其他分享
首页 > 其他分享> > Springboot集成RabbitMQ(二)

Springboot集成RabbitMQ(二)

作者:互联网

上一篇说到了简单的使用RabbitMQ,生产者和消费者只又一个,也就是一对一,下面介绍复杂 的情况。

一.多对多的使用

1.一对多

@Test
    public void oneToMany() throws Exception {
        for (int i=0;i<50;i++){
            neoSender.send(i);
        }
    }

 

一个生产者发送消息,多个消费者。接收端注册了两个消费者,发送端加入参数,发送10条消息也就是遍历10次。测试结果

Receiver 2: spirng boot neo queue ****** 1
Receiver 1: spirng boot neo queue ****** 0
Receiver 2: spirng boot neo queue ****** 3
Receiver 1: spirng boot neo queue ****** 2
Receiver 2: spirng boot neo queue ****** 5
Receiver 1: spirng boot neo queue ****** 4
Receiver 2: spirng boot neo queue ****** 7
Receiver 1: spirng boot neo queue ****** 6
Receiver 2: spirng boot neo queue ****** 9
Receiver 1: spirng boot neo queue ****** 8

从上面可以看出来,消息会随机并均匀的发送给消费者。

2.多对多

复制一个消费者的类,命名为NeoSender2

@Test
    public void manyToMany() throws Exception {
        for (int i=0;i<100;i++){
            neoSender.send(i);
            neoSender2.send(i);
        }
    }

测试结果

Receiver 2: spirng boot neo queue ****** 0
Receiver 1: spirng boot neo queue ****** 0
Receiver 2: spirng boot neo queue ****** 1
Receiver 1: spirng boot neo queue ****** 1
Receiver 2: spirng boot neo queue ****** 2
Receiver 1: spirng boot neo queue ****** 2
Receiver 2: spirng boot neo queue ****** 3
Receiver 2: spirng boot neo queue ****** 4
Receiver 1: spirng boot neo queue ****** 3
Receiver 2: spirng boot neo queue ****** 5
Receiver 1: spirng boot neo queue ****** 4
Receiver 2: spirng boot neo queue ****** 6
Receiver 1: spirng boot neo queue ****** 5
Receiver 2: spirng boot neo queue ****** 7
Receiver 1: spirng boot neo queue ****** 6
Receiver 2: spirng boot neo queue ****** 8
Receiver 1: spirng boot neo queue ****** 7
Receiver 2: spirng boot neo queue ****** 9
Receiver 1: spirng boot neo queue ****** 8
Receiver 1: spirng boot neo queue ****** 9

结论和一对多一样。

二.高级特性

SpringBoot可以做到无需任何配置发送接受对象。

    //生产者
        public void send(User user) {
        System.out.println("Sender object: " + user.toString());
        this.rabbitTemplate.convertAndSend("object", user);
    }


  。。。


    //消费者
  @RabbitHandler
    public void process(User user) {
        System.out.println("Receiver object : " + user);
    }

测试结果:

Sender object: User{name='xpc', pass='123456'}
Receiver object : User{name='xpc', pass='123456'}

Topic Exchange

topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key 自由的绑定不同的队列

首先对 topic 规则配置,这里使用两个队列来测试

三个生产者

    public void send() {
        String context = "hi, i am message all";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
    }

    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }

配置类

    final static String message = "topic.message";
    final static String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

发送send指挥匹配到topic.#只有Receiver2能接收到消息,测试结果如下

发送多次消息

    @Test
    public void topic() throws Exception {
        sender.send();
        sender.send();
        sender.send();
        sender.send();
        sender.send();
    }

控制台打印接收端接受:

Topic Receiver2  : hi, i am message all
Topic Receiver2  : hi, i am message all
Topic Receiver2  : hi, i am message all
Topic Receiver2  : hi, i am message all
Topic Receiver2  : hi, i am message all

发送send1会匹配到topic.#和topic.message,Receiver1和Receiver2都能接收到消息,测试结果如下

@Test
    public void topic1() throws Exception {
        sender.send1();
    }
Topic Receiver1  : hi, i am message 1
Topic Receiver2  : hi, i am message 1

send2和send同理,指只会匹配topic.#,只有Receiver2能接收消息

Fanout Exchange

Fanout 就是广播模式或者订阅模式,给交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

Fanout 相关配置

@Configuration
public class FanoutRabbitConfig {

    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

}
@Test
    public void fanoutSender() throws Exception {
        sender.send();
    }

测试结果

fanout Receiver B: hi, fanout msg 
2019-03-21 20:36:08.477  INFO 13508 --- [       Thread-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
fanout Receiver A  : hi, fanout msg 
2019-03-21 20:36:08.482  INFO 13508 --- [       Thread-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
fanout Receiver C: hi, fanout msg 

无论发送端routing_key写了任何字符都会无效,所有配置了相同路由器的队列都会接收到消息

标签:集成,neo,Springboot,spirng,boot,RabbitMQ,queue,Receiver,public
来源: https://www.cnblogs.com/xingpengcheng/p/10574434.html