irpas技术客

MQTT协议接入电信AEP(AIOT)平台示例_人各有病_电信aep

大大的周 7076

python 使用MQTT协议接入电信AEP(AIOT)平台示例 注册产品创建设备创建激活设备订阅消息python MQTT程序代码块数据上报指令下发AEP平台各种指令状态介绍指令下发响应 通过线程实现同时收发数据并自动响应回复

电信物联网新平台AEP(Aiot)官网:https://·/ 天翼物联网平台(AIoT)通用组件服务支撑7群:569682371

注册

通过官网右上角单点注册

注册完成登录并订购使能服务 ?

免费订购并开通使用 ? ?

产品创建 订购完成会立即跳转到设备管理页面或单点右上角控制台进入,点击创建产品 ?随意创建一个MQTT的物模型非透传产品即可 产品名称:自定义产品分类:智慧城市-环境感知-智能空调 (选择与自身产品设备相近似的物模型类型,因为创建完成后除了默认的标准属性和服务,属性和服务可自定义创建使用。)接入方式:设备直连网络类型:移动蜂窝或WiFi都行通信协议:MQTT数据加密方式:明文 (通常都是明文)认证方式:特征串认证安全类型:一型一密 (建议一型一密)设备型号:自定义是否透传:否 (本篇演示物模型非透传)消息格式:JSON归属机构:无(自定义可选)产品描述:无(自定义可选) ? ? 设备创建 点击确定创建完成,添加一个测试设备 设备名称:自定义设备编号:自定义 (如有真实设备以实际为准)标签:自定义(可选) 设备目前是已注册状态,通过设备右侧的快捷按钮认证信息查看到设备特征串,即MQTT连接时的所需passWord ? ? 激活设备 电信AEP官网MQTT文档:https://·/sbjr/42#see 接入地址(HOST):

mqtt.ctwing.cn:1883

2. 通过控制台页面获取到设备ID以及特征串

设备ID(client_id):

1514560220220217

特征串 (passWord):

q1LpBvswC6qDi6lu9F7W6PpqxQdyBDrYSfv4rGCjcTU

? ?

订阅消息 通过控制台页面查看服务定义-服务列表本篇只演示数据上报和指令收到后的指令下发响应操作 服务标识服务名称服务类型data_report业务数据上报数据上报set_run_resp设定运行状态响应指令下发响应

? ?

python MQTT程序代码块 # -*- coding: utf-8 -*- import paho.mqtt.client as mqtt import json import time HOST = "mqtt.ctwing.cn" # 接入地址 PORT = 1883 # 端口号 client_id = "1514560220220217" # 设备ID userName = "admin" # 用户名-自定义 passWord = "q1LpBvswC6qDi6lu9F7W6PpqxQdyBDrYSfv4rGCjcTU" # 特征串 def on_connect(client, userdata, flags, rc): print('\n'"Connected with result code " + str(rc)) client.subscribe('\n'"set_run") # 指令接收订阅消息 def on_message(client, userdata, msg): # 接收指令下发数据 print('\n'"主题:" + msg.topic + " 消息:" + str(msg.payload.decode("utf-8"))) def on_subscribe(client, userdata, mid, granted_qos): print("On Subscribed: qos = %d" % granted_qos) def on_disconnect(client, userdata, rc): if rc != 0: print("Unexpected disconnection %s" % rc) client = mqtt.Client(client_id) client.username_pw_set(userName, passWord) client.on_connect = on_connect client.on_message = on_message client.on_subscribe = on_subscribe client.on_disconnect = on_disconnect client.connect(HOST, PORT, 60) client.loop_start() while True: time.sleep(2) inputData = input('\033[1;31;40m'' 发送数据:''\033[0m') param = json.dumps(json.loads(inputData)) # string转字典dict[str,int] client.publish("data_report", payload=param, qos=0) # 发送消息

? ?

数据上报 电信AEP MQTT业务数据开发文档: https://·/sbjr/43#see 通过服务定义-服务列表-查看详情编写上报数据的key/value JSON数据 # 发送数据 {"temperature":16,"onoff_state":0,"mode":0,"fan_up_down":0,"fan_left_right":0,"current_speed":0}

?

程序启动发送注册报文到平台成功后,设备状态已激活。

离线灰点:设备主动离线或者无心跳则判断离线在线绿点:表示设备连接在线中··· ?

通过数据查看按钮,查看刚才上报成功的数据 ? ?

指令下发 电信AEP MQTT业务数据开发文档: https://·/sbjr/43#see 指令下发格式参考

通过设备操作-按钮中的指令下发实现快速模拟下发功能 ? ?

然后通过指令下发日志操作按钮,实时查看指令下发状态

? ?

AEP平台各种指令状态介绍

