编程语言
首页 > 编程语言> > java-在Apache Storm bolt中使用Apache Camel ProducerTemplate

java-在Apache Storm bolt中使用Apache Camel ProducerTemplate

作者:互联网

我正在尝试编写简单的Storm Camel项目.
我的Storm拓扑分析了tweet,一个螺栓应该将tweet文本发送到apache骆驼路线,而后者又使用websocket通知了一些webapp.

当尝试使用一次CamelContext构建时,由于从螺栓接收到NotSerializableExceptions,因此我无法使其工作.

我已经尝试过的:

>在Bolt的构造函数中传递CamelContext-导致NotSerializableException
>在storm conf中传递CamelContext,并在bolt的prepare(…)方法中使用它来加强对其的访问.结果是 :

14484 [main]错误org.apache.storm.zookeeper.server.NIOServerCnxnFactory-线程Thread [main,5,main]死亡
java.lang.IllegalArgumentException:拓扑conf无法json序列化
    在backtype.storm.testing $submit_local_topology.invoke(testing.clj:262)〜[storm-core-0.9.4.jar:0.9.4]
    在backtype.storm.LocalCluster $_submitTopology.invoke(LocalCluster.clj:43)〜[storm-core-0.9.4.jar:0.9.4]
    在backtype.storm.LocalCluster.submitTopology(未知来源)〜[storm-core-0.9.4.jar:0.9.4]

骆驼路线:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:main")
                .to("websocket:localhost:8085/main?sendToAll=true");
    }
}

风暴拓扑:
Tweet Spout使用twitter4j stremaing API传播推文.

public class TwitterStreamTopology {

    public static void main(String[] args) {
        CamelContext producerTemplate = new RouteStarter().buildRoute();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
        builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
        Config conf = new Config();
        conf.put("producerTemplate", producerTemplate.createProducerTemplate());
        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("mytopology", conf, builder.createTopology());

        Utils.sleep(20000);
        cluster.shutdown();
    }
}

WebsocketBolt:

public class WebSocketBolt extends BaseBasicBolt {
    private ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
    }
}

有没有办法很好地做到这一点?

还是应该让骆驼路由被http访问,并在bolt prepare(…)方法中创建一些HttpClient?这看起来仍然有些矫kill过正,并且必须有一种使其变得更容易的方法.

感谢您的帮助!

解决方法:

问题的根本原因在于,您正在将ProducerTemplate添加到您的风暴配置中,并且由于无法序列化而引发了异常.如果那是您自己的课,则可以更改代码以使其工作,但是由于那是骆驼课,所以我建议您使用其他方法.

> WebSocketBolt:将您的producerTemplate私有成员更改为临时成员:这样它就不会尝试序列化(将其放入conf时遇到同样的问题).
> WebSocketBolt:在prepare方法而不是拓扑中初始化producerTemplate.

像这样:

public class WebSocketBolt extends BaseBasicBolt {
    private transient ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        CamelContext producerTemplate = new RouteStarter().buildRoute();
        this.producerTemplate = producerTemplate.createProducerTemplate();
    }
}

标签:apache-camel,apache-storm,java
来源: https://codeday.me/bug/20191120/2043781.html