消息队列MQ消费端开发指南

消费协议

消息队列MQ目前支持客户端使用MQTT协议连接并消费数据,仅支持TLS加密

默认支持标准的 MQTT v3.1.1版本
连接协议 证书 地址 端口
MQTT 证书下载 IPv4: 183.230.40.96
IPv6: 2409:8060:8ea:601::13:7c64
8883

SDK 下载

推荐使用第三方SDK,请访问https://github.com/mqtt/mqtt.github.io/wiki/libraries

开发流程

消费端开发流程如下:

1 客户端连接

客户端可通过发送MQTT connnect报文与服务器建立连接,connect报文中三要素填写方法如下:

参数 是否必须 参数说明
clientId 用户自定义合法的UTF-8字符串,可为空
username 填写实例名称
password 填写token,算法见访问鉴权
其中:res=mqs/$实例名称

另外,connnect报文中,keepalive、will、session字段的使用限制如下:

功能是否支持说明
keepalive支持支持范围为:30~4800s
will不支持will、will retain 的flag必须为0,will qos必须为0
session不支持cleansession标记必须为1

2 订阅主题

  • 订阅报文

    订阅报文中topic格式如下,不支持通配符

    $sys/pb/consume/$实例名称/$TOPIC/$SUB

    订阅时request QoS必须大于0,否则订阅失败

  • 订阅确认报文

    平台采用MQTT SubAck报文进行订阅确认,返回成功时返回码固定为0x01

Request QoSSubAck返回码说明
00x80订阅失败
10x01订阅成功,最大QoS为1
20x01订阅成功,最大QoS为1
  • 取消订阅报文

    客户端使用MQTT Unsubscribe报文进行订阅取消

  • 取消订阅确认报文

    服务端使用MQTT UnsubAck报文进行订阅取消确认

3 消息消费

  • 订阅成功后MQ会根据生产消息的情况,通过publish报文的主动推送给客户端
  • 消费时MQ推送的消息只会是QoS1
  • 客户端消费到数据后需要按照获取到的数据先后顺序使用puback报文回复

4 数据解析

客户端接收到publish报文后,按照MQTT协议解析其payload数据段,然后按照如下步骤解析payload数据内容

Step 1 安装protobuf

下载地址: https://github.com/protocolbuffers/protobuf/releases

Step 2 下载下述.proto文件

onenet-mq.proto接口文件如下:

syntax = "proto3";

package mq;

message Msg{
    uint64 msgid          = 1; //MQ中该消息的真实id
    bytes data            = 2; //具体的数据
    uint64 timestamp      = 3; //精确到ms
}

保存onenet-mq.proto文件至本地

Step 3 编译proto文件

根据语言编译该文件,以Java为例

protoc --java_out=$DST_DIR $SRC_DIR/onenet-mq.proto

Step 4 将编译生成的源文件添加到项目

以Java为例,编译后生成OnenetMq.java文件,将该源文件添加到项目

Step 5 读取数据

  • 调用 parseFrom() 方法,创建Msg对象obj
  • 调用 obj.getMsgid() 获取消息id
  • 调用 obj.getData() 获取消息数据(查看数据格式
  • 调用 obj.getTimestamp() 获取消息毫秒级时间戳

Java示例如下:

OnenetMq.Msg obj;
obj = OnenetMq.Msg.parseFrom(mqttPayload);
System.out.println(obj.getMsgid());
System.out.println(new String(obj.getData().toByteArray()));
System.out.println(obj.getTimestamp());

个搜索结果,搜索内容 “

    0 个搜索结果,搜索内容 “