编程语言
首页 > 编程语言> > RabbitMQ: Java code example

RabbitMQ: Java code example

作者:互联网

 

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>io.veer</groupId>
  <artifactId>rabbitmq</artifactId>
  <version>1.0-SNAPSHOT</version>


  <properties>
    <java.version>17</java.version>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-api</artifactId>
      <version>5.8.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.14.2</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.36</version>
    </dependency>
  </dependencies>

</project>

 

Provider:

package io.veer;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestProvider{
  public static void main(String[] args) throws TimeoutException, IOException{
    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.8.105");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("venal");

    // 创建connection
    Connection connection = connectionFactory.newConnection();

    // 通过connection创建channel
    Channel channel = connection.createChannel();

    /**
     * 通道声明消息队列, 如果队列已经存在, 在queueDeclare必须和已存在的queue完全一致
     * param1: 队列名, 不存在自动创建
     * param2: durable 是否持久化队列, 队列已存在为durable, 则设为false报错, durable仅保证queue重启不丢失, 若要保证数据也不丢失, 发布消息时设置MessageProperties.PERSISTENT_TEXT_PLAIN
     * param3: exclusive 是否独占队列
     * param4: autoDelete 是否在消费完成自动删除队列, 生产者 & 消费者queueDeclare声明必须相同, 消费者消费完线程退出, autoDelete才会生效
     * param5: 额外参数
     * 生产者 & 消费者 queueDeclare必须完全相同
     */
    channel.queueDeclare("veneer", true, false, false, null);

    /**
     * param1: exchange name  "" 表示 default exchange
     * param2: queue name
     * param3: extra config
     * param4: message body 二进制流
     */
    for(int i = 0; i < 5; i++){
      // 向不存在的queue, 发布消息, 则消息被忽略
      // channel绑定了VirtualHost, 向不同VirtualHost发布消息, 则消息被忽略. 必须向绑定的VirtualHost发布消息
      channel.basicPublish("", "veneer", MessageProperties.PERSISTENT_TEXT_PLAIN, "venal rabbitmq".getBytes());
    }

    channel.close();
    connection.close();
  }
}

 

Consumer:

package io.veer;


import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class TestConsumer{
  public static void main(String[] args) throws IOException, TimeoutException{
    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.8.107");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("venal");

    // 创建connection
    Connection connection = connectionFactory.newConnection();

    // 通过connection创建channel
    Channel channel = connection.createChannel();

    /**
     * 如果通道声明的queue, 则queue必须和已存在的queue完全一致
     * queueDeclare声明的queue和basicConsume消费的queue可不相同
     * 声明不存在的queue, 会自动创建queue
     */
    channel.queueDeclare("ruzz", false, false, false, null);

    /**
     * param1: 消费的queue name, queue不存在, 报404异常, no queue 'veneer' in vhost 'venal'
     * param2: autoAck
     * param3: 回调接口
     */
    channel.basicConsume("veneer", true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        try{
          Thread.sleep(500);  // 每秒消费一次
        }catch(InterruptedException e){
          throw new RuntimeException(e);
        }
        System.out.println("\033[37;7m" + String.format("Consumer  Message: %s, Thread: %s", new String(body, StandardCharsets.UTF_8), Thread.currentThread()) + "\033[0m");
      }
    });

    /**
     * 都未关闭, 一直阻塞消费
     * connection未关闭, channel.close() 消费一次, 阻塞
     * channel未关闭, connection.close() 不消费, 直接退出
     * 都关闭, 不消费, 直接退出
     */
    // channel.close();  // 关闭后, 阻塞, 但是无法获取message, 默认一直运行接收message
    // connection.close();  // 多线程, 关闭后程序会结束

    // 创建了新线程消费, 会先打印
    System.out.println("\033[37;7m" + Thread.currentThread() + "\033[0m");
  }

}

 

RabbitMQ工具类:

