基于配置类的方式发送和订阅消费消息
作者:互联网
package com.itheima.config; import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Classname RabbitMQConfig * @Description RabbitMQ消息配置类 * @Date 2019-3-8 14:15 * @Created by CrazyStone */ @Configuration public class RabbitMQConfig { /** * 定制JSON格式的消息转换器 * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 使用基于配置类的方式定制消息中间件 * @return */ // 1、定义fanout类型的交换器 @Bean public Exchange fanout_exchange(){ return ExchangeBuilder.fanoutExchange("fanout_exchange").build(); } // 2、定义两个不同名称的消息队列 @Bean public Queue fanout_queue_email(){ return new Queue("fanout_queue_email"); } @Bean public Queue fanout_queue_sms(){ return new Queue("fanout_queue_sms"); } // 3、将两个不同名称的消息队列与交换器进行绑定 @Bean public Binding bindingEmail(){ return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs(); } @Bean public Binding bindingSms(){ return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs(); } }
package com.itheima; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.itheima.domain.User; @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class ApplicationTest { @Autowired private AmqpAdmin amqpAdmin; @Autowired private RabbitTemplate rabbitTemplate; /** * 使用AmqpAdmin管理员API定制消息组件 */ @Test public void amqpAdmin() { // 1、定义fanout类型的交换器 amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange")); // 2、定义两个默认持久化队列,分别处理email和sms amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms")); // 3、将队列分别与交换器进行绑定 amqpAdmin.declareBinding( new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null)); amqpAdmin.declareBinding( new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null)); } /** * 1、Publish/Subscribe工作模式消息发送端 */ @Test public void psubPublisher() { User user = new User(); user.setId(1); user.setUsername("石头"); rabbitTemplate.convertAndSend("fanout_exchange", "", user); } /** * 1、Publish/Subscribe工作模式消息发送端 */ @Test public void my_psubPublisher() { User user = new User(); user.setId(1); user.setUsername("天生自然"); rabbitTemplate.convertAndSend("my_fanout_exchange", "", user); } }
package com.itheima.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { /** * Publish/Subscribe工作模式接收,处理邮件业务 * * @param message */ @RabbitListener(queues = "fanout_queue_email") public void psubConsumerEmail(Message message) { byte[] body = message.getBody(); String s = new String(body); System.out.println("邮件业务接收到消息: " + s); } /** * Publish/Subscribe工作模式接收,处理短信业务 * * @param message */ @RabbitListener(queues = "fanout_queue_sms") public void psubConsumerSms(Message message) { byte[] body = message.getBody(); String s = new String(body); System.out.println("短信业务接收到消息: " + s); } /** * Publish/Subscribe工作模式接收,处理邮件业务 * * @param message */ @RabbitListener(queues = "my_fanout_queue_email") public void my_psubConsumerEmail(Message message) { byte[] body = message.getBody(); String s = new String(body); System.out.println("邮件业务接收到消息: " + s); } /** * Publish/Subscribe工作模式接收,处理短信业务 * * @param message */ @RabbitListener(queues = "my_fanout_queue_sms") public void my_psubConsumerSms(Message message) { byte[] body = message.getBody(); String s = new String(body); System.out.println("短信业务接收到消息: " + s); } }
标签:订阅,基于,queue,springframework,发送,fanout,import,org,public 来源: https://www.cnblogs.com/tszr/p/15918102.html