编程语言
首页 > 编程语言> > java-使用Spring框架在Apache骆驼中回滚消息

java-使用Spring框架在Apache骆驼中回滚消息

作者:互联网

我的应用程序的工作流程:我已经设置了activemq并编写了spring框架来侦听activemq的队列.每当队列中有消息时,侦听器将获取消息,然后将消息出队并执行我的业务逻辑.

现在处于测试用例中,如果我的业务逻辑中存在任何运行时错误,则该消息应回滚到队列.以便消费者可以再次使用该消息并再次执行我的业务逻辑.

我怎样才能用春骆驼做到这一点?

我为ActiveMqConsumer编写的代码

public class ActiveMqConsumer {
public static void main(String[] args){
    try {
        PropertyConfigurator.configure("C:/Users/awsdemo/src/main/resources/log4j.properties");
        ApplicationContext springcontext = new FileSystemXmlApplicationContext("C:/Users/awsdemo/src/main/resources/activecamel.xml");
        CamelContext context = springcontext.getBean("activeContext", CamelContext.class);
        //context.addComponent("activemq", activeMQComponent("tcp://localhost:61616?broker.persistent=false"));
        context.start();
        //Thread.sleep(1000);
        //context.stop();

    } catch ( Exception e ) {
        System.out.println(e);
    }

}
}

ActiveMQRouterBuilder的代码

public class ActiveMQRouterBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
    String activeMqURI = "activemq:queue:ThermalMap";
    System.out.print(activeMqURI);
    from( activeMqURI).to("bean:activemqProcessor?method=processMessage");

}
}

ActiveMQProcessor的代码

public class ActiveMQProcessor{
public void processMessage(Exchange exchange) throws Exception{
    System.out.println("\ninside processMessage :Consumer1");
    //System.out.println(exchange.getIn().getBody());
    Object object = exchange.getIn().getBody();
    FunctionNames functionNamesObject=new FunctionNames();
    //Call Intergration function to execute .exe file
    try {
                 /* my business logic*/
    } catch (IOException e) {
                     /* message should rollback here to activemq*/

        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
                     /* or message should rollback here to activemq*/
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    System.out.println("ActiveMQProcessor: finished");
}

}

以上三个文件结合起来充当使用者.这三个文件在activecamel.xml文件中配置. activecamel.xml包含以下代码

<camelContext id="activeContext" xmlns="http://camel.apache.org/schema/spring">
    <routeBuilder ref="activeMQRouter" />
</camelContext>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1" />
</bean>

<bean id="activeMQRouter" class="main.java.com.aranin.activemq.ActiveMQRouterBuilder"/>

<bean id="activemqProcessor" class="main.java.com.aranin.activemq.ActiveMQProcessor"/>

在ActiveMQProcessor中,我编写了业务逻辑,如果有任何错误,它将引发错误以捕获块.在catch块中,我应该将代码编写为回滚消息.回传消息应使用什么代码?

解决方法:

如果在路由中使用transacted(),则应该从ActiveMQProcessor引发异常,而Camel将自动回滚TX.

  from( activeMqURI)
    .transacted()      
    .to("bean:activemqProcessor?method=processMessage");

您还需要将ActiveMQ配置为事务处理

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1" />
    <property name="transacted" value="true"/>
</bean>

并设置一个JMS事务管理器.查看更多详细信息:http://camel.apache.org/transactional-client.html

尽管是后者,但如果您已进行事务处理= true,则Camel应该默认设置一个.但是,仍然最好在xml文件中定义一个事务管理器,并在activemq配置中进行引用.来自该链接的所有详细信息.

如果您有《骆驼在行动》一书的副本,请阅读第9章.

标签:apache-camel,activemq,spring,java
来源: https://codeday.me/bug/20191030/1967652.html