RabbitMQ生产者消费者模型(二)
作者:互联网
作为主流的MQ消息队列中间件,RabbitMQ也是具备了生产者消费者的模型,那么也就是说
生产者把消息发送后,消费者来作为接收具体的消息。本文章主要详细的概述RabbitMQ的生产者
投递和消费者监听。
一、消息传递流程
下面主要详细的总结下RabbitMQ消息队列服务器消息彻底的整体流程,具体汇总如下:
- 生产者只负责把消息投递到Exchange,这个过程不需要刻意的关注Queue
- 而由Exchange把消息传递给Queue
- 作为消费者的程序来负责监听Queue的消息
- 为了保障消息传递的准确性以及及时性,Exchange与Queue会存在一定的绑定关系就是路由Key
二、MQ投递
依据RabbitMQ的架构模型,在生产者模型和消费者模型中,其实生产者和消费者并不知道
对方的存在,这是异步通信的特性。作为生产者,它只需要把消息投递到Exchange,在这个过程
中生产者并不需要关注Queue,事实上生产者也是无法关注到Queue的,那么消息是如何让消费者
来监听并且接收的了?这就是说会在Exchange和Queue之间建立一种映射关系,而这层关系就不是
生产者所需要关注的了。作为消费者也不需要刻意的关注Exchange,而只需要监听Queue。
2.1、引入RabbitMQ的jar
要使用RabbitMQ的前提是需要引入RabbitMQ的jar,那么就需要在pom.xml文件里面新增RabbitMQ
的服务端和客户端,具体如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
2.2、生产者投递步骤
生产者把消息需要投递给Exchange,那么它的步骤具体总结如下:
ConnectionFactory类负责获取连接工厂
Connection类的对象获取一个连接
Channel创建数据通道信道,可以发送和接收消息
下面具体是完整的生产者投递的代码,具体如下:
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer
{
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置连接mq的地址信息
connectionFactory.setHost("101.**.***.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
//连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//通过connection来创建Channel
Channel channel = connection.createChannel();
//通过channel来发送具体的数据信息
String msg = "Hello RabbitMQ";
channel.basicPublish("saas", "", null, msg.getBytes());
//发送消息成功后,关闭具体的连接
channel.close();
connection.close();
}
}
在如上中,我们可以看到我们首先需要连接到RabbitMQ的服务器,然后在发送消息message的时候我们需要
指定具体的Exchange,因为对于生产者来说,它只关注的是把消息投递给Exchange。
2.3、消费者监听
生产者把消息投递到Exchange,那么作为消费者就需要来监听具体的消息了。监听的整个过程首先也是
需要建立RabbitMQ的服务器,这部分涉及到的代码具体如下:
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.*;
public class Consumer
{
//定义exchange
private static final String EXCHANGE = "saas";
//定义队列
private static final String queueName="saas";
public static void main(String[] args) throws Exception
{
try{
//创建连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
//配置连接mq的地址信息
connectionFactory.setHost("101.**.***.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
//连接工厂创建连接
Connection connection=connectionFactory.newConnection();
//通过connection来创建Channel
Channel channel=connection.createChannel();
//设置exchange类型为fanout
channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);
/*
定义一个队列
* 一个队列来接收数据后,消费端才可以从队列里面来接收具体的数据
* param1:队列名称
* param2:是否持久化
* param3:队列是否独占此连接
* param4:队列不再使用时是否自动删除此队列
* param5:队列参数
* */
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,EXCHANGE,"");
//创建一个消费者来消费数据
DefaultConsumer consumer=new DefaultConsumer(channel)
{
@Override
public void handleDelivery(
String consumerTag,
com.rabbitmq.client.Envelope envelope,
AMQP.BasicProperties properties,
byte [] body) throws java.io.IOException
{
String message=new String(body);
System.out.println("接收到的消息为:"+message);
};
};
// 监听队列,从队列中获取数据
System.out.println("消费者程序启动成功,准备接收生产者的数据:\n");
channel.basicConsume(queueName,consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
在如上中,我们看到Exchange与生产端的Exchange名字是一样的,那么只有这样才能够建立绑定关系,
再说的更加简单点来说,生产者把消息给到Exchange,然后Exchange与Queue之间有一个层映射关系,
那么只有这样消费者监听队列才能够收取message的消息。
2.4、绑定关系
刚才说到Exchange与Queue之间的绑定关系,下面就针对这部分具体的演示下。我们先启动消费者
的程序,启动成功后,就会自动的创建Exchange和Queue,就可以从Exchange的绑定以及Queue的绑定
中能够获取到对应的绑定关系。
2.4.1、Exchange绑定关系
下面的图是消费者的程序启动后创建的Exchange,以及它的绑定关系,具体如下:
2.4.2、消费者绑定关系
在Exchange的绑定关系中,点击To里面saas,就会自动的跳转到Queue,具体如下所示:
2.5、406错误避免
很多初学者在学习RabbitMQ的时候,总是提前创建好Exchange和Queue,这样结果导致消费者的程序报很多
的错误,具体错误如下:
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:783)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:252)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:242)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:222)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:227)
at com.example.rabbitmq.quickstart.Consumer.main(Consumer.java:31)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
at java.lang.Thread.run(Thread.java:748)
其实遇到该问题,最简单解决问题的方式就是删除自己创建的Exchange和Queue。删除后,再次执行消费者的
程序,它会自动创建Exchange和Queue,而且也就不会再报一系列的具体问题了。解决了如上的问题后,再次
执行生产者的程序,就可以看到生产者发送的消息就能够被消费者这边监听到。感谢您的阅读,下个文章主要
介绍Exchange详解。
标签:java,Exchange,生产者,模型,rabbitmq,client,RabbitMQ,com,impl 来源: https://www.cnblogs.com/weke/p/15835674.html