RT-Thread之mqttclient软件包
作者:互联网
关于mqttclient软件包
这是一个基于socket API之上的跨平台MQTT客户端,拥有非常简洁的API接口,以极少的资源实现QOS2的服务质量,并且无缝衔接了mbedtls加密库。
优势:
- 基于标准BSD socket之上开发,只要是兼容BSD socket的系统均可使用。
- 稳定:无论是
掉线重连
,丢包重发
,都是严格遵循MQTT协议标准
执行,除此之外对大数据量的测试无论是收是发,都是非常稳定(一次发送135K
数据,3秒一次),高频测试也是非常稳定(7个主题同时收发,每秒一次,也就是1秒14个mqtt报文,服务质量QoS0、QoS1、QoS2都有)。因为作者以极少的资源设计了记录机制
,对采用QoS1服务质量的报文必须保证到达一次,对QoS2服务质量的报文有且只有收到一次(如果不相信它稳定性的同学可以自己去修改源码,专门为QoS2服务质量去测试,故意不回复PUBREC
,让服务器重发QoS2报文,看看客户端是否有且只有处理一次),而对于掉线重连的稳定性,则是基本操作了,没啥好说的,因此在测试中稳定性极好。 - 轻量级:整个代码工程极其简单,不使用mbedtls情况下,占用资源极少,作者曾使用esp8266模组与云端通信,整个工程代码消耗的RAM不足15k(包括系统占用的开销,对数据的处理开销,而此次还是未优化的情况下)。
- 无缝衔接mbedtls加密传输。
- 拥有极简的API接口,随意配置,使用起来非常简单。
- 有非常好的代码风格与思想:整个代码采用分层式设计,代码实现采用异步处理的思想,降低耦合,提高性能。
- MQTT协议支持主题通配符
“#”、“+”
。 - 订阅的主题与消息处理完全分离,让编程逻辑更加简单易用,用户无需理会错综复杂的逻辑关系。
- 不对外产生依赖。
- mqttclient内部已实现保活处理机制,无需用户过多关心理会,用户只需专心处理应用功能即可。
整体框架
拥有非常明确的分层框架。
API
mqttclient
拥有非常简洁的api
接口,并且api见名知其义,非常易于使用。
int mqtt_init(mqtt_client_t* c, client_init_params_t* init);
int mqtt_release(mqtt_client_t* c);
int mqtt_connect(mqtt_client_t* c);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);
int mqtt_keep_alive(mqtt_client_t* c);
int mqtt_yield(mqtt_client_t* c, int timeout_ms);
核心
mqtt_client_t 是核心结构
typedef struct mqtt_client {
unsigned short packet_id;
unsigned char ping_outstanding;
unsigned char ack_handler_number;
unsigned char *read_buf;
unsigned char *write_buf;
unsigned int cmd_timeout;
unsigned int read_buf_size;
unsigned int write_buf_size;
unsigned int reconnect_try_duration;
void *reconnect_date;
reconnect_handler_t reconnect_handler;
client_state_t client_state;
platform_mutex_t write_lock;
platform_mutex_t global_lock;
list_t msg_handler_list;
list_t ack_handler_list;
network_t *network;
platform_thread_t *thread;
platform_timer_t reconnect_timer;
platform_timer_t last_sent;
platform_timer_t last_received;
connect_params_t *connect_params;
} mqtt_client_t;
该结构主要维护以下内容:
- 读写数据缓冲区
read_buf、write_buf
- 命令超时时间
cmd_timeout
(主要是读写阻塞时间、等待响应的时间、重连等待时间) - 维护
ack
链表ack_handler_list
,这是异步实现的核心,所有等待响应的报文都会被挂载到这个链表上 - 维护消息处理列表
msg_handler_list
,这是mqtt
协议必须实现的内容,所有来自服务器的publish
报文都会被处理(前提是订阅了对应的消息) - 维护一个网卡接口
network
- 维护一个内部线程
thread
,所有来自服务器的mqtt包都会在这里被处理! - 两个定时器,分别是掉线重连定时器与保活定时器
reconnect_timer、last_sent、last_received
- 一些连接的参数
connect_params
初始化
主要是配置mqtt_client_t
结构的相关信息,如果没有指定初始化参数,则系统会提供默认的参数。
但连接部分的参数则必须指定:
init_params.connect_params.network_params.addr = "[你的mqtt服务器IP地址或者是域名]";
init_params.connect_params.network_params.port = "1883"; //端口号
init_params.connect_params.user_name = "jiejietop";
init_params.connect_params.password = "123456";
init_params.connect_params.client_id = "clientid";
mqtt_init(&client, &init_params);
连接服务器
mqtt_connect(&client);
订阅报文
参数只有 mqtt_client_t
类型的指针,字符串类型的主题
(支持通配符),主题的服务质量
,以及收到报文的处理函数
,如不指定则有默认处理函数。
mqtt_subscribe(&client, "testtopic0", QOS0, topic_test1_handler);
mqtt_subscribe(&client, "testtopic1", QOS1, NULL);
mqtt_subscribe(&client, "testtopic2", QOS2, NULL);
发布报文
参数只有 mqtt_client_t
类型的指针,字符串类型的主题
(支持通配符),要发布的消息(包括服务质量
、消息主体
)。
mqtt_message_t msg;
msg.qos = 2;
msg.payload = (void *) buf;
mqtt_publish(&client, "testtopic1", &msg);
其他的API接口都是非常简单的,在后文会提及到。
使用mqttclient软件包
目前作者已经将mqttclient制作成RT-Thread的软件包了,大家可以通过env工具或者 RT-Thread Studio 直接使用软件包。
env工具
随着 package
系统的不断壮大,会有越来越多的软件包加入进来,所以本地看到 menuconfig
中的软件包列表可能会与服务器 不同步 。使用 pkgs --upgrade
命令即可解决该问题,这个命令不仅会对本地的包信息进行更新同步,还会对 env 的功能脚本进行升级,建议定期使用。
本次测试使用野火STM32F429挑战者开发板
- 所以用
pkgs --upgrade
命令先同步一下软件包。 - menuconfig命令打开配置。
- 在以下路径下选中mqttclient软件包,除此之外还要打开lwip、以太网接口 或者 SAL->套接字接口。
Location:
-> RT-Thread online packages
-> IoT - internet of things
-> mqttclient
- 然后就是随意配置了。
mbedtls
默认不打开mbedtls。
salof
salof 全称是:Synchronous Asynchronous Log Output Framework
(同步异步日志输出框架),它是一个异步日志输出库,在空闲时候输出对应的日志信息,并且该库与mqttclient无缝衔接。
配置对应的日志输出级别:
#define BASE_LEVEL (0)
#define ASSERT_LEVEL (BASE_LEVEL + 1) /* 日志输出级别:断言级别(非常高优先级) */
#define ERR_LEVEL (ASSERT_LEVEL + 1) /* 日志输出级别:错误级别(高优先级) */
#define WARN_LEVEL (ERR_LEVEL + 1) /* 日志输出级别:警告级别(中优先级) */
#define INFO_LEVEL (WARN_LEVEL + 1) /* 日志输出级别:信息级别(低优先级) */
#define DEBUG_LEVEL (INFO_LEVEL + 1) /* 日志输出级别:调试级别(更低优先级) */
#define LOG_LEVEL WARN_LEVEL /* 日志输出级别 */
日志其他选项:
- 终端带颜色
- 时间戳
- 标签
mqtt
配置mqtt等待应答列表的最大值,对于qos1 qos2服务质量有要求的可以将其设置大一点,当然也必须资源跟得上,它主要是保证qos1 qos2的mqtt报文能准确到达服务器。
#define MQTT_ACK_HANDLER_NUM_MAX 64
选择MQTT协议的版本,默认为4,表示使用MQTT 3.1.1版本,而3则表示为MQTT 3.1版本。
#define MQTT_VERSION 4 // 4 is mqtt 3.1.1
设置默认的保活时间,它主要是保证MQTT客户端与服务器的保持活性连接,单位为 秒 ,比如MQTT客户端与服务器100S没有发送数据了,有没有接收到数据,此时MQTT客户端会发送一个ping包,确认一下这个会话是否存在,如果收到服务器的应答,那么说明这个会话还是存在的,可以随时收发数据,而如果不存在了,就清除会话。
#define MQTT_KEEP_ALIVE_INTERVAL 100 // unit: second
默认的命令超时,它主要是用于socket读写超时,在MQTT初始化时可以指定:
#define MQTT_DEFAULT_CMD_TIMEOUT 4000
默认主题的长度,主题是支持通配符的,如果主题太长则会被截断:
#define MQTT_TOPIC_LEN_MAX 64
默认的算法数据缓冲区的大小,如果要发送大量数据则修改大一些,在MQTT初始化时可以指定:
#define MQTT_DEFAULT_BUF_SIZE 1024
线程相关的配置,如线程栈,线程优先级,线程时间片等:
在linux环境下可以是不需要理会这些参数的,而在RTOS平台则需要配置,如果不使用mbedtls,线程栈2048字节已足够,而使用mbedtls加密后,需要配置4096字节以上。
#define MQTT_THREAD_STACK_SIZE 2048 // 线程栈
#define MQTT_THREAD_PRIO 5 // 线程优先级
#define MQTT_THREAD_TICK 50 // 线程时间片
默认的重连时间间隔,当发生掉线时,会以这个时间间隔尝试重连:
#define MQTT_RECONNECT_DEFAULT_DURATION 1000
其他不需要怎么配置的东西:
#define MQTT_MAX_PACKET_ID (0xFFFF - 1) // mqtt报文id
#define MQTT_MAX_CMD_TIMEOUT 20000 //最大的命令超时参数
#define MQTT_MIN_CMD_TIMEOUT 1000 //最小的命令超时参数
ps:以上参数基本不需要怎么配置的,直接用即可~
- 最后通过
scons --target=mdk5
命令生成mdk工程,然后编译下载到开发板后运行就行了(需要使用mqttclient测试代码),目前作者提供服务器仅供测试。
RT-Thread Studio使用
- 通过RT-Thread Setting打开lwip、以太网接口然后选择在线软件包添加到工程中,然后保存配置就可以看到工程已经添加了mqttclient软件包了。
**注意:**如果遇到添加软件包失败的话,很可能是因为RT-Thread Studio中的软件包还没更新或者更新失败,那么可以到软件安装目录RT-ThreadStudio\platform\env_released\env\packages\packages
下手动更新软件包,然后将master重置到最新的分支就行了:
mqttclient实现
连接服务器
int mqtt_connect(mqtt_client_t* c);
连接服务器则是使用非异步的方式设计,因为必须等待连接上服务器才能进行下一步操作。
过程如下
- 调用底层的连接函数连接上服务器:
c->network->connect(c->network);
- 序列化
mqtt
的CONNECT
报文并且发送
MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)
mqtt_send_packet(c, len, &connect_timer)
- 等待来自服务器的
CONNACK
报文
mqtt_wait_packet(c, CONNACK, &connect_timer)
- 连接成功后创建一个内部线程
mqtt_yield_thread
platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)
订阅报文
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
订阅报文使用异步设计来实现的:
过程如下:
- 序列化订阅报文并且发送给服务器
MQTTSerialize_subscribe(c->write_buf, c->write_buf_size, 0, mqtt_get_next_packet_id(c), 1, &topic, (int*)&qos)
mqtt_send_packet(c, len, &timer)
- 创建对应的消息处理节点,这个消息节点在收到服务器的
SUBACK
订阅应答报文后会挂载到消息处理列表msg_handler_list
上
mqtt_msg_handler_create(topic_filter, qos, handler)
- 在发送了报文给服务器那就要等待服务器的响应了,记录这个等待
SUBACK
mqtt_ack_list_record(c, SUBACK, mqtt_get_next_packet_id(c), len, msg_handler)
取消订阅
与订阅报文的逻辑基本差不多的~
发布报文
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg)
核心思想都差不多,过程如下:
- 先序列化发布报文,然后发送到服务器
MQTTSerialize_publish(c->write_buf, c->write_buf_size, 0, msg->qos, msg->retained, msg->id,
topic, (unsigned char*)msg->payload, msg->payloadlen);
mqtt_send_packet(c, len, &timer)
- 对于QOS0的逻辑,不做任何处理,对于QOS1和QOS2的报文则需要记录下来,在没收到服务器应答的时候进行重发
if (QOS1 == msg->qos) {
rc = mqtt_ack_list_record(c, PUBACK, mqtt_get_next_packet_id(c), len, NULL);
} else if (QOS2 == msg->qos) {
rc = mqtt_ack_list_record(c, PUBREC, mqtt_get_next_packet_id(c), len, NULL);
}
内部线程
static void mqtt_yield_thread(void *arg)
主要是对mqtt_yield
函数的返回值做处理,比如在disconnect
的时候销毁这个线程。
核心的处理函数
- 数据包的处理
mqtt_packet_handle
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
对不同的包使用不一样的处理:
switch (packet_type) {
case 0: /* timed out reading packet */
break;
case CONNACK:
break;
case PUBACK:
case PUBCOMP:
rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
break;
case SUBACK:
rc = mqtt_suback_packet_handle(c, timer);
break;
case UNSUBACK:
rc = mqtt_unsuback_packet_handle(c, timer);
break;
case PUBLISH:
rc = mqtt_publish_packet_handle(c, timer);
break;
case PUBREC:
case PUBREL:
rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
break;
case PINGRESP:
c->ping_outstanding = 0;
break;
default:
goto exit;
}
并且做保活的处理:
mqtt_keep_alive(c)
ack
链表的扫描,当收到服务器的报文时,对ack列表进行扫描操作
mqtt_ack_list_scan(c);
当超时后就销毁ack链表节点:
mqtt_ack_handler_destroy(ack_handler);
当然下面这几种报文则需要重发操作:(PUBACK 、PUBREC、 PUBREL 、PUBCOMP
,保证QOS1 QOS2的服务质量)
if ((ack_handler->type == PUBACK) || (ack_handler->type == PUBREC) || (ack_handler->type == PUBREL) || (ack_handler->type == PUBCOMP))
mqtt_ack_handler_resend(c, ack_handler);
- 保持活性的时间过去了,可能掉线了,需要重连操作
mqtt_try_reconnect(c);
重连成功后尝试重新订阅报文,保证恢复原始状态~
mqtt_try_resubscribe(c)
发布应答与发布完成报文的处理
static int mqtt_puback_and_pubcomp_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化报文
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
- 取消对应的ack记录
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
订阅应答报文的处理
static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化报文
MQTTDeserialize_suback(&packet_id, 1, &count, (int*)&granted_qos, c->read_buf, c->read_buf_size)
- 取消对应的ack记录
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
- 安装对应的订阅消息处理函数,如果是已存在的则不会安装
mqtt_msg_handlers_install(c, msg_handler);
取消订阅应答报文的处理
static int mqtt_unsuback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化报文
MQTTDeserialize_unsuback(&packet_id, c->read_buf, c->read_buf_size)
- 取消对应的ack记录
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
- 销毁对应的订阅消息处理函数
mqtt_msg_handler_destory(msg_handler);
来自服务器的发布报文的处理
static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化报文
MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->read_buf, c->read_buf_size)
- 对于QOS0、QOS1的报文,直接去处理消息
mqtt_deliver_message(c, &topic_name, &msg);
- 对于QOS1的报文,还需要发送一个
PUBACK
应答报文给服务器
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBACK, 0, msg.id);
- 而对于QOS2的报文则需要发送
PUBREC
报文给服务器,除此之外还需要记录PUBREL
到ack链表上,等待服务器的发布释放报文,最后再去处理这个消息
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBREC, 0, msg.id);
mqtt_ack_list_record(c, PUBREL, msg.id + 1, len, NULL)
mqtt_deliver_message(c, &topic_name, &msg);
说明:一旦注册到ack列表上的报文,当具有重复的报文是不会重新被注册的,它会通过
mqtt_ack_list_node_is_exist
函数判断这个节点是否存在,主要是依赖等待响应的消息类型与msgid。
发布收到与发布释放报文的处理
static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化报文
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
- 产生一个对应的应答报文
mqtt_publish_ack_packet(c, packet_id, packet_type);
- 取消对应的ack记录
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
_杰杰_
博客专家
发布了115 篇原创文章 · 获赞 283 · 访问量 19万+
私信
关注
标签:RT,mqttclient,Thread,ack,报文,packet,mqtt,client,msg 来源: https://blog.csdn.net/jiejiemcu/article/details/104523754