编程语言
首页 > 编程语言> > java-如何设置TOPOLOGY_MAX_SPOUT_PENDING参数

java-如何设置TOPOLOGY_MAX_SPOUT_PENDING参数

作者:互联网

在拓扑中,我从Kafka队列中读取触发消息.收到触发消息后,我需要向螺栓发送大约4096条消息.在螺栓中,经过一些处理后,它将发布到另一个Kafka队列(另一个拓扑将在以后使用此队列).

我正在尝试设置TOPOLOGY_MAX_SPOUT_PENDING参数以限制要发送的邮件数量.但我看到它没有任何作用.是否因为我在一个nextTuple()方法中发出了所有元组?如果是这样,应该如何解决?

解决方法:

如果您正在阅读kafka,则应使用随风暴一起提供的KafkaSpout.不要尝试实现自己的喷嘴,相信我,我在生产中使用了KafkaSpout,它运行非常顺利.每条Kafka消息仅生成一个元组.

正如在this nice page from the manual上看到的那样,您可以像下面这样设置topology.max.spout.pending:

Config conf = new Config();
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);

为每个喷嘴设置topology.max.spout.pending,如果您有四个喷嘴,则拓扑内部的不完整元组的最大数量等于喷嘴的数量* topology.max.spout.pending.

另一个提示是,您应该使用storm UI来查看是否正确设置了topology.max.spout.pending.

请记住,topology.max.spout.pending仅是拓扑内部未处理的元组数,至少在生产系统上,拓扑将永远不会停止消耗来自kafka的消息…如果要消耗4096个批次,则需要在螺栓上实施缓存逻辑,或使用除风暴之外的其他方法(面向微型批处理的方法).

标签:apache-kafka,apache-storm,java
来源: https://codeday.me/bug/20191119/2037925.html