Vertx之MQTT客户端服务端发送
作者:互联网
介绍
Vert.x MQTT消息发送, 客户端和服务端
1. maven项目依赖
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-config-yaml</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.lance.common</groupId>
<artifactId>vertx-common-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
2.YAML文件配置
server:
host: 127.0.0.1
port: 18003
3.MQTT服务端配置
public class MqttServerApp extends AbstractVerticle {
private final static String CLIENT_ID = "clientHello";
@Override
public void start(Promise<Void> startPromise) throws Exception {
ConfigProperties properties = config().mapTo(ConfigProperties.class);
int port = properties.getServer().getPort();
log.info("===>json: {}, port: {}", properties, port);
MqttServer mqttServer = MqttServer.create(vertx, create(properties));
mqttServer.endpointHandler(endpoint -> {
// shows main connect info
log.info("MQTT client [{}] request to connect, clean session = {}", endpoint.clientIdentifier(), endpoint.isCleanSession());
if (endpoint.auth() != null) {
log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword());
}
log.info("[properties = {}]", endpoint.connectProperties());
if (endpoint.will() != null) {
log.info("[will topic: {}, msg: {}, QoS: {}, isRetain: {}]", endpoint.will().getWillTopic(), endpoint.will().getWillMessageBytes(), endpoint.will().getWillQos(), endpoint.will().isWillRetain());
}
log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds());
// accept connection from the remote client
endpoint.accept(true);
receiver(endpoint);
endpoint.disconnectMessageHandler(disconnectMessage -> log.info("Received disconnect from client, reason code = {}", disconnectMessage.code()));
})
.exceptionHandler(t -> log.error("MQTT exception fail: ", t))
.listen(ar -> {
if (ar.succeeded()) {
log.warn("MQTT server is listening on port: {}", ar.result().actualPort());
} else {
log.error("Fail on starting the server: ", ar.cause());
}
});
}
private void receiver(MqttEndpoint endpoint) {
endpoint.publishHandler(p -> {
log.info("Server received message [{}] with QoS [{}]", p.payload().toString(Charset.defaultCharset()), p.qosLevel());
if (p.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(p.messageId());
} else if (p.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(p.messageId());
}
send(endpoint);
})
.publishReleaseHandler(endpoint::publishComplete);
}
private void send(MqttEndpoint endpoint) {
Buffer payload = Buffer.buffer("server: hello world.");
endpoint.publish(MqttClientApp.MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
if (s.succeeded()) {
log.info("===>Server publish success: {}", s.result());
} else {
log.error("===>Server publish fail: ", s.cause());
}
});
}
private MqttServerOptions create(ConfigProperties configProperties) {
MqttServerOptions options = new MqttServerOptions();
options.setPort(configProperties.getServer().getPort());
options.setHost(configProperties.getServer().getHost());
return options;
}
}
4.MQTT客户端配置
public class MqttClientApp extends AbstractVerticle {
public static final String MQTT_TOPIC = "hello_topic";
@Override
public void start() {
MqttClient client = MqttClient.create(vertx, create());
// handler will be called when we have a message in topic we subscribe for
client.publishHandler(p -> {
log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel());
});
client.connect(18003, "127.0.0.1", s -> {
if (s.succeeded()) {
log.info("Client connect success.");
subscribe(client);
} else {
log.error("Client connect fail: ", s.cause());
}
}).exceptionHandler(event -> {
log.error("client fail: ", event.getCause());
});
}
private void subscribe(MqttClient client) {
client.subscribe(MQTT_TOPIC, 0, e -> {
if (e.succeeded()) {
log.info("===>subscribe success: {}", e.result());
vertx.setPeriodic(10_000, l -> publish(client));
} else {
log.error("===>subscribe fail: ", e.cause());
}
});
}
private void publish(MqttClient client) {
Buffer payload = Buffer.buffer("client: hello world.");
client.publish(MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> {
if (s.succeeded()) {
log.info("===>Client publish success: {}", s.result());
} else {
log.error("===>Client publish fail: ", s.cause());
}
});
}
private MqttClientOptions create() {
MqttClientOptions options = new MqttClientOptions();
options.setClientId("ClientId_" + RandomStringUtils.randomAlphanumeric(6));
options.setMaxMessageSize(100_000_000);
options.setKeepAliveInterval(2);
return options;
}
}
5. 结果
2022-01-25 19:06:53.244 WARN 21 --- [ntloop-thread-1] lver.dns.DnsServerAddressStreamProviders---[ 70] : Can not find io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS.
2022-01-25 19:06:53.291 INFO 21 --- [ntloop-thread-1] io.vertx.mqtt.impl.MqttClientImpl ---[ ] : Connection with 127.0.0.1:18003 established successfully
2022-01-25 19:06:53.432 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 34] : Client connect success.
2022-01-25 19:06:53.512 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 47] : ===>subscribe success: 1
2022-01-25 19:07:03.537 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 2
2022-01-25 19:07:03.551 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:13.518 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 3
2022-01-25 19:07:13.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE]
2022-01-25 19:07:23.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 4
6.项目完整地址
标签:info,endpoint,log,---,MQTT,client,服务端,Vertx 来源: https://blog.csdn.net/li6151770/article/details/122690873