MQTT--Python进行发布、订阅测试
安装 paho-mqtt
pip install paho-mqtt
发布并订阅
# -*- coding: utf-8 -*-
import sys
import logging
import json
import random
import zlib
import time
import traceback
import paho.mqtt.client as mqtt
import configparser
from logging.handlers import RotatingFileHandler
import traceback
import common
config = configparser.ConfigParser()
config.read("config.ini")
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
stream=sys.stdout)
Rthandler = RotatingFileHandler(sys.path[0] + '/log/log.log', maxBytes=10 * 1024 * 1024, backupCount=10)
Rthandler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
Rthandler.setFormatter(formatter)
logging.getLogger('').addHandler(Rthandler)
class MqttClient():
"""Mqtt处理类"""
def __init__(self):
logging.info("socket MQTT connect......")
self.ip = config.get("config", "mqtt_ip")
self.port = int(config.get("config", "mqtt_port"))
self.clientId = 'alertserver'+ str(random.randint(20, 20000000000))
def start(self):
try:
self.mqttc = mqtt.Client(self.clientId)
self.mqttc.on_connect = self.on_connect
self.mqttc.on_message = self.on_message
self.mqttc.on_disconnect = self.on_disconnect
self.mqttc.username_pw_set(config.get("config", "mqtt_username"), config.get("config", "mqtt_password"))
self.mqttc.connect(self.ip, self.port, 20)
self.mqttc.loop_forever()
except (Exception) as e:
logging.info(e)
logging.info(traceback.format_exc())
time.sleep(20)
self.start()
def on_connect(self, client, userdata, flags, rc):
# client.subscribe("alert")
# 请求初始数据
try:
topic = "alert"
# 发送数据
msg={}
msg["alert"]=1
z1=json.dumps(msg)
z2=zlib.compress(z1,9)
z3=bytearray(z2)
client.publish(topic, z3, 2)
logs = (u"Publish topic:" + topic)
logging.info(logs)
except (Exception) as e:
logging.info(e)
logging.info(u"MQTT 连接成功!:" + str(rc))
client.subscribe(topic) # 订阅自己的消息
# 接收数据
def on_message(self, client, userdata, msg):
logging.info(msg.topic + " " + str(msg.payload))
# self.get_msg(client, userdata, msg)
def on_disconnect(self, client, userdata, rc):
logging.info(u"MQTT 断开连接!")
if rc != 0:
logging.info(u"MQTT 异常断开!")
def get_msg(self, client, userdata, msg):
if (msg.topic == "alert" ):
try:
z1 = msg.payload
z2 = str(z1) # 转字符串
z3 = zlib.decompress(z2) # 解压缩
alertMsg = json.loads(z3) # json转字典
# init = json.loads(msg.payload)
logging.info(alertMsg)
except (Exception) as e:
logging.info(e)
# mqtt 线程
class MqttThread (threading.Thread):
def __init__(self, client):
threading.Thread.__init__(self)
self.client = client
def run(self):
self.client.start()
if __name__ == '__main__':
client = MqttClient()
t = MqttThread(client)
t.start()
msg = time.time()
common.mqttc.publish("alert", msg, 2)
这里引了一个 import common
,common 的内容只有一行 global mqttc
,是为了方便主线程调用发布,如果直接在主线程文件中写入 global mqttc
,却不起作用,不理解为什么。
发布主要是 client.publish(topic, z3, 2)
,z3 也可以是字符串。如果不需要发布把这里的去掉即可。
订阅主要是 client.subscribe(topic)
。同样如果不需要订阅把这些去掉即可。
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 bin07280@qq.com
文章标题:MQTT--Python进行发布、订阅测试
文章字数:601
本文作者:Bin
发布时间:2018-09-16, 16:06:22
最后更新:2019-08-06, 00:07:35
原始链接:http://coolview.github.io/2018/09/16/MQTT--Python%E8%BF%9B%E8%A1%8C%E5%8F%91%E5%B8%83%E3%80%81%E8%AE%A2%E9%98%85%E6%B5%8B%E8%AF%95/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。