编程语言
首页 > 编程语言> > java-Spring Integration Inbound-Channel-Adapter逐行读取大文件

java-Spring Integration Inbound-Channel-Adapter逐行读取大文件

作者:互联网

我目前正在将Spring Integration 4.1.0与Spring 4.1.2结合使用.
我要求能够逐行读取文件并将读取的每一行都用作消息.基本上,我想允许我们的消息源之一“重播”,但是消息不是保存在单个文件中,而是保存在单个文件中.我对此用例没有交易要求.
除了与JVM运行所在的服务器位于同一服务器上的文件外,我的要求与该发布类似:spring integration – read a remote file line by line

如我所见,我有以下选择:

1.使用int-file:inbound-channel-adapter读取文件,然后“拆分”该文件,以使1条消息现在变为多条消息.
样本配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

        <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
        <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
        <int:channel id="channel1"/>
        <int:splitter input-channel="channel2" output-channel="nullChannel"/>
        <int:channel id="channel2"/>
    </beans>

问题在于文件非常大,使用上述技术时,整个文件首先被读取到内存中,然后被拆分,JVM耗尽了堆空间.真正需要的步骤是:读取一行并将行转换为消息,发送消息,从内存中删除消息并重复.

>使用带有end =“ false”的int-file:tail-inbound-channel-adapter(基本上表示从文件开头读取).根据需要为每个文件启动和停止此适配器(在每次启动前更改文件名).
样本配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
        http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int-file:tail-inbound-channel-adapter id="apache"
        channel="exchangeSpringQueueChannel"
        task-executor="exchangeFileReplayTaskExecutor"
        file="C:\p2-test.txt"
        delay="1"
        end="false"
        reopen="true"
        file-delay="10000" />

    <int:channel id="exchangeSpringQueueChannel" />
    <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
</beans>

>将Spring Integration调用到Spring Batch中,并使用ItemReader处理文件.当然,可以对整个过程进行更细粒度的控制,但是需要大量工作来设置作业存储库等(而且我并不关心作业历史,因此我要么告诉作业不要记录状态, /或使用内存MapJobRepository).

4.通过扩展MessageProducerSupport创建我自己的FileLineByLineInboundChannelAdapter.
许多代码可以从ApacheCommonsFileTailingMessageProducer借用(另请参见http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter).下面是一个示例,但需要做一些工作才能将读数放入自己的线程中,以便在逐行阅读时尊重stop()命令.

    package com.xxx.exchgateway.common.util.springintegration;

    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import org.apache.commons.io.IOUtils;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.endpoint.MessageProducerSupport;
    import org.springframework.integration.file.FileHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.util.Assert;

    /**
     * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
     * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
     */
    public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
        private volatile File file;

        /**
         * The name of the file you wish to tail.
         * @param file The absolute path of the file.
         */
        public void setFile(File file) {
            Assert.notNull("'file' cannot be null");
            this.file = file;
        }

        protected File getFile() {
            if (this.file == null) {
                throw new IllegalStateException("No 'file' has been provided");
            }
            return this.file;
        }

        @Override
        public String getComponentType() {
            return "file:line-by-line-inbound-channel-adapter";
        }

        private void readFile() {
            FileInputStream fstream;
            try {
                fstream = new FileInputStream(getFile());

                BufferedReader br = new BufferedReader(new InputStreamReader(fstream));

                String strLine;

                // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
                while ((strLine = br.readLine()) != null && isRunning()) {
                    send(strLine);
                }

                //Close the input stream
                IOUtils.closeQuietly(br);
                IOUtils.closeQuietly(fstream);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        @Override
        protected void doStart() {
            super.doStart();

            // TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
            // and we want to honor the stop() command while we read line-by-line
            readFile();
        }

        protected void send(String line) {
            Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
            super.sendMessage(message);
        }

        @Override
        public Message<String> receive() {
            // TODO Auto-generated method stub
            return null;
        }
    }

在我看来,我的用例并不超出人们可能喜欢做的典型事情的范围,因此,我为无法找到开箱即用的解决方案感到惊讶.但是,我已经搜索了很多,并查看了很多示例,但是不幸的是,还没有找到适合我需要的东西.

我以为也许我已经错过了框架已经提供的显而易见的东西(尽管这可能属于Spring Integraton和Spring Batch之间的模糊界限).有人可以让我知道我的想法是否完全落伍,或者我错过了一个简单的解决方案,或者提出其他建议?

解决方法:

Spring Integration 4.x具有使用迭代器作为消息的一个不错的新功能:

Spring Integration Reference

Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split.

这允许将Iterator作为消息发送,而不是将整个文件读入内存.

Here is Spring Context将CSV文件每行拆分为一条消息的简单示例:

<int-file:inbound-channel-adapter 
        directory="${inputFileDirectory:/tmp}"
        channel="inputFiles"/>

<int:channel id="inputFiles">
    <int:dispatcher task-executor="executor"/>
</int:channel>

<int:splitter 
    input-channel="inputFiles" 
    output-channel="output">
    <bean 
        class="FileSplitter" 
        p:commentPrefix="${commentPrefix:#}" />
</int:splitter>

<task:executor 
    id="executor" 
    pool-size="${poolSize:8}" 
    queue-capacity="${aueueCapacity:0}" 
    rejection-policy="CALLER_RUNS" />

<int:channel id="output"/>

这是splitter implementation

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);

    private String commentPrefix = "#";

    public Object splitMessage(Message<?> message) {
        if(log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {

            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 

            return new BufferedReaderFileIterator((File) payload);
        } 
        catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }

    public void setCommentPrefix(String commentPrefix) {
        this.commentPrefix = commentPrefix;
    }

    public class BufferedReaderFileIterator implements Iterator<String> {

        private File file;
        private BufferedReader bufferedReader;
        private String line;

        public BufferedReaderFileIterator(File file) throws IOException {
            this.file = file;
            this.bufferedReader = new BufferedReader(new FileReader(file));
            readNextLine();
        }

        @Override
        public boolean hasNext() {
            return line != null;
        }

        @Override
        public String next() {
            try {
                String res = this.line;
                readNextLine();
                return res;
            } 
            catch (IOException e) {
                log.error("Error reading file", e);
                throw new RuntimeException(e);
            }   
        }

        void readNextLine() throws IOException {
            do {
                line = bufferedReader.readLine();
            }
            while(line != null && line.trim().startsWith(commentPrefix));

            if(log.isTraceEnabled()) {
                log.trace("Read next line: {}", line);
            }

            if(line == null) {
                close();
            }
        }

        void close() throws IOException {
            bufferedReader.close();
            file.delete();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

}

请注意,从splitMessage()处理程序方法返回的Iterator对象.

标签:spring-batch,spring,java,spring-integration
来源: https://codeday.me/bug/20191121/2048590.html