添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接


目录

​前言​

​MQTT 协议简介​

​为何选择 MQTT​

​MQTT 通讯运作方式​

​MQTT 协议帧格式​

​MQTT服务器搭建和使用 ​

​公共MQTT 测试服务器​

​MQTT服务器搭建​

​各种MQTT代理服务程序比较​

​Mosquitto安装​

​MQTT使用方法​

​测试MQTT服务器​

​程序中使用MQTT​


前言

MQTT 协议简介

MQTT(Message Queuing Telemetry Transport),是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。

MQTT是专门针对物联网开发的轻量级传输协议。MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化,使得其能适应各种物联网应用场景。

为何选择 MQTT

低协议开销
MQTT 的独特之处在于,它的每消息标题可以短至 2 个字节。MQ 和 HTTP 都拥有高得多的每消息开销。对于 HTTP,为每个新请求消息重新建立 HTTP 连接会导致重大的开销。MQ 和 MQTT 所使用的永久连接显著减少了这一开销。

对不稳定网络的容忍
MQTT 和 MQ 能够从断开等故障中恢复,而且没有进一步的代码需求。但是,HTTP 无法原生地实现此目的,需要客户端重试编码,这可能增加幂等性问题。

低功耗
MQTT 是专门针对低功耗目标而设计的。HTTP 的设计没有考虑此因素,因此增加了功耗。

数百万个连接的客户端
在 HTTP 堆栈上,维护数百万个并发连接,需要做许多的工作来提供支持。尽管可以实现此支持,但大多数商业产品都为处理这一数量级的永久连接而进行了优化。IBM 提供了 IBM MessageSight,这是一个单机架装载服务器,经过测试能处理多达 100 万个通过 MQTT 并发连接的设备。相反,MQ 不是为大量并发客户端而设计的。

推送通知
您需要能够及时地将通知传递给客户。为此,必须采用某种定期轮询或推送方法;从电池、系统负载和带宽角度讲,推送是最佳解决方案。

我们的企业可能需要在没有第三方中介的情况下发送敏感的信息。这降低了特定于操作系统的解决方案(比如 Apple iOS、Google Play 通知)作为主要传输机制的价值。

HTTP 只允许使用一种称为COMET 的方法,使用持久的 HTTP 请求来执行推送。从客户端和服务器的角度讲,此方法都很昂贵。MQ 和 MQTT 都支持推送,这是它们的一个基本特性。

客户端平台差异
HTTP 和 MQTT 客户端都已在大量平台上实现。MQTT 的简单性有助于以极少的精力在额外的客户端上实现 MQTT。

防火墙容错
一些企业防火墙将出站连接限制到一些已定义的端口。这些端口通常被限制为 HTTP(80 端口)、HTTPS(443 端口)等。HTTP 显然可以在这些情况下运行。MQTT 可​​ ​封装​ ​在一个 WebSockets 连接中,显示为一个 HTTP 升级请求,从而允许在这些情况下运行。MQ 不允许采用这种模式。

事实上,MQTT的应用非常之广泛,几乎现在随便找一家大型的硬件、互联网企业,都可以找到MQTT的身影,例如Facebook、BP、alibaba、baidu等等

【MQTT】MQTT简介+安装+使用python MQTT客户端_服务质量

MQTT 通讯运作方式

使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;

1.在MQTT通讯过程中,有三种身份,分别是发布者(publisher)、代理(broker)、和订阅者(subscriber)。

2.MQTT传输的消息分为:主题(topic)和负载(payload)两部分:

客户端向代理发布topic的消息到代理(MQTT服务程序),代理将该消息推送到所有订阅该topic的客户端。

(发布消息的客户端就是发布者,订阅topic消息的客户端就是订阅者)

【MQTT】MQTT简介+安装+使用python MQTT客户端_网络_02

MQTT 协议帧格式

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。

协议帧 = 固定头部+可变头部+消息体

  1. 固定头部

固定头和可变头一般调用接口函数自动完成拼装,并不需要我们手动写,只做了解。

固定头部(2Byte)

Byte1

Byte2

Bit7

Bit6

Bit5

Bit4

Bit3

Bit2

Bit1

Bit0

Bit0~ Bit7

Message type

UDP flag

