|
@@ -36,10 +36,6 @@ public class IotDeviceUpstreamServer {
|
|
|
* 连接超时时间(毫秒)
|
|
|
*/
|
|
|
private static final int CONNECTION_TIMEOUT_MS = 10000;
|
|
|
- /**
|
|
|
- * 主题分隔符
|
|
|
- */
|
|
|
- private static final String TOPIC_SEPARATOR = ",";
|
|
|
/**
|
|
|
* 默认 QoS 级别
|
|
|
*/
|
|
@@ -84,42 +80,40 @@ public class IotDeviceUpstreamServer {
|
|
|
*/
|
|
|
public void start() {
|
|
|
if (isRunning) {
|
|
|
- log.warn("服务已经在运行中,请勿重复启动");
|
|
|
+ log.warn("[start][服务已经在运行中,请勿重复启动]");
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- log.info("[start] 开始启动服务");
|
|
|
+ log.info("[start][开始启动服务]");
|
|
|
|
|
|
// 1. 启动 HTTP 服务器
|
|
|
CompletableFuture<Void> httpFuture = server.listen(emqxProperties.getAuthPort())
|
|
|
.toCompletionStage()
|
|
|
.toCompletableFuture()
|
|
|
- .thenAccept(v -> log.info("[start] HTTP服务器启动完成,端口: {}", server.actualPort()));
|
|
|
+ .thenAccept(v -> log.info("[start][HTTP服务器启动完成,端口: {}]", server.actualPort()));
|
|
|
|
|
|
// 2. 连接 MQTT Broker
|
|
|
CompletableFuture<Void> mqttFuture = connectMqtt()
|
|
|
.toCompletionStage()
|
|
|
.toCompletableFuture()
|
|
|
.thenAccept(v -> {
|
|
|
- // 3. 添加 MQTT 断开重连监听器
|
|
|
+ // 2.1 添加 MQTT 断开重连监听器
|
|
|
client.closeHandler(closeEvent -> {
|
|
|
- log.warn("[closeHandler] MQTT连接已断开,准备重连");
|
|
|
+ log.warn("[closeHandler][MQTT连接已断开,准备重连]");
|
|
|
reconnectWithDelay();
|
|
|
});
|
|
|
-
|
|
|
- // 4. 设置 MQTT 消息处理器
|
|
|
+ // 2. 设置 MQTT 消息处理器
|
|
|
setupMessageHandler();
|
|
|
});
|
|
|
|
|
|
- // 等待所有服务启动完成
|
|
|
+ // 3. 等待所有服务启动完成
|
|
|
CompletableFuture.allOf(httpFuture, mqttFuture)
|
|
|
.orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
|
|
|
.whenComplete((result, error) -> {
|
|
|
if (error != null) {
|
|
|
- log.error("[start] 服务启动失败", error);
|
|
|
+ log.error("[start][服务启动失败]", error);
|
|
|
} else {
|
|
|
isRunning = true;
|
|
|
- log.info("[start] 所有服务启动完成");
|
|
|
+ log.info("[start][所有服务启动完成]");
|
|
|
}
|
|
|
});
|
|
|
}
|
|
@@ -129,7 +123,7 @@ public class IotDeviceUpstreamServer {
|
|
|
*/
|
|
|
private void setupMessageHandler() {
|
|
|
client.publishHandler(mqttMessageHandler::handle);
|
|
|
- log.debug("[setupMessageHandler] MQTT消息处理器设置完成");
|
|
|
+ log.debug("[setupMessageHandler][MQTT消息处理器设置完成]");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -137,12 +131,12 @@ public class IotDeviceUpstreamServer {
|
|
|
*/
|
|
|
private void reconnectWithDelay() {
|
|
|
if (!isRunning) {
|
|
|
- log.info("[reconnectWithDelay] 服务已停止,不再尝试重连");
|
|
|
+ log.info("[reconnectWithDelay][服务已停止,不再尝试重连]");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
vertx.setTimer(RECONNECT_DELAY_MS, id -> {
|
|
|
- log.info("[reconnectWithDelay] 开始重新连接MQTT");
|
|
|
+ log.info("[reconnectWithDelay][开始重新连接 MQTT]");
|
|
|
connectMqtt();
|
|
|
});
|
|
|
}
|
|
@@ -155,28 +149,28 @@ public class IotDeviceUpstreamServer {
|
|
|
private Future<Void> connectMqtt() {
|
|
|
return client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost())
|
|
|
.compose(connAck -> {
|
|
|
- log.info("[connectMqtt] MQTT客户端连接成功");
|
|
|
+ log.info("[connectMqtt][MQTT客户端连接成功]");
|
|
|
return subscribeToTopics();
|
|
|
})
|
|
|
- .recover(err -> {
|
|
|
- log.error("[connectMqtt] 连接MQTT Broker失败: {}", err.getMessage());
|
|
|
+ .recover(error -> {
|
|
|
+ log.error("[connectMqtt][连接MQTT Broker失败:]", error);
|
|
|
reconnectWithDelay();
|
|
|
- return Future.failedFuture(err);
|
|
|
+ return Future.failedFuture(error);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 订阅设备上行消息主题
|
|
|
*
|
|
|
- * @return 订阅结果的Future
|
|
|
+ * @return 订阅结果的 Future
|
|
|
*/
|
|
|
private Future<Void> subscribeToTopics() {
|
|
|
String[] topics = emqxProperties.getMqttTopics();
|
|
|
if (ArrayUtil.isEmpty(topics)) {
|
|
|
- log.warn("[subscribeToTopics] 未配置MQTT主题,跳过订阅");
|
|
|
+ log.warn("[subscribeToTopics][未配置MQTT主题,跳过订阅]");
|
|
|
return Future.succeededFuture();
|
|
|
}
|
|
|
- log.info("[subscribeToTopics] 开始订阅设备上行消息主题");
|
|
|
+ log.info("[subscribeToTopics][开始订阅设备上行消息主题]");
|
|
|
|
|
|
Future<Void> compositeFuture = Future.succeededFuture();
|
|
|
for (String topic : topics) {
|
|
@@ -186,11 +180,11 @@ public class IotDeviceUpstreamServer {
|
|
|
}
|
|
|
compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value())
|
|
|
.<Void>map(ack -> {
|
|
|
- log.info("[subscribeToTopics] 成功订阅主题: {}", trimmedTopic);
|
|
|
+ log.info("[subscribeToTopics][成功订阅主题: {}]", trimmedTopic);
|
|
|
return null;
|
|
|
})
|
|
|
- .recover(err -> {
|
|
|
- log.error("[subscribeToTopics] 订阅主题失败: {}, 原因: {}", trimmedTopic, err.getMessage());
|
|
|
+ .recover(error -> {
|
|
|
+ log.error("[subscribeToTopics][订阅主题失败: {}]", trimmedTopic, error);
|
|
|
return Future.<Void>succeededFuture(); // 继续订阅其他主题
|
|
|
}));
|
|
|
}
|
|
@@ -202,10 +196,10 @@ public class IotDeviceUpstreamServer {
|
|
|
*/
|
|
|
public void stop() {
|
|
|
if (!isRunning) {
|
|
|
- log.warn("[stop] 服务未运行,无需停止");
|
|
|
+ log.warn("[stop][服务未运行,无需停止]");
|
|
|
return;
|
|
|
}
|
|
|
- log.info("[stop] 开始关闭服务");
|
|
|
+ log.info("[stop][开始关闭服务]");
|
|
|
isRunning = false;
|
|
|
|
|
|
try {
|
|
@@ -224,14 +218,14 @@ public class IotDeviceUpstreamServer {
|
|
|
.orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
|
|
|
.whenComplete((result, error) -> {
|
|
|
if (error != null) {
|
|
|
- log.error("[stop] 服务关闭过程中发生异常", error);
|
|
|
+ log.error("[stop][服务关闭过程中发生异常]", error);
|
|
|
} else {
|
|
|
- log.info("[stop] 所有服务关闭完成");
|
|
|
+ log.info("[stop][所有服务关闭完成]");
|
|
|
}
|
|
|
});
|
|
|
} catch (Exception e) {
|
|
|
- log.error("[stop] 关闭服务异常", e);
|
|
|
- throw new RuntimeException("关闭IoT设备上行服务失败", e);
|
|
|
+ log.error("[stop][关闭服务异常]", e);
|
|
|
+ throw new RuntimeException("关闭 IoT 设备上行服务失败", e);
|
|
|
}
|
|
|
}
|
|
|
}
|