java-Spring Integration Inbound-Channel-Adapter逐行读取大文件
作者:互联网
我目前正在将Spring Integration 4.1.0与Spring 4.1.2结合使用.
我要求能够逐行读取文件并将读取的每一行都用作消息.基本上,我想允许我们的消息源之一“重播”,但是消息不是保存在单个文件中,而是保存在单个文件中.我对此用例没有交易要求.
除了与JVM运行所在的服务器位于同一服务器上的文件外,我的要求与该发布类似:spring integration – read a remote file line by line
如我所见,我有以下选择:
1.使用int-file:inbound-channel-adapter读取文件,然后“拆分”该文件,以使1条消息现在变为多条消息.
样本配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
<int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
<int:channel id="channel1"/>
<int:splitter input-channel="channel2" output-channel="nullChannel"/>
<int:channel id="channel2"/>
</beans>
问题在于文件非常大,使用上述技术时,整个文件首先被读取到内存中,然后被拆分,JVM耗尽了堆空间.真正需要的步骤是:读取一行并将行转换为消息,发送消息,从内存中删除消息并重复.
>使用带有end =“ false”的int-file:tail-inbound-channel-adapter(基本上表示从文件开头读取).根据需要为每个文件启动和停止此适配器(在每次启动前更改文件名).
样本配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:tail-inbound-channel-adapter id="apache"
channel="exchangeSpringQueueChannel"
task-executor="exchangeFileReplayTaskExecutor"
file="C:\p2-test.txt"
delay="1"
end="false"
reopen="true"
file-delay="10000" />
<int:channel id="exchangeSpringQueueChannel" />
<task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
</beans>
>将Spring Integration调用到Spring Batch中,并使用ItemReader处理文件.当然,可以对整个过程进行更细粒度的控制,但是需要大量工作来设置作业存储库等(而且我并不关心作业历史,因此我要么告诉作业不要记录状态, /或使用内存MapJobRepository).
4.通过扩展MessageProducerSupport创建我自己的FileLineByLineInboundChannelAdapter.
许多代码可以从ApacheCommonsFileTailingMessageProducer借用(另请参见http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter).下面是一个示例,但需要做一些工作才能将读数放入自己的线程中,以便在逐行阅读时尊重stop()命令.
package com.xxx.exchgateway.common.util.springintegration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;
/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}
protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}
@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}
private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String strLine;
// Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}
//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void doStart() {
super.doStart();
// TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
// and we want to honor the stop() command while we read line-by-line
readFile();
}
protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}
@Override
public Message<String> receive() {
// TODO Auto-generated method stub
return null;
}
}
在我看来,我的用例并不超出人们可能喜欢做的典型事情的范围,因此,我为无法找到开箱即用的解决方案感到惊讶.但是,我已经搜索了很多,并查看了很多示例,但是不幸的是,还没有找到适合我需要的东西.
我以为也许我已经错过了框架已经提供的显而易见的东西(尽管这可能属于Spring Integraton和Spring Batch之间的模糊界限).有人可以让我知道我的想法是否完全落伍,或者我错过了一个简单的解决方案,或者提出其他建议?
解决方法:
Spring Integration 4.x具有使用迭代器作为消息的一个不错的新功能:
Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split.
这允许将Iterator作为消息发送,而不是将整个文件读入内存.
Here is Spring Context将CSV文件每行拆分为一条消息的简单示例:
<int-file:inbound-channel-adapter
directory="${inputFileDirectory:/tmp}"
channel="inputFiles"/>
<int:channel id="inputFiles">
<int:dispatcher task-executor="executor"/>
</int:channel>
<int:splitter
input-channel="inputFiles"
output-channel="output">
<bean
class="FileSplitter"
p:commentPrefix="${commentPrefix:#}" />
</int:splitter>
<task:executor
id="executor"
pool-size="${poolSize:8}"
queue-capacity="${aueueCapacity:0}"
rejection-policy="CALLER_RUNS" />
<int:channel id="output"/>
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
public class FileSplitter extends AbstractMessageSplitter {
private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);
private String commentPrefix = "#";
public Object splitMessage(Message<?> message) {
if(log.isDebugEnabled()) {
log.debug(message.toString());
}
try {
Object payload = message.getPayload();
Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");
return new BufferedReaderFileIterator((File) payload);
}
catch (IOException e) {
String msg = "Unable to transform file: " + e.getMessage();
log.error(msg);
throw new MessageTransformationException(msg, e);
}
}
public void setCommentPrefix(String commentPrefix) {
this.commentPrefix = commentPrefix;
}
public class BufferedReaderFileIterator implements Iterator<String> {
private File file;
private BufferedReader bufferedReader;
private String line;
public BufferedReaderFileIterator(File file) throws IOException {
this.file = file;
this.bufferedReader = new BufferedReader(new FileReader(file));
readNextLine();
}
@Override
public boolean hasNext() {
return line != null;
}
@Override
public String next() {
try {
String res = this.line;
readNextLine();
return res;
}
catch (IOException e) {
log.error("Error reading file", e);
throw new RuntimeException(e);
}
}
void readNextLine() throws IOException {
do {
line = bufferedReader.readLine();
}
while(line != null && line.trim().startsWith(commentPrefix));
if(log.isTraceEnabled()) {
log.trace("Read next line: {}", line);
}
if(line == null) {
close();
}
}
void close() throws IOException {
bufferedReader.close();
file.delete();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}
请注意,从splitMessage()处理程序方法返回的Iterator对象.
标签:spring-batch,spring,java,spring-integration 来源: https://codeday.me/bug/20191121/2048590.html