遗愿消息 - LWT(Last Will and Testament)
根据MQTT协议,在创建链接时可设置遗愿消息(Will Message)。 它属于链接的一个配置信息,当链接意外断开时将发送该消息。
包含但不限于以下情况将发送该信息:
- 服务端发现I/O或网络错误;
- Client未在Keep Alive time规定时间内进行通信;
- Client关闭网络链接时未发送0x00代码的“DISCONNECT”包 (非正常关闭);
- Server 关闭网络链接时未收到0x00代码的“DISCONNECT”包(非正常关闭);
遗愿被储存在服务端并与Session进行关联。遗愿消息存储在CONNECT的payload中,由三部分组成:
- Will Properties : 遗愿属性,一个自定义map。
- Will Topic : 遗愿主题。
- Will Payload fields: 遗愿payload,与普通message的payload一样,可自定义。
保留消息 -Retain Message
保留消息是指发布Message的retain标识为1,当服务端收到一条保留消息:
- 服务端替换该Topic已有的保留消息;
- 订阅者会立即收到消息;
- 未来的订阅者订阅后会立即收到该消息。
即Topic只会保存最新的一条保留信息,已订阅的会立即收到,未来订阅的会在订阅时收到该保留消息。
清除Topic中的保留消息:
如果保留消息的Payload为空(0字节),服务端会该Topic中的保留消息进行移除,并且Topic将不会存储该保留消息。
遗愿消息+保留消息实现设备状态监测
1、订阅者首先启动,订阅device/status
2、设备创建连接时设置保留遗愿消息:{"status":"offline"}
3、设备链接成功后,立即发送保留消息:{"status":"online"}
此时,订阅者会立即收到消息:{"status":"online"} 设备上线
场景1: 订阅者重启
1、订阅者会立即收到保留消息:{"status":"online"} 此时设备时在线的。
场景2: 设备重启
1、设备断开,触发遗愿消息:{"status":"offline"} 设备离线
2、设备连接,收到上线的保留消息:{"status":"online"}
至此,完成设备状态监测。
源码 - Java
Device:
import com.alibaba.fastjson2.JSONObject;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttClientBuilder;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublish;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* publish message
*
* @author chenchen
* @Date 2023/11/17
*/
public class Publisher {
public static void main(String[] args) {
MqttClientBuilder clientBuilder = MqttClient.builder()
.identifier("cc-status-monitor")
.addConnectedListener((context) -> {
MqttClientState clientState = context.getClientConfig().getState();
System.out.println(clientState.isConnected());
})
.addDisconnectedListener(context -> System.out.println(context.getCause().getMessage()))
.serverHost("broker.hivemq.com")
.serverPort(1883);
// will message with retain
Mqtt5WillPublish offline = Mqtt5WillPublish.builder()
.topic("device/status")
.qos(MqttQos.AT_LEAST_ONCE)
.retain(false)
.payload(JSONObject.toJSONString(Map.of("status", "offline")).getBytes(StandardCharsets.UTF_8))
.build();
// set will message when create connect
Mqtt5BlockingClient client = clientBuilder.useMqttVersion5()
.willPublish(offline)
.buildBlocking();
// connect
client.connect();
// success connect, publish a retain massage of online
if (client.getState().isConnected()) {
Mqtt5WillPublish online = Mqtt5WillPublish.builder()
.topic("device/status")
.qos(MqttQos.AT_LEAST_ONCE)
.retain(true)
.payload(JSONObject.toJSONString(Map.of("status", "online")).getBytes(StandardCharsets.UTF_8))
.build();
client.publish(online);
}
}
}
Subscribe:
import cn.hutool.core.util.StrUtil;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscription;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
/**
* subscribe
* @author chenchen
* @Date 2023/11/17
*/
public class Subscriber {
public static void main(String[] args) {
Mqtt5AsyncClient client = MqttClient.builder()
.addConnectedListener((context) -> {
MqttClientState clientState = context.getClientConfig().getState();
System.out.println(clientState.isConnected());
})
.addDisconnectedListener(context -> System.out.println(context.getCause().getMessage()))
.serverHost("broker.hivemq.com")
.serverPort(1883)
.useMqttVersion5()
.buildAsync();
// subscribe topic
Mqtt5Subscribe subscribe = Mqtt5Subscribe.builder()
.addSubscription(Mqtt5Subscription.builder()
.topicFilter("device/status")
.build())
.build();
// connect
client.connect();
// handle message
client.subscribe(subscribe, mqtt5Publish -> {
Optional<ByteBuffer> payload = mqtt5Publish.getPayload();
payload.ifPresent(byteBuffer -> System.out.println("receive message with payload:" + StrUtil.str(byteBuffer, StandardCharsets.UTF_8)));
});
}
}
引用
《物联网系统开发:从0到1构建IoT平台》付强
Will : https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901042
Return: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104
hivemq : https://github.com/hivemq/hivemq-mqtt-client
Mqtt Server: broker.hivemq.com
评论 (0)