diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java index 43f4d87..a437e82 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java @@ -1,12 +1,12 @@ package cn.iocoder.yudao.module.mqtt.kafka; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.TopicPartition; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; @@ -14,28 +14,9 @@ import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; @Configuration +@Slf4j public class KafkaConfig { - /** - * 配置一个通用的错误处理器. - * 它会在反序列化失败或监听器方法抛出异常时生效. - * @param operations KafkaTemplate 实例 - * @return a DefaultErrorHandler - */ - @Bean - public DefaultErrorHandler errorHandler(KafkaOperations operations) { - // 1. 创建 DeadLetterPublishingRecoverer,它负责将失败的消息推送到 DLT - // 这里我们为所有失败的消息都指定了一个通用的 DLT 主题 - DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(operations, - (rec, ex) -> new TopicPartition(rec.topic() + ".DLT", rec.partition())); - - // 根据业务需求选择合适的策略。 - FixedBackOff backOff = new FixedBackOff(1000L, 2L); - - // 3. 创建并返回 DefaultErrorHandler - // 当重试耗尽后,会调用 recoverer 将消息送入 DLT - return new DefaultErrorHandler(recoverer, backOff); - } @Bean("batchFactory") public KafkaListenerContainerFactory batchFactory( @@ -49,28 +30,33 @@ public class KafkaConfig { // 1. 开启批量监听 factory.setBatchListener(true); - // 2. 并发数 (按分区数调整) + // 2. 并发数(按分区数调整) factory.setConcurrency(6); - // 3. 批量手动 ACK (配合 enable-auto-commit: false) factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); factory.getContainerProperties().setPollTimeout(3000); // 4. 错误处理与死信队列 - // 定义死信发送器 - DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer( + kafkaTemplate, (record, ex) -> { - // 策略:原 Topic + ".DLT" (例如 zds_up -> zds_up.DLT) - // 这样比写死 KafkaTopicType.DEAD_LETTER_TOPIC 更灵活,不同业务互不干扰 - return new TopicPartition(record.topic() + ".DLT", record.partition()); + + log.warn("[DLT] 消息进入死信队列, topic={}, partition={}, offset={}, error={}", + record.topic(), record.partition(), record.offset(), + ex.getMessage()); + return new TopicPartition(record.topic() + ".DLT", 0); }); - // 定义回退策略:间隔 1秒,重试 2次 - // 注意:Batch 模式下,如果重试失败,整批数据都会进死信 - DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 2L)); - // 配置给工厂 + // 【问题4 FIX】批量场景下重试次数设为 0,失败立即进 DLT + // 避免整批阻塞消费线程,由 DLT 消费者做单条重试 + // 如果业务可以接受短暂阻塞,可改为 FixedBackOff(1000L, 1L) 最多重试1次 + DefaultErrorHandler errorHandler = new DefaultErrorHandler( + recoverer, + new FixedBackOff(0L, 0L) + ); + factory.setCommonErrorHandler(errorHandler); return factory; } -} +} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java index 08bb13e..704e993 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -54,7 +54,7 @@ public class BatchDeviceMessageProcessor { private FenceAlarmService fenceAlarmService; @Resource private KafkaTemplate kafkaTemplate; - private List> records; + /** @@ -94,7 +94,6 @@ public class BatchDeviceMessageProcessor { } private BatchContext prepareBatchContext(List> records) { - this.records = records; BatchContext context = new BatchContext(); // 1. 提取所有有效的 SNs @@ -116,7 +115,6 @@ public class BatchDeviceMessageProcessor { .filter(sn -> !context.snToTenantMap.containsKey(sn)) .toList(); if (CollectionUtils.isNotEmpty(missingSns)) { - // 2.1 查库判断设备是否真的不存在(可能只是 Redis 中丢失了映射) QueryWrapper query = new QueryWrapper<>(); query.in("sn", missingSns); List dbDetectors = handDetectorService.listAll(query); @@ -126,7 +124,6 @@ public class BatchDeviceMessageProcessor { existingDbSns.add(dbDetector.getSn()); if (dbDetector.getTenantId() != null) { context.snToTenantMap.put(dbDetector.getSn(), Long.valueOf(dbDetector.getTenantId())); - // 顺便回填 Redis,防止下次继续穿透查库 try { redisUtil.hset(RedisKeyUtil.getDeviceTenantMappingKey(), dbDetector.getSn(), dbDetector.getTenantId()); } catch (Exception e) { @@ -159,12 +156,8 @@ public class BatchDeviceMessageProcessor { } try { - // 保存到数据库。注意: 这里使用 save 单条保存,如果项目中存在 saveBatch(newDetectors) 则推荐替换成批量保存以提升性能。 - handDetectorService.saveList(newDetectors); log.info("[自动注册] 自动保存未知设备 {} 个,默认租户ID为 1", newDetectors.size()); - - // 将新注册的设备信息回填进上下文并更新 Redis,这样后续步骤就能流畅处理了 for (HandDetectorDO nd : newDetectors) { context.snToTenantMap.put(nd.getSn(), 1L); try { @@ -719,8 +712,8 @@ public class BatchDeviceMessageProcessor { newAlarm.setAlarmType(AlarmType.BATTERY.getType()); newAlarm.setTAlarmStart(now); newAlarm.setVAlarmFirst((double) batteryPercentage); - newAlarm.setPicX(handVo.getLatitude()); - newAlarm.setPicY(handVo.getLongitude()); + newAlarm.setPicX(handVo.getLongitude()); + newAlarm.setPicY(handVo.getLatitude()); newAlarm.setTenantId(handVo.getTenantId()); newAlarm.setCreator("system"); newAlarm.setCreateTime(now); @@ -917,6 +910,10 @@ public class BatchDeviceMessageProcessor { */ private void sendAlarmMessage(HandDataVo handVo, String gasName, Double value, boolean isAlarming, BatchContext context) { + if (null == handVo.getDeptId()){ + log.error("[sendAlarmMessage][deviceSn({}) 发送报警消息时,部门ID为空]", handVo.getSn()); + return; + } String valueStr = (value != null && value % 1 == 0) ? String.valueOf(value.intValue()) diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java index 720daa0..5d4d62d 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java @@ -6,15 +6,15 @@ import cn.iocoder.yudao.module.hand.service.TdengineService; import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent; import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog; import cn.iocoder.yudao.module.mqtt.mqtt.Client; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.util.concurrent.RateLimiter; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.Map; @@ -22,7 +22,12 @@ import java.util.Map; @Component public class HandAlarmMessageProcess { - private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0); + /** + * 【问题6 FIX】限流速率从配置文件读取,不再硬编码 + * 配置项示例:mqtt.alarm.rate-limit=3000.0 + */ + private final RateLimiter mqttRateLimiter; + @Resource private Client mqttClient; @Resource @@ -30,6 +35,11 @@ public class HandAlarmMessageProcess { @Resource private TdengineService tdengineService; + public HandAlarmMessageProcess( + @Value("${mqtt.alarm.rate-limit:3000.0}") double rateLimit) { + this.mqttRateLimiter = RateLimiter.create(rateLimit); + } + public void processSingle(String jsonValue) { // 1. 解析 Kafka 消息 AlarmDispatchEvent event = JsonUtils.parseObject(jsonValue, AlarmDispatchEvent.class); @@ -39,7 +49,7 @@ public class HandAlarmMessageProcess { } try { - // 2. 【查库】根据部门/租户/源设备SN,查询需要推送的目标设备列表 + // 2. 根据部门/租户/源设备SN,查询需要推送的目标设备列表 List targetSns = handDetectorService.getSnListByDept( event.getDeptId(), event.getTenantId(), @@ -50,23 +60,28 @@ public class HandAlarmMessageProcess { log.info("[报警推送] 无需推送,目标列表为空. SourceSN: {}", event.getSourceSn()); return; } - // 3. 执行推送逻辑 - this.publishAlarmToMqtt(targetSns, event.getMsgContent()); - - // 记录报警消息日志 - AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); - alarmMessageLog.setDetectorId(event.getId()); - alarmMessageLog.setHolderName(event.getUserName()); - alarmMessageLog.setSn(event.getSourceSn()); - alarmMessageLog.setDeptId(event.getDeptId()); - alarmMessageLog.setTenantId(event.getTenantId()); - alarmMessageLog.setMessage(event.getMsgContent()); - alarmMessageLog.setRemark("系统自动触发报警推送"); - alarmMessageLog.setPushSnList(StringUtils.join(targetSns, ",")); - - ArrayList objects = new ArrayList<>(); - objects.add(alarmMessageLog); - tdengineService.createAlarmRecord(objects); + + // 3. 执行推送,收集实际成功推送的 SN 列表 + // 【问题3 FIX】由 publishAlarmToMqtt 返回实际推送成功的列表,日志只记录成功项 + List succeededSns = publishAlarmToMqtt(targetSns, event.getMsgContent()); + + // 4. 记录报警消息日志(仅记录实际推送成功的 SN) + if (!succeededSns.isEmpty()) { + AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); + alarmMessageLog.setDetectorId(event.getId()); + alarmMessageLog.setHolderName(event.getUserName()); + alarmMessageLog.setSn(event.getSourceSn()); + alarmMessageLog.setDeptId(event.getDeptId()); + alarmMessageLog.setTenantId(event.getTenantId()); + alarmMessageLog.setMessage(event.getMsgContent()); + alarmMessageLog.setRemark("系统自动触发报警推送"); + alarmMessageLog.setPushSnList(StringUtils.join(succeededSns, ",")); + + alarmMessageLog.setTs(new Timestamp(System.currentTimeMillis())); + + + tdengineService.createAlarmRecord(List.of(alarmMessageLog)); + } } catch (Exception e) { log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e); @@ -74,30 +89,35 @@ public class HandAlarmMessageProcess { } /** - * 执行 MQTT 推送 (包含限流) + * 执行 MQTT 推送(含限流) + * */ - private void publishAlarmToMqtt(List targetSns, String message) { - if (message == null) { - return; + private List publishAlarmToMqtt(List targetSns, String message) { + List succeededSns = new ArrayList<>(); + + // 【问题5 FIX】同时检查 null 和 blank,避免推送空消息给设备 + if (StringUtils.isBlank(message)) { + log.warn("[MQTT推送] 消息内容为空,跳过推送"); + return succeededSns; } - // 构造 MQTT 消息体 - Map payload = Map.of("message", message); - String jsonPayload = JsonUtils.toJsonString(payload); for (String sn : targetSns) { if (StringUtils.isBlank(sn)) continue; - mqttRateLimiter.acquire(); + try { + // 【BUG-2 FIX】JSON 构造移至 try 块内,确保异常不会绕过限流语义 String topic = sn + "/zds_down"; - // 发送 + String jsonPayload = JsonUtils.toJsonString(Map.of("message", message)); mqttClient.publish(topic, jsonPayload); - + succeededSns.add(sn); } catch (Exception e) { - // 5. 【隔离】单个设备推送失败,不要影响列表里的下一个设备 + // 单个设备推送失败,隔离异常不影响后续设备 log.error("[MQTT推送] 单个发送失败,SN: {}", sn, e); } } + + return succeededSns; } -} +} \ No newline at end of file diff --git a/cc-admin-master/yudao-server/src/main/resources/application-dev.yaml b/cc-admin-master/yudao-server/src/main/resources/application-dev.yaml index a093932..fe196b4 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application-dev.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application-dev.yaml @@ -48,7 +48,7 @@ spring: primary: master datasource: master: - url: jdbc:mysql://192.168.0.180:3306/hand_alarm?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true # MySQL Connector/J 8.X 连接的示例 + url: jdbc:mysql://192.168.0.180:3306/hand_alarm_dev?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true # MySQL Connector/J 8.X 连接的示例 username: root password: Gsking164411 driver-class-name: com.mysql.cj.jdbc.Driver # MySQL Connector/J 8.X 连接的示例 @@ -226,7 +226,7 @@ mqtt: username: root password: roomasd111 client: - id: cc-admin-qg-dev44 + id: cc-admin-qg-dev connectionTimeout: 10 keepAliveInterval: 60 cleanSession: true @@ -240,4 +240,6 @@ mqtt: pool: coreSize: 10 maxSize: 20 - queueSize: 100 \ No newline at end of file + queueSize: 100 + alarm: + rate-limit: 3000.0 # 报警 MQTT 推送限流速率(每秒最大请求数) \ No newline at end of file diff --git a/cc-admin-master/yudao-server/src/main/resources/application-local.yaml b/cc-admin-master/yudao-server/src/main/resources/application-local.yaml index 809e2d7..9416296 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application-local.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application-local.yaml @@ -240,4 +240,6 @@ mqtt: pool: coreSize: 10 maxSize: 20 - queueSize: 100 \ No newline at end of file + queueSize: 100 + alarm: + rate-limit: 3000.0 # 报警 MQTT 推送限流速率(每秒最大请求数) \ No newline at end of file diff --git a/cc-admin-master/yudao-server/src/main/resources/application-prod.yaml b/cc-admin-master/yudao-server/src/main/resources/application-prod.yaml index e347c7b..95ae4b9 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application-prod.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application-prod.yaml @@ -251,4 +251,6 @@ mqtt: pool: coreSize: 10 maxSize: 20 - queueSize: 100 \ No newline at end of file + queueSize: 100 + alarm: + rate-limit: 3000.0 # 报警 MQTT 推送限流速率(每秒最大请求数) \ No newline at end of file