MQTT--Python进行发布、订阅测试

  1. 安装 paho-mqtt
  2. 发布并订阅

安装 paho-mqtt

pip install paho-mqtt

发布并订阅

# -*- coding: utf-8 -*-
import sys
import logging
import json
import random
import zlib
import time
import traceback
import paho.mqtt.client as mqtt
import configparser
from logging.handlers import RotatingFileHandler
import traceback

import common

config = configparser.ConfigParser()
config.read("config.ini")

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    stream=sys.stdout)

Rthandler = RotatingFileHandler(sys.path[0] + '/log/log.log', maxBytes=10 * 1024 * 1024, backupCount=10)
Rthandler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
Rthandler.setFormatter(formatter)
logging.getLogger('').addHandler(Rthandler)

class MqttClient():
    """Mqtt处理类"""
    def __init__(self):
        logging.info("socket MQTT connect......")
        self.ip = config.get("config", "mqtt_ip")
        self.port = int(config.get("config", "mqtt_port"))
        self.clientId = 'alertserver'+ str(random.randint(20, 20000000000))

    def start(self):
        try:
            self.mqttc = mqtt.Client(self.clientId)
            self.mqttc.on_connect = self.on_connect
            self.mqttc.on_message = self.on_message
            self.mqttc.on_disconnect = self.on_disconnect
            self.mqttc.username_pw_set(config.get("config", "mqtt_username"), config.get("config", "mqtt_password"))
            self.mqttc.connect(self.ip, self.port, 20)
            self.mqttc.loop_forever()
        except (Exception) as  e:
            logging.info(e)
            logging.info(traceback.format_exc())
            time.sleep(20)
            self.start()

    def on_connect(self, client, userdata, flags, rc):
        # client.subscribe("alert")
        # 请求初始数据
        try:
            topic = "alert"
            # 发送数据
            msg={}
            msg["alert"]=1
            z1=json.dumps(msg)
            z2=zlib.compress(z1,9)
            z3=bytearray(z2)
            client.publish(topic, z3, 2)
            logs = (u"Publish topic:" + topic)
            logging.info(logs)
        except (Exception) as  e:
            logging.info(e)
        logging.info(u"MQTT 连接成功!:" + str(rc))
        client.subscribe(topic) # 订阅自己的消息

    # 接收数据
    def on_message(self, client, userdata, msg):
        logging.info(msg.topic + " " + str(msg.payload))
        # self.get_msg(client, userdata, msg)

    def on_disconnect(self, client, userdata, rc):
        logging.info(u"MQTT 断开连接!")
        if rc != 0:
            logging.info(u"MQTT 异常断开!")

    def get_msg(self, client, userdata, msg):
        if (msg.topic == "alert" ):
            try:
                z1 = msg.payload
                z2 = str(z1)  # 转字符串
                z3 = zlib.decompress(z2)  # 解压缩
                alertMsg = json.loads(z3)  # json转字典
                # init = json.loads(msg.payload)
                logging.info(alertMsg)
            except (Exception) as  e:
                logging.info(e)


# mqtt 线程
class MqttThread (threading.Thread):
    def __init__(self, client):
        threading.Thread.__init__(self)
        self.client = client

    def run(self):
        self.client.start()

if __name__ == '__main__':
    client = MqttClient()
    t = MqttThread(client)
    t.start()
    msg = time.time()
    common.mqttc.publish("alert", msg, 2)

这里引了一个 import common,common 的内容只有一行 global mqttc,是为了方便主线程调用发布,如果直接在主线程文件中写入 global mqttc,却不起作用,不理解为什么。

发布主要是 client.publish(topic, z3, 2),z3 也可以是字符串。如果不需要发布把这里的去掉即可。

订阅主要是 client.subscribe(topic)。同样如果不需要订阅把这些去掉即可。


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 bin07280@qq.com

文章标题:MQTT--Python进行发布、订阅测试

文章字数:601

本文作者:Bin

发布时间:2018-09-16, 16:06:22

最后更新:2019-08-06, 00:07:35

原始链接:http://coolview.github.io/2018/09/16/MQTT--Python%E8%BF%9B%E8%A1%8C%E5%8F%91%E5%B8%83%E3%80%81%E8%AE%A2%E9%98%85%E6%B5%8B%E8%AF%95/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录