首页
关于
Search
1
联想510s mini 安装 Ventura
898 阅读
2
基于K8s + Jenkins+Docker + Gitee 自动部署 - 配置 Jenkins Item + Gitee Webhook (二)
568 阅读
3
Spring Boot Schedule疑问及线程池配置
467 阅读
4
Server Send Events With Spring Boot
410 阅读
5
Ngrok使用自有服务器服务器及域名 - 解决Mac client问题
389 阅读
默认分类
SSH
typecho
Spring boot
其他
mysql
k8s
jenkins
docker
Java
mqtt
MongoDB
登录
/
注册
Search
标签搜索
k8s
docker
ssh
mysql
db
gitee
jenkins
ngrok
黑苹果
MQTT
CC
累计撰写
19
篇文章
累计收到
0
条评论
首页
栏目
默认分类
SSH
typecho
Spring boot
其他
mysql
k8s
jenkins
docker
Java
mqtt
MongoDB
页面
关于
搜索到
1
篇与
mqtt
的结果
2023-11-17
MQTT - LWT(遗愿消息) + Retain 实现设备状态监测
遗愿消息 - LWT(Last Will and Testament)根据MQTT协议,在创建链接时可设置遗愿消息(Will Message)。 它属于链接的一个配置信息,当链接意外断开时将发送该消息。包含但不限于以下情况将发送该信息:服务端发现I/O或网络错误;Client未在Keep Alive time规定时间内进行通信;Client关闭网络链接时未发送0x00代码的“DISCONNECT”包 (非正常关闭);Server 关闭网络链接时未收到0x00代码的“DISCONNECT”包(非正常关闭);遗愿被储存在服务端并与Session进行关联。遗愿消息存储在CONNECT的payload中,由三部分组成:Will Properties : 遗愿属性,一个自定义map。Will Topic : 遗愿主题。Will Payload fields: 遗愿payload,与普通message的payload一样,可自定义。保留消息 -Retain Message保留消息是指发布Message的retain标识为1,当服务端收到一条保留消息:服务端替换该Topic已有的保留消息;订阅者会立即收到消息;未来的订阅者订阅后会立即收到该消息。即Topic只会保存最新的一条保留信息,已订阅的会立即收到,未来订阅的会在订阅时收到该保留消息。清除Topic中的保留消息:如果保留消息的Payload为空(0字节),服务端会该Topic中的保留消息进行移除,并且Topic将不会存储该保留消息。遗愿消息+保留消息实现设备状态监测1、订阅者首先启动,订阅device/status2、设备创建连接时设置保留遗愿消息:{"status":"offline"}3、设备链接成功后,立即发送保留消息:{"status":"online"}此时,订阅者会立即收到消息:{"status":"online"} 设备上线场景1: 订阅者重启1、订阅者会立即收到保留消息:{"status":"online"} 此时设备时在线的。场景2: 设备重启1、设备断开,触发遗愿消息:{"status":"offline"} 设备离线2、设备连接,收到上线的保留消息:{"status":"online"}至此,完成设备状态监测。源码 - JavaDevice:import com.alibaba.fastjson2.JSONObject; import com.hivemq.client.mqtt.MqttClient; import com.hivemq.client.mqtt.MqttClientBuilder; import com.hivemq.client.mqtt.MqttClientState; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublish; import java.nio.charset.StandardCharsets; import java.util.Map; /** * publish message * * @author chenchen * @Date 2023/11/17 */ public class Publisher { public static void main(String[] args) { MqttClientBuilder clientBuilder = MqttClient.builder() .identifier("cc-status-monitor") .addConnectedListener((context) -> { MqttClientState clientState = context.getClientConfig().getState(); System.out.println(clientState.isConnected()); }) .addDisconnectedListener(context -> System.out.println(context.getCause().getMessage())) .serverHost("broker.hivemq.com") .serverPort(1883); // will message with retain Mqtt5WillPublish offline = Mqtt5WillPublish.builder() .topic("device/status") .qos(MqttQos.AT_LEAST_ONCE) .retain(false) .payload(JSONObject.toJSONString(Map.of("status", "offline")).getBytes(StandardCharsets.UTF_8)) .build(); // set will message when create connect Mqtt5BlockingClient client = clientBuilder.useMqttVersion5() .willPublish(offline) .buildBlocking(); // connect client.connect(); // success connect, publish a retain massage of online if (client.getState().isConnected()) { Mqtt5WillPublish online = Mqtt5WillPublish.builder() .topic("device/status") .qos(MqttQos.AT_LEAST_ONCE) .retain(true) .payload(JSONObject.toJSONString(Map.of("status", "online")).getBytes(StandardCharsets.UTF_8)) .build(); client.publish(online); } } } Subscribe:import cn.hutool.core.util.StrUtil; import com.hivemq.client.mqtt.MqttClient; import com.hivemq.client.mqtt.MqttClientState; import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe; import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscription; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Optional; /** * subscribe * @author chenchen * @Date 2023/11/17 */ public class Subscriber { public static void main(String[] args) { Mqtt5AsyncClient client = MqttClient.builder() .addConnectedListener((context) -> { MqttClientState clientState = context.getClientConfig().getState(); System.out.println(clientState.isConnected()); }) .addDisconnectedListener(context -> System.out.println(context.getCause().getMessage())) .serverHost("broker.hivemq.com") .serverPort(1883) .useMqttVersion5() .buildAsync(); // subscribe topic Mqtt5Subscribe subscribe = Mqtt5Subscribe.builder() .addSubscription(Mqtt5Subscription.builder() .topicFilter("device/status") .build()) .build(); // connect client.connect(); // handle message client.subscribe(subscribe, mqtt5Publish -> { Optional<ByteBuffer> payload = mqtt5Publish.getPayload(); payload.ifPresent(byteBuffer -> System.out.println("receive message with payload:" + StrUtil.str(byteBuffer, StandardCharsets.UTF_8))); }); } }引用《物联网系统开发:从0到1构建IoT平台》付强Will : https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901042Return: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104hivemq : https://github.com/hivemq/hivemq-mqtt-clientMqtt Server: broker.hivemq.com
2023年11月17日
369 阅读
0 评论
0 点赞