rabbitMQ的几种工作模式及代码demo(二)-----订阅模式之广播fanout交换机
作者:互联网
- 订阅模式
- 订阅模式相比于点对点模式,多了一个交换机exchange角色。
- 生产者会将消息发送给exchange
- exchange一端接收消息。一端处理消息。例如,交给特定的队列、发送给所有的队列、或者将消息丢弃。
- exchange有三种类型:Fanout广播、Direct定向、Topic通配符。
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- exchange只负责转发消息,不负责存储消息。所以如果没有任何队列和交换机绑定,或者没有符合路由规则的队列,那么消息就会被丢弃。
- Publish/Subscribe发布与订阅模式
- 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
- 广播交换机-生产者代码
public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); /* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 参数: 1. exchange:交换机名称 2. type:交换机类型 DIRECT("direct"),:定向 FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。 TOPIC("topic"),通配符的方式 HEADERS("headers");参数匹配 3. durable:是否持久化 4. autoDelete:自动删除 5. internal:内部使用。 一般false 6. arguments:参数 */ String exchangeName = "test_fanout"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); //6. 创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数: 1. queue:队列名称 2. exchange:交换机名称 3. routingKey:路由键,绑定规则 如果交换机的类型为fanout ,routingKey设置为"" */ channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; //8. 发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); //9. 释放资源 channel.close(); connection.close(); } }
- 广播交换机-对应队列1的消费者代码
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
- 广播交换机-对应队列2的消费者代码
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_fanout_queue2"; Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
- 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
标签:fanout,String,exchange,队列,demo,模式,交换机,channel 来源: https://www.cnblogs.com/rbwbear/p/15557669.html