Flink实战之入库任务调优
作者:互联网
背景
在调试flink写hdfs和hive时,任务总是报各种各样的异常,其中255问题最多,异常信息如下:
java.lang.Exception: Exception from container-launch.
Container id: container_1597847003686_5818_01_000002
Exit code: 255
Stack trace: ExitCodeException exitCode=255:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
at org.apache.hadoop.util.Shell.run(Shell.java:507)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 255
at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
这段异常是yarn报出来的,根本原因是Direct Memory OOM了。那么该如何调优呢,容我慢慢道来。
我们先看下Flink的内存模型。
Flink内存模型
JVM Heap内存
堆内存包括:
- Framework Heap内存:flink框架使用的堆内存
- Task Heap内存:任务使用堆内存(java对象,基于内存的backend存储的state对象)
配置参数:
taskmanager.memory.framework.heap.size
taskmanager.memory.task.heap.size
JVM Off-Heap内存
对外内存:
- Framework Off-Heap内存:flink框架使用的对外内存
- Task Off-Heap内存:任务使用的对外内存
配置参数:
taskmanager.memory.framework.off-heap.size
taskmanager.memory.task.off-heap.size
Framework vs Task
区分:是否计入Slot资源
Framework:flink框架运行使用的内存
Task:任务运行使用的内存,包括heap、off-heap、managed、direct
Heap vs Off-Heap
区分:jvm堆内存和对外内存
Heap:jvm堆
Off-Heap:包括Direct、Native
Framework Heap+Task Heap = -Xmx
Framework off-heap +task off-heap + network = -XX:MaxDirectMemorySize
Network Memory(网络buffer)
属于Directory Memory
用途:
用于task之间缓冲数据,input buffer pool / output buffer pool
配置参数:
taskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction
Managed Memory(托管内存)
属于Native Memory
用途:
- streaming任务RocksDB Backend
- batch任务的sort、hash table、中间结果缓存
- python任务的UDF使用
配置参数:
设置大小:taskmanager.memory.managed.size
设置比率:taskmanager.memory.managed.fraction
JVM Metaspace & Overhead
都是jvm本身的开销
JVM Metaspace
用途:存放JVM加载的类的元数据,加载的类越多需要空间越大
所以如果任务需要加载大量第三方库时,可以调大Metaspace内存
配置参数:
taskmanager.memory.jvm-metaspace.size
JVM Overhead
属于Native Memory
用途:用于其他JVM开销,比如Code Cache、Thread Stack、garbage collection space 等。
配置参数:
taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction
看完上面的总结,想必大家已经有了大概了解,回到我们的入库任务,理解入库任务主要会使用哪一块的内存,那么如何调优也就一目了然了。
入库任务使用内存
入库任务底层原理都是基于StreamingFileSink写Hdfs文件。借助BulkWriter进行写入,数据是先写到Direct Memory当中,然后在文件滚动时flush到hdfs。所以主要使用的Direct Memory,其属于task off-heap内存。
同时我们任务使用了RocksDB的状态后端,但是状态不是很大,也就1M左右。所以可以适当减少Managed Memory的大小。最终效果是调大了task off-heap的内存,调小了Managed Memory的内存,然后任务就不再报255了。
taskmanager.memory.task.off-heap.size和taskmanager.memory.managed.fraction
标签:java,scala,Flink,taskmanager,调优,内存,memory,akka,入库 来源: https://blog.csdn.net/weixin_41608066/article/details/110549951