Qos level

RETAIN

Remaining Length

【MQTT】MQTT简介+安装+使用python MQTT客户端_服务器_03

Message type(报文类型)有如下种类:

CONNECT//请求连接

CONNACK//请求应答

PUBLISH//发布消息

PUBACK//发布应答

PUBREC//发布已接收,保证传递1

PUBREL//发布释放,保证传递2

PUBCOMP//发布完成,保证传递3

SUBSCRIBE//订阅请求

SUBACK//订阅应答

UNSUBSCRIBE//取消订阅

UNSUBACK//取消订阅应答

PINGREQ//ping请求

PINGRESP//ping响应

DISCONNECT//断开连接

  1. 可变头部

固定头和可变头一般调用接口函数自动完成拼装,并不需要我们手动写,只做了解。

可变头的内容因数据包类型而不同,较常的应用是作为包的标识,很多类型数据包中都包括一个2字节的数据包标识字段。

【MQTT】MQTT简介+安装+使用python MQTT客户端_服务器_04

【MQTT】MQTT简介+安装+使用python MQTT客户端_网络_05

  1. 消息体

包含CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息。

【MQTT】MQTT简介+安装+使用python MQTT客户端_网络_06

  1. MQTT协议规定的方法

(重要,无论在那个框架里这是通用的)

(1)Connect    与服务器建立连接。
(2)Disconnect    与服务器断开TCP/IP会话。
(3)Subscribe    订阅。
(4)UnSubscribe    取消订阅。
(5)Publish    发送消息请求,发送完成后返回应用程序线程。

  1. MQTT支持三种消息发布服务质量(QoS)

对消息中间件,估计大家不得不关心的就是消息的可靠性,也就是消息的发布服务质量,可喜的是,MQTT支持三种消息发布服务质量(QoS):

“至多一次”(QoS==0),消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。

“至少一次”(QoS==1),确保消息到达,但消息重复可能会发生。

“只有一次”(QoS==2),确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。

通过设置协议帧中的“可变头部”中的Qos位指定

【MQTT】MQTT简介+安装+使用python MQTT客户端_网络_07

MQTT服务器搭建和使用

公共MQTT 测试服务器

MQTT是成熟的物联网协议,网上已经公开很多公用的MQTT代理/服务器 供开发人员测试,其中常用的一个就是:

地址:mq.tongxinmao.com

端口:18831

用MQTT客户端连上以上服务器,就可以发布和订阅MQTT消息。

MQTT服务器搭建

各种MQTT代理服务程序比较

备注:

截至2018-12-18
QoS 0:服务质量 0,最多传输一次。
QoS 1:服务质量1,至少传输一次。
QoS 2:服务质量2,仅仅传输一次。
auth:验证,身份验证授权。
bridge:桥接,服务器代理之间连接
$SYS:主题过滤器通配符,订阅后能够接收到所有以此通配符开头的主题的消息。
dynamic topics:动态主题
cluster:集群

Server

QoS 0

QoS 1

QoS 2

auth

bridge

$SYS

SSL

dynamic topics

cluster

websockets

plugin system

2lemetry

§

Apache ActiveMQ

Apache ActiveMQ Artemis

Bevywise IoT Platform

rm

rm

emitter

§

emqttd

flespi

GnatMQ

HBMQTT

HiveMQ

IBM MessageSight

§

JoramMQ

Mongoose

?

?

?

?

?

?

?

?

?

moquette

?

?

?

rm

mosca

?

?

?

?

mosquitto

§

MQTT.js

§

MqttWk

?

RabbitMQ

?

?

?

RSMB

?

Software AG Universal Messaging

rm

Solace

§

SwiftMQ

Trafero Tstack

VerneMQ

WebSphere MQ

?

?

?

说明:✔表示支持,✘表示不支持,?表示未知,§表示支持但有限制,rm表示(roadmap)路线图规划中也就是计划支持。

(本次选择相对简单的 Mosquitto举例子)

Mosquitto安装

Mosquitto是一个实现了MQTT3.1协议的代理服务器,由MQTT协议创始人之一的Andy Stanford-Clark开发,它为我们提供了非常棒的轻量级数据交换的解决方案。

