MQTT - LWT(遗愿消息) + Retain 实现设备状态监测

MQTT - LWT(遗愿消息) + Retain 实现设备状态监测

cc
cc
2023-11-17 / 0 评论 / 371 阅读 / 正在检测是否收录...

mqtt.jpeg

遗愿消息 - LWT(Last Will and Testament)

根据MQTT协议,在创建链接时可设置遗愿消息(Will Message)。 它属于链接的一个配置信息,当链接意外断开时将发送该消息。

包含但不限于以下情况将发送该信息:

  • 服务端发现I/O或网络错误;
  • Client未在Keep Alive time规定时间内进行通信;
  • Client关闭网络链接时未发送0x00代码的“DISCONNECT”包 (非正常关闭);
  • Server 关闭网络链接时未收到0x00代码的“DISCONNECT”包(非正常关闭);

遗愿被储存在服务端并与Session进行关联。遗愿消息存储在CONNECT的payload中,由三部分组成:

  • Will Properties : 遗愿属性,一个自定义map。
  • Will Topic : 遗愿主题。
  • Will Payload fields: 遗愿payload,与普通message的payload一样,可自定义。

保留消息 -Retain Message

保留消息是指发布Message的retain标识为1,当服务端收到一条保留消息:

  • 服务端替换该Topic已有的保留消息;
  • 订阅者会立即收到消息;
  • 未来的订阅者订阅后会立即收到该消息。

即Topic只会保存最新的一条保留信息,已订阅的会立即收到,未来订阅的会在订阅时收到该保留消息。

清除Topic中的保留消息:

如果保留消息的Payload为空(0字节),服务端会该Topic中的保留消息进行移除,并且Topic将不会存储该保留消息。

遗愿消息+保留消息实现设备状态监测

lp2sqc58.png

1、订阅者首先启动,订阅device/status
2、设备创建连接时设置保留遗愿消息:{"status":"offline"}
3、设备链接成功后,立即发送保留消息:{"status":"online"}

此时,订阅者会立即收到消息:{"status":"online"} 设备上线

场景1: 订阅者重启

1、订阅者会立即收到保留消息:{"status":"online"} 此时设备时在线的。

场景2: 设备重启

1、设备断开,触发遗愿消息:{"status":"offline"} 设备离线
2、设备连接,收到上线的保留消息:{"status":"online"}

至此,完成设备状态监测。

源码 - Java

Device:

import com.alibaba.fastjson2.JSONObject;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttClientBuilder;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublish;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
 * publish message
 *
 * @author chenchen
 * @Date 2023/11/17
 */
public class Publisher {
    public static void main(String[] args) {
        MqttClientBuilder clientBuilder = MqttClient.builder()
                .identifier("cc-status-monitor")
                .addConnectedListener((context) -> {
                    MqttClientState clientState = context.getClientConfig().getState();
                    System.out.println(clientState.isConnected());
                })
                .addDisconnectedListener(context -> System.out.println(context.getCause().getMessage()))
                .serverHost("broker.hivemq.com")
                .serverPort(1883);

        // will message with retain
        Mqtt5WillPublish offline = Mqtt5WillPublish.builder()
                .topic("device/status")
                .qos(MqttQos.AT_LEAST_ONCE)
                .retain(false)
                .payload(JSONObject.toJSONString(Map.of("status", "offline")).getBytes(StandardCharsets.UTF_8))
                .build();


        // set will message when create connect
        Mqtt5BlockingClient client = clientBuilder.useMqttVersion5()
                .willPublish(offline)
                .buildBlocking();

        // connect
        client.connect();
        
        // success connect, publish a retain massage of online
        if (client.getState().isConnected()) {
            Mqtt5WillPublish online = Mqtt5WillPublish.builder()
                    .topic("device/status")
                    .qos(MqttQos.AT_LEAST_ONCE)
                    .retain(true)
                    .payload(JSONObject.toJSONString(Map.of("status", "online")).getBytes(StandardCharsets.UTF_8))
                    .build();
            client.publish(online);
        }
    }
}

Subscribe:

import cn.hutool.core.util.StrUtil;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscription;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

/**
 * subscribe
 * @author chenchen
 * @Date 2023/11/17
 */
public class Subscriber {
    public static void main(String[] args) {
        Mqtt5AsyncClient client = MqttClient.builder()
                .addConnectedListener((context) -> {
                    MqttClientState clientState = context.getClientConfig().getState();
                    System.out.println(clientState.isConnected());
                })
                .addDisconnectedListener(context -> System.out.println(context.getCause().getMessage()))
                .serverHost("broker.hivemq.com")
                .serverPort(1883)
                .useMqttVersion5()
                .buildAsync();

        // subscribe topic
        Mqtt5Subscribe subscribe = Mqtt5Subscribe.builder()
                .addSubscription(Mqtt5Subscription.builder()
                        .topicFilter("device/status")
                        .build())
                .build();
        // connect
        client.connect();

        // handle message
        client.subscribe(subscribe, mqtt5Publish -> {
            Optional<ByteBuffer> payload = mqtt5Publish.getPayload();
            payload.ifPresent(byteBuffer -> System.out.println("receive message with payload:" + StrUtil.str(byteBuffer, StandardCharsets.UTF_8)));
        });
    }
}

引用

《物联网系统开发:从0到1构建IoT平台》付强
Will : https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901042
Return: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104
hivemq : https://github.com/hivemq/hivemq-mqtt-client
Mqtt Server: broker.hivemq.com
0

评论 (0)

取消