其他分享
首页 > 其他分享> > Vertx之MQTT客户端服务端发送

Vertx之MQTT客户端服务端发送

作者:互联网

介绍

Vert.x MQTT消息发送, 客户端和服务端

1. maven项目依赖

<dependencies>
	<dependency>
		<groupId>io.vertx</groupId>
		<artifactId>vertx-web</artifactId>
	</dependency>
	<dependency>
		<groupId>io.vertx</groupId>
		<artifactId>vertx-mqtt</artifactId>
	</dependency>
	<dependency>
		<groupId>io.vertx</groupId>
		<artifactId>vertx-config-yaml</artifactId>
	</dependency>
	<dependency>
		<groupId>com.fasterxml.jackson.core</groupId>
		<artifactId>jackson-databind</artifactId>
	</dependency>
	<dependency>
		<groupId>com.lance.common</groupId>
		<artifactId>vertx-common-core</artifactId>
		<version>0.0.1-SNAPSHOT</version>
	</dependency>
</dependencies>

2.YAML文件配置

server:
  host: 127.0.0.1
  port: 18003

3.MQTT服务端配置

public class MqttServerApp extends AbstractVerticle {
  private final static String CLIENT_ID = "clientHello";

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    ConfigProperties properties = config().mapTo(ConfigProperties.class);
    int port = properties.getServer().getPort();
    log.info("===>json: {}, port: {}", properties, port);

    MqttServer mqttServer = MqttServer.create(vertx, create(properties));
    mqttServer.endpointHandler(endpoint -> {
          // shows main connect info
          log.info("MQTT client [{}] request to connect, clean session = {}", endpoint.clientIdentifier(), endpoint.isCleanSession());
          if (endpoint.auth() != null) {
            log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword());
          }

          log.info("[properties = {}]", endpoint.connectProperties());
          if (endpoint.will() != null) {
            log.info("[will topic: {}, msg: {}, QoS: {}, isRetain: {}]", endpoint.will().getWillTopic(), endpoint.will().getWillMessageBytes(), endpoint.will().getWillQos(), endpoint.will().isWillRetain());
          }

          log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds());
          // accept connection from the remote client
          endpoint.accept(true);
          receiver(endpoint);
          endpoint.disconnectMessageHandler(disconnectMessage -> log.info("Received disconnect from client, reason code = {}", disconnectMessage.code()));
        })
        .exceptionHandler(t -> log.error("MQTT exception fail: ", t))
        .listen(ar -> {
          if (ar.succeeded()) {
            log.warn("MQTT server is listening on port: {}", ar.result().actualPort());
          } else {
            log.error("Fail on starting the server: ", ar.cause());
          }
        });

  }

  private void receiver(MqttEndpoint endpoint) {
    endpoint.publishHandler(p -> {
          log.info("Server received message [{}] with QoS [{}]", p.payload().toString(Charset.defaultCharset()), p.qosLevel());
          if (p.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
            endpoint.publishAcknowledge(p.messageId());
          } else if (p.qosLevel() == MqttQoS.EXACTLY_ONCE) {
            endpoint.publishReceived(p.messageId());
          }
          send(endpoint);
        })
        .publishReleaseHandler(endpoint::publishComplete);
  }

  private void send(MqttEndpoint endpoint) {
    Buffer payload = Buffer.buffer("server: hello world.");
    endpoint.publish(MqttClientApp.MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
      if (s.succeeded()) {
        log.info("===>Server publish success: {}", s.result());
      } else {
        log.error("===>Server publish fail: ", s.cause());
      }
    });
  }

  private MqttServerOptions create(ConfigProperties configProperties) {
    MqttServerOptions options = new MqttServerOptions();
    options.setPort(configProperties.getServer().getPort());
    options.setHost(configProperties.getServer().getHost());
    return options;
  }
}

4.MQTT客户端配置

public class MqttClientApp extends AbstractVerticle {
  public static final String MQTT_TOPIC = "hello_topic";

  @Override
  public void start() {
    MqttClient client = MqttClient.create(vertx, create());

    // handler will be called when we have a message in topic we subscribe for
    client.publishHandler(p -> {
      log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
    });

    client.connect(18003, "127.0.0.1", s -> {
      if (s.succeeded()) {
        log.info("Client connect success.");
        subscribe(client);
      } else {
        log.error("Client connect fail: ", s.cause());
      }
    }).exceptionHandler(event -> {
      log.error("client fail: ", event.getCause());
    });
  }

  private void subscribe(MqttClient client) {
    client.subscribe(MQTT_TOPIC, 0, e -> {
      if (e.succeeded()) {
        log.info("===>subscribe success: {}", e.result());
        vertx.setPeriodic(10_000, l -> publish(client));
      } else {
        log.error("===>subscribe fail: ", e.cause());
      }
    });
  }

  private void publish(MqttClient client) {
    Buffer payload = Buffer.buffer("client: hello world.");
    client.publish(MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
      if (s.succeeded()) {
        log.info("===>Client publish success: {}", s.result());
      } else {
        log.error("===>Client publish fail: ", s.cause());
      }
    });
  }

  private MqttClientOptions create() {
    MqttClientOptions options = new MqttClientOptions();
    options.setClientId("ClientId_" + RandomStringUtils.randomAlphanumeric(6));
    options.setMaxMessageSize(100_000_000);
    options.setKeepAliveInterval(2);
    return options;
  }
}

5. 结果

2022-01-25 19:06:53.244  WARN 21 --- [ntloop-thread-1] lver.dns.DnsServerAddressStreamProviders---[  70] : Can not find io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS.
2022-01-25 19:06:53.291  INFO 21 --- [ntloop-thread-1] io.vertx.mqtt.impl.MqttClientImpl       ---[    ] : Connection with 127.0.0.1:18003 established successfully
2022-01-25 19:06:53.432  INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp            ---[  34] : Client connect success.
2022-01-25 19:06:53.512  INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp            ---[  47] : ===>subscribe success: 1
2022-01-25 19:07:03.537  INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp            ---[  59] : ===>publish success: 2
2022-01-25 19:07:03.551  INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp            ---[  29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:13.518  INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp            ---[  59] : ===>publish success: 3
2022-01-25 19:07:13.521  INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp            ---[  29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:23.521  INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp            ---[  59] : ===>publish success: 4

6.项目完整地址

Vertx之MQTT客户端服务端发送 Github 地址

Vertx之MQTT客户端服务端发送 Gitee 地址

标签:info,endpoint,log,---,MQTT,client,服务端,Vertx
来源: https://blog.csdn.net/li6151770/article/details/122690873