平台各种指令状态: 状态信息共分为:指令已保存(指令已到达平台但未下发到设备)、 指令已发送(指令下发过程中,平台未收到设备返回的ack确认)、 指令TTL超时(指令缓存时长超过命令设置的ttl值)、 指令已送达(指令已到达设备,且平台收到设备返回的ack确认)、 指令已完成 (指令下发到设备,且平台收到了设备主动回复的响应)、 指令下发超时 (指令下发到设备,平台超过既定时间未收到设备的ACK确认)、 指令失败(指令下发失败或设备返回执行结果为失败)。 其中,对于TCP透传设备,无法到达指令已送达/已完成状态,指令的最终状态即为指令已发送;对于其他协议透传设备,无法到达指令已完成状态,指令的最终状态即为指令已送达。

? ?

指令下发响应 这里我们把发送消息主题改成:set_run_resp 指令响应的主题client.publish("set_run_resp", payload=param, qos=0) # 发送消息 平台指令成功下发到MQTT程序控制台展示{"temperature":22,"onoff_state":1,"mode":1,"fan_up_down":1,"fan_left_right":1,"current_speed":1}} 通过编译器控制台发送指令响应回复

响应内容需携带指令消息的taskId,进行响应回复

{"taskId": 2, "resultPayload": {"result": 0}}

通过指令下发日志查看指令状态是否已完成 ? ?

点击指令详情查看是否有**返回内容 ** ? ? ? ?

通过线程实现同时收发数据并自动响应回复

? ?

# -*- coding: utf-8 -*- import threading import paho.mqtt.client as mqtt import json import time HOST = "mqtt.ctwing.cn" # 接入地址 PORT = 1883 # 端口号 client_id = "1514560220220217" # 设备ID userName = "admin" # 用户名(自定义即可) passWord = "q1LpBvswC6qDi6lu9F7W6PpqxQdyBDrYSfv4rGCjcTU" # 特征串 def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) # # 订阅消息 client.subscribe('\n'"set_run") # 指令接收订阅消息 # todo 用于临时存储taskId,响应回复 List = [] def on_message(client, userdata, msg): print("\n 主题:" + msg.topic + " 消息:" + str(msg.payload)) taskId = json.loads(msg.payload)["taskId"] temperature = json.loads(msg.payload)["payload"]["temperature"] Command = {"taskId": int(taskId), "resultPayload": {"result": 0}} List.append(json.dumps(Command)) # todo 如果此处立即响应指令,指令虽然已完成,但平台大多无响应内容 # client.publish("set_run_resp", payload=List.pop(), qos=1) # 发送消息 # print("主题:" + msg.topic + " 消息:" + str(msg.payload)) # print('响应回复2') # List.clear() """ 指令接收示例; 主题:set_run 消息:{"taskId":7,"payload":{"temperature":22,"onoff_state":0,"mode":1,"fan_up_down":0,"fan_left_right":1,"current_speed":0}} """ def on_subscribe(client, userdata, mid, granted_qos): print("On Subscribed: qos = %d" % granted_qos) def on_disconnect(client, userdata, rc): if rc != 0: print("Unexpected disconnection %s" % rc) client = mqtt.Client(client_id) client.username_pw_set(userName, passWord) client.on_connect = on_connect client.on_message = on_message client.on_subscribe = on_subscribe client.on_disconnect = on_disconnect print("开始连接MQTT:", HOST + ':' + str(PORT)) client.connect(HOST, PORT, 60) print("MQTT连接完成", HOST + ':' + str(PORT)) client.loop_start() """ loop()是一个心跳函数,用来保持客户端与服务器的连接。 比如keepalive参数为60秒,那么60秒内必须loop()一下或者发布一下消息,不然连接会断,就无法继续发布或者接受消息。 loop_start()是启用一个进程保持loop()的重复调用,就不需要定期心跳了,对应的有loop_stop()。 loop_forever()用来保持无穷阻塞调用loop() """ # todo 用于线程结束 BreakThread = [''] def command(client): while True: if BreakThread[0] == 'exit': break elif len(List) >= 1: paramCommand = List.pop() client.publish("set_run_resp", payload=paramCommand, qos=1) # 发送消息 print('\033[1;39;33m', paramCommand, '响应回复成功''\033[0m') List.clear() def publish(client): while True: time.sleep(5) inputData = input('\033[1;31;40m'' 发送数据:''\033[0m') BreakThread[0] = inputData if inputData == 'exit': break else: param = json.dumps(json.loads(inputData)) # string转字典dict[str,int] client.publish("data_report", payload=param, qos=0) # 发送消息 print('\033[1;39;33m', '数据发送成功''\033[0m') """ 发送数据示例: {"temperature":16,"onoff_state":0,"mode":0,"fan_up_down":0,"fan_left_right":0,"current_speed":0} """ def main(): Threading1 = threading.Thread(target=command, args=(client,)) Threading2 = threading.Thread(target=publish, args=(client,)) Threading1.start() Threading2.start() Threading1.join() Threading2.join() if __name__ == '__main__': main()

? ?

平台有技术支撑群,如果文章有其他不明白可以群里咨询管理员-QQ群支撑群:569682371能更快的帮助你加速开发进程


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #电信aep