消息队列MQ目前支持客户端使用MQTT协议连接并消费数据,仅支持TLS加密
默认支持标准的 MQTT v3.1.1版本
连接协议 | 证书 | 地址 | 端口 |
---|---|---|---|
MQTT | 证书下载 | IPv4: 183.230.40.96 IPv6: 2409:8060:8ea:601::13:7c64 |
8883 |
推荐使用第三方SDK,请访问https://github.com/mqtt/mqtt.github.io/wiki/libraries
消费端开发流程如下:
客户端可通过发送MQTT connnect报文与服务器建立连接,connect报文中三要素填写方法如下:
参数 | 是否必须 | 参数说明 |
---|---|---|
clientId | 否 | 用户自定义合法的UTF-8字符串,可为空 |
username | 是 | 填写实例名称 |
password | 是 | 填写token,算法见访问鉴权 其中:res=mqs/$实例名称 |
另外,connnect报文中,keepalive、will、session字段的使用限制如下:
功能 | 是否支持 | 说明 |
---|---|---|
keepalive | 支持 | 支持范围为:30~4800s |
will | 不支持 | will、will retain 的flag必须为0,will qos必须为0 |
session | 不支持 | cleansession标记必须为1 |
订阅报文
订阅报文中topic格式如下,不支持通配符
$sys/pb/consume/$实例名称/$TOPIC/$SUB
订阅时request QoS必须大于0,否则订阅失败
订阅确认报文
平台采用MQTT SubAck报文进行订阅确认,返回成功时返回码固定为0x01
Request QoS | SubAck返回码 | 说明 |
---|---|---|
0 | 0x80 | 订阅失败 |
1 | 0x01 | 订阅成功,最大QoS为1 |
2 | 0x01 | 订阅成功,最大QoS为1 |
取消订阅报文
客户端使用MQTT Unsubscribe报文进行订阅取消
取消订阅确认报文
服务端使用MQTT UnsubAck报文进行订阅取消确认
客户端接收到publish报文后,按照MQTT协议解析其payload数据段,然后按照如下步骤解析payload数据内容
下载地址: https://github.com/protocolbuffers/protobuf/releases
onenet-mq.proto接口文件如下:
syntax = "proto3";
package mq;
message Msg{
uint64 msgid = 1; //MQ中该消息的真实id
bytes data = 2; //具体的数据
uint64 timestamp = 3; //精确到ms
}
保存onenet-mq.proto文件至本地
根据语言编译该文件,以Java为例
protoc --java_out=$DST_DIR $SRC_DIR/onenet-mq.proto
以Java为例,编译后生成OnenetMq.java文件,将该源文件添加到项目
Java示例如下:
OnenetMq.Msg obj;
obj = OnenetMq.Msg.parseFrom(mqttPayload);
System.out.println(obj.getMsgid());
System.out.println(new String(obj.getData().toByteArray()));
System.out.println(obj.getTimestamp());