消息隊列MQ消費端開發指南

消費協議

消息佇列MQ目前支援用戶端使用MQTT協議連接並消費數據,僅支援TLS加密

默認支援標準的MQTT v3.1.1版本
連接協議 證書 地址
MQTT 證書下載 IPv4: 47.52.102.24 11006

SDK 下載

推薦使用第三方SDK,請訪問https://github.com/mqtt/mqtt.github.io/wiki/libraries

開發流程

消费端开发流程如下:

1 用戶端連接

用戶端可通過發送MQTT connnect報文與伺服器建立連接,connect報文中三要素填寫方法如下:

參數 是否必須 參數說明
clientId 用戶自訂合法的UTF-8字串,可為空
username 填寫實例名稱
password 填寫token,演算法見訪問鑒權 其中:res=mqs/$實例名稱

另外,connnect報文中,keepalive、will、session欄位的使用限制如下:

功能 是否支持 說明
keepalive 支持 支持範圍為:30~4800s
will 不支持 will、will retain 的flag必須為0,will qos必須為0
session 不支持 cleansession標記必須為1

2 訂閱主題

  • 訂閱報文

    訂閱報文中topic格式如下,不支持通配符

    $sys/pb/consume/$實例名稱/$TOPIC/$SUB

    訂閱時request QoS必須大於0,否則訂閱失敗

  • 訂閱確認報文

    平台採用MQTT SubAck報文進行訂閱確認,返回成功時返回碼固定為0x01

Request QoS SubAck返回碼 說明
0 0x80 訂閱失敗
1 0x01 訂閱成功,最大QoS為1
2 0x01 訂閱成功,最大QoS為1
  • 取消訂閱報文

    用戶端使用MQTT Unsubscribe報文進行訂閱取消

  • 取消訂閱確認報文

    服務端使用MQTT UnsubAck報文進行訂閱取消確認

3 消息消費

訂閱成功後MQ會根據生產消息的情況,通過publish報文的主動推送給用戶端

消費時MQ推送的消息只會是QoS1

用戶端消費到數據後需要按照獲取到的數據先後順序使用puback報文回復

4 數據解析

用戶端接收到publish報文後,按照MQTT協議解析其payload數據段,然後按照如下步驟解析payload數據內容

Step 1 安裝protobuf

下載地址:https://github.com/protocolbuffers/protobuf/releases

Step 2 下載下述.proto文件

onenet-mq.proto介面檔如下:

syntax = "proto3";
package mq; message Msg{
uint64 msgid = 1; //MQ中該消息的真實id
bytes data = 2; //具體的數據
uint64 timestamp = 3; //精確到ms
}

保存onenet-mq.proto檔至本地

Step 3 編譯proto文件

根據語言編譯該檔,以Java為例

protoc --java_out=$DST_DIR $SRC_DIR/onenet-mq.proto

Step 4 將編譯生成的源文件添加到項目

以Java為例,編譯後生成OnenetMq.java檔,將該原始檔案添加到專案

Step 5 讀取數據

  • 調用parseFrom() 方法,創建Msg物件obj

  • 調用obj.getMsgid() 獲取消息id

  • 調用obj.getData() 獲取消息數據(查看數據格式

  • 調用obj.getTimestamp() 獲取消息毫秒級時間戳記

Java示例如下:

OnenetMq.Msg obj;
obj = OnenetMq.Msg.parseFrom(mqttPayload); 
System.out.println(obj.getMsgid());
System.out.println(new String(obj.getData().toByteArray())); System.out.println(obj.getTimestamp());

个搜索结果,搜索内容 “

    0 个搜索结果,搜索内容 “