其他分享
首页 > 其他分享> > 04 Work Queues

04 Work Queues

作者:互联网

Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

轮训分发消息

在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。

抽取工具类

/**
 * 连接工厂创建信道的工具类
 */
public class RabbitMQUtils {
    //得到一个连接的channel
    public static Channel getChannel() throws Exception, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP 连接RabbitMQ的队列
        factory.setHost("112.124.22.24");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        return channel;
    }

}

启动两个线程

//这是一个工作线程  相当于消费者
public class Worker01 {

    //队列名称
    public static final String QUEUE_NAME = "hello";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        //消息的接受
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("接收到的消息"+new String(message.getBody()));
        };

        //消息接受被取消时  执行下面的内容
        CancelCallback cancelCallback =consumerTag ->{
            System.out.println(consumerTag+"消息者取消消费接口回调逻辑");
        };
        //消息的接受
        /**
         * 消费者消费信息
         *1.消费哪个队列
         * 2.消费成功之后是否要自动应答  true:自动应答 false  代表手动应答
         * 3.消费者未成功消费的回调
         * 4.消费者取消 消费的 回调
         */
        System.out.println("C2等待接受消息.......");
        /**
         * 发送一个消息
         * 1.发送到哪个交换机
         * 2.路由的key值是哪个  本次是队列的名称
         * 3.其他参数信息
         *4.发送消息的消息体
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}


开启两个线程 C1 C2

启动一个发送线程

//生产者发送大量的消息
public class Task01 {

    //队列名称
    public static final String QUEUE_NAME = "hello";

    //发送大量消息
    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();
        //队列的声明
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否 持久化(磁盘)  默认情况消息存储在内存中
         * 3.该队列是否只供一个消费者进行消费 是否进行消费  是否进行消息共享,true:只能一个消费者消费   false:可以多个消费者消费
         * 4.是否自动删除  最后一个消费者断开连接以后 该队列是否自动删除 true 自动删除  false 不自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台接受信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            /**
             * 发送一个消息
             * 1.发送到哪个交换机
             * 2.路由的key值是哪个  本次是队列的名称
             * 3.其他参数信息
             *4.发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送完毕"+message);

        }
    }
}

结果展示

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息

不公平分发

RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好

比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数 channel.basicQos(1);

在消费者中设置信道

 //设置不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);

意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

预取值

该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认

例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。

设置了预取值之后,分两条给C1 5条给C2不会给C1处理

 //预取值是5
int prefetchCount = 5;
channel.basicQos(prefetchCount);

标签:消费者,04,队列,Queues,Work,线程,消息,public,channel
来源: https://www.cnblogs.com/flypigggg/p/15612951.html