package io.veer.util;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtil{
  private static final ConnectionFactory connectionFactory;

  static{
    connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.8.105");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("venal");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
  }

  public static Connection getConnection(){
    try{
      return connectionFactory.newConnection();
    }catch(RuntimeException | IOException | TimeoutException e){
      e.printStackTrace();
    }
    return null;
  }

  public static void closeConnectionAndChannel(Connection connection, Channel channel){
    try{
      channel.close();
      connection.close();  // 会自动关闭queueDeclare的通道
    }catch(IOException | TimeoutException e){
      throw new RuntimeException(e);
    }
  }
}

 

Worker:

Provider

package io.veer.workerqueue;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Provider{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();
    channel.queueDeclare("worker", true, false, false, null);

    for(int b = 0; b < 100; b++){
      channel.basicPublish("", "worker", MessageProperties.PERSISTENT_TEXT_PLAIN, (b + " worker queue").getBytes(StandardCharsets.UTF_8));
    }

    RabbitMQUtil.closeConnectionAndChannel(connection, channel);
  }
}

Consumer

package io.veer.workerqueue;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer1{
  public static void main(String[] args) throws IOException{

    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();
    channel.queueDeclare("worker", true, false, false, null);
    channel.basicQos(1);  // 一次只接受一条未确认消息, 否则接受全部消息, 清空server的queue
    // 关闭自动确认, 没有确认不能接受下一条message, Unacked + 1
    channel.basicConsume("worker", false, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        try{
          System.out.println("\033[37;7m" + new String(body) + "\033[0m");
          channel.basicAck(envelope.getDeliveryTag(), false);  // false: 每次确认一条
          Thread.sleep(1000);
        }catch(Exception e){
          throw new RuntimeException(e);
        }
      }
    });
  }

}
package io.veer.workerqueue;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer2{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();
    channel.queueDeclare("worker", true, false, false, null);
    channel.basicQos(1);
    channel.basicConsume("worker", false, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        try{
          System.out.println("\033[37;7m" + new String(body) + "\033[0m");
          channel.basicAck(envelope.getDeliveryTag(), false);
          Thread.sleep(500);
        }catch(IOException | InterruptedException e){
          throw new RuntimeException(e);
        }
      }
    });
  }
}

 

Exchange Fanout 没有routing key:

provider:

package io.veer.fanout;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Provider{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();

    Channel channel = Objects.requireNonNull(connection).createChannel();

    /**
     * 声明通道为exchange
     * param1: exchange name
     * param1: exchange type
     */
    channel.exchangeDeclare("log_fanout", "fanout");

    /**
     * 发送消息
     * param1: exchange name
     * param2: routingKey fanout类型中无意义
     */
    channel.basicPublish("log_fanout", "", null, "exchange: logs, type: fanout".getBytes(StandardCharsets.UTF_8));

    RabbitMQUtil.closeConnectionAndChannel(connection, channel);
  }
}

Consumer

package io.veer.fanout;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Consumer{
  public static void main(String[] args) throws IOException{
    for(int i = 0; i < 5; i++){
      new Thread(() -> {
        try{
          new Consumer().consume();
        }catch(IOException e){
          throw new RuntimeException(e);
        }
      }).start();
    }
  }

  public void consume() throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    // 通道声明为exchange
    channel.exchangeDeclare("log_fanout", "fanout");

    // 临时队列
    String queue = channel.queueDeclare().getQueue();

    channel.queueBind(queue, "log_fanout", "");

    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        String format = String.format("%s %s", Thread.currentThread(), new String(body, StandardCharsets.UTF_8));
        System.out.println("\033[37;7m" + format + "\033[0m");
      }
    });
  }
}

 

Exchange Direct 通过routing key转发至不同queue

provider:

package io.veer.direct;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Provider{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();
    channel.exchangeDeclare("log_direct", "direct");

    String routingKey = "error";
    String body = String.format("routingKey: %s, %s", routingKey, Thread.currentThread().getName());
    channel.basicPublish("log_direct", routingKey, null, body.getBytes(StandardCharsets.UTF_8));

    RabbitMQUtil.closeConnectionAndChannel(connection, channel);
  }
}

consumer

