消息佇列MQ目前支援用戶端使用MQTT協議連接並消費數據,僅支援TLS加密
默認支援標準的MQTT v3.1.1版本
連接協議 | 證書 | 地址 | 埠 |
---|---|---|---|
MQTT | 證書下載 | IPv4: 47.52.102.24 | 11006 |
推薦使用第三方SDK,請訪問https://github.com/mqtt/mqtt.github.io/wiki/libraries
消费端开发流程如下:
用戶端可通過發送MQTT connnect報文與伺服器建立連接,connect報文中三要素填寫方法如下:
參數 | 是否必須 | 參數說明 |
---|---|---|
clientId | 否 | 用戶自訂合法的UTF-8字串,可為空 |
username | 是 | 填寫實例名稱 |
password | 是 | 填寫token,演算法見訪問鑒權 其中:res=mqs/$實例名稱 |
另外,connnect報文中,keepalive、will、session欄位的使用限制如下:
功能 | 是否支持 | 說明 |
---|---|---|
keepalive | 支持 | 支持範圍為:30~4800s |
will | 不支持 | will、will retain 的flag必須為0,will qos必須為0 |
session | 不支持 | cleansession標記必須為1 |
訂閱報文
訂閱報文中topic格式如下,不支持通配符
$sys/pb/consume/$實例名稱/$TOPIC/$SUB
訂閱時request QoS必須大於0,否則訂閱失敗
訂閱確認報文
平台採用MQTT SubAck報文進行訂閱確認,返回成功時返回碼固定為0x01
Request QoS | SubAck返回碼 | 說明 |
---|---|---|
0 | 0x80 | 訂閱失敗 |
1 | 0x01 | 訂閱成功,最大QoS為1 |
2 | 0x01 | 訂閱成功,最大QoS為1 |
取消訂閱報文
用戶端使用MQTT Unsubscribe報文進行訂閱取消
取消訂閱確認報文
服務端使用MQTT UnsubAck報文進行訂閱取消確認
訂閱成功後MQ會根據生產消息的情況,通過publish報文的主動推送給用戶端
消費時MQ推送的消息只會是QoS1
用戶端消費到數據後需要按照獲取到的數據先後順序使用puback報文回復
用戶端接收到publish報文後,按照MQTT協議解析其payload數據段,然後按照如下步驟解析payload數據內容
下載地址:https://github.com/protocolbuffers/protobuf/releases
onenet-mq.proto介面檔如下:
syntax = "proto3";
package mq; message Msg{
uint64 msgid = 1; //MQ中該消息的真實id
bytes data = 2; //具體的數據
uint64 timestamp = 3; //精確到ms
}
保存onenet-mq.proto檔至本地
根據語言編譯該檔,以Java為例
protoc --java_out=$DST_DIR $SRC_DIR/onenet-mq.proto
以Java為例,編譯後生成OnenetMq.java檔,將該原始檔案添加到專案
調用parseFrom() 方法,創建Msg物件obj
調用obj.getMsgid() 獲取消息id
調用obj.getData() 獲取消息數據(查看數據格式)
調用obj.getTimestamp() 獲取消息毫秒級時間戳記
Java示例如下:
OnenetMq.Msg obj;
obj = OnenetMq.Msg.parseFrom(mqttPayload);
System.out.println(obj.getMsgid());
System.out.println(new String(obj.getData().toByteArray())); System.out.println(obj.getTimestamp());