From 0c0dab4da601e4f175e91d408fec6a3b6286f60f Mon Sep 17 00:00:00 2001 From: wangwei_123 <1255324804@qq.com> Date: Tue, 9 Dec 2025 16:54:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=8B=E6=8C=81=E8=A1=A8=E6=89=B9=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/mqtt/config/BatchProcessorConfig.java | 37 +- .../module/mqtt/config/TdengineBatchConfig.java | 190 +++-- .../yudao/module/mqtt/kafka/KafkaConfig.java | 29 + .../module/mqtt/kafka/KafkaMessageConsumer.java | 58 +- .../cn/iocoder/yudao/module/mqtt/mqtt/Client.java | 122 ++- .../yudao/module/mqtt/mqtt/ThreadPoolConfig.java | 2 +- .../processor/BatchDeviceMessageProcessor.java | 917 +++++++++++++++++++++ .../mqtt/processor/DeviceMessageProcessor.java | 116 ++- .../controller/admin/AlarmMessageController.java | 107 --- .../controller/admin/HandTdengineController.java | 7 + .../yudao/module/hand/dal/AlarmMessageDO.java | 63 -- .../module/hand/mapper/AlarmMessageMapper.java | 33 - .../yudao/module/hand/mapper/TdengineMapper.java | 17 +- .../module/hand/service/AlarmMessageService.java | 67 -- .../module/hand/service/FenceAlarmService.java | 4 + .../module/hand/service/HandAlarmService.java | 4 + .../module/hand/service/HandDetectorService.java | 5 +- .../yudao/module/hand/service/TdengineService.java | 12 +- .../hand/service/impl/AlarmMessageServiceImpl.java | 117 --- .../hand/service/impl/FenceAlarmServiceImpl.java | 15 +- .../hand/service/impl/HandAlarmServiceImpl.java | 16 + .../hand/service/impl/HandDetectorServiceImpl.java | 49 +- .../hand/service/impl/TdengineServiceImpl.java | 89 +- .../yudao/module/hand/util/RedisKeyUtil.java | 8 +- .../iocoder/yudao/module/hand/util/RedisUtil.java | 4 + .../yudao/module/hand/vo/AlarmMessageLog.java | 55 ++ .../module/hand/vo/AlarmMessagePageReqVO.java | 41 - .../yudao/module/hand/vo/AlarmMessageRespVO.java | 51 -- .../module/hand/vo/AlarmMessageSaveReqVO.java | 36 - .../cn/iocoder/yudao/module/hand/vo/Geofence.java | 4 + .../yudao/module/hand/vo/HandTdenginePageVO.java | 10 + .../yudao/module/hand/vo/TdengineDataVo.java | 2 + .../main/resources/mapper/AlarmMessageMapper.xml | 12 - .../src/main/resources/mapper/TdengineMapper.xml | 97 ++- cc-admin-master/yudao-module-system/pom.xml | 11 +- .../src/main/resources/application-dev.yaml | 2 +- .../src/main/resources/application-local.yaml | 2 +- .../src/main/resources/application-prod.yaml | 2 +- .../src/main/resources/application.yaml | 2 + 39 files changed, 1607 insertions(+), 808 deletions(-) create mode 100644 cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java delete mode 100644 cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/AlarmMessageController.java delete mode 100644 cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/AlarmMessageDO.java delete mode 100644 cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/AlarmMessageMapper.java delete mode 100644 cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/AlarmMessageService.java delete mode 100644 cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/AlarmMessageServiceImpl.java create mode 100644 cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageLog.java delete mode 100644 cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessagePageReqVO.java delete mode 100644 cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageRespVO.java delete mode 100644 cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageSaveReqVO.java delete mode 100644 cc-admin-master/yudao-module-hand/src/main/resources/mapper/AlarmMessageMapper.xml diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java index 4063256..7cba2c9 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.mqtt.config; import cn.iocoder.yudao.module.hand.service.TdengineService; +import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog; import cn.iocoder.yudao.module.hand.vo.TdengineDataVo; import cn.iocoder.yudao.module.hand.vo.HandOriginalLog; import jakarta.annotation.Resource; @@ -20,33 +21,35 @@ public class BatchProcessorConfig { @Bean public TdengineBatchConfig handLogBatchProcessor() { return new TdengineBatchConfig<>( - "HandLogProcessor", // 处理器名字 - list -> tdengineService.saveHandLogBatch(list), // 【核心】将具体的保存逻辑作为 Lambda 传入 - 50000, // 队列容量 - 1000, // 批次大小 - 5000 // 执行频率 + "HandLogProcessor", + list -> tdengineService.saveHandLogBatch(list), + 50000, + 1000, + 5000 // 内部每 5秒 执行一次 ); } @Bean public TdengineBatchConfig handLogBatchDataProcessor() { return new TdengineBatchConfig<>( - "handLogBatchDataProcessor", // 处理器名字 - list -> tdengineService.saveDataLogBatch(list), // 【核心】将具体的保存逻辑作为 Lambda 传入 - 50000, // 队列容量 - 1000, // 批次大小 - 5000 // 执行频率 + "handLogBatchDataProcessor", + list -> tdengineService.saveDataLogBatch(list), + 50000, + 1000, + 5000 // 内部每 5秒 执行一次 ); } + @Bean + public TdengineBatchConfig messageLogBatchProcessor() { + return new TdengineBatchConfig<>( + "messageLogBatchProcessor", + list -> tdengineService.createAlarmRecord(list), + 50000, + 1000, + 30000 // 内部每 30秒 执行一次 + ); - // --- 创建一个调度器来统一调用所有处理器的 flush 方法 --- - @Scheduled(fixedRateString = "${batch.processor.flush.rate:5000}") // 从配置读取频率,默认5秒 - public void scheduleFlush() { - // 调用 handLogProcessor 的刷新方法 - handLogBatchProcessor().flush(); - handLogBatchDataProcessor().flush(); - // 如果有其他处理器,也在这里调用 } } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java index cb6460d..0a9bc43 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java @@ -1,119 +1,169 @@ package cn.iocoder.yudao.module.mqtt.config; +import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @Slf4j public class TdengineBatchConfig { - // --- 这些参数可以在构造时传入,使其更灵活 --- private final int queueCapacity; private final int batchSize; - private final long offerTimeoutMs; + private final long maxWaitTimeMs; + private final String processorName; - private final String processorName; // 用于日志,区分不同的处理器实例 private final BlockingQueue dataQueue; - private final Consumer> batchAction; // 【核心】用于处理一个批次的具体业务逻辑 + private final Consumer> batchAction; + private final AtomicBoolean running = new AtomicBoolean(true); + private Thread workerThread; - private final AtomicBoolean shuttingDown = new AtomicBoolean(false); - - /** - * 构造函数 - * @param processorName 处理器名称,用于日志区分 - * @param batchAction 一个函数,接收一个 List 并执行相应的批量操作(例如,写入数据库) - * @param queueCapacity 队列容量 - * @param batchSize 批次大小 - * @param fixedRateMs 执行频率 - */ public TdengineBatchConfig(String processorName, Consumer> batchAction, - int queueCapacity, int batchSize, long fixedRateMs) { + int queueCapacity, int batchSize, long maxWaitTimeMs) { this.processorName = processorName; this.batchAction = batchAction; this.queueCapacity = queueCapacity; this.batchSize = batchSize; - this.offerTimeoutMs = 100L; + this.maxWaitTimeMs = maxWaitTimeMs; this.dataQueue = new LinkedBlockingQueue<>(this.queueCapacity); + startWorker(); } - public void addToBatch(T data) { - if (data == null) { - return; + private void startWorker() { + this.workerThread = new Thread(this::processLoop, "TD-Worker-" + processorName); + this.workerThread.setDaemon(true); + this.workerThread.start(); + log.info("[{}] 批处理线程已启动,批次大小: {}, 最大等待: {}ms", + processorName, batchSize, maxWaitTimeMs); + } + + private void processLoop() { + List buffer = new ArrayList<>(batchSize); + long lastFlushTime = System.currentTimeMillis(); + + while (running.get()) { + try { + long now = System.currentTimeMillis(); + long waitTime = maxWaitTimeMs - (now - lastFlushTime); + if (waitTime <= 0) waitTime = 1; + + T firstItem = dataQueue.poll(waitTime, TimeUnit.MILLISECONDS); + + if (firstItem != null) { + buffer.add(firstItem); + dataQueue.drainTo(buffer, batchSize - buffer.size()); + } + + boolean sizeReached = buffer.size() >= batchSize; + boolean timeReached = (System.currentTimeMillis() - lastFlushTime) >= maxWaitTimeMs; + + if ((sizeReached || timeReached) && !buffer.isEmpty()) { + doFlush(buffer); + buffer.clear(); + lastFlushTime = System.currentTimeMillis(); + } + + } catch (InterruptedException e) { + log.warn("[{}] 工作线程被中断", processorName); + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + log.error("[{}] Loop异常", processorName, e); + } } - // 如果正在关闭,则不再接受新数据 - if (shuttingDown.get()) { - log.warn("[{}] 正在关闭,已拒绝添加新数据。", this.processorName); - return; + if (!buffer.isEmpty()) doFlush(buffer); + } + + private void doFlush(List batch) { + try { + if (log.isDebugEnabled()) { + log.debug("[{}] 触发批量入库,数量: {}", processorName, batch.size()); + } + batchAction.accept(new ArrayList<>(batch)); + } catch (Exception e) { + log.error("[{}] 批量入库失败!数量: {}", processorName, batch.size(), e); } + } + + /** + * 单条添加(保留兼容性) + */ + public void addToBatch(T data) { + if (data == null || !running.get()) return; + try { - if (!dataQueue.offer(data, offerTimeoutMs, TimeUnit.MILLISECONDS)) { - log.warn("[{}] 缓冲区已满且在 {} 毫秒内无法添加,数据可能被丢弃!当前队列大小: {}", - this.processorName, this.offerTimeoutMs, dataQueue.size()); + if (!dataQueue.offer(data, 100, TimeUnit.MILLISECONDS)) { + log.warn("[{}] 队列已满({}),丢弃数据", processorName, queueCapacity); } } catch (InterruptedException e) { - log.error("[{}] 添加数据到缓冲区时线程被中断", this.processorName, e); Thread.currentThread().interrupt(); } } - // 注意:@Scheduled 注解不能用在非 Spring Bean 的普通类方法上。 - // 我们将在下一步的配置类中解决这个问题。 - public void flush() { - if (dataQueue.isEmpty()) { + /** + * 批量添加(新增优化方法)- 关键优化点! + */ + public void addToBatch(Collection dataList) { + if (dataList == null || dataList.isEmpty() || !running.get()) { return; } - List batchList = new ArrayList<>(batchSize); - try { - int drainedCount = dataQueue.drainTo(batchList, batchSize); - if (drainedCount > 0) { - log.debug("[{}] 定时任务触发,准备将 {} 条数据进行批量处理...", this.processorName, drainedCount); - // 调用构造时传入的业务逻辑 - this.batchAction.accept(batchList); - log.debug("[{}] 成功批量处理 {} 条数据。", this.processorName, drainedCount); + + int added = 0; + int dropped = 0; + + for (T data : dataList) { + if (data == null) continue; + + try { + // 批量添加时使用较短的超时,避免阻塞太久 + if (dataQueue.offer(data, 10, TimeUnit.MILLISECONDS)) { + added++; + } else { + dropped++; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("[{}] 批量添加被中断,已添加: {}, 剩余: {}", + processorName, added, dataList.size() - added); + break; } - } catch (Exception e) { - log.error("[{}] 批量处理数据时发生严重错误!数据量: {}", this.processorName, batchList.size(), e); + } + + if (dropped > 0) { + log.warn("[{}] 批量添加完成,成功: {}, 丢弃: {} (队列已满)", + processorName, added, dropped); + } else if (log.isDebugEnabled()) { + log.debug("[{}] 批量添加完成,数量: {}", processorName, added); } } + /** + * 获取当前队列大小(用于监控) + */ + public int getQueueSize() { + return dataQueue.size(); + } + @PreDestroy public void onShutdown() { - log.info("[{}] 应用即将关闭,开始执行最后的缓冲区数据刷新...", this.processorName); - // 1. 设置关闭标志,阻止新数据进入 - shuttingDown.set(true); - - // 2. 将队列中剩余的所有数据一次性取出到一个临时列表 - List remainingData = new ArrayList<>(); - dataQueue.drainTo(remainingData); - - if (remainingData.isEmpty()) { - log.info("[{}] 缓冲区为空,无需处理。", this.processorName); - return; - } - log.info("[{}] 停机处理中,剩余 {} 条数据待处理...", this.processorName, remainingData.size()); - - // 3. 对取出的数据进行分批处理 - for (int i = 0; i < remainingData.size(); i += batchSize) { - // 计算当前批次的结束索引 - int end = Math.min(i + batchSize, remainingData.size()); - // 获取子列表作为当前批次 - List batch = remainingData.subList(i, end); + log.info("[{}] 正在停止...", processorName); + running.set(false); + if (workerThread != null) { + workerThread.interrupt(); try { - log.debug("[{}] 正在处理最后批次的数据,数量: {}", this.processorName, batch.size()); - this.batchAction.accept(batch); - } catch (Exception e) { - log.error("[{}] 关闭过程中批量处理数据时发生严重错误!数据量: {}", this.processorName, batch.size(), e); + workerThread.join(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } - - log.info("[{}] 缓冲区数据已全部处理完毕。", this.processorName); + log.info("[{}] 已停止。剩余队列: {}", processorName, dataQueue.size()); } + } \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java index fc609e6..5af7af7 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java @@ -3,7 +3,12 @@ package cn.iocoder.yudao.module.mqtt.kafka; import org.apache.kafka.common.TopicPartition; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; @@ -31,4 +36,28 @@ public class KafkaConfig { // 当重试耗尽后,会调用 recoverer 将消息送入 DLT return new DefaultErrorHandler(recoverer, backOff); } + + @Bean("batchFactory") + public KafkaListenerContainerFactory batchFactory(ConsumerFactory consumerFactory, + KafkaTemplate kafkaTemplate) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + + // 【核心设置】开启批量监听!没有这就还是单条处理 + factory.setBatchListener(true); + + // 【并发设置】根据你的分区数设置,比如你有 3 个分区就设为 3 + factory.setConcurrency(3); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); + factory.getContainerProperties().setPollTimeout(3000); + + // 创建一个 DeadLetterPublishingRecoverer,它知道如何将失败消息发送到 DLT + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, + (record, ex) -> new TopicPartition(KafkaTopicType.DEAD_LETTER_TOPIC.getValue(), record.partition())); + DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 2L)); + factory.setCommonErrorHandler(errorHandler); + + return factory; + } } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java index 40f3524..57a7257 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java @@ -1,16 +1,24 @@ package cn.iocoder.yudao.module.mqtt.kafka; +import cn.iocoder.yudao.module.mqtt.processor.BatchDeviceMessageProcessor; import cn.iocoder.yudao.module.mqtt.processor.DeviceMessageProcessor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.task.TaskExecutor; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import static cn.iocoder.yudao.module.mqtt.kafka.KafkaTopicType.DEAD_LETTER_TOPIC; @Slf4j @@ -19,40 +27,44 @@ public class KafkaMessageConsumer { private final TaskExecutor taskExecutor; private final DeviceMessageProcessor deviceMessageProcessor; + + private final BatchDeviceMessageProcessor batchDeviceMessageProcessor; + private final KafkaTemplate kafkaTemplate; public KafkaMessageConsumer(DeviceMessageProcessor deviceMessageProcessor, @Qualifier("mqttExecutor") TaskExecutor taskExecutor, + BatchDeviceMessageProcessor batchDeviceMessageProcessor, KafkaTemplate kafkaTemplate) { this.deviceMessageProcessor = deviceMessageProcessor; this.taskExecutor = taskExecutor; + this.batchDeviceMessageProcessor = batchDeviceMessageProcessor; this.kafkaTemplate = kafkaTemplate; } - @KafkaListener(topics = "zds_up") - public void handleMessage(@Payload String payload, - @Header(KafkaHeaders.RECEIVED_KEY) String key, - @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - log.debug("从 Kafka Topic 收到消息,准备处理: topic=[{}]", topic); - - // 3. 异步执行,保证kafka不阻塞 - taskExecutor.execute(() -> { - try { - // 核心业务逻辑调用 - this.deviceMessageProcessor.process(key, payload); - } catch (Exception e) { - // 1. 捕获所有运行时异常 - log.error("处理 Kafka 消息时发生严重错误!将消息发送到死信队列. Key: {}, Payload: {}", key, payload, e); - try { - // 2. 将失败的消息发送到 DLT - // 可以在消息头中添加额外信息,如原始主题、异常信息等,方便排查 - kafkaTemplate.send(DEAD_LETTER_TOPIC.getValue(), key, payload); - } catch (Exception dltEx) { - log.error("无法将消息发送到死信队列!消息可能丢失. Key: {}, Payload: {}", key, payload, dltEx); - } - } - }); + // 使用你定义的 batchFactory + + @KafkaListener(topics = "zds_up", containerFactory = "batchFactory") + public void handleBatchMessage(List> records) { + + if (records.isEmpty()) { + return; + } + log.info(">>> 开始同步处理 {} 条消息", records.size()); + try { + // 1. [成功路径] 核心业务逻辑在消费者线程中同步执行 + batchDeviceMessageProcessor.processBatch(records); + + log.info("成功处理并确认 {} 条消息", records.size()); + } catch (Exception e) { + // 2. [失败路径] 业务处理失败,转入异步DLT处理流程 + log.error("批量处理失败,将异步发送 {} 条消息到死信队列", records.size(), e); + + throw e; + } } + + } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java index 4fc5bba..069574b 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java @@ -20,7 +20,6 @@ import java.util.concurrent.BlockingQueue; @Slf4j @Component public class Client { - private final OnMessageCallback onMessageCallback; // 1. 注入我们新的 Kafka 生产者回调 // --- 1. 配置注入 (保持不变) --- @Value("${mqtt.enable:false}") @@ -43,11 +42,29 @@ public class Client { private String[] subscribeTopics; @Value("${mqtt.subscribe.qos}") private int[] subscribeQos; - @Value("${mqtt.default.publishQos:1}") + @Value("${mqtt.default.publishQos:0}") private int publishQos; - private IMqttClient mqttClient; - // 2. 构造函数注入 Spring Bean + //使用异步客户端接口 + private IMqttAsyncClient mqttClient; + + // 【修改点2】定义全局通用的发送回调,避免每次发送都创建对象,减少GC + private final IMqttActionListener globalPublishCallback = new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + // 高并发下,成功通常不打印日志,否则磁盘IO会爆炸 + // log.debug("发送成功: {}", asyncActionToken.getTopics()); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + // 只有失败才打印日志 + log.error("MQTT异步发送失败, Topic: {}", + asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null", + exception); + } + }; + public Client(OnMessageCallback onMessageCallback) { this.onMessageCallback = onMessageCallback; } @@ -64,19 +81,32 @@ public class Client { private void connect() { try { String finalClientId = baseClientId + "-" + UUID.randomUUID().toString().substring(0, 8); - mqttClient = new MqttClient(hostUrl, finalClientId, new MemoryPersistence()); - MqttConnectOptions options = createMqttConnectOptions(); + //实例化 MqttAsyncClient + mqttClient = new MqttAsyncClient(hostUrl, finalClientId, new MemoryPersistence()); - // 3. 将注入的回调 Bean 设置给 Paho 客户端 mqttClient.setCallback(onMessageCallback); - log.info("正在连接到 MQTT Broker: {}", hostUrl); - mqttClient.connect(options); - subscribeToTopics(); + MqttConnectOptions options = createMqttConnectOptions(); + + log.info("正在异步连接到 MQTT Broker: {}", hostUrl); + + // 【修改点4】执行异步连接,并在回调中处理订阅 + mqttClient.connect(options, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + log.info("MQTT 连接成功 (Async)"); + subscribeToTopics(); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + log.error("MQTT 连接失败", exception); + } + }); } catch (MqttException e) { - log.error("连接到 MQTT Broker 时发生初始错误: {}", e.getMessage(), e); + log.error("初始化 MQTT 客户端失败: {}", e.getMessage(), e); } } @@ -87,50 +117,75 @@ public class Client { options.setConnectionTimeout(connectionTimeout); options.setKeepAliveInterval(keepAliveInterval); options.setCleanSession(cleanSession); - options.setAutomaticReconnect(true); // 启用自动重连 + options.setAutomaticReconnect(true); + + + //对于 10w QPS,必须拉满到 MQTT 协议上限 (65535) + options.setMaxInflight(65535); return options; } - /** - * 4. 创建一个事件监听器,用于处理连接成功事件 - * 这取代了之前传递 Runnable 的方式,实现了更好的解耦 - */ -/* @Async - @EventListener - public void handleMqttConnectionComplete(MqttConnectedEvent event) { - log.info("监听到 MQTT 连接成功事件,开始执行订阅操作..."); - subscribeToTopics(); - }*/ private void subscribeToTopics() { if (mqttClient == null || !mqttClient.isConnected()) { - log.error("无法订阅主题,因为 MQTT 客户端尚未连接。"); + log.warn("无法订阅,客户端未连接"); return; } try { log.info("执行 MQTT 主题订阅: {}", (Object) subscribeTopics); - mqttClient.subscribe(subscribeTopics, subscribeQos); + // 异步订阅 + mqttClient.subscribe(subscribeTopics, subscribeQos, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + log.info("订阅成功"); + } + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + log.error("订阅失败", exception); + } + }); } catch (MqttException e) { - log.error("订阅 MQTT 主题时发生错误: {}", e.getMessage(), e); + log.error("订阅请求异常", e); } } - // --- 4. 发布和清理方法 (保持不变) --- + // --- 发布方法 --- + + /** + * 兼容旧代码的重载方法 + */ public void publish(String topic, String payload) { - publish(topic, payload.getBytes(), this.publishQos, false); + publishAsync(topic, payload); } - public void publish(String topic, byte[] payload, int qos, boolean retained) { + /** + * 推荐使用的异步发送方法 + */ + public void publishAsync(String topic, String payload) { + if (payload == null) return; + publishAsync(topic, payload.getBytes(StandardCharsets.UTF_8), this.publishQos, false); + } + + /** + * 核心异步发送逻辑 + */ + public void publishAsync(String topic, byte[] payload, int qos, boolean retained) { if (mqttClient == null || !mqttClient.isConnected()) { - log.error("MQTT 客户端未连接,无法发布消息。"); + // 降级:只打印日志,不抛异常,避免中断业务循环 + // 在高并发场景下,这里可以考虑加一个计数器,每N次打印一次日志 + log.warn("MQTT未连接,丢弃消息: {}", topic); return; } try { MqttMessage message = new MqttMessage(payload); message.setQos(qos); message.setRetained(retained); - mqttClient.publish(topic, message); + + // 【修改点6】调用异步发送接口,传入全局回调 + mqttClient.publish(topic, message, null, globalPublishCallback); + } catch (MqttException e) { - log.error("发布 MQTT 消息到主题 [{}] 时发生错误: {}", topic, e.getMessage(), e); + // 这里通常是队列满了(MaxInflight满了)或者参数错误 + log.error("MQTT消息入队失败: {}", topic, e); } } @@ -138,13 +193,14 @@ public class Client { public void cleanup() { if (mqttClient != null) { try { + // 异步断开 if (mqttClient.isConnected()) { mqttClient.disconnect(); } mqttClient.close(); - log.info("MQTT 客户端已成功关闭。"); + log.info("MQTT 客户端已关闭"); } catch (MqttException e) { - log.error("关闭 MQTT 客户端时发生错误: {}", e.getMessage(), e); + log.error("关闭 MQTT 客户端异常", e); } } } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java index e968b4a..4bfe0a5 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java @@ -33,7 +33,7 @@ public class ThreadPoolConfig { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); - executor.setQueueCapacity(2000); // 队列大一点,应对瞬间并发 + executor.setQueueCapacity(5000); // 队列大一点,应对瞬间并发 executor.setThreadNamePrefix("alarm-push-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java new file mode 100644 index 0000000..8cc4221 --- /dev/null +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -0,0 +1,917 @@ +package cn.iocoder.yudao.module.mqtt.processor; + +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.framework.common.util.object.BeanUtils; +import cn.iocoder.yudao.module.hand.dal.*; +import cn.iocoder.yudao.module.hand.enums.*; +import cn.iocoder.yudao.module.hand.service.*; +import cn.iocoder.yudao.module.hand.util.*; +import cn.iocoder.yudao.module.hand.vo.*; +import cn.iocoder.yudao.module.mqtt.config.TdengineBatchConfig; +import cn.iocoder.yudao.module.mqtt.mqtt.Client; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.core.task.TaskExecutor; +import org.springframework.stereotype.Component; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * 批量设备消息处理器 + *

+ * 核心优化: + * 1. 批量获取基础数据(租户信息、设备信息、报警规则) + * 2. 内存中完成所有业务逻辑计算 + * 3. 批量执行所有数据库写操作 + */ +@Slf4j +@Component +public class BatchDeviceMessageProcessor { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Resource + private RedisUtil redisUtil; + @Resource + private TdengineBatchConfig tdengineBatchProcessor; + @Resource + private TdengineBatchConfig tdengineBatchConfig; + @Resource + private TdengineBatchConfig alarmProcessor; + @Resource + private HandDetectorService handDetectorService; + @Resource + private HandAlarmService handAlarmService; + @Resource + private AlarmRuleService alarmRuleService; + @Resource + private FenceService fenceService; + @Resource + private FenceAlarmService fenceAlarmService; + @Resource + private Client client; + @Resource(name = "mqttAlarmExecutor") + private TaskExecutor alarmExecutor; + + /** + * 批量处理 Kafka 消息的主入口 + */ + public void processBatch(List> records) { + long startTime = System.currentTimeMillis(); + + if (CollectionUtils.isEmpty(records)) { + return; + } + + log.info("[批量处理] 开始处理,消息数量: {}", records.size()); + + try { + // === 阶段1: 数据准备 === + BatchContext context = prepareBatchContext(records); + + if (context.isEmpty()) { + log.warn("[批量处理] 无有效数据,处理结束"); + return; + } + + // === 阶段2: 业务逻辑处理 === + processBatchLogic(records, context); + + // === 阶段3: 批量持久化 === + persistBatchData(context); + + log.info("[批量处理] 完成,消息数量: {},耗时: {} ms", + records.size(), System.currentTimeMillis() - startTime); + + } catch (Exception e) { + log.error("[批量处理] 发生异常", e); + // 根据业务需求决定是否需要回滚或重试 + } + } + + private BatchContext prepareBatchContext(List> records) { + BatchContext context = new BatchContext(); + + // 1. 提取所有有效的 SNs + List sns = records.stream() + .map(ConsumerRecord::key) + .filter(StringUtils::isNotBlank) + .distinct() + .toList(); + + if (sns.isEmpty()) { + return context; + } + + // 2. 批量获取租户信息 + context.snToTenantMap = getTenantIdsInBatch(sns); + + // 3. 按租户分组 SN + Map> tenantToSnsMap = sns.stream() + .filter(context.snToTenantMap::containsKey) + .collect(Collectors.groupingBy(context.snToTenantMap::get)); + + // 4. 批量获取设备信息 + context.snToDeviceMap = getDeviceVosInBatch(tenantToSnsMap); + + // 5. 批量获取报警规则 + Set tenantIds = tenantToSnsMap.keySet(); + context.tenantAlarmRules = getAlarmRulesForTenants(tenantIds); + + // 6. 批量获取围栏信息 + context.fenceCache = getFenceInfoBatch(context.snToDeviceMap.values()); + + return context; + } + + /** + * 阶段2: 处理业务逻辑(内存操作) + */ + private void processBatchLogic(List> records, BatchContext context) { + for (ConsumerRecord record : records) { + try { + processSingleMessage(record, context); + } catch (Exception e) { + log.error("[批量处理] 处理单条消息失败,SN: {}", record.key(), e); + // 继续处理下一条,不中断整个批次 + } + } + } + + /** + * 处理单条消息 + */ + private void processSingleMessage(ConsumerRecord record, BatchContext context) { + String sn = record.key(); + String payload = record.value(); + + // 1. 基础校验 + Long tenantId = context.snToTenantMap.get(sn); + HandDataVo device = context.snToDeviceMap.get(sn); + + if (tenantId == null || device == null) { + log.warn("[批量处理] SN {} 无租户或设备信息,跳过", sn); + return; + } + + // 2. 保存原始日志 + context.originalLogs.add(createOriginalLog(sn, payload, tenantId)); + + // 3. 检查设备是否启用 + if (EnableStatus.DISABLED.value().equals(device.getEnableStatus())) { + log.debug("[批量处理] 设备未启用,SN: {}", sn); + return; + } + + // 4. 数据解析与转换 + HandDataVo handVo = parseAndConvertData(sn, payload, device); + + // 5. 获取报警规则 + Map> ruleMap = context.tenantAlarmRules.get(tenantId); + AlarmRuleDO alarmRule = getAlarmRule(handVo, ruleMap); + + // 6. 气体报警处理 + processGasAlarm(handVo, alarmRule, context); + + // 7. 电池报警处理 + processBatteryAlarm(handVo, context); + + // 8. 围栏报警处理 + processFenceAlarm(handVo, context); + + // 9. 保存处理后的数据日志 + context.processedLogs.add(createTdengineDataVo(handVo)); + + // 10. 记录需要更新到 Redis 的数据 + context.redisUpdates.put(sn, handVo); + } + + /** + * 阶段3: 批量持久化数据 + */ + private void persistBatchData(BatchContext context) { + // 1. 批量保存原始日志 + if (!context.originalLogs.isEmpty()) { + tdengineBatchProcessor.addToBatch(context.originalLogs); + log.debug("[批量持久化] 原始日志: {} 条", context.originalLogs.size()); + } + + // 2. 批量保存处理后日志 + if (!context.processedLogs.isEmpty()) { + tdengineBatchConfig.addToBatch(context.processedLogs); + log.debug("[批量持久化] 处理日志: {} 条", context.processedLogs.size()); + } + + // 3. 批量保存报警消息日志 + if (!context.alarmMessageLogs.isEmpty()) { + alarmProcessor.addToBatch(context.alarmMessageLogs); + log.debug("[批量持久化] 报警消息: {} 条", context.alarmMessageLogs.size()); + } + + // 4. 批量创建气体报警 + if (!context.gasAlarmsToCreate.isEmpty()) { + handAlarmService.batchCreateHandAlarm(context.gasAlarmsToCreate); + log.debug("[批量持久化] 新增气体报警: {} 条", context.gasAlarmsToCreate.size()); + } + + // 5. 批量更新气体报警 + if (!context.gasAlarmsToUpdate.isEmpty()) { + handAlarmService.batchUpdateById(context.gasAlarmsToUpdate); + log.debug("[批量持久化] 更新气体报警: {} 条", context.gasAlarmsToUpdate.size()); + } + + // 6. 批量创建围栏报警 + if (!context.fenceAlarmsToCreate.isEmpty()) { + fenceAlarmService.batchCreateFenceAlarm(context.fenceAlarmsToCreate); + log.debug("[批量持久化] 新增围栏报警: {} 条", context.fenceAlarmsToCreate.size()); + } + + // 7. 批量更新围栏报警 + if (!context.fenceAlarmsToUpdate.isEmpty()) { + fenceAlarmService.batchUpdateById(context.fenceAlarmsToUpdate); + log.debug("[批量持久化] 更新围栏报警: {} 条", context.fenceAlarmsToUpdate.size()); + } + + // 8. 批量更新 Redis + if (!context.redisUpdates.isEmpty()) { + batchUpdateRedis(context.redisUpdates, context.snToTenantMap); + log.debug("[批量持久化] Redis 更新: {} 条", context.redisUpdates.size()); + } + } + + // ========== 基础数据获取方法 ========== + + /** + * 批量获取租户ID映射 + */ + private Map getTenantIdsInBatch(List sns) { + Map result = new HashMap<>(); + + try { + List tenantIdObjs = redisUtil.hmget( + RedisKeyUtil.getDeviceTenantMappingKey(), + new ArrayList<>(sns) + ); + + for (int i = 0; i < sns.size(); i++) { + Object tenantIdObj = tenantIdObjs.get(i); + if (tenantIdObj != null && StringUtils.isNotBlank(tenantIdObj.toString())) { + result.put(sns.get(i), Long.parseLong(tenantIdObj.toString())); + } + } + } catch (Exception e) { + log.error("[批量获取] 获取租户ID失败", e); + } + + return result; + } + + /** + * 批量获取设备信息(优先从 Redis) + */ + private Map getDeviceVosInBatch(Map> tenantToSnsMap) { + Map result = new HashMap<>(); + List cacheMissSns = new ArrayList<>(); + + // 从 Redis 批量获取 + tenantToSnsMap.forEach((tenantId, sns) -> { + try { + String redisKey = RedisKeyUtil.getTenantDeviceHashKey(tenantId); + List cachedObjects = redisUtil.hmget(redisKey, new ArrayList<>(sns)); + + for (int i = 0; i < sns.size(); i++) { + Object obj = cachedObjects.get(i); + String sn = sns.get(i); + + if (obj != null) { + result.put(sn, BeanUtils.toBean(obj, HandDataVo.class)); + } else { + cacheMissSns.add(sn); + } + } + } catch (Exception e) { + log.error("[批量获取] 从Redis获取设备信息失败,tenantId: {}", tenantId, e); + cacheMissSns.addAll(sns); + } + }); + + // 缓存未命中,从数据库加载 + if (!cacheMissSns.isEmpty()) { + try { + QueryWrapper doQueryWrapper = new QueryWrapper<>(); + doQueryWrapper.in("sn", cacheMissSns); + List detectors = handDetectorService.listAll(doQueryWrapper); + Map detectorMap = detectors.stream() + .collect(Collectors.toMap(HandDetectorDO::getSn, Function.identity())); + + cacheMissSns.forEach(sn -> { + HandDetectorDO detector = detectorMap.get(sn); + if (detector != null) { + result.put(sn, BeanUtils.toBean(detector, HandDataVo.class)); + } + }); + + log.info("[批量获取] 从数据库加载设备: {} 条", detectors.size()); + } catch (Exception e) { + log.error("[批量获取] 从数据库加载设备失败", e); + } + } + + return result; + } + + /** + * 批量获取报警规则 + */ + private Map>> getAlarmRulesForTenants(Set tenantIds) { + Map>> result = new HashMap<>(); + + try { + for (Long tenantId : tenantIds) { + Map> rules = alarmRuleService.selectCacheListMap(tenantId); + if (rules != null && !rules.isEmpty()) { + result.put(tenantId, rules); + } + } + } catch (Exception e) { + log.error("[批量获取] 获取报警规则失败", e); + } + + return result; + } + + /** + * 批量获取围栏信息 + */ + private Map> getFenceInfoBatch(Collection devices) { + Map> result = new HashMap<>(); + + try { + // 收集所有需要查询的围栏ID + Set allFenceIds = devices.stream() + .map(HandDataVo::getFenceIds) + .filter(StringUtils::isNotBlank) + .flatMap(ids -> Arrays.stream(ids.split(","))) + .map(Long::parseLong) + .collect(Collectors.toSet()); + + if (!allFenceIds.isEmpty()) { + List fences = fenceService.getFenceList(new ArrayList<>(allFenceIds)); + Map fenceMap = fences.stream() + .collect(Collectors.toMap(Geofence::getId, Function.identity())); + + // 为每个设备构建其对应的围栏列表 + devices.forEach(device -> { + if (StringUtils.isNotBlank(device.getFenceIds())) { + List deviceFences = Arrays.stream(device.getFenceIds().split(",")) + .map(Long::parseLong) + .map(fenceMap::get) + .filter(Objects::nonNull) + .toList(); + + if (!deviceFences.isEmpty()) { + result.put(device.getId(), deviceFences); + } + } + }); + } + } catch (Exception e) { + log.error("[批量获取] 获取围栏信息失败", e); + } + + return result; + } + + // ========== 业务逻辑处理方法 ========== + + /** + * 数据解析与转换 + */ + private HandDataVo parseAndConvertData(String sn, String payload, HandDataVo device) { + try { + JSONObject json = JSONUtil.parseObj(payload); + + Double gasValue = json.getDouble("gas", null); + String loc = json.getStr("loc"); + String battery = json.getStr("battery"); + + device.setValue(gasValue); + device.setSn(sn); + device.setBattery(battery); + device.setTime(new Date()); + device.setOnlineStatus(OnlineStatusType.ONLINE.getType()); + + // 解析位置信息 + if (StringUtils.isNotBlank(loc)) { + String coords = loc.substring(1, loc.length() - 1); + String[] parts = coords.split(","); + + if (parts.length == 3) { + Map converted = CoordinateTransferUtils.wgs84ToGcj02( + Double.parseDouble(parts[0]), + Double.parseDouble(parts[1]) + ); + + device.setLongitude(converted.get("lon")); + device.setLatitude(converted.get("lat")); + device.setGpsType(Integer.parseInt(parts[2])); + } + } + } catch (Exception e) { + log.error("[数据转换] 解析失败,SN: {}, payload: {}", sn, payload, e); + } + + return device; + } + + /** + * 气体报警处理 + */ + private void processGasAlarm(HandDataVo handVo, AlarmRuleDO alarmRule, BatchContext context) { + if (alarmRule == null) { + handVo.setGasStatus(HandAlarmType.NORMAL.getType()); + return; + } + + LocalDateTime now = LocalDateTime.now(); + + // 处理离线报警恢复 + if (OnlineStatusType.ONLINE.getType().equals(handVo.getOnlineStatus()) + && AlarmLevelEnum.OFFLINE.value().equals(handVo.getAlarmLevel()) + && handVo.getAlarmId() != null) { + + HandAlarmDO alarmToEnd = new HandAlarmDO(); + alarmToEnd.setId(handVo.getAlarmId()); + alarmToEnd.setTAlarmEnd(now); + alarmToEnd.setStatus(EnableStatus.HANDLE.value()); + context.gasAlarmsToUpdate.add(alarmToEnd); + + handVo.setAlarmId(null); + handVo.setAlarmLevel(0); + return; + } + + boolean isCurrentlyAlarming = handVo.getAlarmLevel() != null && handVo.getAlarmLevel() > 0; + boolean shouldAlarm = alarmRule.getAlarmLevel() > 0; + + // 报警恢复 + if (isCurrentlyAlarming && !shouldAlarm) { + handVo.setAlarmLevel(0); + handVo.setTAlarmEnd(now); + handVo.setGasStatus(HandAlarmType.NORMAL.getType()); + + // 发送报警结束消息 + sendAlarmMessage(handVo, alarmRule.getGasTypeName(), handVo.getValue(), false, context); + handVo.setLastPushValue(null); + return; + } + + // 正常状态 + if (!shouldAlarm) { + handVo.setAlarmLevel(0); + handVo.setGasStatus(HandAlarmType.NORMAL.getType()); + return; + } + + // 报警触发或持续 + Integer newLevel = alarmRule.getAlarmLevel(); + + // 首次报警 + if (!isCurrentlyAlarming) { + handVo.setFirstValue(handVo.getValue()); + handVo.setTAlarmStart(now); + handVo.setMaxValue(handVo.getValue()); + handVo.setMaxAlarmLevel(newLevel); + + // 创建新报警 + HandAlarmSaveReqVO newAlarm = new HandAlarmSaveReqVO(); + newAlarm.setDetectorId(handVo.getId()); + newAlarm.setSn(handVo.getSn()); + newAlarm.setVAlarmFirst(handVo.getFirstValue()); + newAlarm.setGasType(handVo.getGasChemical()); + newAlarm.setPicX(handVo.getLongitude()); + newAlarm.setPicY(handVo.getLatitude()); + newAlarm.setAlarmType(AlarmType.GAS.getType()); + newAlarm.setAlarmLevel(newLevel); + newAlarm.setTAlarmStart(now); + newAlarm.setTenantId(Math.toIntExact(handVo.getTenantId())); + newAlarm.setUnit(handVo.getUnit()); + newAlarm.setCreator("system"); + newAlarm.setCreateTime(now); + + context.gasAlarmsToCreate.add(newAlarm); + context.pendingAlarmIds.put(handVo.getSn(), newAlarm); // 用于后续回填ID + } + + handVo.setAlarmLevel(newLevel); + + // 报警升级 + if (handVo.getMaxAlarmLevel() == null || newLevel > handVo.getMaxAlarmLevel()) { + handVo.setMaxAlarmLevel(newLevel); + } + + // 更新最大值 + if (shouldUpdateMaxValue(handVo.getValue(), handVo.getMaxValue(), alarmRule.getDirection())) { + handVo.setMaxValue(handVo.getValue()); + + // 更新已存在的报警记录 + if (handVo.getAlarmId() != null) { + HandAlarmDO alarmToUpdate = new HandAlarmDO(); + alarmToUpdate.setId(handVo.getAlarmId()); + alarmToUpdate.setVAlarmMaximum(handVo.getMaxValue()); + alarmToUpdate.setAlarmLevel(newLevel); + alarmToUpdate.setUpdater("system"); + alarmToUpdate.setUpdateTime(now); + context.gasAlarmsToUpdate.add(alarmToUpdate); + } + } + + // 推送报警消息(值变化时) + if (handVo.getLastPushValue() == null + || !handVo.getLastPushValue().equals(handVo.getValue())) { + sendAlarmMessage(handVo, alarmRule.getGasTypeName(), handVo.getValue(), true, context); + handVo.setLastPushValue(handVo.getValue()); + } + + handVo.setGasStatus(HandAlarmType.ALARM.getType()); + } + + /** + * 电池报警处理 + */ + private void processBatteryAlarm(HandDataVo handVo, BatchContext context) { + handVo.setBatteryStatus(EnableStatus.DISABLED.value()); + + if (handVo.getBatteryAlarmValue() == null) { + return; + } + + try { + int batteryPercentage = BatteryConverterUtils.getBatteryPercentage( + Integer.parseInt(handVo.getBattery()) + ); + + int threshold = handVo.getBatteryAlarmValue().intValue(); + boolean isCurrentlyAlarming = EnableStatus.ENABLED.value().equals(handVo.getBatteryStatus()); + boolean shouldAlarm = batteryPercentage < threshold; + + LocalDateTime now = LocalDateTime.now(); + + // 从正常变为报警 + if (shouldAlarm && !isCurrentlyAlarming) { + HandAlarmSaveReqVO newAlarm = new HandAlarmSaveReqVO(); + newAlarm.setDetectorId(handVo.getId()); + newAlarm.setSn(handVo.getSn()); + newAlarm.setAlarmType(AlarmType.BATTERY.getType()); + newAlarm.setTAlarmStart(now); + newAlarm.setVAlarmFirst((double) batteryPercentage); + newAlarm.setPicX(handVo.getLatitude()); + newAlarm.setPicY(handVo.getLongitude()); + newAlarm.setTenantId(Math.toIntExact(handVo.getTenantId())); + newAlarm.setCreator("system"); + newAlarm.setCreateTime(now); + + context.gasAlarmsToCreate.add(newAlarm); + handVo.setBatteryStatus(EnableStatus.ENABLED.value()); + + } else if (!shouldAlarm && isCurrentlyAlarming && handVo.getBatteryStatusAlarmId() != null) { + // 从报警恢复正常 + HandAlarmDO alarmToEnd = new HandAlarmDO(); + alarmToEnd.setId(handVo.getBatteryStatusAlarmId()); + alarmToEnd.setTAlarmEnd(now); + alarmToEnd.setUpdater("system"); + alarmToEnd.setUpdateTime(now); + + context.gasAlarmsToUpdate.add(alarmToEnd); + handVo.setBatteryStatus(EnableStatus.DISABLED.value()); + handVo.setBatteryStatusAlarmId(null); + } + } catch (Exception e) { + log.error("[电池报警] 处理失败,SN: {}", handVo.getSn(), e); + } + } + + /** + * 围栏报警处理 + */ + private void processFenceAlarm(HandDataVo handVo, BatchContext context) { + if (StringUtils.isBlank(handVo.getFenceIds())) { + handVo.setFenceStatus(HandAlarmType.NORMAL.getType()); + return; + } + + List fenceList = context.fenceCache.get(handVo.getId()); + if (fenceList == null || fenceList.isEmpty()) { + handVo.setFenceStatus(HandAlarmType.NORMAL.getType()); + return; + } + + FenceType fenceType = FenceType.fromType(handVo.getFenceType()); + if (null == fenceType) { + log.error("[围栏报警] 围栏类型无效,SN: {}", handVo.getSn()); + return; + } + + boolean isInside = GeofenceUtils.isInsideAnyFence( + handVo.getLongitude(), + handVo.getLatitude(), + fenceList + ); + + Boolean isViolating = switch (fenceType) { + case INSIDE -> isInside; + case OUTSIDE -> !isInside; + default -> null; + }; + + if (isViolating != null) { + handleFenceAlarmLifecycle(isViolating, handVo, fenceType, fenceList, context); + } + } + + /** + * 处理围栏报警生命周期 + */ + private void handleFenceAlarmLifecycle(boolean isViolating, HandDataVo handVo, + FenceType fenceType, List fenceList, BatchContext context) { + + boolean hasOngoingAlarm = handVo.getFenceAlarmId() != null; + LocalDateTime now = LocalDateTime.now(); + + if (isViolating) { + double distance = GeofenceUtils.fenceDistance(handVo, fenceType, fenceList); + double roundedDistance = Math.round(distance * 100.0) / 100.0; + + if (!hasOngoingAlarm) { + // 触发新报警 + FenceAlarmSaveReqVO newAlarm = new FenceAlarmSaveReqVO(); + newAlarm.setDetectorId(handVo.getId()); + newAlarm.setSn(handVo.getSn()); + newAlarm.setType(fenceType.getType()); + newAlarm.setTAlarmStart(now); + newAlarm.setPicX(handVo.getLongitude()); + newAlarm.setPicY(handVo.getLatitude()); + newAlarm.setDistance(roundedDistance); + newAlarm.setMaxDistance(roundedDistance); + newAlarm.setTenantId(handVo.getTenantId()); + newAlarm.setCreator("system"); + newAlarm.setCreateTime(now); + + context.fenceAlarmsToCreate.add(newAlarm); + context.pendingFenceAlarmIds.put(handVo.getSn(), newAlarm); + + handVo.setDistance(roundedDistance); + handVo.setMaxDistance(roundedDistance); + + } else { + // 更新持续报警 + handVo.setDistance(roundedDistance); + + FenceAlarmDO alarmToUpdate = new FenceAlarmDO(); + alarmToUpdate.setId(handVo.getFenceAlarmId()); + alarmToUpdate.setDistance(roundedDistance); + + if (handVo.getMaxDistance() == null || roundedDistance > handVo.getMaxDistance()) { + alarmToUpdate.setMaxDistance(roundedDistance); + handVo.setMaxDistance(roundedDistance); + } + + alarmToUpdate.setUpdater("system"); + alarmToUpdate.setUpdateTime(now); + context.fenceAlarmsToUpdate.add(alarmToUpdate); + } + + handVo.setFenceStatus(HandAlarmType.ALARM.getType()); + + } else { + // 结束报警 + if (hasOngoingAlarm) { + FenceAlarmDO alarmToEnd = new FenceAlarmDO(); + alarmToEnd.setId(handVo.getFenceAlarmId()); + alarmToEnd.setTAlarmEnd(now); + alarmToEnd.setStatus(EnableStatus.HANDLE.value()); + alarmToEnd.setUpdater("system"); + alarmToEnd.setUpdateTime(now); + + context.fenceAlarmsToUpdate.add(alarmToEnd); + + handVo.setFenceAlarmId(null); + handVo.setDistance(null); + handVo.setMaxDistance(null); + } + + handVo.setFenceStatus(HandAlarmType.NORMAL.getType()); + } + } + + // ========== 工具方法 ========== + + /** + * 获取报警规则 + */ + private AlarmRuleDO getAlarmRule(HandDataVo handVo, Map> ruleMap) { + if (handVo.getValue() == null || ruleMap == null) { + return null; + } + + double gasValue = handVo.getValue(); + Long gasTypeId = handVo.getGasTypeId(); + + List rules = ruleMap.get(gasTypeId); + + // 兼容 Redis 反序列化的 String key + if (rules == null && gasTypeId != null) { + rules = ruleMap.get(gasTypeId.toString()); + } + + if (rules == null || rules.isEmpty()) { + return null; + } + + AlarmRuleDO result = null; + for (AlarmRuleDO rule : rules) { + boolean inRange = (rule.getMin() == null || rule.getMin() <= gasValue) + && (rule.getMax() == null || gasValue <= rule.getMax()); + + if (inRange) { + if (result == null || rule.getAlarmLevel() > result.getAlarmLevel()) { + result = rule; + } + } + } + + return result; + } + + /** + * 判断是否需要更新最大值 + */ + private boolean shouldUpdateMaxValue(Double currentValue, Double maxValue, Integer direction) { + if (currentValue == null || maxValue == null || direction == null) { + return false; + } + + return (MaxDirection.DOWN.value().equals(direction) && currentValue < maxValue) + || (MaxDirection.UP.value().equals(direction) && currentValue > maxValue); + } + + /** + * 发送报警消息(异步) + */ + private void sendAlarmMessage(HandDataVo handVo, String gasName, Double value, + boolean isAlarming, BatchContext context) { + + String valueStr = (value != null && value % 1 == 0) + ? String.valueOf(value.intValue()) + : String.valueOf(value); + + String statusText = isAlarming ? "报警" : "报警结束"; + String msgContent = String.format("%s%s,%s气体浓度为%s", + handVo.getName(), statusText, gasName, valueStr); + + // 记录报警消息日志 + AlarmMessageLog log = new AlarmMessageLog(); + log.setDetectorId(handVo.getId()); + log.setHolderName(handVo.getName()); + log.setSn(handVo.getSn()); + log.setDeptId(handVo.getDeptId()); + log.setTenantId(handVo.getTenantId()); + log.setMessage(msgContent); + log.setRemark("系统自动触发报警推送"); + + try { + List targetSns = handDetectorService.getSnListByDept( + handVo.getDeptId(), + handVo.getTenantId(), + handVo.getSn() + ); + + if (targetSns != null && !targetSns.isEmpty()) { + log.setPushSnList(String.join(",", targetSns)); + + // 异步推送 MQTT 消息 + publishAlarmToMqtt(targetSns, msgContent); + } + } catch (Exception e) { + this.log.error("[报警推送] 准备推送数据失败", e); + } + + context.alarmMessageLogs.add(log); + } + + /** + * 异步推送 MQTT 报警消息 + */ + private void publishAlarmToMqtt(List targetSns, String message) { + alarmExecutor.execute(() -> { + Map payload = Map.of("message", message); + + try { + String json = OBJECT_MAPPER.writeValueAsString(payload); + + for (String sn : targetSns) { + if (StringUtils.isBlank(sn)) continue; + + try { + String topic = sn + "/zds_down"; + client.publish(topic, json); + } catch (Exception e) { + log.error("[MQTT推送] 失败,SN: {}", sn, e); + } + } + } catch (JsonProcessingException e) { + log.error("[MQTT推送] JSON序列化失败", e); + } + }); + } + + /** + * 批量更新 Redis + */ + private void batchUpdateRedis(Map updates, Map snToTenantMap) { + // 按租户分组 + Map> updatesByTenant = updates.entrySet().stream() + .filter(e -> snToTenantMap.containsKey(e.getKey())) + .collect(Collectors.groupingBy( + e -> snToTenantMap.get(e.getKey()), + Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue) + )); + + // 按租户批量更新 + updatesByTenant.forEach((tenantId, deviceMap) -> { + try { + handDetectorService.batchUpdateRedisData(tenantId, deviceMap); + } catch (Exception e) { + log.error("[Redis更新] 失败,tenantId: {}", tenantId, e); + } + }); + } + + /** + * 创建原始日志对象 + */ + private HandOriginalLog createOriginalLog(String sn, String payload, Long tenantId) { + HandOriginalLog log = new HandOriginalLog(); + log.setSn(sn); + log.setPayload(payload); + log.setTenantId(tenantId); + log.setTs(new Timestamp(System.currentTimeMillis())); + return log; + } + + /** + * 创建处理后的日志对象 + */ + private TdengineDataVo createTdengineDataVo(HandDataVo handVo) { + TdengineDataVo vo = BeanUtils.toBean(handVo, TdengineDataVo.class); + vo.setTenantId(handVo.getTenantId()); + vo.setTs(new Timestamp(System.currentTimeMillis())); + return vo; + } + + // ========== 内部类:批处理上下文 ========== + + /** + * 批处理上下文,存储整个批次的数据 + */ + private static class BatchContext { + // 基础数据 + Map snToTenantMap = new HashMap<>(); + Map snToDeviceMap = new HashMap<>(); + Map>> tenantAlarmRules = new HashMap<>(); + Map> fenceCache = new HashMap<>(); + + // 待保存的日志 + List originalLogs = new ArrayList<>(); + List processedLogs = new ArrayList<>(); + List alarmMessageLogs = new ArrayList<>(); + + // 待保存的报警 + List gasAlarmsToCreate = new ArrayList<>(); + List gasAlarmsToUpdate = new ArrayList<>(); + List fenceAlarmsToCreate = new ArrayList<>(); + List fenceAlarmsToUpdate = new ArrayList<>(); + + // 待回填的ID(用于新创建的报警记录) + Map pendingAlarmIds = new HashMap<>(); + Map pendingFenceAlarmIds = new HashMap<>(); + + // 待更新的 Redis 数据 + Map redisUpdates = new HashMap<>(); + + boolean isEmpty() { + return snToDeviceMap.isEmpty(); + } + } +} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java index fc1dcd1..e36e3e2 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java @@ -27,6 +27,7 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Slf4j @Component @@ -39,6 +40,9 @@ public class DeviceMessageProcessor { private TdengineBatchConfig tdengineBatchProcessor; @Resource private TdengineBatchConfig tdengineBatchConfig; + + @Resource + private TdengineBatchConfig alarmProcessor; @Resource private HandDetectorService handDetectorService; @Resource @@ -54,9 +58,9 @@ public class DeviceMessageProcessor { @Resource private Client client; - @Resource(name = "mqttAlarmExecutor") private TaskExecutor alarmExecutor; + public void process(String topic, String payload) { log.debug("[设备上报] 开始处理 -> 主题: {}, 内容: {}", topic, payload); @@ -70,73 +74,52 @@ public class DeviceMessageProcessor { // 保存原始日志 logSave(topic, payload, tenantId); - // topic 即为设备 sn,是锁的最佳 key - String deviceProcessLockKey = RedisKeyUtil.getDeviceProcessLockKey(tenantId, topic); - RLock lock = redissonClient.getLock(deviceProcessLockKey); - try { - // 尝试在 10 秒内获取锁,防止因锁等待导致 Kafka 消费者线程长时间阻塞 - if (!lock.tryLock(10, TimeUnit.SECONDS)) { - log.warn("未能获取设备 {} 的处理锁,消息将被丢弃或重试。", topic); - // 抛出异常,外层逻辑可以捕获并发送到死信队列 - throw new RuntimeException("获取设备锁超时: " + topic); + // 从 Redis 中获取设备信息 + Object meterObj = redisUtil.hget(RedisKeyUtil.getTenantDeviceHashKey(tenantId), topic); + HandDataVo detector; + + if (meterObj == null) { // 缓存未命中 + // 从数据库查询 DO 对象 + HandDetectorDO one = handDetectorService.getBySn(topic); + if (one == null) { + log.warn("[数据不一致] 在租户 {} 的设备列表中未找到 SN {} 的详细信息。", tenantId, topic); + return; // 直接返回,无需再操作 } - try { - // 从 Redis 中获取设备信息 - Object meterObj = redisUtil.hget(RedisKeyUtil.getTenantDeviceHashKey(tenantId), topic); - HandDataVo detector; - - if (meterObj == null) { // 缓存未命中 - // 从数据库查询 DO 对象 - HandDetectorDO one = handDetectorService.getBySn(topic); - if (one == null) { - log.warn("[数据不一致] 在租户 {} 的设备列表中未找到 SN {} 的详细信息。", tenantId, topic); - return; // 直接返回,无需再操作 - } - detector = BeanUtils.toBean(one, HandDataVo.class); - } else { // 缓存命中 - detector = BeanUtils.toBean(meterObj, HandDataVo.class); - } + detector = BeanUtils.toBean(one, HandDataVo.class); + } else { // 缓存命中 + detector = BeanUtils.toBean(meterObj, HandDataVo.class); + } - if (EnableStatus.DISABLED.value().equals(detector.getEnableStatus())) { - log.info("未启用的手持探测器 sn: {}", topic); - return; - } + if (EnableStatus.DISABLED.value().equals(detector.getEnableStatus())) { + log.info("未启用的手持探测器 sn: {}", topic); + return; + } - // 数据解析与转换 - HandDataVo handVo = dataConvert(topic, payload, detector); + // 数据解析与转换 + HandDataVo handVo = dataConvert(topic, payload, detector); - // 获取气种的报警规则 - Map> ruleMap = alarmRuleService.selectCacheListMap(tenantId); - AlarmRuleDO alarmRule = getAlarmRule(handVo, ruleMap); + // 获取气种的报警规则 + Map> ruleMap = alarmRuleService.selectCacheListMap(tenantId); + AlarmRuleDO alarmRule = getAlarmRule(handVo, ruleMap); - // 气体报警逻辑处理 - HandDataVo vo = gasHandAlarm(alarmRule, handVo); - // 气体报警保存 - saveGasAlarm(vo, alarmRule); + // 气体报警逻辑处理 + HandDataVo vo = gasHandAlarm(alarmRule, handVo); + // 气体报警保存 + saveGasAlarm(vo, alarmRule); - //围栏报警逻辑 - fanceAlarm(handVo); + //围栏报警逻辑(高并发情况下尚未优化) + fanceAlarm(handVo); - //电量报警逻辑 - batteryAlarm(handVo); + //电量报警逻辑 + //batteryAlarm(handVo); - // 无论是否发生告警,设备的状态(如电量、位置、最新值)都需要更新 - handDetectorService.updateRedisData(tenantId, topic, handVo); - handLogSave(handVo); + // 无论是否发生告警,设备的状态(如电量、位置、最新值)都需要更新 + handDetectorService.updateRedisData(tenantId, topic, handVo); + handLogSave(handVo); - } finally { - // 确保锁一定被释放 - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("获取设备 {} 的锁时被中断", topic, e); - throw new RuntimeException("处理设备消息时线程被中断: " + topic, e); - } } + private void handLogSave(HandDataVo handVo) { TdengineDataVo bean = BeanUtils.toBean(handVo, TdengineDataVo.class); bean.setTenantId(handVo.getTenantId()); @@ -458,7 +441,6 @@ public class DeviceMessageProcessor { return redisData; } - String sn = redisData.getSn(); String gasName = alarmRule.getGasTypeName(); LocalDateTime now = LocalDateTime.now(); @@ -539,7 +521,6 @@ public class DeviceMessageProcessor { * @param isAlarming true=报警中, false=报警结束 */ private void sendGroupMessage(HandDataVo redisData, String gasName, Double value, boolean isAlarming) { - if (null == redisData.getDeptId()) return; // 1. 准备数据(在主线程做,只做一次) String valueStr = (value != null && value % 1 == 0) ? String.valueOf(value.intValue()) : String.valueOf(value); @@ -558,12 +539,11 @@ public class DeviceMessageProcessor { } // 2. 查人(查同组所有设备的SN) - List listAll = handDetectorService.getListAll(redisData.getDeptId(), redisData.getTenantId(),redisData.getSn()); + List listAll = handDetectorService.getSnListByDept(redisData.getDeptId(), redisData.getTenantId(), redisData.getSn()); if (listAll == null || listAll.isEmpty()) return; alarmExecutor.execute(() -> { - for (HandDetectorDO device : listAll) { - String deviceSn = device.getSn(); + for (String deviceSn : listAll) { if (deviceSn == null || deviceSn.isEmpty()) continue; String topic = deviceSn + "/zds_down"; @@ -574,6 +554,18 @@ public class DeviceMessageProcessor { } } }); + //报警推送消息 + AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); + alarmMessageLog.setDetectorId(redisData.getId()); + alarmMessageLog.setHolderName(redisData.getName()); + alarmMessageLog.setSn(redisData.getSn()); + alarmMessageLog.setDeptId(redisData.getDeptId()); + alarmMessageLog.setTenantId(redisData.getTenantId()); + alarmMessageLog.setMessage(msgContent); + alarmMessageLog.setPushSnList(StringUtils.join(listAll, ",")); + alarmMessageLog.setMessage(msgContent); + alarmMessageLog.setRemark("系统自动触发报警推送"); + alarmProcessor.addToBatch(alarmMessageLog); } /** diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/AlarmMessageController.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/AlarmMessageController.java deleted file mode 100644 index 063898b..0000000 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/AlarmMessageController.java +++ /dev/null @@ -1,107 +0,0 @@ -package cn.iocoder.yudao.module.hand.controller.admin; - -import cn.iocoder.yudao.module.hand.dal.AlarmMessageDO; -import cn.iocoder.yudao.module.hand.service.AlarmMessageService; -import cn.iocoder.yudao.module.hand.vo.AlarmMessagePageReqVO; -import cn.iocoder.yudao.module.hand.vo.AlarmMessageRespVO; -import cn.iocoder.yudao.module.hand.vo.AlarmMessageSaveReqVO; -import org.springframework.web.bind.annotation.*; -import jakarta.annotation.Resource; -import org.springframework.validation.annotation.Validated; -import org.springframework.security.access.prepost.PreAuthorize; -import io.swagger.v3.oas.annotations.tags.Tag; -import io.swagger.v3.oas.annotations.Parameter; -import io.swagger.v3.oas.annotations.Operation; - -import jakarta.validation.constraints.*; -import jakarta.validation.*; -import jakarta.servlet.http.*; -import java.util.*; -import java.io.IOException; - -import cn.iocoder.yudao.framework.common.pojo.PageParam; -import cn.iocoder.yudao.framework.common.pojo.PageResult; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.framework.common.util.object.BeanUtils; -import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; - -import cn.iocoder.yudao.framework.excel.core.util.ExcelUtils; - -import cn.iocoder.yudao.framework.apilog.core.annotation.ApiAccessLog; -import static cn.iocoder.yudao.framework.apilog.core.enums.OperateTypeEnum.*; - - - -@Tag(name = "管理后台 - GAS手持探测器推送") -@RestController -@RequestMapping("/gas/alarm-message") -@Validated -public class AlarmMessageController { - - @Resource - private AlarmMessageService alarmMessageService; - - @PostMapping("/create") - @Operation(summary = "创建GAS手持探测器推送") - @PreAuthorize("@ss.hasPermission('gas:alarm-message:create')") - public CommonResult createAlarmMessage(@Valid @RequestBody AlarmMessageSaveReqVO createReqVO) { - return success(alarmMessageService.createAlarmMessage(createReqVO)); - } - - @PutMapping("/update") - @Operation(summary = "更新GAS手持探测器推送") - @PreAuthorize("@ss.hasPermission('gas:alarm-message:update')") - public CommonResult updateAlarmMessage(@Valid @RequestBody AlarmMessageSaveReqVO updateReqVO) { - alarmMessageService.updateAlarmMessage(updateReqVO); - return success(true); - } - - @DeleteMapping("/delete") - @Operation(summary = "删除GAS手持探测器推送") - @Parameter(name = "id", description = "编号", required = true) - @PreAuthorize("@ss.hasPermission('gas:alarm-message:delete')") - public CommonResult deleteAlarmMessage(@RequestParam("id") Long id) { - alarmMessageService.deleteAlarmMessage(id); - return success(true); - } - - @DeleteMapping("/delete-list") - @Parameter(name = "ids", description = "编号", required = true) - @Operation(summary = "批量删除GAS手持探测器推送") - @PreAuthorize("@ss.hasPermission('gas:alarm-message:delete')") - public CommonResult deleteAlarmMessageList(@RequestParam("ids") List ids) { - alarmMessageService.deleteAlarmMessageListByIds(ids); - return success(true); - } - - @GetMapping("/get") - @Operation(summary = "获得GAS手持探测器推送") - @Parameter(name = "id", description = "编号", required = true, example = "1024") - @PreAuthorize("@ss.hasPermission('gas:alarm-message:query')") - public CommonResult getAlarmMessage(@RequestParam("id") Long id) { - AlarmMessageDO alarmMessage = alarmMessageService.getAlarmMessage(id); - return success(BeanUtils.toBean(alarmMessage, AlarmMessageRespVO.class)); - } - - @GetMapping("/page") - @Operation(summary = "获得GAS手持探测器推送分页") - @PreAuthorize("@ss.hasPermission('gas:alarm-message:query')") - public CommonResult> getAlarmMessagePage(@Valid AlarmMessagePageReqVO pageReqVO) { - PageResult pageResult = alarmMessageService.getAlarmMessagePage(pageReqVO); - return success(BeanUtils.toBean(pageResult, AlarmMessageRespVO.class)); - } - - @GetMapping("/export-excel") - @Operation(summary = "导出GAS手持探测器推送 Excel") - @PreAuthorize("@ss.hasPermission('gas:alarm-message:export')") - @ApiAccessLog(operateType = EXPORT) - public void exportAlarmMessageExcel(@Valid AlarmMessagePageReqVO pageReqVO, - HttpServletResponse response) throws IOException { - pageReqVO.setPageSize(PageParam.PAGE_SIZE_NONE); - List list = alarmMessageService.getAlarmMessagePage(pageReqVO).getList(); - // 导出 Excel - ExcelUtils.write(response, "GAS手持探测器推送.xls", "数据", AlarmMessageRespVO.class, - BeanUtils.toBean(list, AlarmMessageRespVO.class)); - } - -} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandTdengineController.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandTdengineController.java index a53308b..be23e37 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandTdengineController.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandTdengineController.java @@ -70,4 +70,11 @@ public class HandTdengineController { return success(list); } + @GetMapping("/alarmMessagePage") + @Operation(summary = "报警推送记录") + @PreAuthorize("@ss.hasPermission('gas:hand-td:alarmMessagePage')") + public CommonResult> alarmMessagePage(@Valid HandTdenginePageVO po) { + PageResult list = tdengineService.getAlarmMessageLog(po); + return success(list); + } } \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/AlarmMessageDO.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/AlarmMessageDO.java deleted file mode 100644 index 88963ae..0000000 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/AlarmMessageDO.java +++ /dev/null @@ -1,63 +0,0 @@ -package cn.iocoder.yudao.module.hand.dal; - -import lombok.*; -import java.util.*; -import java.time.LocalDateTime; -import java.time.LocalDateTime; -import com.baomidou.mybatisplus.annotation.*; -import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO; - -/** - * GAS手持探测器推送 DO - * - * @author 超级管理员 - */ -@TableName("gas_alarm_message") -@KeySequence("gas_alarm_message_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。 -@Data -@EqualsAndHashCode(callSuper = true) -@ToString(callSuper = true) -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class AlarmMessageDO extends BaseDO { - - /** - * 主键ID - */ - @TableId - private Long id; - /** - * 手持表id - */ - private Long detectorId; - /** - * 持有人名称 - */ - private String name; - /** - * 设备编号 - */ - private String sn; - /** - * 消息 - */ - private String message; - /** - * 推送设备sn,逗号分割 - */ - private String pushSnList; - /** - * 备注 - */ - private String remark; - /** - * 部门id - */ - private Long deptId; - /** - * 租户id - */ - private Long tenantId; - -} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/AlarmMessageMapper.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/AlarmMessageMapper.java deleted file mode 100644 index 6b3358a..0000000 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/AlarmMessageMapper.java +++ /dev/null @@ -1,33 +0,0 @@ -package cn.iocoder.yudao.module.hand.mapper; - -import java.util.*; - -import cn.iocoder.yudao.framework.common.pojo.PageResult; -import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX; -import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX; -import cn.iocoder.yudao.module.hand.dal.AlarmMessageDO; -import cn.iocoder.yudao.module.hand.vo.AlarmMessagePageReqVO; -import org.apache.ibatis.annotations.Mapper; - -/** - * GAS手持探测器推送 Mapper - * - * @author 超级管理员 - */ -@Mapper -public interface AlarmMessageMapper extends BaseMapperX { - - default PageResult selectPage(AlarmMessagePageReqVO reqVO) { - return selectPage(reqVO, new LambdaQueryWrapperX() - .eqIfPresent(AlarmMessageDO::getDetectorId, reqVO.getDetectorId()) - .likeIfPresent(AlarmMessageDO::getName, reqVO.getName()) - .eqIfPresent(AlarmMessageDO::getSn, reqVO.getSn()) - .eqIfPresent(AlarmMessageDO::getMessage, reqVO.getMessage()) - .eqIfPresent(AlarmMessageDO::getPushSnList, reqVO.getPushSnList()) - .eqIfPresent(AlarmMessageDO::getRemark, reqVO.getRemark()) - .eqIfPresent(AlarmMessageDO::getDeptId, reqVO.getDeptId()) - .betweenIfPresent(AlarmMessageDO::getCreateTime, reqVO.getCreateTime()) - .orderByDesc(AlarmMessageDO::getId)); - } - -} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/TdengineMapper.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/TdengineMapper.java index 0f695db..6eac7dc 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/TdengineMapper.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/TdengineMapper.java @@ -1,10 +1,7 @@ package cn.iocoder.yudao.module.hand.mapper; -import cn.iocoder.yudao.module.hand.vo.HandOriginalLog; -import cn.iocoder.yudao.module.hand.vo.HandTdenginePageVO; -import cn.iocoder.yudao.module.hand.vo.HandTdenginePo; -import cn.iocoder.yudao.module.hand.vo.TdengineDataVo; +import cn.iocoder.yudao.module.hand.vo.*; import com.baomidou.mybatisplus.annotation.InterceptorIgnore; // 确保导入这个注解 import com.baomidou.mybatisplus.core.metadata.IPage; import org.apache.ibatis.annotations.Mapper; @@ -17,13 +14,12 @@ import java.util.List; public interface TdengineMapper { @InterceptorIgnore(tenantLine = "true") - void insertHandLogBatch(@Param("sn") String sn,@Param("tenantId")Long tenantId, - @Param("logList") List groupedLogs); + void insertHandLogBatch(@Param("logList") List logList); @InterceptorIgnore(tenantLine = "true") - void saveDataLogBatch(@Param("sn")String sn,@Param("tenantId")Long tenantId, - @Param("dataVoList")List dataVoList); + void saveDataLogBatch(@Param("dataVoList") List dataVoList); + IPage selectPage(IPage page,@Param("vo") HandTdenginePageVO vo); @@ -32,4 +28,9 @@ public interface TdengineMapper { @InterceptorIgnore(tenantLine = "true") List HistoricalSn(@Param("po") HandTdenginePo po); + @InterceptorIgnore(tenantLine = "true") + void saveMeaagesLogList(List list); + + IPage selectMessagePage(IPage page,@Param("vo") HandTdenginePageVO vo); + } diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/AlarmMessageService.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/AlarmMessageService.java deleted file mode 100644 index f73322c..0000000 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/AlarmMessageService.java +++ /dev/null @@ -1,67 +0,0 @@ -package cn.iocoder.yudao.module.hand.service; - -import java.util.*; - -import cn.iocoder.yudao.module.hand.dal.AlarmMessageDO; -import cn.iocoder.yudao.module.hand.dal.HandDetectorDO; -import cn.iocoder.yudao.module.hand.vo.AlarmMessagePageReqVO; -import cn.iocoder.yudao.module.hand.vo.AlarmMessageSaveReqVO; -import cn.iocoder.yudao.module.hand.vo.HandDataVo; -import jakarta.validation.*; -import cn.iocoder.yudao.framework.common.pojo.PageResult; -import cn.iocoder.yudao.framework.common.pojo.PageParam; - -/** - * GAS手持探测器推送 Service 接口 - * - * @author 超级管理员 - */ -public interface AlarmMessageService { - - /** - * 创建GAS手持探测器推送 - * - * @param createReqVO 创建信息 - * @return 编号 - */ - Long createAlarmMessage(@Valid AlarmMessageSaveReqVO createReqVO); - - /** - * 更新GAS手持探测器推送 - * - * @param updateReqVO 更新信息 - */ - void updateAlarmMessage(@Valid AlarmMessageSaveReqVO updateReqVO); - - /** - * 删除GAS手持探测器推送 - * - * @param id 编号 - */ - void deleteAlarmMessage(Long id); - - /** - * 批量删除GAS手持探测器推送 - * - * @param ids 编号 - */ - void deleteAlarmMessageListByIds(List ids); - - /** - * 获得GAS手持探测器推送 - * - * @param id 编号 - * @return GAS手持探测器推送 - */ - AlarmMessageDO getAlarmMessage(Long id); - - /** - * 获得GAS手持探测器推送分页 - * - * @param pageReqVO 分页查询 - * @return GAS手持探测器推送分页 - */ - PageResult getAlarmMessagePage(AlarmMessagePageReqVO pageReqVO); - - void createAlarmRecord(HandDataVo redisData, List listAll, String msgContent); -} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java index acef572..eb17e72 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java @@ -64,4 +64,8 @@ public interface FenceAlarmService { void update( FenceAlarmDO updateReqVO); + void batchCreateFenceAlarm(List fenceAlarmsToCreate); + + void batchUpdateById(List fenceAlarmsToUpdate); + } \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java index bb47031..e8f77b5 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java @@ -63,4 +63,8 @@ public interface HandAlarmService { int updateById(HandAlarmDO alarm); void insertBatch(List doList); + + void batchCreateHandAlarm(List gasAlarmsToCreate); + + void batchUpdateById(List gasAlarmsToUpdate); } \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandDetectorService.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandDetectorService.java index ea6595b..4f8ac8d 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandDetectorService.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandDetectorService.java @@ -75,5 +75,8 @@ public interface HandDetectorService { void dataMigrate(HandDetectorSaveReqVO updateReqVO); - List getListAll(Long deptId,Long tenantId, String sn); + List getSnListByDept(Long deptId, Long tenantId, String excludeSn); + + void batchUpdateRedisData(Long tenantId, Map deviceMap); + } \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/TdengineService.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/TdengineService.java index be10b38..5ee752e 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/TdengineService.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/TdengineService.java @@ -1,10 +1,8 @@ package cn.iocoder.yudao.module.hand.service; import cn.iocoder.yudao.framework.common.pojo.PageResult; -import cn.iocoder.yudao.module.hand.vo.HandOriginalLog; -import cn.iocoder.yudao.module.hand.vo.HandTdenginePageVO; -import cn.iocoder.yudao.module.hand.vo.HandTdenginePo; -import cn.iocoder.yudao.module.hand.vo.TdengineDataVo; +import cn.iocoder.yudao.module.hand.dal.HandDetectorDO; +import cn.iocoder.yudao.module.hand.vo.*; import java.util.List; @@ -21,4 +19,10 @@ public interface TdengineService { List HistoricalSn(HandTdenginePo po); + void createAlarmRecord(List alarmMessageLogList); + + + PageResult getAlarmMessageLog(HandTdenginePageVO pageReqVO); + + } diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/AlarmMessageServiceImpl.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/AlarmMessageServiceImpl.java deleted file mode 100644 index 9108f26..0000000 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/AlarmMessageServiceImpl.java +++ /dev/null @@ -1,117 +0,0 @@ -package cn.iocoder.yudao.module.hand.service.impl; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.module.hand.dal.AlarmMessageDO; -import cn.iocoder.yudao.module.hand.dal.HandDetectorDO; -import cn.iocoder.yudao.module.hand.mapper.AlarmMessageMapper; -import cn.iocoder.yudao.module.hand.service.AlarmMessageService; -import cn.iocoder.yudao.module.hand.vo.AlarmMessagePageReqVO; -import cn.iocoder.yudao.module.hand.vo.AlarmMessageSaveReqVO; -import cn.iocoder.yudao.module.hand.vo.HandDataVo; -import org.springframework.stereotype.Service; -import jakarta.annotation.Resource; -import org.springframework.validation.annotation.Validated; -import org.springframework.transaction.annotation.Transactional; - -import java.util.*; -import java.util.stream.Collectors; - -import cn.iocoder.yudao.framework.common.pojo.PageResult; -import cn.iocoder.yudao.framework.common.pojo.PageParam; -import cn.iocoder.yudao.framework.common.util.object.BeanUtils; - - -import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; -import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList; -import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.diffList; -import static cn.iocoder.yudao.module.hand.enums.ErrorCodeConstants.ALARM_MESSAGE_NOT_EXISTS; - -/** - * GAS手持探测器推送 Service 实现类 - * - * @author 超级管理员 - */ -@Service -@Validated -public class AlarmMessageServiceImpl implements AlarmMessageService { - - @Resource - private AlarmMessageMapper alarmMessageMapper; - - @Override - public Long createAlarmMessage(AlarmMessageSaveReqVO createReqVO) { - // 插入 - AlarmMessageDO alarmMessage = BeanUtils.toBean(createReqVO, AlarmMessageDO.class); - alarmMessageMapper.insert(alarmMessage); - - // 返回 - return alarmMessage.getId(); - } - - @Override - public void updateAlarmMessage(AlarmMessageSaveReqVO updateReqVO) { - // 校验存在 - validateAlarmMessageExists(updateReqVO.getId()); - // 更新 - AlarmMessageDO updateObj = BeanUtils.toBean(updateReqVO, AlarmMessageDO.class); - alarmMessageMapper.updateById(updateObj); - } - - @Override - public void deleteAlarmMessage(Long id) { - // 校验存在 - validateAlarmMessageExists(id); - // 删除 - alarmMessageMapper.deleteById(id); - } - - @Override - public void deleteAlarmMessageListByIds(List ids) { - // 删除 - alarmMessageMapper.deleteByIds(ids); - } - - - private void validateAlarmMessageExists(Long id) { - if (alarmMessageMapper.selectById(id) == null) { - throw exception(ALARM_MESSAGE_NOT_EXISTS); - } - } - - @Override - public AlarmMessageDO getAlarmMessage(Long id) { - return alarmMessageMapper.selectById(id); - } - - @Override - public PageResult getAlarmMessagePage(AlarmMessagePageReqVO pageReqVO) { - return alarmMessageMapper.selectPage(pageReqVO); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public void createAlarmRecord(HandDataVo redisData, List listAll, String msgContent) { - String pushSnListStr = ""; - if (CollUtil.isNotEmpty(listAll)) { - pushSnListStr = listAll.stream() - .map(HandDetectorDO::getSn) - .filter(StrUtil::isNotBlank) - .collect(Collectors.joining(",")); - } - // 2. 构建实体对象 - AlarmMessageDO alarmDO = new AlarmMessageDO(); - alarmDO.setDetectorId(redisData.getId()); - alarmDO.setName(redisData.getName()); - alarmDO.setSn(redisData.getSn()); - alarmDO.setTenantId(redisData.getTenantId()); - alarmDO.setDeptId(redisData.getDeptId()); - alarmDO.setMessage(msgContent); - alarmDO.setPushSnList(pushSnListStr); - alarmDO.setRemark("系统自动触发报警推送"); - - // 3. 落库 - alarmMessageMapper.insert(alarmDO); - } - -} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java index 65f285b..4a35d5d 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java @@ -15,7 +15,7 @@ import org.springframework.transaction.annotation.Transactional; import java.util.*; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.common.pojo.PageParam; -import cn.iocoder.yudao.framework.common.util.object.BeanUtils; +import cn.iocoder.yudao.framework.common.util.object.BeanUtils; // 假设您使用的是若依/yudao框架的工具类 import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList; @@ -93,4 +93,17 @@ public class FenceAlarmServiceImpl implements FenceAlarmService { fenceAlarmMapper.updateById(updateReqVO); } + @Override + public void batchCreateFenceAlarm(List fenceAlarmsToCreate) { + + List fenceAlarmDOs = BeanUtils.toBean(fenceAlarmsToCreate, FenceAlarmDO.class); + fenceAlarmMapper.insertBatch(fenceAlarmDOs); + } + + @Override + public void batchUpdateById(List fenceAlarmsToUpdate) { + + fenceAlarmMapper.updateBatch(fenceAlarmsToUpdate); + } + } \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java index 0fee6f2..9ab539d 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java @@ -96,4 +96,20 @@ public class HandAlarmServiceImpl implements HandAlarmService { handAlarmMapper.insertBatch(doList); } + @Override + @Transactional(rollbackFor = Exception.class) + @TenantIgnore + public void batchCreateHandAlarm(List gasAlarmsToCreate) { + + List bean = BeanUtils.toBean(gasAlarmsToCreate, HandAlarmDO.class); + handAlarmMapper.insertBatch(bean); + } + + @Override + @Transactional(rollbackFor = Exception.class) + @TenantIgnore + public void batchUpdateById(List gasAlarmsToUpdate) { + handAlarmMapper.updateBatch(gasAlarmsToUpdate); + } + } \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java index 0f5bd8f..844373c 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java @@ -255,6 +255,7 @@ public class HandDetectorServiceImpl implements HandDetectorService { } @Override + @TenantIgnore public List listAll(QueryWrapper handDetectorDOQueryWrapper) { return handDetectorMapper.selectList(handDetectorDOQueryWrapper); @@ -333,17 +334,55 @@ public class HandDetectorServiceImpl implements HandDetectorService { @Override @TenantIgnore - public List getListAll(Long deptId,Long tenantId,String sn) { - + public List getSnListByDept(Long deptId, Long tenantId, String excludeSn) { QueryWrapper queryWrapper = new QueryWrapper<>(); + // 【核心优化1】只查询 sn 字段,不查其他几十个字段,极大减少网络传输和内存占用 + queryWrapper.select("sn"); if (deptId != null){ queryWrapper.eq("dept_id", deptId); } - if (sn != null) { - queryWrapper.ne("sn", sn); + // 排除当前的 SN (excludeSn) + if (excludeSn != null) { + queryWrapper.ne("sn", excludeSn); } queryWrapper.eq("tenant_id", tenantId); - return handDetectorMapper.selectList(queryWrapper); + + // 【核心优化2】使用 selectObjs + // selectObjs 直接返回第一列的值(List),性能最高 + List result = handDetectorMapper.selectObjs(queryWrapper); + + if (result == null) { + return Collections.emptyList(); + } + return result.stream() + .filter(Objects::nonNull) + .map(Object::toString) + .collect(Collectors.toList()); + } + + @Override + public void batchUpdateRedisData(Long tenantId, Map deviceMap) { + if (tenantId == null || deviceMap == null || deviceMap.isEmpty()) { + return; + } + + try { + String redisKey = RedisKeyUtil.getTenantDeviceHashKey(tenantId); + + // 将 Map 转换为 Map + Map redisMap = new HashMap<>(deviceMap); + + // 使用 hmset 批量更新 + boolean success = redisUtil.hmset(redisKey, redisMap); + + if (success) { + log.debug("[Redis批量更新] 成功,租户: {}, 设备数量: {}", tenantId, deviceMap.size()); + } else { + log.error("[Redis批量更新] 失败,租户: {}, 设备数量: {}", tenantId, deviceMap.size()); + } + } catch (Exception e) { + log.error("[Redis批量更新] 异常,租户: {}, 设备数量: {}", tenantId, deviceMap.size(), e); + } } } \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java index 0e3e41c..ce3ca3f 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java @@ -1,20 +1,21 @@ package cn.iocoder.yudao.module.hand.service.impl; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.security.core.LoginUser; +import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog; +import cn.iocoder.yudao.module.hand.dal.HandDetectorDO; import cn.iocoder.yudao.module.hand.mapper.TdengineMapper; import cn.iocoder.yudao.module.hand.service.TdengineService; -import cn.iocoder.yudao.module.hand.vo.HandOriginalLog; -import cn.iocoder.yudao.module.hand.vo.HandTdenginePageVO; -import cn.iocoder.yudao.module.hand.vo.HandTdenginePo; -import cn.iocoder.yudao.module.hand.vo.TdengineDataVo; +import cn.iocoder.yudao.module.hand.vo.*; import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import jakarta.annotation.Resource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; import java.util.List; @@ -26,50 +27,50 @@ import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUti @Service @Validated @DS("tdengine") +@Slf4j public class TdengineServiceImpl implements TdengineService { - private static final Logger log = LoggerFactory.getLogger(TdengineServiceImpl.class); @Resource private TdengineMapper tdengineMapper; @Override public void saveHandLogBatch(List batchList) { - if (batchList.isEmpty()) { + if (batchList == null || batchList.isEmpty()) { return; } - // 1. 按设备SN分组 - Map> groupedBySn = batchList.stream() - .collect(Collectors.groupingBy(HandOriginalLog::getSn)); - - // 遍历Map,每个Entry对应一个子表 - for (Map.Entry> entry : groupedBySn.entrySet()) { - String sn = entry.getKey(); - List logs = entry.getValue(); - - Long tenantId = logs.get(0).getTenantId(); - tdengineMapper.insertHandLogBatch(sn, tenantId, logs); + int batchSize = 1000; + // 手动分片逻辑 (如果不引入 Guava) + for (int i = 0; i < batchList.size(); i += batchSize) { + int end = Math.min(i + batchSize, batchList.size()); + List subList = batchList.subList(i, end); + + try { + tdengineMapper.insertHandLogBatch(subList); + } catch (Exception e) { + log.error("批量保存原始日志失败", e); + // 可以在这里做简单的重试或死信处理 + } } } @Override public void saveDataLogBatch(List batchList) { - if (batchList.isEmpty()) { + if (batchList == null || batchList.isEmpty()) { return; } - // 1. 按设备SN分组 - Map> groupedBySn = batchList.stream() - .collect(Collectors.groupingBy(TdengineDataVo::getSn)); - - // 遍历Map,每个Entry对应一个子表 - for (Map.Entry> entry : groupedBySn.entrySet()) { - String sn = entry.getKey(); - List logs = entry.getValue(); - Long tenantId = logs.get(0).getTenantId(); + int batchSize = 1000; - tdengineMapper.saveDataLogBatch(sn, tenantId, logs); + for (int i = 0; i < batchList.size(); i += batchSize) { + int end = Math.min(i + batchSize, batchList.size()); + List subList = batchList.subList(i, end); + try { + tdengineMapper.saveDataLogBatch(subList); + } catch (Exception e) { + log.error("批量保存设备数据日志失败, 本批次数量: {}", subList.size(), e); + } } } @@ -118,4 +119,32 @@ public class TdengineServiceImpl implements TdengineService { } return tdengineMapper.HistoricalSn(po); } + + @Override + @Transactional(rollbackFor = Exception.class) + public void createAlarmRecord(List alarmMessageLogList) { + + // 3. 落库 + tdengineMapper.saveMeaagesLogList(alarmMessageLogList); + } + + + @Override + public PageResult getAlarmMessageLog(HandTdenginePageVO pageReqVO) { + + IPage page = new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()); + + LoginUser loginUser = getLoginUser(); + if (loginUser != null){ + pageReqVO.setTenantId(loginUser.getTenantId()); + } + IPage resultPage = tdengineMapper.selectMessagePage(page, pageReqVO); + + List doList = resultPage.getRecords(); + if (doList == null || doList.isEmpty()) { + return PageResult.empty(resultPage.getTotal()); + } + + return new PageResult<>(doList, resultPage.getTotal()); + } } diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisKeyUtil.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisKeyUtil.java index 57a61f3..a375f24 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisKeyUtil.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisKeyUtil.java @@ -41,13 +41,7 @@ public class RedisKeyUtil { return DEVICE_INFO_PREFIX + ":" + tenantId; } - /** - * 获取处理锁 Key (加入租户维度更安全) - * 结构示例: lock:device:{tenantId}:{sn} - */ - public static String getDeviceProcessLockKey(Long tenantId, String sn) { - return LOCK_PREFIX + ":" + tenantId + ":" + sn; - } + public static String getDeviceTenantMappingKey() { return DEVICE_TENANT_MAPPING_KEY; diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisUtil.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisUtil.java index bf615e0..4ccb757 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisUtil.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisUtil.java @@ -187,6 +187,10 @@ public class RedisUtil { public Map hmget(String key) { return redisTemplate.opsForHash().entries(key); } + + public List hmget(String key, Collection fields) { + return redisTemplate.opsForHash().multiGet(key, fields); + } /** * 获取hashKey对应的所有键值 * diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageLog.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageLog.java new file mode 100644 index 0000000..bc0611a --- /dev/null +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageLog.java @@ -0,0 +1,55 @@ +package cn.iocoder.yudao.module.hand.vo; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; +import com.baomidou.mybatisplus.annotation.*; +import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO; + +import java.sql.Timestamp; + +/** + * GAS手持探测器推送 DO + * + * @author 超级管理员 + */ +@Data +public class AlarmMessageLog { + + /** + * 手持表id + */ + private Long detectorId; + /** + * 持有人名称 + */ + private String holderName; + /** + * 设备编号 + */ + private String sn; + /** + * 消息 + */ + private String message; + /** + * 推送设备sn,逗号分割 + */ + private String pushSnList; + /** + * 备注 + */ + private String remark; + /** + * 部门id + */ + private Long deptId; + /** + * 租户id + */ + private Long tenantId; + /** + * 更新时间 + */ + private Timestamp ts; + +} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessagePageReqVO.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessagePageReqVO.java deleted file mode 100644 index 047cad3..0000000 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessagePageReqVO.java +++ /dev/null @@ -1,41 +0,0 @@ -package cn.iocoder.yudao.module.hand.vo; - -import lombok.*; -import java.util.*; -import io.swagger.v3.oas.annotations.media.Schema; -import cn.iocoder.yudao.framework.common.pojo.PageParam; -import org.springframework.format.annotation.DateTimeFormat; -import java.time.LocalDateTime; - -import static cn.iocoder.yudao.framework.common.util.date.DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND; - -@Schema(description = "管理后台 - GAS手持探测器推送分页 Request VO") -@Data -public class AlarmMessagePageReqVO extends PageParam { - - @Schema(description = "手持表id", example = "10665") - private Long detectorId; - - @Schema(description = "持有人名称", example = "王五") - private String name; - - @Schema(description = "设备编号") - private String sn; - - @Schema(description = "消息") - private String message; - - @Schema(description = "推送设备sn,逗号分割") - private String pushSnList; - - @Schema(description = "备注", example = "随便") - private String remark; - - @Schema(description = "部门id", example = "12286") - private Long deptId; - - @Schema(description = "创建时间") - @DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND) - private LocalDateTime[] createTime; - -} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageRespVO.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageRespVO.java deleted file mode 100644 index 3026820..0000000 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageRespVO.java +++ /dev/null @@ -1,51 +0,0 @@ -package cn.iocoder.yudao.module.hand.vo; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; -import java.util.*; -import org.springframework.format.annotation.DateTimeFormat; -import java.time.LocalDateTime; -import com.alibaba.excel.annotation.*; - -@Schema(description = "管理后台 - GAS手持探测器推送 Response VO") -@Data -@ExcelIgnoreUnannotated -public class AlarmMessageRespVO { - - @Schema(description = "主键ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "13733") - @ExcelProperty("主键ID") - private Long id; - - @Schema(description = "手持表id", example = "10665") - @ExcelProperty("手持表id") - private Long detectorId; - - @Schema(description = "持有人名称", example = "王五") - @ExcelProperty("持有人名称") - private String name; - - @Schema(description = "设备编号") - @ExcelProperty("设备编号") - private String sn; - - @Schema(description = "消息") - @ExcelProperty("消息") - private String message; - - @Schema(description = "推送设备sn,逗号分割") - @ExcelProperty("推送设备sn,逗号分割") - private String pushSnList; - - @Schema(description = "备注", example = "随便") - @ExcelProperty("备注") - private String remark; - - @Schema(description = "部门id", example = "12286") - @ExcelProperty("部门id") - private Long deptId; - - @Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED) - @ExcelProperty("创建时间") - private LocalDateTime createTime; - -} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageSaveReqVO.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageSaveReqVO.java deleted file mode 100644 index 7357145..0000000 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageSaveReqVO.java +++ /dev/null @@ -1,36 +0,0 @@ -package cn.iocoder.yudao.module.hand.vo; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; -import java.util.*; -import jakarta.validation.constraints.*; - -@Schema(description = "管理后台 - GAS手持探测器推送新增/修改 Request VO") -@Data -public class AlarmMessageSaveReqVO { - - @Schema(description = "主键ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "13733") - private Long id; - - @Schema(description = "手持表id", example = "10665") - private Long detectorId; - - @Schema(description = "持有人名称", example = "王五") - private String name; - - @Schema(description = "设备编号") - private String sn; - - @Schema(description = "消息") - private String message; - - @Schema(description = "推送设备sn,逗号分割") - private String pushSnList; - - @Schema(description = "备注", example = "随便") - private String remark; - - @Schema(description = "部门id", example = "12286") - private Long deptId; - -} \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/Geofence.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/Geofence.java index 27a6e7e..8a0d89e 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/Geofence.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/Geofence.java @@ -6,6 +6,10 @@ import java.util.Collections; import java.util.List; @Data public class Geofence { + + private Long id; // <--- 添加 ID 字段 + + private final List vertices; private final double minX, minY, maxX, maxY; // 包围盒 diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java index 3f3aed6..36a5812 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java @@ -11,6 +11,16 @@ public class HandTdenginePageVO extends PageParam { private String sn; + /** + * 持有人名称 + */ + private String holderName; + + /** + * 推送设备sn,逗号分割 + */ + private String pushSnList; + private Timestamp startTime; private Timestamp endTime; diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java index 852f145..0a6269b 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java @@ -36,4 +36,6 @@ public class TdengineDataVo { @Schema(description = "租户id") private Long tenantId; + + } diff --git a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/AlarmMessageMapper.xml b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/AlarmMessageMapper.xml deleted file mode 100644 index a3cbf5e..0000000 --- a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/AlarmMessageMapper.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - - - \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml index ae9ff39..6169ef9 100644 --- a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml +++ b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml @@ -10,27 +10,35 @@ - - INSERT INTO hand_original_log_${sn} - USING hand_original_log - TAGS(#{sn}, #{tenantId}) - (ts, payload) - VALUES - + INSERT INTO + + + hand_original_log_${log.sn} + + USING hand_original_log + + TAGS(#{log.sn}, #{log.tenantId}) + + (ts, payload) + VALUES (#{log.ts}, #{log.payload}) - - INSERT INTO device_data_log_#{sn} - USING device_data_log - TAGS(#{sn}, #{tenantId}) - (ts, battery, `value`, longitude, latitude, `name`) - VALUES - + INSERT INTO + + + device_data_log_${log.sn} + + USING device_data_log + + TAGS(#{log.sn}, #{log.tenantId}) + + (ts, battery, `value`, longitude, latitude, `name`) + + VALUES (#{log.ts}, #{log.battery}, #{log.value}, #{log.longitude}, #{log.latitude}, #{log.name}) - + + + INSERT INTO + + + `message_log_#{item.sn}` + + USING alarm_message_log + + TAGS ( + #{item.sn}, + #{item.detectorId}, + #{item.tenantId} + ) + + (ts, holder_name, message, push_sn_list, remark, dept_id) + + VALUES ( + now, + #{item.holderName}, + #{item.message}, + #{item.pushSnList}, + #{item.remark}, + #{item.deptId} + ) + + + + \ No newline at end of file diff --git a/cc-admin-master/yudao-module-system/pom.xml b/cc-admin-master/yudao-module-system/pom.xml index afe5f3f..9689dec 100644 --- a/cc-admin-master/yudao-module-system/pom.xml +++ b/cc-admin-master/yudao-module-system/pom.xml @@ -23,7 +23,10 @@ yudao-module-infra ${revision} - + + jakarta.mail + jakarta.mail-api + cn.iocoder.boot @@ -88,6 +91,12 @@ org.springframework.boot spring-boot-starter-mail + + + org.eclipse.angus + jakarta.mail + + diff --git a/cc-admin-master/yudao-server/src/main/resources/application-dev.yaml b/cc-admin-master/yudao-server/src/main/resources/application-dev.yaml index 902ccec..d0cd515 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application-dev.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application-dev.yaml @@ -235,7 +235,7 @@ mqtt: topic: +/zds_up,+/zds_down qos: 1,1 default: - publishQos: 1 + publishQos: 0 offlineTime: 180 # 超过 180 秒无数据则判为数据超时 pool: coreSize: 10 diff --git a/cc-admin-master/yudao-server/src/main/resources/application-local.yaml b/cc-admin-master/yudao-server/src/main/resources/application-local.yaml index 015c57b..20498f5 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application-local.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application-local.yaml @@ -235,7 +235,7 @@ mqtt: topic: +/zds_up,+/zds_down qos: 1,1 default: - publishQos: 1 + publishQos: 0 offlineTime: 180 # 超过 180 秒无数据则判为数据超时 pool: coreSize: 10 diff --git a/cc-admin-master/yudao-server/src/main/resources/application-prod.yaml b/cc-admin-master/yudao-server/src/main/resources/application-prod.yaml index 4f02524..e347c7b 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application-prod.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application-prod.yaml @@ -246,7 +246,7 @@ mqtt: topic: +/zds_up,+/zds_down qos: 1,1 default: - publishQos: 1 + publishQos: 0 offlineTime: 180 # 超过 180 秒无数据则判为数据超时 pool: coreSize: 10 diff --git a/cc-admin-master/yudao-server/src/main/resources/application.yaml b/cc-admin-master/yudao-server/src/main/resources/application.yaml index c6dd217..ed0c593 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application.yaml @@ -136,6 +136,8 @@ spring: consumer: auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解 value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + max-poll-records: 300 + fetch-max-wait: 3000 # 等待拉取消息的最大时间(毫秒) properties: spring.json.trusted.packages: '*' group-id: consumer-${spring.application.name} # 消费者分组