RabbitMQ-Work Queues
作者:互联网
工作队列(Work Queues)
场景:假设生产者向队列中添加一条数据的时间为1秒,消费者从队列中消费一条数据执行完业务逻辑需要5秒,在这种情况下队列就会不断堆积最终导致服务瘫痪。
解决方案:运行多个消费者,同时消费队列中的任务
生产者
定义一个task_Queue队列;1秒向队列中发送一条消息
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://admin:admin@192.168.65.133:5672"),
AutomaticRecoveryEnabled = true
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,//可持久化
exclusive: false,
autoDelete: false,
arguments: null);
int count = 0;
while (true)
{
count++;
var message = $"Task {count}";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine("Send {0}", message);
//暂停一秒
Task.Delay(1000).Wait();
}
}
消费者
定义一个task_Queue队列;5秒消费一条消息
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://admin:admin@192.168.65.133:5672"),
AutomaticRecoveryEnabled = true
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,//可持久化
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine("Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received {0}", message);
Task.Delay(5000).Wait(); //等待5秒
Console.WriteLine("Task Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动确认
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,//关闭自动确认
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
测试
启动5个消费者
可视化界面中查看连接数为5个
启动1个生产者
可视化界面连接多了1个生产者
测试结果
5个消费者一起消费队列中的任务。
运行一段时间后 rabbitmq 队列中 也没有过多的堆积任务
公众号同步更新
标签:false,队列,Queues,Work,RabbitMQ,queue,var,Console,channel 来源: https://www.cnblogs.com/xianchengzhang/p/15971370.html