分类: 其他平台
2020-09-12 10:50:59
MQTT简介:
MQTT 3.1.1协议:
MQTT.fx工具操作参考流程:
https://blog.csdn.net/qq_45097019/article/details/107418853
C-SDK-V3.0.1 Linux操作参考流程:
https://blog.csdn.net/qq_45097019/article/details/107449929
1. 业务流程
1.1. CONNECT
connect业务流程如上所示:
1) 客户端发送CONNECT包至服务端进行会话连接,等待服务端返回CONNACK包
2) 服务端返回CONNACK包
1.2. SUBSCRIBE
sub业务流程如上所示:
1) 客户端发送SUBSCRIBE包至服务器订阅主题,等待服务器端返回SUBACK包
2) 服务端返回SUBACK包
3) 当服务端有主题的信息更新时,发送PUBLISH包给客户端
1.3. PUBLISH QoS0
publish QoS0业务流程如上所示:
1) 客户端发送PUBLISH包给服务端
1.4. PUBLISH Qos1
publish QoS1业务流程如上所示:
1) 客户端发送PUSBLISH包给服务端,等待服务端返回PUBACK包
2) 服务端返回PUBACK包
3) 如果服务端返回PUBACK包超时,则客户端重新发送PUSBLISH包
2. 主线程流程(main)
1) 设置默认的事件处理函数为example_event_handle
2) IOT_MQTT_Construct:初始化MQTT协议栈并进行会话连接
3) example_subscribe:订阅一个主题的例程
4) example_publish:对一个主题发布消息更新的例程
5) IOT_MQTT_Yield:MQTT空闲调度,用于存活检测与服务器发送的包处理
3. Connect流程(IOT_MQTT_Construct)
1) 获取设备的product_key,device_name,device_secret
2) IOT_Sign_MQTT:生成MQTT的基础连接配置信息,通过设备的product_key,device_name,device_secret生成clientid,password,hostname和username,以及根据加密配置选择端口号
3) 根据product_key和device_name生成device_id
4) 根据用户配置设置存活检测间隔,读写缓冲区大小等参数
5) wrapper_mqtt_init:建立会话结构体,配置基础参数,设置网络接口
6) wrapper_mqtt_connect:建立网络的连接并进行会话的连接
7) iotx_mqtt_report_funcs:发送设备的系统和固件版本信息给服务器
2.1.wrapper_mqtt_init
1) 分配一个iotx_mc_client_t结构体作为会话对象
2) iotx_mc_init:会话的参数配置及网络接口配置
2.2.iotx_mc_init
1) 初始化会话的资源锁
2) 配置会话的参数,包括协议版本号,存活检测间隔时间,clientID,username等参数
3) iotx_net_init:网络的接口配置,host和port的设置,read,write,connect,disconnect等通用网络接口的配置
2.3.wrapper_mqtt_connect
1) 进行网络的连接,网络连接成功则进行会话的连接
2) _mqtt_connect:会话的连接
2.4._mqtt_connect
1) MQTTConnect:发送CONNECT包至服务器
2) iotx_mc_wait_CONNACK:等待服务器的CONNACK回包
3) 设置keepalive_probes存活检测计数器为0
4) 设置client_state会话状态为IOTX_MC_STATE_CONNECTED已连接状态
2.5.MQTTConnect
1) MQTTSerialize_connect:根据会话的连接信息进行CONNECT包的组包
2) iotx_mc_send_packet:将组好的包通过网络接口发送
2.6.MQTTSerialize_connect
CONNECT包的定义如下:
byte |
bit7 |
bit6 |
bit5 |
bit4 |
bit3 |
bit2 |
bit1 |
bit0 |
固定 包头 |
0 |
0 |
0 |
0 |
1 |
0 |
0 |
0 |
0 |
|
包类型(CONNECT) |
保留 |
||||||||
1-N |
X |
X |
X |
X |
X |
X |
X |
X |
|
剩余长度 |
|||||||||
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
可变 包头 |
协议名称长度MSB |
|||||||||
1 |
0 |
0 |
0 |
0 |
0 |
1 |
0 |
0 |
|
协议名称长度LSB |
|||||||||
2 |
0 |
1 |
0 |
0 |
1 |
1 |
0 |
1 |
|
协议名称’M’ |
|||||||||
3 |
0 |
1 |
0 |
1 |
0 |
0 |
0 |
1 |
|
协议名称’Q’ |
|||||||||
4 |
0 |
1 |
0 |
1 |
0 |
1 |
0 |
0 |
|
协议名称’T’ |
|||||||||
5 |
0 |
1 |
0 |
1 |
0 |
1 |
0 |
0 |
|
协议名称’T’ |
|||||||||
6 |
0 |
0 |
0 |
0 |
0 |
1 |
0 |
0 |
|
协议版本号 |
|||||||||
7 |
X |
X |
X |
X |
X |
X |
X |
0 |
|
UserName |
Password |
Will Retain |
Will Qos |
Will Flag |
Clean Session |
保留 |
|||
8 |
X |
X |
X |
X |
X |
X |
X |
X |
|
KeepAlive MSB |
|||||||||
9 |
X |
X |
X |
X |
X |
X |
X |
X |
|
KeepAlive LSB |
|||||||||
0-M |
X |
X |
X |
X |
X |
X |
X |
X |
负载 |
Client Identifier UTF-8字符串 |
|||||||||
M-J |
X |
X |
X |
X |
X |
X |
X |
X |
|
Will Topic UTF-8字符串(如果设置了WillFlag) |
|||||||||
J-K |
X |
X |
X |
X |
X |
X |
X |
X |
|
Will Message UTF-8字符串(如果设置了WillFlag) |
|||||||||
K-L |
X |
X |
X |
X |
X |
X |
X |
X |
|
User Name UTF-8字符串(如果设置了UserName) |
|||||||||
L-S |
X |
X |
X |
X |
X |
X |
X |
X |
|
Password UTF-8字符串(如果设置了Password) |
1) 根据flag和对应的字符串计算可变包头加负载需要的字节数,并转换为固定包头中剩余字节需要的字节数及值
2) 根据包类型设置写缓冲区中固定包头的header部分
3) 根据可变包头和负载的长度设置写缓冲区中固定包头的剩余字节部分
4) 根据使用的MQTT版本(当前为V3.1.1)将协议名称长度(4)设置写缓冲区中可变包头的协议名称长度部分
5) 根据使用的MQTT版本(当前为V3.1.1)将协议名称(“MQTT”)设置写缓冲区中可变包头的协议名称部分
6) 根据使用的MQTT版本(当前为V3.1.1)将协议版本号(4)设置写缓冲区中可变包头的协议版本号部分
7) 根据会话的flag设置写缓冲区中可变包头的flag部分
8) 根据会话的KeepAlive值设置写缓冲区中可变包头的KeepAlive部分
9) 根据会话的ClientId字符串根据UTF-8格式设置写缓冲区中负载的Client Identifier部分
10) 如果设置了WillFlag,则根据会话的WillTopic字符串根据UTF-8格式设置写缓冲区中负载的WillTopic部分
11) 如果设置了WillFlag,则根据会话的WillMessage字符串根据UTF-8格式设置写缓冲区中负载的WillMessage部分
12) 如果设置了UserName,则根据会话的UserName字符串根据UTF-8格式设置写缓冲区中负载的UserName部分
13) 如果设置了Password,则根据会话的Password字符串根据UTF-8格式设置写缓冲区中负载的Password部分
2.7.iotx_mc_wait_CONNACK
1) iotx_mc_read_packet:从网络接口中读取一个包
2) 判断包是否为CONNACK包,如果不是并且未超时则继续读取
3) iotx_mc_handle_recv_CONNACK:处理CONNACK包
2.8.iotx_mc_read_packet
1) 读取1个字节至会话读缓冲区中,获取packet_type
2) iotx_mc_decode_packet:读取剩余字节字段并对剩余字节字段解码
3) MQTTPacket_encode:将剩余字节数读取至会话读缓冲区中
4) 如果剩余数据的长度大于会话读缓冲区,则申请一个临时缓冲区,读取剩余数据至临时缓冲区中后马上释放,将包的类型设置为MQTT_CPT_RESERVED,运行系统的默认处理函数后返回
5) 如果剩余数据的长度小于等于读缓冲区,则读取剩余数据至会话读缓冲区中,设置传入参数packettype为包的类型后返回
2.9.iotx_mc_handle_recv_CONNACK
CONNACK包的定义如下:
byte |
bit7 |
bit6 |
bit5 |
bit4 |
bit3 |
bit2 |
bit1 |
bit0 |
固定 包头 |
0 |
0 |
0 |
1 |
0 |
0 |
0 |
0 |
0 |
|
包类型(CONNACK) |
保留 |
||||||||
1 |
0 |
0 |
0 |
0 |
0 |
0 |
1 |
0 |
|
剩余长度,CONNACK中固定为2 |
|||||||||
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
X |
可变包头 |
保留 |
SessionPresent |
||||||||
1 |
X |
X |
X |
X |
X |
X |
X |
X |
|
CONNECT返回值 |
1) MQTTDeserialize_connack:对会话读缓冲区中的内容进行CONNACK协议解包,获取返回类型
2) 返回CONNACK的返回类型
2.10.iotx_mqtt_report_funcs
1) iotx_set_report_func:设置上报函数,DEMO中设置使用IOT_MQTT_Publish_Simple函数
2) iotx_report_devinfo:上报设备的系统信息
3) iotx_report_firmware_version:上报设备的固件版本信息
4. Subscribe例程流程(example_subscribe):
1) 生成订阅的主题字符串
2) IOT_MQTT_Subscribe:根据主题字符串进行订阅并挂载订阅处理函数example_message_arrive
3.1.IOT_MQTT_Subscribe
1) 检查主题和订阅处理函数是否为空,为空则返回异常
2) wrapper_mqtt_subscribe:根据主题字符串进行订阅并挂载订阅处理函数
3.2.wrapper_mqtt_subscribe
1) iotx_mc_get_next_packetid:获取会话的packet_id号,每获取一次进行自增
2) iotx_mc_check_topic:检查主题的字符串格式是否合法
3) MQTTSubscribe:根据主题字符串进行订阅并挂载订阅处理函数
3.3.MQTTSubscribe
1) 在主题字符串中寻找+或者#字符,以判断主题字符串为名字类型还是通配符类型,如果为名字类型,则可以对主题字符串进行压缩处理
2) 申请一个iotx_mc_topic_handle_t结构实体用于挂载订阅处理函数和传入参数
3) MQTTSerialize_subscribe:根据订阅参数生成SUBSCRIBE包
4) iotx_mc_send_packet:发送SUBSCRIBE包
5) 检查list_sub_handle订阅列表上是否有和该订阅相同主题,处理函数的项,没有则将该节点挂载至订阅列表上
3.4.MQTTSerialize_subscribe
SUBSCRIBE包的定义如下:
byte |
bit7 |
bit6 |
bit5 |
bit4 |
bit3 |
bit2 |
bit1 |
bit0 |
固定 包头 |
0 |
1 |
0 |
0 |
0 |
0 |
0 |
1 |
0 |
|
包类型(SUBSCRIBE) |
保留 |
||||||||
1-N |
X |
X |
X |
X |
X |
X |
X |
X |
|
剩余长度 |
|||||||||
0 |
X |
X |
X |
X |
X |
X |
X |
X |
可变 包头 |
Packet Identifier MSB |
|||||||||
1 |
X |
X |
X |
X |
X |
X |
X |
X |
|
Packet Identifier LSB |
|||||||||
0 |
X |
X |
X |
X |
X |
X |
X |
X |
负载 |
主题字符串长度MSB |
|||||||||
1 |
X |
X |
X |
X |
X |
X |
X |
X |
|
主题字符串长度LSB |
|||||||||
2-N |
X |
X |
X |
X |
X |
X |
X |
X |
|
主题字符串 |
|||||||||
N+1 |
0 |
0 |
0 |
0 |
0 |
0 |
X |
X |
|
保留 |
QoS |
l 包中主题字符串为UTF-8格式,UTF-8格式的字符串存储为2+N的结构,前面的2字节用于存储字符串的长度,后面的N为最大65536长度的字符串
l 包中主题可以有多个,每个主题都需要主题字符串长度,主题字符串,QoS三个部分
1) 根据主题字符串的长度计算可变包头加负载的字节数,并转换为固定包头中剩余字节需要的字节数及值
2) 根据包类型设置写缓冲区中固定包头的header部分
3) 根据可变包头和负载的长度设置写缓冲区中固定包头的剩余字节部分
4) 根据packetid设置写缓冲区中可变包头的Packet Identifier部分
5) 根据主题的数量设置写缓冲区中负载的主题部分
5. Publish例程流程(example_publish):
1) 生成更新的主题字符串
2) 生成更新的主题负载信息
3) IOT_MQTT_Publish_Simple:发布主题的信息更新
4.1.IOT_MQTT_Publish_Simple
1) 构建发送的信息包
2) wrapper_mqtt_publish:发布主题的信息更新
4.2.wrapper_mqtt_publish
1) iotx_mc_check_topic:检查主题有效性
2) 如果QoS大于0,则通过iotx_mc_get_next_packetid生成packetid号
3) MQTTPublish:发布主题的信息更新
4.3.MQTTPublish
1) MQTTSerialize_publish:根据主题和负载信息进行PUBLISH包打包
2) iotx_mc_push_pubInfo_to:如果QOS大于0,则将PUBLISH包的内容加入至PUSBLISH ACK等待队列中
3) iotx_mc_send_packet:发送PUSBLISH包
4.4.MQTTSerialize_publish
PUSBLISH包的定义如下:
byte |
bit7 |
bit6 |
bit5 |
bit4 |
bit3 |
bit2 |
bit1 |
bit0 |
固定 包头 |
0 |
0 |
0 |
1 |
1 |
X |
X |
X |
X |
|
包类型(PUBLISH) |
Dup Flag |
QoS |
RETAIN |
||||||
1 |
X |
X |
X |
X |
X |
X |
X |
X |
|
剩余字节 |
|||||||||
0 |
X |
X |
X |
X |
X |
X |
X |
X |
可变 包头 |
主题字符串长度MSB |
|||||||||
1 |
X |
X |
X |
X |
X |
X |
X |
X |
|
主题字符串长度LSB |
|||||||||
2-N |
X |
X |
X |
X |
X |
X |
X |
X |
|
主题字符串 |
|||||||||
N+1 |
X |
X |
X |
X |
X |
X |
X |
X |
|
Packet Identifier MSB(QoS大于0) |
|||||||||
N+2 |
X |
X |
X |
X |
X |
X |
X |
X |
|
Packet Identifier LSB(QoS大于0) |
|||||||||
0-N |
X |
X |
X |
X |
X |
X |
X |
X |
负载 |
负载 |
1) 根据QoS等级,主题字符串的长度计算可变包头加负载的字节数,并转换为固定包头中剩余字节需要的字节数及值
2) 根据包类型设置写缓冲区中固定包头的header部分
3) 根据可变包头和负载的长度设置写缓冲区中固定包头的剩余字节部分
4) 根据主题设置写缓冲区中可变包头的主题部分
5) 根据QoS等级决定是否将packetId设置写缓冲区中可变包头的Packet Identifier部分
6) 根据负载的内容设置写缓冲区中负载的部分
4.5.iotx_mc_push_pubInfo_to
1) 申请iotx_mc_pub_info_t数据结构体加上会话写缓冲区有效内容的空间
2) 拷贝会话写缓冲区中的有效内容至刚申请的对应空间中
3) 设置节点的node_state状态为IOTX_MC_NODE_STATE_NORMANL
4) 将节点挂载至list_pub_wait_ack队列上
6. 空闲处理流程(IOT_MQTT_Yield)
1) wrapper_mqtt_yield:空闲调度,用于存活检测与服务器发送的包处理
5.1.wrapper_mqtt_yield
1) iotx_mc_keepalive:存活检测
2) _mqtt_cycle:服务器发送的包处理
5.2.iotx_mc_keepalive
1) iotx_mc_keepalive_sub:存活检测
2) iotx_mc_handle_reconnect:会话重新连接
5.3.iotx_mc_keepalive_sub
1) 检查是否到达下一次存活检测时间,没到则返回
2) MQTTKeepalive:发送PINGREQ包用于向服务器通知设备存活
3) keepalive_probes存活计数器自增
5.4.MQTTKeepalive
1) MQTTSerialize_pingreq:打包PINGREQ包
2) iotx_mc_send_packet:发送PINGREQ包
5.5.MQTTSerialize_pingreq
1) MQTTSerialize_zero:打包PINGREQ包
5.6.MQTTSerialize_zero
PINGREQ包的定义如下
byte |
bit7 |
bit6 |
bit5 |
bit4 |
bit3 |
bit2 |
bit1 |
bit0 |
固定 包头 |
0 |
1 |
1 |
0 |
0 |
0 |
0 |
0 |
0 |
|
包类型(PINGREQ) |
保留 |
||||||||
1 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
|
剩余长度 |
1) 根据包类型设置写缓冲区中固定包头的header部分
2) 根据可变包头和负载的长度设置写缓冲区中固定包头的剩余字节部分
5.7._mqtt_cycle
1) 在cycle限制时间内一直读取包
2) iotx_mc_cycle:读取并处理服务器发送的包
3) MQTTPubInfoProc:检查PUBLISH ACK是否收到,收到则从队列中移除,超时则进行PUBLISH包的重发
5.8.iotx_mc_cycle
1) 存活检测计数器超过上限,则设置会话状态为断开连接状态,用于后续重连检测
2) iotx_mc_read_packet:从网络中读取1个包
3) 如果从服务器收到任何包,则清空存活检测计数器
4) 根据收到的包类型进行特定处理
5) iotx_mc_handle_recv_PUBACK:收到PUBACK包的处理
6) iotx_mc_handle_recv_SUBACK:收到SUBACK包的处理
7) iotx_mc_handle_recv_PUBLISH:收到PUBLISH包的处理
5.9.iotx_mc_handle_recv_PUBACK
1) MQTTDeserialize_ack:对PUBACK包进行解包以获得packetid
2) iotx_mc_mask_pubInfo_from:PUSBLISH ACK队列的节点匹配
3) 如有系统默认处理函数,则进行调用
5.10.MQTTDeserialize_ack
PUBACK包的定义如下:
byte |
bit7 |
bit6 |
bit5 |
bit4 |
bit3 |
bit2 |
bit1 |
bit0 |
固定 包头 |
0 |
1 |
1 |
0 |
0 |
0 |
0 |
0 |
0 |
|
包类型(PINGREQ) |
保留 |
||||||||
1 |
0 |
0 |
0 |
0 |
0 |
0 |
1 |
0 |
|
剩余字节 |
|||||||||
0 |
X |
X |
X |
X |
X |
X |
X |
X |
可变 包头 |
Packet Identifier MSB |
|||||||||
1 |
X |
X |
X |
X |
X |
X |
X |
X |
|
Packet Identifier LSB |
1) 对会话读缓冲区中的内容进行PUBACK协议解包,获取packetid
5.11.iotx_mc_mask_pubInfo_from
1) 历编list_pub_wait_ack列表中的节点匹配packetid,并将其节点的状态标识设置为IOTX_MC_NODE_STATE_INVALID说明收到了pusblish ack包
5.12.iotx_mc_handle_recv_SUBACK
1) MQTTDeserialize_suback:对SUBACK包进行解包
2) 遍历订阅结果,判断其订阅是否失败,如果值为0x80表示失败
3) 如有系统默认处理函数,则进行调用
5.13.MQTTDeserialize_suback
SUBACK包的定义如下:
byte |
bit7 |
bit6 |
bit5 |
bit4 |
bit3 |
bit2 |
bit1 |
bit0 |
固定 包头 |
0 |
1 |
0 |
0 |
1 |
0 |
0 |
0 |
0 |
|
包类型(SUBACK) |
保留 |
||||||||
1 |
X |
X |
X |
X |
X |
X |
X |
X |
|
剩余长度 |
|||||||||
0 |
X |
X |
X |
X |
X |
X |
X |
X |
可变 包头 |
Packet Identifier MSB |
|||||||||
1 |
X |
X |
X |
X |
X |
X |
X |
X |
|
Packet Identifier LSB |
|||||||||
0 |
X |
0 |
0 |
0 |
0 |
0 |
X |
X |
负载 |
订阅返回值 |
|||||||||
N |
X |
0 |
0 |
0 |
0 |
0 |
X |
X |
|
订阅返回值 |
l 订阅返回值可以有多个,其数量需要根据剩余长度减去可变包头后算出
1) 从会话读缓冲区中获得header字节的type字段,移动缓冲区指针,并判断其是否为SUBACK包
2) 从会话读缓冲区中获得剩余字节数,移动缓冲区指针
3) 从会话读缓冲区中获取packetid号,移动缓冲区指针
4) 根据剩余长度减去packetid长度推算订阅结果的数量,并从会话读缓冲区中获取其值
5.14.iotx_mc_handle_recv_PUBLISH
1) MQTTDeserialize_publish:对PUSBLISH包进行解析
2) iotx_mc_deliver_message:将PUBLISH包中的主题与list_sub_handle订阅队列中的主题进行匹配,匹配成功则调用其处理函数
3) 检测QOS,如果大于0,则使用MQTTPuback进行回包处理
5.15.MQTTDeserialize_publish
PUSBLISH包的定义如上不再重复
1) 从会话读缓冲区中获得header字节,并解析其type,dup,qos,retain字段,然后移动缓冲区指针,并判断其是否为PUBLISH包
2) 从会话读缓冲区中获得剩余字节数,移动缓冲区指针
3) 从会话读缓冲区中读取主题字符串,方法为为先读取头2字节获取字符串的长度,再根据其长度读取字符串,然后移动缓冲区指针
4) 如果QoS大于0,则从会话读缓冲区中获得packetid,移动缓冲区指针
5) 通过读缓冲区当前地址指针和起始地址指针计算负载的长度
6) 设置负载的数据指针为读缓冲区当前地址指针,从而获得负载的数据
5.16.iotx_mc_deliver_message
1) 遍历订阅的主题队列list_sub_handle,通过传入的topicName参数与列表节点的topic_filter进行名字或者通配符匹配
2) 如果匹配成功则调用节点的处理函数,匹配失败则调用系统默认的处理函数
5.17.MQTTPubInfoProc
1) 遍历list_pub_wait_ack队列
2) 如果节点状态为IOTX_MC_NODE_STATE_INVALID则认为对应的PUSBLISH包发送成功,移除该节点
3) 如果超时没收到PUSBLISH ACK,则重新发送PUBLISH包