在Linux系统上安装Mosquitto,建议使用源码安装模式,最新的源码可从http://mosquitto.org/files/source/地址中获取。解压之后,我们可以在源码目录里面找到主要的配置文件config.mk,其中包含了所有Mosquitto的安装选项,详细的参数说明如下:

C++ Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47


# 是否支持tcpd/libwrap功能.

#WITH_WRAP:=yes



# 是否开启SSL/TLS支持

#WITH_TLS:=yes



# 是否开启TLS/PSK支持

#WITH_TLS_PSK:=yes



# Comment out to disable client client threading support.

#WITH_THREADING:=yes



# 是否使用严格的协议版本(老版本兼容会有点问题)

#WITH_STRICT_PROTOCOL:=yes



# 是否开启桥接模式

#WITH_BRIDGE:=yes



# 是否开启持久化功能

#WITH_PERSISTENCE:=yes



# 是否监控运行状态

#WITH_MEMORY_TRACKING:=yes

这里需要注意的是,默认情况下Mosquitto的安装需要OpenSSL的支持;如果不需要SSL,则需要关闭config.mk里面的某些与SSL功能有关的选项(WITH_TLS、WITH_TLS_PSK)。接着,就是运行make install进行安装,完成之后会在系统命令行里发现mosquitto、mosquitto_passwd、mosquitto_pub和mosquitto_sub四个工具(截图如下),分别用于启动代理、管理密码、发布消息和订阅消息。

【MQTT】MQTT简介+安装+使用python MQTT客户端_网络_08

配置&运行

安装完成之后,所有配置文件会被放置于/etc/mosquitto/目录下,其中最重要的就是Mosquitto的配置文件,即mosquitto.conf,以下是详细的配置参数说明。

C++ Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353


# =================================================================

# General configuration

# =================================================================



# 客户端心跳的间隔时间

#retry_interval 20



# 系统状态的刷新时间

#sys_interval 10



# 系统资源的回收时间,0表示尽快处理

#store_clean_interval 10



# 服务进程的PID

#pid_file /var/run/mosquitto.pid



# 服务进程的系统用户

#user mosquitto



# 客户端心跳消息的最大并发数

#max_inflight_messages 10



# 客户端心跳消息缓存队列

#max_queued_messages 100



# 用于设置客户端长连接的过期时间,默认永不过期

#persistent_client_expiration



# =================================================================

# Default listener

# =================================================================



# 服务绑定的IP地址

#bind_address



# 服务绑定的端口号

#port 1883



# 允许的最大连接数,-1表示没有限制

#max_connections -1



# cafile:CA证书文件

# capath:CA证书目录

# certfile:PEM证书文件

# keyfile:PEM密钥文件

#cafile

#capath

#certfile

#keyfile



# 必须提供证书以保证数据安全性

#require_certificate false



# 若require_certificate值为true,use_identity_as_username也必须为true

#use_identity_as_username false



# 启用PSK(Pre-shared-key)支持

#psk_hint



# SSL/TSL加密算法,可以使用“openssl ciphers”命令获取

# as the output of that command.

#ciphers



# =================================================================

# Persistence

# =================================================================



# 消息自动保存的间隔时间

#autosave_interval 1800



# 消息自动保存功能的开关

#autosave_on_changes false



# 持久化功能的开关

persistence true



# 持久化DB文件

#persistence_file mosquitto.db



# 持久化DB文件目录

#persistence_location /var/lib/mosquitto/



# =================================================================

# Logging

# =================================================================



# 4种日志模式:stdout、stderr、syslog、topic

# none 则表示不记日志,此配置可以提升些许性能

log_dest none



# 选择日志的级别(可设置多项)

#log_type error

#log_type warning

#log_type notice

#log_type information



# 是否记录客户端连接信息

#connection_messages true



# 是否记录日志时间

#log_timestamp true



# =================================================================

# Security

# =================================================================



# 客户端ID的前缀限制,可用于保证安全性

#clientid_prefixes



# 允许匿名用户

#allow_anonymous true



# 用户/密码文件,默认格式:username:password

#password_file



# PSK格式密码文件,默认格式:identity:key

#psk_file



# pattern write sensor/%u/data

# ACL权限配置,常用语法如下:

# 用户限制:user <username>