package io.veer.direct;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer1{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "log_direct";

    channel.exchangeDeclare(exchangeName, "direct");

    String queue = channel.queueDeclare().getQueue();
    channel.queueBind(queue, exchangeName, "info");

    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println("\033[37;7m" + String.format("Consumer1 %s", new String(body)) + "\033[0m");
      }
    });
  }
}
package io.veer.direct;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer2{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "log_direct";
    channel.exchangeDeclare(exchangeName, "direct");

    String queue = channel.queueDeclare().getQueue();

    channel.queueBind(queue, exchangeName, "info");
    channel.queueBind(queue, exchangeName, "error");

    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println("\033[37;7m" + String.format("Consumer2 %s", new String(body)) + "\033[0m");
      }
    });
  }
}

 

Exchange Topic routing key使用通配符 * #:

provider

package io.veer.topic;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Provider{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "topic";
    channel.exchangeDeclare(exchangeName, "topic");

    String routingKey = "user.insert.ruzz";

    channel.basicPublish(exchangeName, routingKey, null, String.format("Provider routingKey: %s", routingKey).getBytes(StandardCharsets.UTF_8));

    RabbitMQUtil.closeConnectionAndChannel(connection, channel);
  }
}

Consumer

package io.veer.topic;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer1{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "topic";
    channel.exchangeDeclare(exchangeName, "topic");

    String queue = channel.queueDeclare().getQueue();
    channel.queueBind(queue, exchangeName, "user.*");
    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println("\033[37;7m" + new String(body) + "\033[0m");
      }
    });
  }
}
package io.veer.topic;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer2{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "topic";
    channel.exchangeDeclare(exchangeName, "topic");

    String queue = channel.queueDeclare().getQueue();

    channel.queueBind(queue, exchangeName, "user.#");

    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println("\033[37;7m" + new String(body) + "\033[0m");
      }
    });
  }
}

 

 

Springboot:

依赖

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 192.168.8.105
    port: 5672
    virtual-host: veil
    username: veil
    password: veil

 

测试类:

package io.veer.rabbit;


import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest(classes = RabbitApplication.class)
public class TestRabbitMQ{
  @Resource
  private RabbitTemplate rabbitTemplate;

  @Test
  public void tun(){
    rabbitTemplate.convertAndSend("tun", "tun");
  }

  @Test
  public void worker(){
    for(int i = 0; i < 10; i++){
      rabbitTemplate.convertAndSend("worker", "worker " + i);
    }
  }

  @Test
  public void fanout(){
    rabbitTemplate.convertAndSend("fanout", "", "fanout");
  }

  @Test
  public void direct(){
    rabbitTemplate.convertAndSend("direct", "error", "redict");
  }

  @Test
  public void topic(){
    rabbitTemplate.convertAndSend("topic", "user.save.insert", "topic");
  }
}

 

consumer类:

package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "tun", durable = "false", autoDelete = "true"))
public class Tun{
  @RabbitHandler
  public void tun(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }
}
package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Worker{
  @RabbitListener(queuesToDeclare = @Queue(value = "worker"))  // 直接加到方法上, 代替RabbitHandler
  public void worker1(String message){
    System.out.println("\033[37;7m" + "worker1: " + message + "\033[0m");
  }

  @RabbitListener(queuesToDeclare = @Queue(value = "worker"))
  public void worker2(String message){
    System.out.println("\033[37;7m" + "worker2: " + message + "\033[0m");
  }
}
package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Fanout{
  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "fanout", type = "fanout"))})
  public void fanout1(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }

  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "fanout", type = "fanout"))})
  public void fanout2(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }
}
package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Direct{
  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "direct", type = "direct"), key = {"info"})})
  public void direct1(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }

  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "direct", type = "direct"), key = {"info", "error"})})
  public void direct2(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }
}
package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Topic{
  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.*"})})
  public void topic1(String msg){
    System.out.println("\033[37;7m" + msg + "\033[0m");
  }

  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.#"})})
  public void topic2(String msg){
    System.out.println("\033[37;7m" + msg + "\033[0m");
  }
}

 

标签:code,Java,String,public,IOException,io,import,example,channel
来源: https://www.cnblogs.com/dissipate/p/16248492.html