Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3576807
  • 博文数量: 365
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2522
  • 用 户 组: 普通用户
  • 注册时间: 2019-10-28 13:40
文章分类

全部博文(365)

文章存档

2023年(8)

2022年(130)

2021年(155)

2020年(50)

2019年(22)

我的朋友

分类: Python/Ruby

2021-08-23 17:30:34

import json

import sys

# 引入mqtt

import paho.mqtt.client as mqtt

# 使用独立线程运行

from threading import Thread

# 建立mqtt连接

def on_connect(client, userdata, flag, rc):

    if rc == 0:

        # 连接成功

        print("Connection successful")

    elif rc == 1:

        # 协议版本错误

        print("Protocol version error")

    elif rc == 2:

        # 无效的客户端标识

        print("Invalid client identity")

    elif rc == 3:

        # 服务器无法使用

        print("server unavailable")

    elif rc == 4:

        # 错误的用户名或密码

        print("Wrong user name or password")

    elif rc == 5:

        # 未经授权

        print("unaccredited")

    print("Connect with the result code " + str(rc))

    # 订阅频道

    # client.subscribe('31765425213673472', qos=2)

# 当与代理断开连接时调用

def on_disconnect(client, userdata, rc):

    #  rc == 0回调被调用以响应disconnect()调用

    # 如果以任何其他值断开连接是意外的,例如可能出现网络错误。

    if rc != 0:

        print("Unexpected disconnection %s" % rc)

# 当收到关于客户订阅的主题的消息时调用。

def on_message(client, userdata, msg):

    print(msg.topic + " " + str(msg.payload))

    json_msg = json.loads(msg.payload.decode('utf-8'))

    # 加入个人逻辑

    pass

# 当使用使用publish()发送的消息已经传输到代理时被调用。

def on_publish(client, obj, mid):

    print("on_Publish, mid: " + str(mid))

# 当代理响应订阅请求时被调用

def on_subscribe(client, userdata, mid, granted_qos):

    print("on_Subscribed: " + str(mid) + " " + str(granted_qos))

# 当代理响应取消订阅请求时调用。

def on_unsubscribe(client, userdata, mid):

    print("on_unsubscribe, mid: " + str(mid))

# 当客户端有日志信息时调用

def on_log(client, obj, level, string):

    print("on_Log:" + string)

# mqtt订阅启动函数

def mqtt_subscribe():

    global client

    client.loop_forever()

# mqtt发布启动函数

def mqtt_publish(sensor_data, topic='xxxxxxxx', qos=2):

    global client

    try:

        client.publish(topic=topic, payload=sensor_data, qos=qos)

    except KeyboardInterrupt:

        print("EXIT")

        # 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。

        client.disconnect()

        sys.exit(0)

client = mqtt.Client()

# 启动函数

def mqtt_run():

    # 账号密码验证放到最前面

    client.username_pw_set('user', 'user')

    # client = mqtt.Client()

    # 建立mqtt连接

    client.on_connect = on_connect

    client.on_subscribe = on_subscribe

    client.on_message = on_message

    # 当与代理断开连接时调用

    client.on_disconnect = on_disconnect

    client.on_log = on_log

    # 绑定 MQTT 服务器地址

    broker_ip = ''

    # MQTT服务器的端口号

    # client.connect(host=broker_ip, port=1883, keepalive=6000)

    client.connect(host=broker_ip, port=1883)

    client.reconnect_delay_set(min_delay=1, max_delay=2000)

    client.subscribe('xxxxxxxx', qos=0)

    # 创建线程去持续接收订阅信息

    subscribe_thread = Thread(target=mqtt_subscribe)

    subscribe_thread.start()

if __name__ == "__main__":

    mqtt_run()

阅读(2699) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~