其他分享
首页 > 其他分享> > Flume出现The server disconnected before a response was received错误

Flume出现The server disconnected before a response was received错误

作者:互联网

一、错误日志如下:

下午4点43:28.444分    ERROR    KafkaSink    
Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:243)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
下午4点43:28.445分    ERROR    SinkRunner    
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:267)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:243)
    ... 3 more
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
下午4点43:31.657分    INFO    ReliableTaildirEventReader    
Last read was never committed - resetting position

二、错误原因分析

  将Atlas中的元数据导出之后,使用Flume进行采集,因此需要分析文件大小,仔细查找之后发现最大的一个JSON文件有200M,故原因就是采集的JSON文件过大导致

三、解决思路

  1、自定义Flume拦截器,对大文件单独处理,看看能否将JSON文件中的数据拆分开

  2、同时调整如下参数:

#批次处理行数
a1.sources.r1.batchSize = 10000
#interceptor
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.meiyijia.pd.flume.interceptor.BigDataInterceptor$Builder
a1.sources.r1.interceptors.i1.param=parameter
#关闭没有新增内容的文件超时时间(毫秒
a1.sources.r1.idleTimeout = 1000

#最大数据大小
a1.sinks.k1.kafka.producer.max.request.size = 1053741824
#客户端总缓存大小
a1.sinks.k1.kafka.producer.buffer.memory = 15053741824
a1.sinks.k1.kafka.producer.max.block.ms = 30000
a1.sinks.k1.kafka.producer.request.timeout.ms = 10000
a1.sinks.k1.kafka.flumeBatchSize = 10000
a1.sinks.k1.kafka.linger.ms = 1
a1.sinks.k1.kafka.batch.size = 10000
#失败重试次数
a1.sinks.k1.kafka.producer.retries = 3

# channel
a1.channels.c1.type = memory
#channel中最多缓存多少
a1.channels.c1.capacity = 2000000
#channel一次最多吐给sink多少
a1.channels.c1.transactionCapacity = 12000
#event的活跃时间
a1.channels.c1.keep-alive = 3

四、参考文档:

https://www.csdn.net/tags/Ntjacg3sODcwMTUtYmxvZwO0O0OO0O0O.html

https://flume.liyifeng.org/#

标签:Flume,received,java,disconnected,kafka,a1,apache,org,FutureRecordMetadata
来源: https://www.cnblogs.com/qq1035807396/p/16446453.html