其他分享
首页 > 其他分享> > Flink实战之入库任务调优

Flink实战之入库任务调优

作者:互联网

背景

在调试flink写hdfs和hive时,任务总是报各种各样的异常,其中255问题最多,异常信息如下:

java.lang.Exception: Exception from container-launch.
Container id: container_1597847003686_5818_01_000002
Exit code: 255
Stack trace: ExitCodeException exitCode=255: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
	at org.apache.hadoop.util.Shell.run(Shell.java:507)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
	at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


Container exited with a non-zero exit code 255

	at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

这段异常是yarn报出来的,根本原因是Direct Memory OOM了。那么该如何调优呢,容我慢慢道来。
我们先看下Flink的内存模型。

Flink内存模型

JVM Heap内存

堆内存包括:

  1. Framework Heap内存:flink框架使用的堆内存
  2. Task Heap内存:任务使用堆内存(java对象,基于内存的backend存储的state对象)

配置参数:

taskmanager.memory.framework.heap.size

taskmanager.memory.task.heap.size

JVM Off-Heap内存

对外内存:

  1. Framework Off-Heap内存:flink框架使用的对外内存
  2. Task Off-Heap内存:任务使用的对外内存

配置参数:

taskmanager.memory.framework.off-heap.size

taskmanager.memory.task.off-heap.size

Framework vs Task

区分:是否计入Slot资源

Framework:flink框架运行使用的内存

Task:任务运行使用的内存,包括heap、off-heap、managed、direct

Heap vs Off-Heap

区分:jvm堆内存和对外内存

Heap:jvm堆

Off-Heap:包括Direct、Native

Framework Heap+Task Heap = -Xmx

Framework off-heap +task off-heap + network = -XX:MaxDirectMemorySize

Network Memory(网络buffer)

属于Directory Memory

用途:

用于task之间缓冲数据,input buffer pool / output buffer pool

配置参数:

taskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction

Managed Memory(托管内存)

属于Native Memory

用途:

  1. streaming任务RocksDB Backend
  2. batch任务的sort、hash table、中间结果缓存
  3. python任务的UDF使用

配置参数:

设置大小:taskmanager.memory.managed.size

设置比率:taskmanager.memory.managed.fraction

JVM Metaspace & Overhead

都是jvm本身的开销

JVM Metaspace

用途:存放JVM加载的类的元数据,加载的类越多需要空间越大

所以如果任务需要加载大量第三方库时,可以调大Metaspace内存

配置参数:

taskmanager.memory.jvm-metaspace.size

JVM Overhead

属于Native Memory

用途:用于其他JVM开销,比如Code Cache、Thread Stack、garbage collection space 等。

配置参数:

taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction

看完上面的总结,想必大家已经有了大概了解,回到我们的入库任务,理解入库任务主要会使用哪一块的内存,那么如何调优也就一目了然了。

入库任务使用内存

入库任务底层原理都是基于StreamingFileSink写Hdfs文件。借助BulkWriter进行写入,数据是先写到Direct Memory当中,然后在文件滚动时flush到hdfs。所以主要使用的Direct Memory,其属于task off-heap内存。
同时我们任务使用了RocksDB的状态后端,但是状态不是很大,也就1M左右。所以可以适当减少Managed Memory的大小。最终效果是调大了task off-heap的内存,调小了Managed Memory的内存,然后任务就不再报255了。
taskmanager.memory.task.off-heap.size和taskmanager.memory.managed.fraction

标签:java,scala,Flink,taskmanager,调优,内存,memory,akka,入库
来源: https://blog.csdn.net/weixin_41608066/article/details/110549951