rabbitMQ第二种模型(work quene)工作队列
作者:互联网
也叫任务模型,当消息处理比较耗时的时候,可能生产消息的数度会远远大于消费的速度,常此以往,消息就会堆积越来越多,无法及时处理
此时就可以使用work模型:让多个消费者绑定一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的
默认情况下,rabbitMQ将按顺序将每一个消息发送给下一个使用者。平均而言,每一个消费者都会收到相同数量的消息,这种分发消息的方式称为循环
生产者:
public class Provider2 { public static void main(String[] args) throws IOException { Connection connection = rabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); for(int i=0;i<10;i++) { channel.basicPublish("","work",null,(i+"hello workqueue").getBytes()); } rabbitMQUtils.connectionAndchannelClose(connection,channel); } }
消费者1:
public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = rabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消费者-1"+new String(body)); } }); } }
消费者2:
public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = rabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消费者-2"+new String(body)); } }); } }
能者多劳
public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = rabbitMQUtils.getConnection(); final Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者-1"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = rabbitMQUtils.getConnection(); final Channel channel = connection.createChannel(); channel.basicQos(1);//每一次只消费一个消息 channel.queueDeclare("work",true,false,false,null); //参数2:消息自动确认 true:消费者自动向rabbitMQ确认消息消费 false:不会自动确认消息 channel.basicConsume("work",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消费者-2"+new String(body)); //手动确认 参数1:手动确认消息标识 确认队列中那个具体消息 参数2:false每次确认一个 channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
标签:body,quene,false,String,work,rabbitMQ,public,channel 来源: https://www.cnblogs.com/yz-bky/p/13052275.html