# 话题限制:topic [read|write] <topic>

# 正则限制:pattern write sensor/%u/data

#acl_file



# =================================================================

# Bridges

# =================================================================



# 允许服务之间使用“桥接”模式(可用于分布式部署)

#connection <name>

#address <host>[:<port>]

#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]



# 设置桥接的客户端ID

#clientid



# 桥接断开时,是否清除远程服务器中的消息

#cleansession false



# 是否发布桥接的状态信息

#notifications true



# 设置桥接模式下,消息将会发布到的话题地址

# $SYS/broker/connection/<clientid>/state

#notification_topic



# 设置桥接的keepalive数值

#keepalive_interval 60



# 桥接模式,目前有三种:automatic、lazy、once

#start_type automatic



# 桥接模式automatic的超时时间

#restart_timeout 30



# 桥接模式lazy的超时时间

#idle_timeout 60



# 桥接客户端的用户名

#username



# 桥接客户端的密码

#password



# bridge_cafile:桥接客户端的CA证书文件

# bridge_capath:桥接客户端的CA证书目录

# bridge_certfile:桥接客户端的PEM证书文件

# bridge_keyfile:桥接客户端的PEM密钥文件

#bridge_cafile

#bridge_capath

#bridge_certfile

#bridge_keyfile



# 自己的配置可以放到以下目录中

include_dir /etc/mosquitto/conf.d

最后,启动Mosquitto服务很简单,直接运行命令行即可开启服务:

mosquitto -c /etc/mosquitto/mosquitto.conf -d

MQTT使用方法

测试MQTT服务器

方法:

如现在有一个MQTT服务器(MQTT Broker),可以把我门刚才搭建的服务器来拿来做这个测试,暂且叫做S:

地址:mq.tongxinmao.com

端口:18831

【MQTT】MQTT简介+安装+使用python MQTT客户端_服务器_09

两个需要MQTT通信的设备均下载安装MQTT客户端软件,两个设备都运行MQTT客户端软件,且都连接上MQTT服务器S:

然后一个设备A的MQTT客户端订阅一个主题:“/topic/qos0”(订阅主题的client就是subscriber);

另一个设备B的MQTT客户端向MQTT服务器A发布一个主题为:“/topic/qos0”的消息”hello,BZL first mqtt message!”。(发布主题的client就是Publisher)

可以看到订阅主题“/topic/qos0”的A的客户端收到了消息”hello,BZL first mqtt message!”。

初体验实例:

我们安装通信猫通信调试软件(内嵌MQTT客户端):​ ​通信猫调试助手http://www.tongxinmao.com/Topic/Detail/id/6​

(这里为了简便,发布和订阅都用同一个电脑)

打开软件,点击“网络”--->”MQTT”,在参数界面,输入MQTT服务器地址和端口号:

地址:mq.tongxinmao.com

端口:18831

用户名和密码:TEST,TEST

输入订阅主题:“/topic/qos0”

输入发布主题:“/topic/qos0”

勾选”启动“连接上MQTT服务器,

【MQTT】MQTT简介+安装+使用python MQTT客户端_服务器_10

然后在数据框输入要发布是消息,然后点击发布。这样所有订阅主题“/topic/qos0”的客户端都可以接收到该消息:

【MQTT】MQTT简介+安装+使用python MQTT客户端_服务质量_11

把上面的MQTT服务器地址和端口号替换成我们刚才部署的 Mosquitto的地址和监听端口,就可以测试我们自己搭建的MQTT服务器了。

程序中使用MQTT

我们上面是用的客户端工具测试MQTT, 程序中使用MQTT指的是程序成为MQTT通信系统的客户端,向MQTT服务器订阅主题或发布MQTT消息。

程序要向MQTT服务器订阅主题或者发布MQTT消息,则工程中包含MQTT客户端的库文件,然后程序调用库中的MQTT接口函数,向MQTT服务器订阅主题或发布消息。

以python为例,paho.mqtt.client是python的一个MQTT客户端库(包):

Python Code

1
2
3
4
5


# encoding: utf-8
#!/usr/bin/python3

import paho.mqtt.client as mqtt                 #导入mqtt客户端的包
client = mqtt.Client()                          #调用mqtt客户端库的函数创建对象
client.username_pw_set(”myUsername”, ”myPassword”)

