RabbitMQ: Java code example
作者:互联网
pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>io.veer</groupId> <artifactId>rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>17</java.version> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.8.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.36</version> </dependency> </dependencies> </project>
Provider:
package io.veer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.util.concurrent.TimeoutException; public class TestProvider{ public static void main(String[] args) throws TimeoutException, IOException{ // 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.8.105"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("venal"); // 创建connection Connection connection = connectionFactory.newConnection(); // 通过connection创建channel Channel channel = connection.createChannel(); /** * 通道声明消息队列, 如果队列已经存在, 在queueDeclare必须和已存在的queue完全一致 * param1: 队列名, 不存在自动创建 * param2: durable 是否持久化队列, 队列已存在为durable, 则设为false报错, durable仅保证queue重启不丢失, 若要保证数据也不丢失, 发布消息时设置MessageProperties.PERSISTENT_TEXT_PLAIN * param3: exclusive 是否独占队列 * param4: autoDelete 是否在消费完成自动删除队列, 生产者 & 消费者queueDeclare声明必须相同, 消费者消费完线程退出, autoDelete才会生效 * param5: 额外参数 * 生产者 & 消费者 queueDeclare必须完全相同 */ channel.queueDeclare("veneer", true, false, false, null); /** * param1: exchange name "" 表示 default exchange * param2: queue name * param3: extra config * param4: message body 二进制流 */ for(int i = 0; i < 5; i++){ // 向不存在的queue, 发布消息, 则消息被忽略 // channel绑定了VirtualHost, 向不同VirtualHost发布消息, 则消息被忽略. 必须向绑定的VirtualHost发布消息 channel.basicPublish("", "veneer", MessageProperties.PERSISTENT_TEXT_PLAIN, "venal rabbitmq".getBytes()); } channel.close(); connection.close(); } }
Consumer:
package io.veer; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class TestConsumer{ public static void main(String[] args) throws IOException, TimeoutException{ // 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.8.107"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("venal"); // 创建connection Connection connection = connectionFactory.newConnection(); // 通过connection创建channel Channel channel = connection.createChannel(); /** * 如果通道声明的queue, 则queue必须和已存在的queue完全一致 * queueDeclare声明的queue和basicConsume消费的queue可不相同 * 声明不存在的queue, 会自动创建queue */ channel.queueDeclare("ruzz", false, false, false, null); /** * param1: 消费的queue name, queue不存在, 报404异常, no queue 'veneer' in vhost 'venal' * param2: autoAck * param3: 回调接口 */ channel.basicConsume("veneer", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ try{ Thread.sleep(500); // 每秒消费一次 }catch(InterruptedException e){ throw new RuntimeException(e); } System.out.println("\033[37;7m" + String.format("Consumer Message: %s, Thread: %s", new String(body, StandardCharsets.UTF_8), Thread.currentThread()) + "\033[0m"); } }); /** * 都未关闭, 一直阻塞消费 * connection未关闭, channel.close() 消费一次, 阻塞 * channel未关闭, connection.close() 不消费, 直接退出 * 都关闭, 不消费, 直接退出 */ // channel.close(); // 关闭后, 阻塞, 但是无法获取message, 默认一直运行接收message // connection.close(); // 多线程, 关闭后程序会结束 // 创建了新线程消费, 会先打印 System.out.println("\033[37;7m" + Thread.currentThread() + "\033[0m"); } }
RabbitMQ工具类:
package io.veer.util; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtil{ private static final ConnectionFactory connectionFactory; static{ connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.8.105"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("venal"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); } public static Connection getConnection(){ try{ return connectionFactory.newConnection(); }catch(RuntimeException | IOException | TimeoutException e){ e.printStackTrace(); } return null; } public static void closeConnectionAndChannel(Connection connection, Channel channel){ try{ channel.close(); connection.close(); // 会自动关闭queueDeclare的通道 }catch(IOException | TimeoutException e){ throw new RuntimeException(e); } } }
Worker:
Provider
package io.veer.workerqueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Objects; public class Provider{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); channel.queueDeclare("worker", true, false, false, null); for(int b = 0; b < 100; b++){ channel.basicPublish("", "worker", MessageProperties.PERSISTENT_TEXT_PLAIN, (b + " worker queue").getBytes(StandardCharsets.UTF_8)); } RabbitMQUtil.closeConnectionAndChannel(connection, channel); } }
Consumer
package io.veer.workerqueue; import com.rabbitmq.client.*; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.util.Objects; public class Consumer1{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); channel.queueDeclare("worker", true, false, false, null); channel.basicQos(1); // 一次只接受一条未确认消息, 否则接受全部消息, 清空server的queue // 关闭自动确认, 没有确认不能接受下一条message, Unacked + 1 channel.basicConsume("worker", false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ try{ System.out.println("\033[37;7m" + new String(body) + "\033[0m"); channel.basicAck(envelope.getDeliveryTag(), false); // false: 每次确认一条 Thread.sleep(1000); }catch(Exception e){ throw new RuntimeException(e); } } }); } }
package io.veer.workerqueue; import com.rabbitmq.client.*; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.util.Objects; public class Consumer2{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); channel.queueDeclare("worker", true, false, false, null); channel.basicQos(1); channel.basicConsume("worker", false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ try{ System.out.println("\033[37;7m" + new String(body) + "\033[0m"); channel.basicAck(envelope.getDeliveryTag(), false); Thread.sleep(500); }catch(IOException | InterruptedException e){ throw new RuntimeException(e); } } }); } }
Exchange Fanout 没有routing key:
provider:
package io.veer.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Objects; public class Provider{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); /** * 声明通道为exchange * param1: exchange name * param1: exchange type */ channel.exchangeDeclare("log_fanout", "fanout"); /** * 发送消息 * param1: exchange name * param2: routingKey fanout类型中无意义 */ channel.basicPublish("log_fanout", "", null, "exchange: logs, type: fanout".getBytes(StandardCharsets.UTF_8)); RabbitMQUtil.closeConnectionAndChannel(connection, channel); } }
Consumer
package io.veer.fanout; import com.rabbitmq.client.*; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Objects; public class Consumer{ public static void main(String[] args) throws IOException{ for(int i = 0; i < 5; i++){ new Thread(() -> { try{ new Consumer().consume(); }catch(IOException e){ throw new RuntimeException(e); } }).start(); } } public void consume() throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); // 通道声明为exchange channel.exchangeDeclare("log_fanout", "fanout"); // 临时队列 String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "log_fanout", ""); channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ String format = String.format("%s %s", Thread.currentThread(), new String(body, StandardCharsets.UTF_8)); System.out.println("\033[37;7m" + format + "\033[0m"); } }); } }
Exchange Direct 通过routing key转发至不同queue
provider:
package io.veer.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Objects; public class Provider{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); channel.exchangeDeclare("log_direct", "direct"); String routingKey = "error"; String body = String.format("routingKey: %s, %s", routingKey, Thread.currentThread().getName()); channel.basicPublish("log_direct", routingKey, null, body.getBytes(StandardCharsets.UTF_8)); RabbitMQUtil.closeConnectionAndChannel(connection, channel); } }
consumer
package io.veer.direct; import com.rabbitmq.client.*; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.util.Objects; public class Consumer1{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); String exchangeName = "log_direct"; channel.exchangeDeclare(exchangeName, "direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, exchangeName, "info"); channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ System.out.println("\033[37;7m" + String.format("Consumer1 %s", new String(body)) + "\033[0m"); } }); } }
package io.veer.direct; import com.rabbitmq.client.*; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.util.Objects; public class Consumer2{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); String exchangeName = "log_direct"; channel.exchangeDeclare(exchangeName, "direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, exchangeName, "info"); channel.queueBind(queue, exchangeName, "error"); channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ System.out.println("\033[37;7m" + String.format("Consumer2 %s", new String(body)) + "\033[0m"); } }); } }
Exchange Topic routing key使用通配符 * #:
provider
package io.veer.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Objects; public class Provider{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); String exchangeName = "topic"; channel.exchangeDeclare(exchangeName, "topic"); String routingKey = "user.insert.ruzz"; channel.basicPublish(exchangeName, routingKey, null, String.format("Provider routingKey: %s", routingKey).getBytes(StandardCharsets.UTF_8)); RabbitMQUtil.closeConnectionAndChannel(connection, channel); } }
Consumer
package io.veer.topic; import com.rabbitmq.client.*; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.util.Objects; public class Consumer1{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); String exchangeName = "topic"; channel.exchangeDeclare(exchangeName, "topic"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, exchangeName, "user.*"); channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ System.out.println("\033[37;7m" + new String(body) + "\033[0m"); } }); } }
package io.veer.topic; import com.rabbitmq.client.*; import io.veer.util.RabbitMQUtil; import java.io.IOException; import java.util.Objects; public class Consumer2{ public static void main(String[] args) throws IOException{ Connection connection = RabbitMQUtil.getConnection(); Channel channel = Objects.requireNonNull(connection).createChannel(); String exchangeName = "topic"; channel.exchangeDeclare(exchangeName, "topic"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, exchangeName, "user.#"); channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ System.out.println("\033[37;7m" + new String(body) + "\033[0m"); } }); } }
Springboot:
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置
spring: application: name: rabbitmq rabbitmq: host: 192.168.8.105 port: 5672 virtual-host: veil username: veil password: veil
测试类:
package io.veer.rabbit; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; @SpringBootTest(classes = RabbitApplication.class) public class TestRabbitMQ{ @Resource private RabbitTemplate rabbitTemplate; @Test public void tun(){ rabbitTemplate.convertAndSend("tun", "tun"); } @Test public void worker(){ for(int i = 0; i < 10; i++){ rabbitTemplate.convertAndSend("worker", "worker " + i); } } @Test public void fanout(){ rabbitTemplate.convertAndSend("fanout", "", "fanout"); } @Test public void direct(){ rabbitTemplate.convertAndSend("direct", "error", "redict"); } @Test public void topic(){ rabbitTemplate.convertAndSend("topic", "user.save.insert", "topic"); } }
consumer类:
package io.veer.rabbit.consumer; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queuesToDeclare = @Queue(value = "tun", durable = "false", autoDelete = "true")) public class Tun{ @RabbitHandler public void tun(String message){ System.out.println("\033[37;7m" + message + "\033[0m"); } }
package io.veer.rabbit.consumer; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Worker{ @RabbitListener(queuesToDeclare = @Queue(value = "worker")) // 直接加到方法上, 代替RabbitHandler public void worker1(String message){ System.out.println("\033[37;7m" + "worker1: " + message + "\033[0m"); } @RabbitListener(queuesToDeclare = @Queue(value = "worker")) public void worker2(String message){ System.out.println("\033[37;7m" + "worker2: " + message + "\033[0m"); } }
package io.veer.rabbit.consumer; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Fanout{ @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "fanout", type = "fanout"))}) public void fanout1(String message){ System.out.println("\033[37;7m" + message + "\033[0m"); } @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "fanout", type = "fanout"))}) public void fanout2(String message){ System.out.println("\033[37;7m" + message + "\033[0m"); } }
package io.veer.rabbit.consumer; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Direct{ @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "direct", type = "direct"), key = {"info"})}) public void direct1(String message){ System.out.println("\033[37;7m" + message + "\033[0m"); } @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "direct", type = "direct"), key = {"info", "error"})}) public void direct2(String message){ System.out.println("\033[37;7m" + message + "\033[0m"); } }
package io.veer.rabbit.consumer; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Topic{ @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.*"})}) public void topic1(String msg){ System.out.println("\033[37;7m" + msg + "\033[0m"); } @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.#"})}) public void topic2(String msg){ System.out.println("\033[37;7m" + msg + "\033[0m"); } }
标签:code,Java,String,public,IOException,io,import,example,channel 来源: https://www.cnblogs.com/dissipate/p/16248492.html