From 0a98e1e802ff1c22f63b5a264a35c876b04597fa Mon Sep 17 00:00:00 2001 From: wangwei_123 <1255324804@qq.com> Date: Tue, 7 Apr 2026 14:50:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E6=88=90=E8=8B=B1=E6=96=87=E9=80=97?= =?UTF-8?q?=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yudao/module/mqtt/kafka/KafkaConfig.java | 1 - .../module/mqtt/kafka/KafkaMessageConsumer.java | 21 +--- .../yudao/module/mqtt/mqtt/OnMessageCallback.java | 1 - .../processor/BatchDeviceMessageProcessor.java | 2 +- .../mqtt/processor/HandAlarmMessageProcess.java | 126 --------------------- .../mqtt/processor/HandAlarmMessageProcessor.java | 125 ++++++++++++++++++++ 6 files changed, 131 insertions(+), 145 deletions(-) delete mode 100644 cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java create mode 100644 cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcessor.java diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java index a437e82..3418e5d 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java @@ -47,7 +47,6 @@ public class KafkaConfig { return new TopicPartition(record.topic() + ".DLT", 0); }); - // 【问题4 FIX】批量场景下重试次数设为 0,失败立即进 DLT // 避免整批阻塞消费线程,由 DLT 消费者做单条重试 // 如果业务可以接受短暂阻塞,可改为 FixedBackOff(1000L, 1L) 最多重试1次 DefaultErrorHandler errorHandler = new DefaultErrorHandler( diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java index 8633b61..7a2ced4 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java @@ -1,38 +1,27 @@ package cn.iocoder.yudao.module.mqtt.kafka; import cn.iocoder.yudao.module.mqtt.processor.BatchDeviceMessageProcessor; -import cn.iocoder.yudao.module.mqtt.processor.DeviceMessageProcessor; -import cn.iocoder.yudao.module.mqtt.processor.HandAlarmMessageProcess; +import cn.iocoder.yudao.module.mqtt.processor.HandAlarmMessageProcessor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.core.task.TaskExecutor; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.Acknowledgment; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; -import java.util.stream.IntStream; @Slf4j @Service // <--- 添加这个注解! public class KafkaMessageConsumer { - private final HandAlarmMessageProcess handAlarmMessageProcess; + private final HandAlarmMessageProcessor handAlarmMessageProcessor; private final BatchDeviceMessageProcessor batchDeviceMessageProcessor; - public KafkaMessageConsumer(HandAlarmMessageProcess handAlarmMessageProcess, + public KafkaMessageConsumer(HandAlarmMessageProcessor handAlarmMessageProcessor, BatchDeviceMessageProcessor batchDeviceMessageProcessor) { - this.handAlarmMessageProcess = handAlarmMessageProcess; + this.handAlarmMessageProcessor = handAlarmMessageProcessor; this.batchDeviceMessageProcessor = batchDeviceMessageProcessor; } @@ -67,7 +56,7 @@ public class KafkaMessageConsumer { // 遍历 List,一条条处理 for (ConsumerRecord record : records) { try { - handAlarmMessageProcess.processSingle(record.value()); + handAlarmMessageProcessor.processSingle(record.value()); } catch (Exception e) { log.error("单条消息处理失败,已跳过。Key: {}, Value: {}", record.key(), record.value(), e); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java index 5bea96b..640e4dd 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java @@ -60,7 +60,6 @@ public class OnMessageCallback implements MqttCallbackExtended { if (from == null) return; // 3. 转发 Kafka (异步 IO) - // 配合 application.yml 的 linger.ms,底层会自动批量发送 String payload = new String(message.getPayload()); kafkaTemplate.send(suffix, sn, payload); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java index ce4fc7b..e590c4b 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -921,7 +921,7 @@ public class BatchDeviceMessageProcessor { : String.valueOf(value); String statusText = isAlarming ? "报警" : "报警结束"; - String msgContent = String.format("%s%s,%s气体浓度为%s", + String msgContent = String.format("%s%s,%s气体浓度为%s", handVo.getUserName(), statusText, gasName, valueStr); try { diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java deleted file mode 100644 index b1ad1f8..0000000 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java +++ /dev/null @@ -1,126 +0,0 @@ -package cn.iocoder.yudao.module.mqtt.processor; - -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.hand.service.HandDetectorService; -import cn.iocoder.yudao.module.hand.service.TdengineService; -import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent; -import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog; -import cn.iocoder.yudao.module.mqtt.mqtt.Client; -import com.google.common.util.concurrent.RateLimiter; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -@Slf4j -@Component -public class HandAlarmMessageProcess { - - /** - * 【问题6 FIX】限流速率从配置文件读取,不再硬编码 - * 配置项示例:mqtt.alarm.rate-limit=3000.0 - */ - private final RateLimiter mqttRateLimiter; - - @Resource - private Client mqttClient; - @Resource - private HandDetectorService handDetectorService; - @Resource - private TdengineService tdengineService; - - public HandAlarmMessageProcess( - @Value("${mqtt.alarm.rate-limit:3000.0}") double rateLimit) { - this.mqttRateLimiter = RateLimiter.create(rateLimit); - } - - public void processSingle(String jsonValue) { - // 1. 解析 Kafka 消息 - AlarmDispatchEvent event = JsonUtils.parseObject(jsonValue, AlarmDispatchEvent.class); - if (event == null) { - log.warn("[报警推送] 消息解析为空,忽略: {}", jsonValue); - return; - } - - try { - // 2. 根据部门/租户/源设备SN,查询需要推送的目标设备列表 - List targetSns = handDetectorService.getSnListByDept( - event.getDeptId(), - event.getTenantId(), - event.getSourceSn() - ); - - if (targetSns == null || targetSns.isEmpty()) { - log.info("[报警推送] 无需推送,目标列表为空. SourceSN: {}", event.getSourceSn()); - return; - } - - // 3. 执行推送,收集实际成功推送的 SN 列表 - List succeededSns = publishAlarmToMqtt(targetSns, event.getMsgContent()); - - // 4. 记录报警消息日志(仅记录实际推送成功的 SN) - if (!succeededSns.isEmpty()) { - AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); - alarmMessageLog.setDetectorId(event.getId()); - alarmMessageLog.setHolderName(event.getUserName()); - alarmMessageLog.setSn(event.getSourceSn()); - alarmMessageLog.setDeptId(event.getDeptId()); - alarmMessageLog.setTenantId(event.getTenantId()); - alarmMessageLog.setMessage(event.getMsgContent()); - alarmMessageLog.setRemark("系统自动触发报警推送"); - alarmMessageLog.setPushSnList(StringUtils.join(succeededSns, ",")); - - alarmMessageLog.setTs(new Timestamp(System.currentTimeMillis())); - - - tdengineService.createAlarmRecord(List.of(alarmMessageLog)); - } - - } catch (Exception e) { - log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e); - } - } - - /** - * 执行 MQTT 推送(含限流) - * - */ - private List publishAlarmToMqtt(List targetSns, String message) { - List succeededSns = new ArrayList<>(); - - // 【问题5 FIX】同时检查 null 和 blank,避免推送空消息给设备 - if (StringUtils.isBlank(message)) { - log.warn("[MQTT推送] 消息内容为空,跳过推送"); - return succeededSns; - } - - for (String sn : targetSns) { - if (StringUtils.isBlank(sn)) continue; - - mqttRateLimiter.acquire(); - - try { - // 【BUG-2 FIX】JSON 构造移至 try 块内,确保异常不会绕过限流语义 - String topic = sn + "/zds_down"; - String jsonPayload = JsonUtils.toJsonString(Map.of("message", message)); - mqttClient.publish(topic, jsonPayload); - succeededSns.add(sn); - } catch (Exception e) { - // 单个设备推送失败,隔离异常不影响后续设备 - log.error("[MQTT推送] 单个发送失败,SN: {}", sn, e); - } - } - - return succeededSns; - } - - public static void main(String[] args) { - - } -} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcessor.java new file mode 100644 index 0000000..ff611ec --- /dev/null +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcessor.java @@ -0,0 +1,125 @@ +package cn.iocoder.yudao.module.mqtt.processor; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.hand.service.HandDetectorService; +import cn.iocoder.yudao.module.hand.service.TdengineService; +import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent; +import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog; +import cn.iocoder.yudao.module.mqtt.mqtt.Client; +import com.google.common.util.concurrent.RateLimiter; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Slf4j +@Component +public class HandAlarmMessageProcessor { + + /** + * 【问题6 FIX】限流速率从配置文件读取,不再硬编码 + * 配置项示例:mqtt.alarm.rate-limit=3000.0 + */ + private final RateLimiter mqttRateLimiter; + + @Resource + private Client mqttClient; + @Resource + private HandDetectorService handDetectorService; + @Resource + private TdengineService tdengineService; + + public HandAlarmMessageProcessor( + @Value("${mqtt.alarm.rate-limit:3000.0}") double rateLimit) { + this.mqttRateLimiter = RateLimiter.create(rateLimit); + } + + public void processSingle(String jsonValue) { + // 1. 解析 Kafka 消息 + AlarmDispatchEvent event = JsonUtils.parseObject(jsonValue, AlarmDispatchEvent.class); + if (event == null) { + log.warn("[报警推送] 消息解析为空,忽略: {}", jsonValue); + return; + } + + try { + // 2. 根据部门/租户/源设备SN,查询需要推送的目标设备列表 + List targetSns = handDetectorService.getSnListByDept( + event.getDeptId(), + event.getTenantId(), + event.getSourceSn() + ); + + if (targetSns == null || targetSns.isEmpty()) { + log.info("[报警推送] 无需推送,目标列表为空. SourceSN: {}", event.getSourceSn()); + return; + } + + // 3. 执行推送,收集实际成功推送的 SN 列表 + List succeededSns = publishAlarmToMqtt(targetSns, event.getMsgContent()); + + // 4. 记录报警消息日志(仅记录实际推送成功的 SN) + if (!succeededSns.isEmpty()) { + AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); + alarmMessageLog.setDetectorId(event.getId()); + alarmMessageLog.setHolderName(event.getUserName()); + alarmMessageLog.setSn(event.getSourceSn()); + alarmMessageLog.setDeptId(event.getDeptId()); + alarmMessageLog.setTenantId(event.getTenantId()); + alarmMessageLog.setMessage(event.getMsgContent()); + alarmMessageLog.setRemark("系统自动触发报警推送"); + alarmMessageLog.setPushSnList(StringUtils.join(succeededSns, ",")); + + alarmMessageLog.setTs(new Timestamp(System.currentTimeMillis())); + + + tdengineService.createAlarmRecord(List.of(alarmMessageLog)); + } + + } catch (Exception e) { + log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e); + } + } + + /** + * 执行 MQTT 推送(含限流) + * + */ + private List publishAlarmToMqtt(List targetSns, String message) { + List succeededSns = new ArrayList<>(); + + // 【问题5 FIX】同时检查 null 和 blank,避免推送空消息给设备 + if (StringUtils.isBlank(message)) { + log.warn("[MQTT推送] 消息内容为空,跳过推送"); + return succeededSns; + } + + for (String sn : targetSns) { + if (StringUtils.isBlank(sn)) continue; + + mqttRateLimiter.acquire(); + + try { + String topic = sn + "/zds_down"; + String jsonPayload = JsonUtils.toJsonString(Map.of("message", message)); + mqttClient.publish(topic, jsonPayload); + succeededSns.add(sn); + } catch (Exception e) { + // 单个设备推送失败,隔离异常不影响后续设备 + log.error("[MQTT推送] 单个发送失败,SN: {}", sn, e); + } + } + + return succeededSns; + } + + public static void main(String[] args) { + + } +} \ No newline at end of file