diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java index a437e82..3418e5d 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java @@ -47,7 +47,6 @@ public class KafkaConfig { return new TopicPartition(record.topic() + ".DLT", 0); }); - // 【问题4 FIX】批量场景下重试次数设为 0,失败立即进 DLT // 避免整批阻塞消费线程,由 DLT 消费者做单条重试 // 如果业务可以接受短暂阻塞,可改为 FixedBackOff(1000L, 1L) 最多重试1次 DefaultErrorHandler errorHandler = new DefaultErrorHandler( diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java index 8633b61..7a2ced4 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java @@ -1,38 +1,27 @@ package cn.iocoder.yudao.module.mqtt.kafka; import cn.iocoder.yudao.module.mqtt.processor.BatchDeviceMessageProcessor; -import cn.iocoder.yudao.module.mqtt.processor.DeviceMessageProcessor; -import cn.iocoder.yudao.module.mqtt.processor.HandAlarmMessageProcess; +import cn.iocoder.yudao.module.mqtt.processor.HandAlarmMessageProcessor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.core.task.TaskExecutor; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.Acknowledgment; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; -import java.util.stream.IntStream; @Slf4j @Service // <--- 添加这个注解! public class KafkaMessageConsumer { - private final HandAlarmMessageProcess handAlarmMessageProcess; + private final HandAlarmMessageProcessor handAlarmMessageProcessor; private final BatchDeviceMessageProcessor batchDeviceMessageProcessor; - public KafkaMessageConsumer(HandAlarmMessageProcess handAlarmMessageProcess, + public KafkaMessageConsumer(HandAlarmMessageProcessor handAlarmMessageProcessor, BatchDeviceMessageProcessor batchDeviceMessageProcessor) { - this.handAlarmMessageProcess = handAlarmMessageProcess; + this.handAlarmMessageProcessor = handAlarmMessageProcessor; this.batchDeviceMessageProcessor = batchDeviceMessageProcessor; } @@ -67,7 +56,7 @@ public class KafkaMessageConsumer { // 遍历 List,一条条处理 for (ConsumerRecord record : records) { try { - handAlarmMessageProcess.processSingle(record.value()); + handAlarmMessageProcessor.processSingle(record.value()); } catch (Exception e) { log.error("单条消息处理失败,已跳过。Key: {}, Value: {}", record.key(), record.value(), e); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java index 5bea96b..640e4dd 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java @@ -60,7 +60,6 @@ public class OnMessageCallback implements MqttCallbackExtended { if (from == null) return; // 3. 转发 Kafka (异步 IO) - // 配合 application.yml 的 linger.ms,底层会自动批量发送 String payload = new String(message.getPayload()); kafkaTemplate.send(suffix, sn, payload); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java index ce4fc7b..e590c4b 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -921,7 +921,7 @@ public class BatchDeviceMessageProcessor { : String.valueOf(value); String statusText = isAlarming ? "报警" : "报警结束"; - String msgContent = String.format("%s%s,%s气体浓度为%s", + String msgContent = String.format("%s%s,%s气体浓度为%s", handVo.getUserName(), statusText, gasName, valueStr); try { diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcessor.java similarity index 96% rename from cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java rename to cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcessor.java index b1ad1f8..ff611ec 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcessor.java @@ -20,7 +20,7 @@ import java.util.Map; @Slf4j @Component -public class HandAlarmMessageProcess { +public class HandAlarmMessageProcessor { /** * 【问题6 FIX】限流速率从配置文件读取,不再硬编码 @@ -35,7 +35,7 @@ public class HandAlarmMessageProcess { @Resource private TdengineService tdengineService; - public HandAlarmMessageProcess( + public HandAlarmMessageProcessor( @Value("${mqtt.alarm.rate-limit:3000.0}") double rateLimit) { this.mqttRateLimiter = RateLimiter.create(rateLimit); } @@ -106,7 +106,6 @@ public class HandAlarmMessageProcess { mqttRateLimiter.acquire(); try { - // 【BUG-2 FIX】JSON 构造移至 try 块内,确保异常不会绕过限流语义 String topic = sn + "/zds_down"; String jsonPayload = JsonUtils.toJsonString(Map.of("message", message)); mqttClient.publish(topic, jsonPayload);