java-在Apache Storm bolt中使用Apache Camel ProducerTemplate
作者:互联网
我正在尝试编写简单的Storm Camel项目.
我的Storm拓扑分析了tweet,一个螺栓应该将tweet文本发送到apache骆驼路线,而后者又使用websocket通知了一些webapp.
当尝试使用一次CamelContext构建时,由于从螺栓接收到NotSerializableExceptions,因此我无法使其工作.
我已经尝试过的:
>在Bolt的构造函数中传递CamelContext-导致NotSerializableException
>在storm conf中传递CamelContext,并在bolt的prepare(…)方法中使用它来加强对其的访问.结果是 :
14484 [main]错误org.apache.storm.zookeeper.server.NIOServerCnxnFactory-线程Thread [main,5,main]死亡
java.lang.IllegalArgumentException:拓扑conf无法json序列化
在backtype.storm.testing $submit_local_topology.invoke(testing.clj:262)〜[storm-core-0.9.4.jar:0.9.4]
在backtype.storm.LocalCluster $_submitTopology.invoke(LocalCluster.clj:43)〜[storm-core-0.9.4.jar:0.9.4]
在backtype.storm.LocalCluster.submitTopology(未知来源)〜[storm-core-0.9.4.jar:0.9.4]
骆驼路线:
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:main")
.to("websocket:localhost:8085/main?sendToAll=true");
}
}
风暴拓扑:
Tweet Spout使用twitter4j stremaing API传播推文.
public class TwitterStreamTopology {
public static void main(String[] args) {
CamelContext producerTemplate = new RouteStarter().buildRoute();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
Config conf = new Config();
conf.put("producerTemplate", producerTemplate.createProducerTemplate());
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Utils.sleep(20000);
cluster.shutdown();
}
}
WebsocketBolt:
public class WebSocketBolt extends BaseBasicBolt {
private ProducerTemplate producerTemplate;
@Override
public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
Status s = (Status) input.getValueByField("tweet");
producerTemplate.sendBody("direct:main", s.getText());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
}
}
有没有办法很好地做到这一点?
还是应该让骆驼路由被http访问,并在bolt prepare(…)方法中创建一些HttpClient?这看起来仍然有些矫kill过正,并且必须有一种使其变得更容易的方法.
感谢您的帮助!
解决方法:
问题的根本原因在于,您正在将ProducerTemplate添加到您的风暴配置中,并且由于无法序列化而引发了异常.如果那是您自己的课,则可以更改代码以使其工作,但是由于那是骆驼课,所以我建议您使用其他方法.
> WebSocketBolt:将您的producerTemplate私有成员更改为临时成员:这样它就不会尝试序列化(将其放入conf时遇到同样的问题).
> WebSocketBolt:在prepare方法而不是拓扑中初始化producerTemplate.
像这样:
public class WebSocketBolt extends BaseBasicBolt {
private transient ProducerTemplate producerTemplate;
@Override
public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
Status s = (Status) input.getValueByField("tweet");
producerTemplate.sendBody("direct:main", s.getText());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
CamelContext producerTemplate = new RouteStarter().buildRoute();
this.producerTemplate = producerTemplate.createProducerTemplate();
}
}
标签:apache-camel,apache-storm,java 来源: https://codeday.me/bug/20191120/2043781.html