RabbitMQ Publish/Subscribe 发布订阅
作者:互联网
RabbitMQ 官网
将它收到的所有消息广播到它知道的所有队列
1、发布/订阅--fanout类型--生产者
using RabbitMQ.Client;
using System;
using System.Text;
namespace PublishSubscribeSendConsole
{
class Program
{
/// <summary>
/// 发布/订阅--fanout类型--生产者
/// </summary>
/// <param name="args"></param>
public static void Main(string[] args)
{
///交换机名称
string exchangeName = "exchange_fanout";
///1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
HostName = "127.0.0.1",//rabbitmq ip
Port = 5672,//端口号
UserName = "guest",//用户名
Password = "guest",//密码
//VirtualHost="/tj"//虚拟主机
};
///2. 创建连接对象
using (IConnection connection = factory.CreateConnection())
{
///3. 创建连接会话对象,信道
using (IModel channel = connection.CreateModel())
{
///4. 绑定交换机
channel.ExchangeDeclare(
exchange: exchangeName, //交换机名称
type: ExchangeType.Fanout //交换机类型,fanout广播
);
for (int i = 0; i < 2; i++)
{
///消息内容
string message = $"Hello World {i} !!!";
///5. 发送消息
channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: Encoding.UTF8.GetBytes(message));
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
2、发布/订阅--fanout类型--消费者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace PublishSubscribeReceiveFirstConsole
{
class Program
{
/// <summary>
/// 发布/订阅--fanout类型--消费者
/// </summary>
/// <param name="args"></param>
public static void Main(string[] args)
{
///交换机名称
string exchangeName = "exchange_fanout";
///1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
HostName = "127.0.0.1",
Port = 5672,//端口号
UserName = "guest",//用户名
Password = "guest",//密码
//VirtualHost = "/tj"
};
///2. 创建连接对象
using (var connection = factory.CreateConnection())
{
///3. 创建连接会话对象
using (var channel = connection.CreateModel())
{
///4. 绑定交换机
channel.ExchangeDeclare(
exchange: exchangeName, //交换机名称
type: ExchangeType.Fanout //交换机类型,fanout广播
);
///队列名称,排他队列
string queueName = channel.QueueDeclare().QueueName;
///队列和交换机绑定
channel.QueueBind(
queue: queueName, //队列名称
exchange: exchangeName,//交换机名称
routingKey: ""
);
///事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
///接收到消息事件
consumer.Received += (model, ea) =>
{
///打印消息
Console.WriteLine(" [x] Received {0}", Encoding.UTF8.GetString(ea.Body.ToArray()));
};
///消费消息
channel.BasicConsume(
queueName, //队列名称
true, //自动确认
consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}
3、发布/订阅--fanout类型--消费者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace PublishSubscribeReceiveSecondConsole
{
class Program
{
/// <summary>
/// 发布/订阅--fanout类型--消费者
/// </summary>
/// <param name="args"></param>
public static void Main(string[] args)
{
///交换机名称
string exchangeName = "exchange_fanout";
///1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
HostName = "127.0.0.1",
Port = 5672,//端口号
UserName = "guest",//用户名
Password = "guest",//密码
//VirtualHost = "/tj"
};
///2. 创建连接对象
using (var connection = factory.CreateConnection())
{
///3. 创建连接会话对象
using (var channel = connection.CreateModel())
{
///4. 绑定交换机
channel.ExchangeDeclare(
exchange: exchangeName, //交换机名称
type: ExchangeType.Fanout //交换机类型,fanout广播
);
///队列名称,排他队列
string queueName = channel.QueueDeclare().QueueName;
///队列和交换机绑定
channel.QueueBind(
queue: queueName, //队列名称
exchange: exchangeName,//交换机名称
routingKey: ""
);
///事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
///接收到消息事件
consumer.Received += (model, ea) =>
{
///打印消息
Console.WriteLine(" [x] Received {0}", Encoding.UTF8.GetString(ea.Body.ToArray()));
};
///消费消息
channel.BasicConsume(
queueName, //队列名称
true, //自动确认
consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
}
*
*
*
标签:Console,fanout,--,RabbitMQ,Subscribe,交换机,Publish,using,channel 来源: https://blog.csdn.net/KingCruel/article/details/121469229