client.connect("114.55.242.49", 1883, 60)      #调用mqtt客户端库client对象的connect方法连接上MQTT服务器
client.subscribe("/topic/qos0")                #订阅主题
client.loop_forever()

使用Python发送、订阅消息

需要用到的包为paho-mqtt
pip install paho-mqtt

Python Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48


# -*- coding: utf-8 -*-
# 以下代码在2019年2月28日 python3.6环境下运行通过
import paho.mqtt.client as mqtt
import json
import time

HOST = "10.8.9.21"
PORT = 1883
client_id = "1083421xxxxx"                       # 没有就不写,此处部分内容用xxx代替原内容,下同

def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("data/receive")         # 订阅消息


def on_message(client, userdata, msg):
print("主题:"+msg.topic+" 消息:"+str(msg.payload.decode('utf-8')))


def on_subscribe(client, userdata, mid, granted_qos):
print("On Subscribed: qos = %d" % granted_qos)


def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection %s" % rc)

data = {
"type":2,
"timestamp": time.time(),
"messageId":"9fcda359-89f5-4933-xxxx",
"command":"xx/recommend",
"data":{
"openId":"xxxx",
"appId":xxxx,
"recommendType":"temRecommend"
}
}
param = json.dumps(data)
client = mqtt.Client(client_id)
client.username_pw_set("xxxxxx", "xxxxxx")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.on_disconnect = on_disconnect
client.connect(HOST, PORT, 60)
client.publish("data/send", payload=param, qos=0)     # 发送消息
client.loop_forever()

python MQTT客户端

需要安装的python库

使用python编写程序进行测试MQTT的发布和订阅功能。首先要安装:​ ​pip install paho-mqtt​

测试发布(pub)

我的MQTT部署在阿里云的服务器上面,所以我在本机上编写了python程序进行测试。

然后在shell里面重新打开一个终端,订阅一个主题为“chat” ​ ​mosquitto_sub -t chat​

在本机上测试远程的MQTT的发布功能就是把自己作为一个发送信息的人,当自己发送信息的时候,所有订阅过该主题(topic)的对象都将收到自己发送的信息。
mqtt_client.py

# encoding: utf-8
import paho.mqtt.client as mqtt

HOST = "101.200.46.138"
PORT = 1883


def test():

client = mqtt.Client()

client.connect(HOST, PORT, 60)

client.publish("chat","hello liefyuan",2) # 发布一个主题为'chat',内容为‘hello liefyuan’的信息

client.loop_forever()


if __name__ == '__main__':

test()

发布/订阅测试

# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt

MQTTHOST = "101.200.46.138"
MQTTPORT = 1883

mqttClient = mqtt.Client()

# 连接MQTT服务器

def on_mqtt_connect():

mqttClient.connect(MQTTHOST, MQTTPORT, 60)

mqttClient.loop_start()


# publish 消息

def on_publish(topic, payload, qos):

mqttClient.publish(topic, payload, qos)


# 消息处理函数
def on_message_come(lient, userdata, msg):
print(msg.topic + " " + ":" + str(msg.payload))


# subscribe 消息
def on_subscribe():

mqttClient.subscribe("/server", 1)

mqttClient.on_message = on_message_come # 消息到来处理函数


def main():

on_mqtt_connect()

on_publish("/test/server", "Hello Python!", 1)

on_subscribe()

while True:

pass


if __name__ == '__main__':

main()

注解函数:

client.connect(self, host, port, keepalive, bind_address)

client.publish(self, topic, payload, qos, retain)

client.subscribe(self, topic, qos)

测试订阅(sub)

在本机上编写程序测试订阅功能,就是让自己的程序作为一个接收者,同一个主题没有发布(pub)信息的时候,就自己一直等候。

# encoding: utf-8
import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):

print("Connected with result code "+str(rc))

client.subscribe("chat")


def on_message(client, userdata, msg):

print(msg.topic+" " + ":" + str(msg.payload))


client = mqtt.Client()

client.on_connect = on_connect

client.on_message = on_message

client.connect("www.liefyuan.top", 1883, 60)

client.loop_forever()

C语言 MQTT客户端