From 599f82eebaeef9de3f0660dcdaf65b94c403ff27 Mon Sep 17 00:00:00 2001 From: wangwei_123 <1255324804@qq.com> Date: Wed, 10 Dec 2025 17:52:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=8B=E6=8C=81=E8=A1=A8=E6=8A=A5=E8=AD=A6?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yudao/module/mqtt/kafka/KafkaConfig.java | 2 +- .../module/mqtt/kafka/KafkaMessageConsumer.java | 10 +-- .../cn/iocoder/yudao/module/mqtt/mqtt/Client.java | 45 ++++++------ .../yudao/module/mqtt/mqtt/ThreadPoolConfig.java | 2 +- .../processor/BatchDeviceMessageProcessor.java | 82 +++++++++++++++++++--- 5 files changed, 96 insertions(+), 45 deletions(-) diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java index 5af7af7..8e39084 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java @@ -48,7 +48,7 @@ public class KafkaConfig { factory.setBatchListener(true); // 【并发设置】根据你的分区数设置,比如你有 3 个分区就设为 3 - factory.setConcurrency(3); + factory.setConcurrency(6); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); factory.getContainerProperties().setPollTimeout(3000); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java index 57a7257..da9b533 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java @@ -24,24 +24,16 @@ import static cn.iocoder.yudao.module.mqtt.kafka.KafkaTopicType.DEAD_LETTER_TOPI @Slf4j @Service // <--- 添加这个注解! public class KafkaMessageConsumer { - - private final TaskExecutor taskExecutor; private final DeviceMessageProcessor deviceMessageProcessor; private final BatchDeviceMessageProcessor batchDeviceMessageProcessor; - private final KafkaTemplate kafkaTemplate; public KafkaMessageConsumer(DeviceMessageProcessor deviceMessageProcessor, - @Qualifier("mqttExecutor") TaskExecutor taskExecutor, - BatchDeviceMessageProcessor batchDeviceMessageProcessor, - KafkaTemplate kafkaTemplate) { + BatchDeviceMessageProcessor batchDeviceMessageProcessor) { this.deviceMessageProcessor = deviceMessageProcessor; - this.taskExecutor = taskExecutor; this.batchDeviceMessageProcessor = batchDeviceMessageProcessor; - this.kafkaTemplate = kafkaTemplate; - } // 使用你定义的 batchFactory diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java index 069574b..5d236b7 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java @@ -21,6 +21,22 @@ import java.util.concurrent.BlockingQueue; @Component public class Client { private final OnMessageCallback onMessageCallback; // 1. 注入我们新的 Kafka 生产者回调 + // 【修改点2】定义全局通用的发送回调,避免每次发送都创建对象,减少GC + private final IMqttActionListener globalPublishCallback = new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + // 高并发下,成功通常不打印日志,否则磁盘IO会爆炸 + // log.debug("发送成功: {}", asyncActionToken.getTopics()); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + // 只有失败才打印日志 + log.error("MQTT异步发送失败, Topic: {}", + asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null", + exception); + } + }; // --- 1. 配置注入 (保持不变) --- @Value("${mqtt.enable:false}") private Boolean enable; @@ -44,27 +60,9 @@ public class Client { private int[] subscribeQos; @Value("${mqtt.default.publishQos:0}") private int publishQos; - //使用异步客户端接口 private IMqttAsyncClient mqttClient; - // 【修改点2】定义全局通用的发送回调,避免每次发送都创建对象,减少GC - private final IMqttActionListener globalPublishCallback = new IMqttActionListener() { - @Override - public void onSuccess(IMqttToken asyncActionToken) { - // 高并发下,成功通常不打印日志,否则磁盘IO会爆炸 - // log.debug("发送成功: {}", asyncActionToken.getTopics()); - } - - @Override - public void onFailure(IMqttToken asyncActionToken, Throwable exception) { - // 只有失败才打印日志 - log.error("MQTT异步发送失败, Topic: {}", - asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null", - exception); - } - }; - public Client(OnMessageCallback onMessageCallback) { this.onMessageCallback = onMessageCallback; } @@ -118,9 +116,12 @@ public class Client { options.setKeepAliveInterval(keepAliveInterval); options.setCleanSession(cleanSession); options.setAutomaticReconnect(true); + options.setCleanSession(false); + // 建议设置:超时和保活时间 + options.setConnectionTimeout(10); + options.setKeepAliveInterval(20); - - //对于 10w QPS,必须拉满到 MQTT 协议上限 (65535) + //对于 10w QPS,必须拉满到 MQTT 协议上限 (65535) options.setMaxInflight(65535); return options; } @@ -138,6 +139,7 @@ public class Client { public void onSuccess(IMqttToken asyncActionToken) { log.info("订阅成功"); } + @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("订阅失败", exception); @@ -150,9 +152,6 @@ public class Client { // --- 发布方法 --- - /** - * 兼容旧代码的重载方法 - */ public void publish(String topic, String payload) { publishAsync(topic, payload); } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java index 4bfe0a5..99936b1 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java @@ -33,7 +33,7 @@ public class ThreadPoolConfig { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); - executor.setQueueCapacity(5000); // 队列大一点,应对瞬间并发 + executor.setQueueCapacity(10000); // 队列大一点,应对瞬间并发 executor.setThreadNamePrefix("alarm-push-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java index 8cc4221..705f883 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -14,6 +14,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.RateLimiter; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -64,6 +65,9 @@ public class BatchDeviceMessageProcessor { @Resource(name = "mqttAlarmExecutor") private TaskExecutor alarmExecutor; + private final RateLimiter mqttRateLimiter = RateLimiter.create(50.0); + + /** * 批量处理 Kafka 消息的主入口 */ @@ -218,13 +222,37 @@ public class BatchDeviceMessageProcessor { alarmProcessor.addToBatch(context.alarmMessageLogs); log.debug("[批量持久化] 报警消息: {} 条", context.alarmMessageLogs.size()); } - // 4. 批量创建气体报警 if (!context.gasAlarmsToCreate.isEmpty()) { + // 2. 批量保存 + // MybatisPlus 会把生成的 ID 填入 doList 的对象中 handAlarmService.batchCreateHandAlarm(context.gasAlarmsToCreate); log.debug("[批量持久化] 新增气体报警: {} 条", context.gasAlarmsToCreate.size()); - } + // 3. 原有的回填 Redis 逻辑 (现在 alarmVo.getId() 有值了) + context.pendingAlarmIds.forEach((sn, alarmVo) -> { + // 1. 确保数据库确实生成了 ID + if (alarmVo.getId() != null) { + // 2. 找到准备更新到 Redis 的设备对象 + HandDataVo redisVo = context.redisUpdates.get(sn); + + // 3. 如果 Redis 更新队列里没有,尝试从全局缓存取 + if (redisVo == null) { + redisVo = context.snToDeviceMap.get(sn); + if (redisVo != null) { + context.redisUpdates.put(sn, redisVo); + } + } + // 4. 执行回填 + if (redisVo != null) { + redisVo.setAlarmId(alarmVo.getId()); + // log.info("SN: {} 回填气体报警ID: {}", sn, alarmVo.getId()); + } + } else { + log.error("SN: {} 气体报警保存后 ID 仍为空,请检查 Service 层事务或 ID 生成策略", sn); + } + }); + } // 5. 批量更新气体报警 if (!context.gasAlarmsToUpdate.isEmpty()) { handAlarmService.batchUpdateById(context.gasAlarmsToUpdate); @@ -235,6 +263,23 @@ public class BatchDeviceMessageProcessor { if (!context.fenceAlarmsToCreate.isEmpty()) { fenceAlarmService.batchCreateFenceAlarm(context.fenceAlarmsToCreate); log.debug("[批量持久化] 新增围栏报警: {} 条", context.fenceAlarmsToCreate.size()); + + // ★★★【核心修改:围栏报警 ID 回填】★★★ + context.pendingFenceAlarmIds.forEach((sn, fenceAlarmVo) -> { + if (fenceAlarmVo.getId() != null) { + HandDataVo redisVo = context.redisUpdates.get(sn); + if (redisVo == null) { + redisVo = context.snToDeviceMap.get(sn); + if (redisVo != null) context.redisUpdates.put(sn, redisVo); + } + + if (redisVo != null) { + // 假设 HandDataVo 有 setFenceAlarmId 方法,或者业务逻辑复用 setAlarmId + // 请根据你的 HandDataVo 实际字段修改下面这行: + redisVo.setFenceAlarmId(fenceAlarmVo.getId()); + } + } + }); } // 7. 批量更新围栏报警 @@ -444,6 +489,7 @@ public class BatchDeviceMessageProcessor { return; } + LocalDateTime now = LocalDateTime.now(); // 处理离线报警恢复 @@ -455,6 +501,8 @@ public class BatchDeviceMessageProcessor { alarmToEnd.setId(handVo.getAlarmId()); alarmToEnd.setTAlarmEnd(now); alarmToEnd.setStatus(EnableStatus.HANDLE.value()); + alarmToEnd.setRemark("系统已自动处理报警"); + context.gasAlarmsToUpdate.add(alarmToEnd); handVo.setAlarmId(null); @@ -474,6 +522,12 @@ public class BatchDeviceMessageProcessor { // 发送报警结束消息 sendAlarmMessage(handVo, alarmRule.getGasTypeName(), handVo.getValue(), false, context); handVo.setLastPushValue(null); + HandAlarmDO alarmToEnd = new HandAlarmDO(); + alarmToEnd.setId(handVo.getAlarmId()); + alarmToEnd.setTAlarmEnd(now); + alarmToEnd.setStatus(EnableStatus.HANDLE.value()); + alarmToEnd.setRemark("系统已自动处理报警"); + context.gasAlarmsToUpdate.add(alarmToEnd); return; } @@ -495,17 +549,20 @@ public class BatchDeviceMessageProcessor { handVo.setMaxAlarmLevel(newLevel); // 创建新报警 - HandAlarmSaveReqVO newAlarm = new HandAlarmSaveReqVO(); + HandAlarmDO newAlarm = new HandAlarmDO(); newAlarm.setDetectorId(handVo.getId()); newAlarm.setSn(handVo.getSn()); + newAlarm.setName(handVo.getName()); newAlarm.setVAlarmFirst(handVo.getFirstValue()); + newAlarm.setVAlarmMaximum(handVo.getFirstValue()); newAlarm.setGasType(handVo.getGasChemical()); + newAlarm.setDeptId(handVo.getDeptId()); newAlarm.setPicX(handVo.getLongitude()); newAlarm.setPicY(handVo.getLatitude()); newAlarm.setAlarmType(AlarmType.GAS.getType()); newAlarm.setAlarmLevel(newLevel); newAlarm.setTAlarmStart(now); - newAlarm.setTenantId(Math.toIntExact(handVo.getTenantId())); + newAlarm.setTenantId(handVo.getTenantId()); newAlarm.setUnit(handVo.getUnit()); newAlarm.setCreator("system"); newAlarm.setCreateTime(now); @@ -570,7 +627,7 @@ public class BatchDeviceMessageProcessor { // 从正常变为报警 if (shouldAlarm && !isCurrentlyAlarming) { - HandAlarmSaveReqVO newAlarm = new HandAlarmSaveReqVO(); + HandAlarmDO newAlarm = new HandAlarmDO(); newAlarm.setDetectorId(handVo.getId()); newAlarm.setSn(handVo.getSn()); newAlarm.setAlarmType(AlarmType.BATTERY.getType()); @@ -578,7 +635,7 @@ public class BatchDeviceMessageProcessor { newAlarm.setVAlarmFirst((double) batteryPercentage); newAlarm.setPicX(handVo.getLatitude()); newAlarm.setPicY(handVo.getLongitude()); - newAlarm.setTenantId(Math.toIntExact(handVo.getTenantId())); + newAlarm.setTenantId(handVo.getTenantId()); newAlarm.setCreator("system"); newAlarm.setCreateTime(now); @@ -655,7 +712,7 @@ public class BatchDeviceMessageProcessor { if (!hasOngoingAlarm) { // 触发新报警 - FenceAlarmSaveReqVO newAlarm = new FenceAlarmSaveReqVO(); + FenceAlarmDO newAlarm = new FenceAlarmDO(); newAlarm.setDetectorId(handVo.getId()); newAlarm.setSn(handVo.getSn()); newAlarm.setType(fenceType.getType()); @@ -823,6 +880,9 @@ public class BatchDeviceMessageProcessor { for (String sn : targetSns) { if (StringUtils.isBlank(sn)) continue; + // 限流 + mqttRateLimiter.acquire(); + try { String topic = sn + "/zds_down"; client.publish(topic, json); @@ -898,14 +958,14 @@ public class BatchDeviceMessageProcessor { List alarmMessageLogs = new ArrayList<>(); // 待保存的报警 - List gasAlarmsToCreate = new ArrayList<>(); + List gasAlarmsToCreate = new ArrayList<>(); List gasAlarmsToUpdate = new ArrayList<>(); - List fenceAlarmsToCreate = new ArrayList<>(); + List fenceAlarmsToCreate = new ArrayList<>(); List fenceAlarmsToUpdate = new ArrayList<>(); // 待回填的ID(用于新创建的报警记录) - Map pendingAlarmIds = new HashMap<>(); - Map pendingFenceAlarmIds = new HashMap<>(); + Map pendingAlarmIds = new HashMap<>(); + Map pendingFenceAlarmIds = new HashMap<>(); // 待更新的 Redis 数据 Map redisUpdates = new HashMap<>();