其他分享
首页 > 其他分享> > 基于配置类的方式发送和订阅消费消息

基于配置类的方式发送和订阅消费消息

作者:互联网

package com.itheima.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Classname RabbitMQConfig
 * @Description RabbitMQ消息配置类
 * @Date 2019-3-8 14:15
 * @Created by CrazyStone
 */
@Configuration
public class RabbitMQConfig {
    /**
     * 定制JSON格式的消息转换器
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 使用基于配置类的方式定制消息中间件
     * @return
     */
    // 1、定义fanout类型的交换器
    @Bean
    public Exchange fanout_exchange(){
        return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
    }
    // 2、定义两个不同名称的消息队列
    @Bean
    public Queue fanout_queue_email(){
        return new Queue("fanout_queue_email");
    }
    @Bean
    public Queue fanout_queue_sms(){
        return new Queue("fanout_queue_sms");
    }
    // 3、将两个不同名称的消息队列与交换器进行绑定
    @Bean
    public Binding bindingEmail(){
        return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();
    }
    @Bean
    public Binding bindingSms(){
        return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();
    }

}
package com.itheima;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.itheima.domain.User;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ApplicationTest {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 使用AmqpAdmin管理员API定制消息组件
     */
    @Test
    public void amqpAdmin() {
        // 1、定义fanout类型的交换器
        amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
        // 2、定义两个默认持久化队列,分别处理email和sms
        amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
        amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
        // 3、将队列分别与交换器进行绑定
        amqpAdmin.declareBinding(
                new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
        amqpAdmin.declareBinding(
                new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
    }

    /**
     * 1、Publish/Subscribe工作模式消息发送端
     */
    @Test
    public void psubPublisher() {
        User user = new User();
        user.setId(1);
        user.setUsername("石头");
        rabbitTemplate.convertAndSend("fanout_exchange", "", user);
    }
    
    /**
     * 1、Publish/Subscribe工作模式消息发送端
     */
    @Test
    public void my_psubPublisher() {
        User user = new User();
        user.setId(1);
        user.setUsername("天生自然");
        rabbitTemplate.convertAndSend("my_fanout_exchange", "", user);
    }
}
package com.itheima.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQService {

    /**
     * Publish/Subscribe工作模式接收,处理邮件业务
     * 
     * @param message
     */
    @RabbitListener(queues = "fanout_queue_email")
    public void psubConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("邮件业务接收到消息: " + s);
    }

    /**
     * Publish/Subscribe工作模式接收,处理短信业务
     * 
     * @param message
     */
    @RabbitListener(queues = "fanout_queue_sms")
    public void psubConsumerSms(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("短信业务接收到消息: " + s);
    }
    
    /**
     * Publish/Subscribe工作模式接收,处理邮件业务
     * 
     * @param message
     */
    @RabbitListener(queues = "my_fanout_queue_email")
    public void my_psubConsumerEmail(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("邮件业务接收到消息: " + s);
    }

    /**
     * Publish/Subscribe工作模式接收,处理短信业务
     * 
     * @param message
     */
    @RabbitListener(queues = "my_fanout_queue_sms")
    public void my_psubConsumerSms(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("短信业务接收到消息: " + s);
    }
}

 

 

 

标签:订阅,基于,queue,springframework,发送,fanout,import,org,public
来源: https://www.cnblogs.com/tszr/p/15918102.html