Flink出现network.partition.ProducerFailedException: java.lang.NullPointerException
作者:互联网
一、错误日志
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'xx.xxx.xxx.xxx/xxx.xxx.xxx.xxx:34750'. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:325) ~[flinkjob:?] at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:214) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flinkjob:?] at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flinkjob:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException: java.lang.NullPointerException at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:224) ~[flinkjob:?] at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:110) ~[flinkjob:?] at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:173) ~[flinkjob:?] at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) ~
二、原因分析
由于启动参数中只设置了一个插槽,而多个Task任务都来共享这个插槽,导致网络出现繁忙、一些Task容器无法使用到插槽,从而出现network.partition.ProducerFailedException
三、解决方案
发生报错之前,我的启动参数如下:
bin/flink run -d -m yarn-cluster -yjm 2048m -ytm 4096m -p 1 -ys 1 -yD taskmanager.memory.managed.size=265M -c 全类名 flinkjob.jar
解决方案是:合理设置并行度以及插槽数,在启动参数中指定即可
bin/flink run -d -m yarn-cluster -yjm 2048m -ytm 4096m -p 5 -ys 3 -yD taskmanager.memory.managed.size=265M -c 全类名 flinkjob.jar
参数说明:
-m:指定yarn集群
-yjm:指定jobmanager的内存
-ytm:指定taskmanager的内存
-p:指定并行度
-ys:指定插槽的数量
-yD:后台运行
taskmanager.memory.managed.size:taskmanager直接管理内存
标签:lang,netty,Flink,java,flink,io,apache,org 来源: https://www.cnblogs.com/qq1035807396/p/16626856.html