Browse Source

手持表批处理

master
wangwei_123 6 hours ago
parent
commit
0c0dab4da6
  1. 37
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java
  2. 190
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java
  3. 29
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java
  4. 58
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java
  5. 122
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java
  6. 2
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java
  7. 917
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
  8. 116
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java
  9. 107
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/AlarmMessageController.java
  10. 7
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandTdengineController.java
  11. 33
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/AlarmMessageMapper.java
  12. 17
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/TdengineMapper.java
  13. 67
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/AlarmMessageService.java
  14. 4
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java
  15. 4
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java
  16. 5
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandDetectorService.java
  17. 12
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/TdengineService.java
  18. 117
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/AlarmMessageServiceImpl.java
  19. 15
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java
  20. 16
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java
  21. 49
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java
  22. 89
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java
  23. 8
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisKeyUtil.java
  24. 4
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisUtil.java
  25. 28
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageLog.java
  26. 41
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessagePageReqVO.java
  27. 51
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageRespVO.java
  28. 36
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageSaveReqVO.java
  29. 4
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/Geofence.java
  30. 10
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java
  31. 2
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java
  32. 12
      cc-admin-master/yudao-module-hand/src/main/resources/mapper/AlarmMessageMapper.xml
  33. 97
      cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml
  34. 11
      cc-admin-master/yudao-module-system/pom.xml
  35. 2
      cc-admin-master/yudao-server/src/main/resources/application-dev.yaml
  36. 2
      cc-admin-master/yudao-server/src/main/resources/application-local.yaml
  37. 2
      cc-admin-master/yudao-server/src/main/resources/application-prod.yaml
  38. 2
      cc-admin-master/yudao-server/src/main/resources/application.yaml

37
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java

@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.mqtt.config;
import cn.iocoder.yudao.module.hand.service.TdengineService;
import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog;
import cn.iocoder.yudao.module.hand.vo.TdengineDataVo;
import cn.iocoder.yudao.module.hand.vo.HandOriginalLog;
import jakarta.annotation.Resource;
@ -20,33 +21,35 @@ public class BatchProcessorConfig {
@Bean
public TdengineBatchConfig<HandOriginalLog> handLogBatchProcessor() {
return new TdengineBatchConfig<>(
"HandLogProcessor", // 处理器名字
list -> tdengineService.saveHandLogBatch(list), // 【核心】将具体的保存逻辑作为 Lambda 传入
50000, // 队列容量
1000, // 批次大小
5000 // 执行频率
"HandLogProcessor",
list -> tdengineService.saveHandLogBatch(list),
50000,
1000,
5000 // 内部每 5秒 执行一次
);
}
@Bean
public TdengineBatchConfig<TdengineDataVo> handLogBatchDataProcessor() {
return new TdengineBatchConfig<>(
"handLogBatchDataProcessor", // 处理器名字
list -> tdengineService.saveDataLogBatch(list), // 【核心】将具体的保存逻辑作为 Lambda 传入
50000, // 队列容量
1000, // 批次大小
5000 // 执行频率
"handLogBatchDataProcessor",
list -> tdengineService.saveDataLogBatch(list),
50000,
1000,
5000 // 内部每 5秒 执行一次
);
}
@Bean
public TdengineBatchConfig<AlarmMessageLog> messageLogBatchProcessor() {
return new TdengineBatchConfig<>(
"messageLogBatchProcessor",
list -> tdengineService.createAlarmRecord(list),
50000,
1000,
30000 // 内部每 30秒 执行一次
);
// --- 创建一个调度器来统一调用所有处理器的 flush 方法 ---
@Scheduled(fixedRateString = "${batch.processor.flush.rate:5000}") // 从配置读取频率,默认5秒
public void scheduleFlush() {
// 调用 handLogProcessor 的刷新方法
handLogBatchProcessor().flush();
handLogBatchDataProcessor().flush();
// 如果有其他处理器,也在这里调用
}
}

190
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java

@ -1,119 +1,169 @@
package cn.iocoder.yudao.module.mqtt.config;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@Slf4j
public class TdengineBatchConfig<T> {
// --- 这些参数可以在构造时传入,使其更灵活 ---
private final int queueCapacity;
private final int batchSize;
private final long offerTimeoutMs;
private final long maxWaitTimeMs;
private final String processorName;
private final String processorName; // 用于日志,区分不同的处理器实例
private final BlockingQueue<T> dataQueue;
private final Consumer<List<T>> batchAction; // 【核心】用于处理一个批次的具体业务逻辑
private final Consumer<List<T>> batchAction;
private final AtomicBoolean running = new AtomicBoolean(true);
private Thread workerThread;
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
/**
* 构造函数
* @param processorName 处理器名称用于日志区分
* @param batchAction 一个函数接收一个 List<T> 并执行相应的批量操作例如写入数据库
* @param queueCapacity 队列容量
* @param batchSize 批次大小
* @param fixedRateMs 执行频率
*/
public TdengineBatchConfig(String processorName, Consumer<List<T>> batchAction,
int queueCapacity, int batchSize, long fixedRateMs) {
int queueCapacity, int batchSize, long maxWaitTimeMs) {
this.processorName = processorName;
this.batchAction = batchAction;
this.queueCapacity = queueCapacity;
this.batchSize = batchSize;
this.offerTimeoutMs = 100L;
this.maxWaitTimeMs = maxWaitTimeMs;
this.dataQueue = new LinkedBlockingQueue<>(this.queueCapacity);
startWorker();
}
public void addToBatch(T data) {
if (data == null) {
return;
private void startWorker() {
this.workerThread = new Thread(this::processLoop, "TD-Worker-" + processorName);
this.workerThread.setDaemon(true);
this.workerThread.start();
log.info("[{}] 批处理线程已启动,批次大小: {}, 最大等待: {}ms",
processorName, batchSize, maxWaitTimeMs);
}
private void processLoop() {
List<T> buffer = new ArrayList<>(batchSize);
long lastFlushTime = System.currentTimeMillis();
while (running.get()) {
try {
long now = System.currentTimeMillis();
long waitTime = maxWaitTimeMs - (now - lastFlushTime);
if (waitTime <= 0) waitTime = 1;
T firstItem = dataQueue.poll(waitTime, TimeUnit.MILLISECONDS);
if (firstItem != null) {
buffer.add(firstItem);
dataQueue.drainTo(buffer, batchSize - buffer.size());
}
boolean sizeReached = buffer.size() >= batchSize;
boolean timeReached = (System.currentTimeMillis() - lastFlushTime) >= maxWaitTimeMs;
if ((sizeReached || timeReached) && !buffer.isEmpty()) {
doFlush(buffer);
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
} catch (InterruptedException e) {
log.warn("[{}] 工作线程被中断", processorName);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("[{}] Loop异常", processorName, e);
}
}
// 如果正在关闭,则不再接受新数据
if (shuttingDown.get()) {
log.warn("[{}] 正在关闭,已拒绝添加新数据。", this.processorName);
return;
if (!buffer.isEmpty()) doFlush(buffer);
}
private void doFlush(List<T> batch) {
try {
if (log.isDebugEnabled()) {
log.debug("[{}] 触发批量入库,数量: {}", processorName, batch.size());
}
batchAction.accept(new ArrayList<>(batch));
} catch (Exception e) {
log.error("[{}] 批量入库失败!数量: {}", processorName, batch.size(), e);
}
}
/**
* 单条添加保留兼容性
*/
public void addToBatch(T data) {
if (data == null || !running.get()) return;
try {
if (!dataQueue.offer(data, offerTimeoutMs, TimeUnit.MILLISECONDS)) {
log.warn("[{}] 缓冲区已满且在 {} 毫秒内无法添加,数据可能被丢弃!当前队列大小: {}",
this.processorName, this.offerTimeoutMs, dataQueue.size());
if (!dataQueue.offer(data, 100, TimeUnit.MILLISECONDS)) {
log.warn("[{}] 队列已满({}),丢弃数据", processorName, queueCapacity);
}
} catch (InterruptedException e) {
log.error("[{}] 添加数据到缓冲区时线程被中断", this.processorName, e);
Thread.currentThread().interrupt();
}
}
// 注意:@Scheduled 注解不能用在非 Spring Bean 的普通类方法上。
// 我们将在下一步的配置类中解决这个问题。
public void flush() {
if (dataQueue.isEmpty()) {
/**
* 批量添加新增优化方法- 关键优化点
*/
public void addToBatch(Collection<T> dataList) {
if (dataList == null || dataList.isEmpty() || !running.get()) {
return;
}
List<T> batchList = new ArrayList<>(batchSize);
try {
int drainedCount = dataQueue.drainTo(batchList, batchSize);
if (drainedCount > 0) {
log.debug("[{}] 定时任务触发,准备将 {} 条数据进行批量处理...", this.processorName, drainedCount);
// 调用构造时传入的业务逻辑
this.batchAction.accept(batchList);
log.debug("[{}] 成功批量处理 {} 条数据。", this.processorName, drainedCount);
int added = 0;
int dropped = 0;
for (T data : dataList) {
if (data == null) continue;
try {
// 批量添加时使用较短的超时,避免阻塞太久
if (dataQueue.offer(data, 10, TimeUnit.MILLISECONDS)) {
added++;
} else {
dropped++;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("[{}] 批量添加被中断,已添加: {}, 剩余: {}",
processorName, added, dataList.size() - added);
break;
}
} catch (Exception e) {
log.error("[{}] 批量处理数据时发生严重错误!数据量: {}", this.processorName, batchList.size(), e);
}
if (dropped > 0) {
log.warn("[{}] 批量添加完成,成功: {}, 丢弃: {} (队列已满)",
processorName, added, dropped);
} else if (log.isDebugEnabled()) {
log.debug("[{}] 批量添加完成,数量: {}", processorName, added);
}
}
/**
* 获取当前队列大小用于监控
*/
public int getQueueSize() {
return dataQueue.size();
}
@PreDestroy
public void onShutdown() {
log.info("[{}] 应用即将关闭,开始执行最后的缓冲区数据刷新...", this.processorName);
// 1. 设置关闭标志,阻止新数据进入
shuttingDown.set(true);
// 2. 将队列中剩余的所有数据一次性取出到一个临时列表
List<T> remainingData = new ArrayList<>();
dataQueue.drainTo(remainingData);
if (remainingData.isEmpty()) {
log.info("[{}] 缓冲区为空,无需处理。", this.processorName);
return;
}
log.info("[{}] 停机处理中,剩余 {} 条数据待处理...", this.processorName, remainingData.size());
// 3. 对取出的数据进行分批处理
for (int i = 0; i < remainingData.size(); i += batchSize) {
// 计算当前批次的结束索引
int end = Math.min(i + batchSize, remainingData.size());
// 获取子列表作为当前批次
List<T> batch = remainingData.subList(i, end);
log.info("[{}] 正在停止...", processorName);
running.set(false);
if (workerThread != null) {
workerThread.interrupt();
try {
log.debug("[{}] 正在处理最后批次的数据,数量: {}", this.processorName, batch.size());
this.batchAction.accept(batch);
} catch (Exception e) {
log.error("[{}] 关闭过程中批量处理数据时发生严重错误!数据量: {}", this.processorName, batch.size(), e);
workerThread.join(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.info("[{}] 缓冲区数据已全部处理完毕。", this.processorName);
log.info("[{}] 已停止。剩余队列: {}", processorName, dataQueue.size());
}
}

29
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java

@ -3,7 +3,12 @@ package cn.iocoder.yudao.module.mqtt.kafka;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@ -31,4 +36,28 @@ public class KafkaConfig {
// 当重试耗尽后,会调用 recoverer 将消息送入 DLT
return new DefaultErrorHandler(recoverer, backOff);
}
@Bean("batchFactory")
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<String, String> consumerFactory,
KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 【核心设置】开启批量监听!没有这就还是单条处理
factory.setBatchListener(true);
// 【并发设置】根据你的分区数设置,比如你有 3 个分区就设为 3
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
factory.getContainerProperties().setPollTimeout(3000);
// 创建一个 DeadLetterPublishingRecoverer,它知道如何将失败消息发送到 DLT
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(KafkaTopicType.DEAD_LETTER_TOPIC.getValue(), record.partition()));
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 2L));
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}

58
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java

@ -1,16 +1,24 @@
package cn.iocoder.yudao.module.mqtt.kafka;
import cn.iocoder.yudao.module.mqtt.processor.BatchDeviceMessageProcessor;
import cn.iocoder.yudao.module.mqtt.processor.DeviceMessageProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static cn.iocoder.yudao.module.mqtt.kafka.KafkaTopicType.DEAD_LETTER_TOPIC;
@Slf4j
@ -19,40 +27,44 @@ public class KafkaMessageConsumer {
private final TaskExecutor taskExecutor;
private final DeviceMessageProcessor deviceMessageProcessor;
private final BatchDeviceMessageProcessor batchDeviceMessageProcessor;
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaMessageConsumer(DeviceMessageProcessor deviceMessageProcessor,
@Qualifier("mqttExecutor") TaskExecutor taskExecutor,
BatchDeviceMessageProcessor batchDeviceMessageProcessor,
KafkaTemplate<String, String> kafkaTemplate) {
this.deviceMessageProcessor = deviceMessageProcessor;
this.taskExecutor = taskExecutor;
this.batchDeviceMessageProcessor = batchDeviceMessageProcessor;
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(topics = "zds_up")
public void handleMessage(@Payload String payload,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.debug("从 Kafka Topic 收到消息,准备处理: topic=[{}]", topic);
// 3. 异步执行,保证kafka不阻塞
taskExecutor.execute(() -> {
try {
// 核心业务逻辑调用
this.deviceMessageProcessor.process(key, payload);
} catch (Exception e) {
// 1. 捕获所有运行时异常
log.error("处理 Kafka 消息时发生严重错误!将消息发送到死信队列. Key: {}, Payload: {}", key, payload, e);
try {
// 2. 将失败的消息发送到 DLT
// 可以在消息头中添加额外信息,如原始主题、异常信息等,方便排查
kafkaTemplate.send(DEAD_LETTER_TOPIC.getValue(), key, payload);
} catch (Exception dltEx) {
log.error("无法将消息发送到死信队列!消息可能丢失. Key: {}, Payload: {}", key, payload, dltEx);
}
}
});
// 使用你定义的 batchFactory
@KafkaListener(topics = "zds_up", containerFactory = "batchFactory")
public void handleBatchMessage(List<ConsumerRecord<String, String>> records) {
if (records.isEmpty()) {
return;
}
log.info(">>> 开始同步处理 {} 条消息", records.size());
try {
// 1. [成功路径] 核心业务逻辑在消费者线程中同步执行
batchDeviceMessageProcessor.processBatch(records);
log.info("成功处理并确认 {} 条消息", records.size());
} catch (Exception e) {
// 2. [失败路径] 业务处理失败,转入异步DLT处理流程
log.error("批量处理失败,将异步发送 {} 条消息到死信队列", records.size(), e);
throw e;
}
}
}

122
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java

@ -20,7 +20,6 @@ import java.util.concurrent.BlockingQueue;
@Slf4j
@Component
public class Client {
private final OnMessageCallback onMessageCallback; // 1. 注入我们新的 Kafka 生产者回调
// --- 1. 配置注入 (保持不变) ---
@Value("${mqtt.enable:false}")
@ -43,11 +42,29 @@ public class Client {
private String[] subscribeTopics;
@Value("${mqtt.subscribe.qos}")
private int[] subscribeQos;
@Value("${mqtt.default.publishQos:1}")
@Value("${mqtt.default.publishQos:0}")
private int publishQos;
private IMqttClient mqttClient;
// 2. 构造函数注入 Spring Bean
//使用异步客户端接口
private IMqttAsyncClient mqttClient;
// 【修改点2】定义全局通用的发送回调,避免每次发送都创建对象,减少GC
private final IMqttActionListener globalPublishCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// 高并发下,成功通常不打印日志,否则磁盘IO会爆炸
// log.debug("发送成功: {}", asyncActionToken.getTopics());
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// 只有失败才打印日志
log.error("MQTT异步发送失败, Topic: {}",
asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null",
exception);
}
};
public Client(OnMessageCallback onMessageCallback) {
this.onMessageCallback = onMessageCallback;
}
@ -64,19 +81,32 @@ public class Client {
private void connect() {
try {
String finalClientId = baseClientId + "-" + UUID.randomUUID().toString().substring(0, 8);
mqttClient = new MqttClient(hostUrl, finalClientId, new MemoryPersistence());
MqttConnectOptions options = createMqttConnectOptions();
//实例化 MqttAsyncClient
mqttClient = new MqttAsyncClient(hostUrl, finalClientId, new MemoryPersistence());
// 3. 将注入的回调 Bean 设置给 Paho 客户端
mqttClient.setCallback(onMessageCallback);
log.info("正在连接到 MQTT Broker: {}", hostUrl);
mqttClient.connect(options);
subscribeToTopics();
MqttConnectOptions options = createMqttConnectOptions();
log.info("正在异步连接到 MQTT Broker: {}", hostUrl);
// 【修改点4】执行异步连接,并在回调中处理订阅
mqttClient.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
log.info("MQTT 连接成功 (Async)");
subscribeToTopics();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.error("MQTT 连接失败", exception);
}
});
} catch (MqttException e) {
log.error("连接到 MQTT Broker 时发生初始错误: {}", e.getMessage(), e);
log.error("初始化 MQTT 客户端失败: {}", e.getMessage(), e);
}
}
@ -87,50 +117,75 @@ public class Client {
options.setConnectionTimeout(connectionTimeout);
options.setKeepAliveInterval(keepAliveInterval);
options.setCleanSession(cleanSession);
options.setAutomaticReconnect(true); // 启用自动重连
options.setAutomaticReconnect(true);
//对于 10w QPS,必须拉满到 MQTT 协议上限 (65535)
options.setMaxInflight(65535);
return options;
}
/**
* 4. 创建一个事件监听器用于处理连接成功事件
* 这取代了之前传递 Runnable 的方式实现了更好的解耦
*/
/* @Async
@EventListener
public void handleMqttConnectionComplete(MqttConnectedEvent event) {
log.info("监听到 MQTT 连接成功事件,开始执行订阅操作...");
subscribeToTopics();
}*/
private void subscribeToTopics() {
if (mqttClient == null || !mqttClient.isConnected()) {
log.error("无法订阅主题因为 MQTT 客户端未连接");
log.warn("无法订阅,客户端未连接");
return;
}
try {
log.info("执行 MQTT 主题订阅: {}", (Object) subscribeTopics);
mqttClient.subscribe(subscribeTopics, subscribeQos);
// 异步订阅
mqttClient.subscribe(subscribeTopics, subscribeQos, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
log.info("订阅成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.error("订阅失败", exception);
}
});
} catch (MqttException e) {
log.error("订阅 MQTT 主题时发生错误: {}", e.getMessage(), e);
log.error("订阅请求异常", e);
}
}
// --- 4. 发布和清理方法 (保持不变) ---
// --- 发布方法 ---
/**
* 兼容旧代码的重载方法
*/
public void publish(String topic, String payload) {
publish(topic, payload.getBytes(), this.publishQos, false);
publishAsync(topic, payload);
}
public void publish(String topic, byte[] payload, int qos, boolean retained) {
/**
* 推荐使用的异步发送方法
*/
public void publishAsync(String topic, String payload) {
if (payload == null) return;
publishAsync(topic, payload.getBytes(StandardCharsets.UTF_8), this.publishQos, false);
}
/**
* 核心异步发送逻辑
*/
public void publishAsync(String topic, byte[] payload, int qos, boolean retained) {
if (mqttClient == null || !mqttClient.isConnected()) {
log.error("MQTT 客户端未连接,无法发布消息。");
// 降级:只打印日志,不抛异常,避免中断业务循环
// 在高并发场景下,这里可以考虑加一个计数器,每N次打印一次日志
log.warn("MQTT未连接,丢弃消息: {}", topic);
return;
}
try {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
mqttClient.publish(topic, message);
// 【修改点6】调用异步发送接口,传入全局回调
mqttClient.publish(topic, message, null, globalPublishCallback);
} catch (MqttException e) {
log.error("发布 MQTT 消息到主题 [{}] 时发生错误: {}", topic, e.getMessage(), e);
// 这里通常是队列满了(MaxInflight满了)或者参数错误
log.error("MQTT消息入队失败: {}", topic, e);
}
}
@ -138,13 +193,14 @@ public class Client {
public void cleanup() {
if (mqttClient != null) {
try {
// 异步断开
if (mqttClient.isConnected()) {
mqttClient.disconnect();
}
mqttClient.close();
log.info("MQTT 客户端已成功关闭");
log.info("MQTT 客户端已关闭");
} catch (MqttException e) {
log.error("关闭 MQTT 客户端时发生错误: {}", e.getMessage(), e);
log.error("关闭 MQTT 客户端异常", e);
}
}
}

2
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java

@ -33,7 +33,7 @@ public class ThreadPoolConfig {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(2000); // 队列大一点,应对瞬间并发
executor.setQueueCapacity(5000); // 队列大一点,应对瞬间并发
executor.setThreadNamePrefix("alarm-push-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();

917
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

@ -0,0 +1,917 @@
package cn.iocoder.yudao.module.mqtt.processor;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.hand.dal.*;
import cn.iocoder.yudao.module.hand.enums.*;
import cn.iocoder.yudao.module.hand.service.*;
import cn.iocoder.yudao.module.hand.util.*;
import cn.iocoder.yudao.module.hand.vo.*;
import cn.iocoder.yudao.module.mqtt.config.TdengineBatchConfig;
import cn.iocoder.yudao.module.mqtt.mqtt.Client;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 批量设备消息处理器
* <p>
* 核心优化
* 1. 批量获取基础数据租户信息设备信息报警规则
* 2. 内存中完成所有业务逻辑计算
* 3. 批量执行所有数据库写操作
*/
@Slf4j
@Component
public class BatchDeviceMessageProcessor {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Resource
private RedisUtil redisUtil;
@Resource
private TdengineBatchConfig<HandOriginalLog> tdengineBatchProcessor;
@Resource
private TdengineBatchConfig<TdengineDataVo> tdengineBatchConfig;
@Resource
private TdengineBatchConfig<AlarmMessageLog> alarmProcessor;
@Resource
private HandDetectorService handDetectorService;
@Resource
private HandAlarmService handAlarmService;
@Resource
private AlarmRuleService alarmRuleService;
@Resource
private FenceService fenceService;
@Resource
private FenceAlarmService fenceAlarmService;
@Resource
private Client client;
@Resource(name = "mqttAlarmExecutor")
private TaskExecutor alarmExecutor;
/**
* 批量处理 Kafka 消息的主入口
*/
public void processBatch(List<ConsumerRecord<String, String>> records) {
long startTime = System.currentTimeMillis();
if (CollectionUtils.isEmpty(records)) {
return;
}
log.info("[批量处理] 开始处理,消息数量: {}", records.size());
try {
// === 阶段1: 数据准备 ===
BatchContext context = prepareBatchContext(records);
if (context.isEmpty()) {
log.warn("[批量处理] 无有效数据,处理结束");
return;
}
// === 阶段2: 业务逻辑处理 ===
processBatchLogic(records, context);
// === 阶段3: 批量持久化 ===
persistBatchData(context);
log.info("[批量处理] 完成,消息数量: {},耗时: {} ms",
records.size(), System.currentTimeMillis() - startTime);
} catch (Exception e) {
log.error("[批量处理] 发生异常", e);
// 根据业务需求决定是否需要回滚或重试
}
}
private BatchContext prepareBatchContext(List<ConsumerRecord<String, String>> records) {
BatchContext context = new BatchContext();
// 1. 提取所有有效的 SNs
List<String> sns = records.stream()
.map(ConsumerRecord::key)
.filter(StringUtils::isNotBlank)
.distinct()
.toList();
if (sns.isEmpty()) {
return context;
}
// 2. 批量获取租户信息
context.snToTenantMap = getTenantIdsInBatch(sns);
// 3. 按租户分组 SN
Map<Long, List<String>> tenantToSnsMap = sns.stream()
.filter(context.snToTenantMap::containsKey)
.collect(Collectors.groupingBy(context.snToTenantMap::get));
// 4. 批量获取设备信息
context.snToDeviceMap = getDeviceVosInBatch(tenantToSnsMap);
// 5. 批量获取报警规则
Set<Long> tenantIds = tenantToSnsMap.keySet();
context.tenantAlarmRules = getAlarmRulesForTenants(tenantIds);
// 6. 批量获取围栏信息
context.fenceCache = getFenceInfoBatch(context.snToDeviceMap.values());
return context;
}
/**
* 阶段2: 处理业务逻辑内存操作
*/
private void processBatchLogic(List<ConsumerRecord<String, String>> records, BatchContext context) {
for (ConsumerRecord<String, String> record : records) {
try {
processSingleMessage(record, context);
} catch (Exception e) {
log.error("[批量处理] 处理单条消息失败,SN: {}", record.key(), e);
// 继续处理下一条,不中断整个批次
}
}
}
/**
* 处理单条消息
*/
private void processSingleMessage(ConsumerRecord<String, String> record, BatchContext context) {
String sn = record.key();
String payload = record.value();
// 1. 基础校验
Long tenantId = context.snToTenantMap.get(sn);
HandDataVo device = context.snToDeviceMap.get(sn);
if (tenantId == null || device == null) {
log.warn("[批量处理] SN {} 无租户或设备信息,跳过", sn);
return;
}
// 2. 保存原始日志
context.originalLogs.add(createOriginalLog(sn, payload, tenantId));
// 3. 检查设备是否启用
if (EnableStatus.DISABLED.value().equals(device.getEnableStatus())) {
log.debug("[批量处理] 设备未启用,SN: {}", sn);
return;
}
// 4. 数据解析与转换
HandDataVo handVo = parseAndConvertData(sn, payload, device);
// 5. 获取报警规则
Map<Long, List<AlarmRuleDO>> ruleMap = context.tenantAlarmRules.get(tenantId);
AlarmRuleDO alarmRule = getAlarmRule(handVo, ruleMap);
// 6. 气体报警处理
processGasAlarm(handVo, alarmRule, context);
// 7. 电池报警处理
processBatteryAlarm(handVo, context);
// 8. 围栏报警处理
processFenceAlarm(handVo, context);
// 9. 保存处理后的数据日志
context.processedLogs.add(createTdengineDataVo(handVo));
// 10. 记录需要更新到 Redis 的数据
context.redisUpdates.put(sn, handVo);
}
/**
* 阶段3: 批量持久化数据
*/
private void persistBatchData(BatchContext context) {
// 1. 批量保存原始日志
if (!context.originalLogs.isEmpty()) {
tdengineBatchProcessor.addToBatch(context.originalLogs);
log.debug("[批量持久化] 原始日志: {} 条", context.originalLogs.size());
}
// 2. 批量保存处理后日志
if (!context.processedLogs.isEmpty()) {
tdengineBatchConfig.addToBatch(context.processedLogs);
log.debug("[批量持久化] 处理日志: {} 条", context.processedLogs.size());
}
// 3. 批量保存报警消息日志
if (!context.alarmMessageLogs.isEmpty()) {
alarmProcessor.addToBatch(context.alarmMessageLogs);
log.debug("[批量持久化] 报警消息: {} 条", context.alarmMessageLogs.size());
}
// 4. 批量创建气体报警
if (!context.gasAlarmsToCreate.isEmpty()) {
handAlarmService.batchCreateHandAlarm(context.gasAlarmsToCreate);
log.debug("[批量持久化] 新增气体报警: {} 条", context.gasAlarmsToCreate.size());
}
// 5. 批量更新气体报警
if (!context.gasAlarmsToUpdate.isEmpty()) {
handAlarmService.batchUpdateById(context.gasAlarmsToUpdate);
log.debug("[批量持久化] 更新气体报警: {} 条", context.gasAlarmsToUpdate.size());
}
// 6. 批量创建围栏报警
if (!context.fenceAlarmsToCreate.isEmpty()) {
fenceAlarmService.batchCreateFenceAlarm(context.fenceAlarmsToCreate);
log.debug("[批量持久化] 新增围栏报警: {} 条", context.fenceAlarmsToCreate.size());
}
// 7. 批量更新围栏报警
if (!context.fenceAlarmsToUpdate.isEmpty()) {
fenceAlarmService.batchUpdateById(context.fenceAlarmsToUpdate);
log.debug("[批量持久化] 更新围栏报警: {} 条", context.fenceAlarmsToUpdate.size());
}
// 8. 批量更新 Redis
if (!context.redisUpdates.isEmpty()) {
batchUpdateRedis(context.redisUpdates, context.snToTenantMap);
log.debug("[批量持久化] Redis 更新: {} 条", context.redisUpdates.size());
}
}
// ========== 基础数据获取方法 ==========
/**
* 批量获取租户ID映射
*/
private Map<String, Long> getTenantIdsInBatch(List<String> sns) {
Map<String, Long> result = new HashMap<>();
try {
List<Object> tenantIdObjs = redisUtil.hmget(
RedisKeyUtil.getDeviceTenantMappingKey(),
new ArrayList<>(sns)
);
for (int i = 0; i < sns.size(); i++) {
Object tenantIdObj = tenantIdObjs.get(i);
if (tenantIdObj != null && StringUtils.isNotBlank(tenantIdObj.toString())) {
result.put(sns.get(i), Long.parseLong(tenantIdObj.toString()));
}
}
} catch (Exception e) {
log.error("[批量获取] 获取租户ID失败", e);
}
return result;
}
/**
* 批量获取设备信息优先从 Redis
*/
private Map<String, HandDataVo> getDeviceVosInBatch(Map<Long, List<String>> tenantToSnsMap) {
Map<String, HandDataVo> result = new HashMap<>();
List<String> cacheMissSns = new ArrayList<>();
// 从 Redis 批量获取
tenantToSnsMap.forEach((tenantId, sns) -> {
try {
String redisKey = RedisKeyUtil.getTenantDeviceHashKey(tenantId);
List<Object> cachedObjects = redisUtil.hmget(redisKey, new ArrayList<>(sns));
for (int i = 0; i < sns.size(); i++) {
Object obj = cachedObjects.get(i);
String sn = sns.get(i);
if (obj != null) {
result.put(sn, BeanUtils.toBean(obj, HandDataVo.class));
} else {
cacheMissSns.add(sn);
}
}
} catch (Exception e) {
log.error("[批量获取] 从Redis获取设备信息失败,tenantId: {}", tenantId, e);
cacheMissSns.addAll(sns);
}
});
// 缓存未命中,从数据库加载
if (!cacheMissSns.isEmpty()) {
try {
QueryWrapper<HandDetectorDO> doQueryWrapper = new QueryWrapper<>();
doQueryWrapper.in("sn", cacheMissSns);
List<HandDetectorDO> detectors = handDetectorService.listAll(doQueryWrapper);
Map<String, HandDetectorDO> detectorMap = detectors.stream()
.collect(Collectors.toMap(HandDetectorDO::getSn, Function.identity()));
cacheMissSns.forEach(sn -> {
HandDetectorDO detector = detectorMap.get(sn);
if (detector != null) {
result.put(sn, BeanUtils.toBean(detector, HandDataVo.class));
}
});
log.info("[批量获取] 从数据库加载设备: {} 条", detectors.size());
} catch (Exception e) {
log.error("[批量获取] 从数据库加载设备失败", e);
}
}
return result;
}
/**
* 批量获取报警规则
*/
private Map<Long, Map<Long, List<AlarmRuleDO>>> getAlarmRulesForTenants(Set<Long> tenantIds) {
Map<Long, Map<Long, List<AlarmRuleDO>>> result = new HashMap<>();
try {
for (Long tenantId : tenantIds) {
Map<Long, List<AlarmRuleDO>> rules = alarmRuleService.selectCacheListMap(tenantId);
if (rules != null && !rules.isEmpty()) {
result.put(tenantId, rules);
}
}
} catch (Exception e) {
log.error("[批量获取] 获取报警规则失败", e);
}
return result;
}
/**
* 批量获取围栏信息
*/
private Map<Long, List<Geofence>> getFenceInfoBatch(Collection<HandDataVo> devices) {
Map<Long, List<Geofence>> result = new HashMap<>();
try {
// 收集所有需要查询的围栏ID
Set<Long> allFenceIds = devices.stream()
.map(HandDataVo::getFenceIds)
.filter(StringUtils::isNotBlank)
.flatMap(ids -> Arrays.stream(ids.split(",")))
.map(Long::parseLong)
.collect(Collectors.toSet());
if (!allFenceIds.isEmpty()) {
List<Geofence> fences = fenceService.getFenceList(new ArrayList<>(allFenceIds));
Map<Long, Geofence> fenceMap = fences.stream()
.collect(Collectors.toMap(Geofence::getId, Function.identity()));
// 为每个设备构建其对应的围栏列表
devices.forEach(device -> {
if (StringUtils.isNotBlank(device.getFenceIds())) {
List<Geofence> deviceFences = Arrays.stream(device.getFenceIds().split(","))
.map(Long::parseLong)
.map(fenceMap::get)
.filter(Objects::nonNull)
.toList();
if (!deviceFences.isEmpty()) {
result.put(device.getId(), deviceFences);
}
}
});
}
} catch (Exception e) {
log.error("[批量获取] 获取围栏信息失败", e);
}
return result;
}
// ========== 业务逻辑处理方法 ==========
/**
* 数据解析与转换
*/
private HandDataVo parseAndConvertData(String sn, String payload, HandDataVo device) {
try {
JSONObject json = JSONUtil.parseObj(payload);
Double gasValue = json.getDouble("gas", null);
String loc = json.getStr("loc");
String battery = json.getStr("battery");
device.setValue(gasValue);
device.setSn(sn);
device.setBattery(battery);
device.setTime(new Date());
device.setOnlineStatus(OnlineStatusType.ONLINE.getType());
// 解析位置信息
if (StringUtils.isNotBlank(loc)) {
String coords = loc.substring(1, loc.length() - 1);
String[] parts = coords.split(",");
if (parts.length == 3) {
Map<String, Double> converted = CoordinateTransferUtils.wgs84ToGcj02(
Double.parseDouble(parts[0]),
Double.parseDouble(parts[1])
);
device.setLongitude(converted.get("lon"));
device.setLatitude(converted.get("lat"));
device.setGpsType(Integer.parseInt(parts[2]));
}
}
} catch (Exception e) {
log.error("[数据转换] 解析失败,SN: {}, payload: {}", sn, payload, e);
}
return device;
}
/**
* 气体报警处理
*/
private void processGasAlarm(HandDataVo handVo, AlarmRuleDO alarmRule, BatchContext context) {
if (alarmRule == null) {
handVo.setGasStatus(HandAlarmType.NORMAL.getType());
return;
}
LocalDateTime now = LocalDateTime.now();
// 处理离线报警恢复
if (OnlineStatusType.ONLINE.getType().equals(handVo.getOnlineStatus())
&& AlarmLevelEnum.OFFLINE.value().equals(handVo.getAlarmLevel())
&& handVo.getAlarmId() != null) {
HandAlarmDO alarmToEnd = new HandAlarmDO();
alarmToEnd.setId(handVo.getAlarmId());
alarmToEnd.setTAlarmEnd(now);
alarmToEnd.setStatus(EnableStatus.HANDLE.value());
context.gasAlarmsToUpdate.add(alarmToEnd);
handVo.setAlarmId(null);
handVo.setAlarmLevel(0);
return;
}
boolean isCurrentlyAlarming = handVo.getAlarmLevel() != null && handVo.getAlarmLevel() > 0;
boolean shouldAlarm = alarmRule.getAlarmLevel() > 0;
// 报警恢复
if (isCurrentlyAlarming && !shouldAlarm) {
handVo.setAlarmLevel(0);
handVo.setTAlarmEnd(now);
handVo.setGasStatus(HandAlarmType.NORMAL.getType());
// 发送报警结束消息
sendAlarmMessage(handVo, alarmRule.getGasTypeName(), handVo.getValue(), false, context);
handVo.setLastPushValue(null);
return;
}
// 正常状态
if (!shouldAlarm) {
handVo.setAlarmLevel(0);
handVo.setGasStatus(HandAlarmType.NORMAL.getType());
return;
}
// 报警触发或持续
Integer newLevel = alarmRule.getAlarmLevel();
// 首次报警
if (!isCurrentlyAlarming) {
handVo.setFirstValue(handVo.getValue());
handVo.setTAlarmStart(now);
handVo.setMaxValue(handVo.getValue());
handVo.setMaxAlarmLevel(newLevel);
// 创建新报警
HandAlarmSaveReqVO newAlarm = new HandAlarmSaveReqVO();
newAlarm.setDetectorId(handVo.getId());
newAlarm.setSn(handVo.getSn());
newAlarm.setVAlarmFirst(handVo.getFirstValue());
newAlarm.setGasType(handVo.getGasChemical());
newAlarm.setPicX(handVo.getLongitude());
newAlarm.setPicY(handVo.getLatitude());
newAlarm.setAlarmType(AlarmType.GAS.getType());
newAlarm.setAlarmLevel(newLevel);
newAlarm.setTAlarmStart(now);
newAlarm.setTenantId(Math.toIntExact(handVo.getTenantId()));
newAlarm.setUnit(handVo.getUnit());
newAlarm.setCreator("system");
newAlarm.setCreateTime(now);
context.gasAlarmsToCreate.add(newAlarm);
context.pendingAlarmIds.put(handVo.getSn(), newAlarm); // 用于后续回填ID
}
handVo.setAlarmLevel(newLevel);
// 报警升级
if (handVo.getMaxAlarmLevel() == null || newLevel > handVo.getMaxAlarmLevel()) {
handVo.setMaxAlarmLevel(newLevel);
}
// 更新最大值
if (shouldUpdateMaxValue(handVo.getValue(), handVo.getMaxValue(), alarmRule.getDirection())) {
handVo.setMaxValue(handVo.getValue());
// 更新已存在的报警记录
if (handVo.getAlarmId() != null) {
HandAlarmDO alarmToUpdate = new HandAlarmDO();
alarmToUpdate.setId(handVo.getAlarmId());
alarmToUpdate.setVAlarmMaximum(handVo.getMaxValue());
alarmToUpdate.setAlarmLevel(newLevel);
alarmToUpdate.setUpdater("system");
alarmToUpdate.setUpdateTime(now);
context.gasAlarmsToUpdate.add(alarmToUpdate);
}
}
// 推送报警消息(值变化时)
if (handVo.getLastPushValue() == null
|| !handVo.getLastPushValue().equals(handVo.getValue())) {
sendAlarmMessage(handVo, alarmRule.getGasTypeName(), handVo.getValue(), true, context);
handVo.setLastPushValue(handVo.getValue());
}
handVo.setGasStatus(HandAlarmType.ALARM.getType());
}
/**
* 电池报警处理
*/
private void processBatteryAlarm(HandDataVo handVo, BatchContext context) {
handVo.setBatteryStatus(EnableStatus.DISABLED.value());
if (handVo.getBatteryAlarmValue() == null) {
return;
}
try {
int batteryPercentage = BatteryConverterUtils.getBatteryPercentage(
Integer.parseInt(handVo.getBattery())
);
int threshold = handVo.getBatteryAlarmValue().intValue();
boolean isCurrentlyAlarming = EnableStatus.ENABLED.value().equals(handVo.getBatteryStatus());
boolean shouldAlarm = batteryPercentage < threshold;
LocalDateTime now = LocalDateTime.now();
// 从正常变为报警
if (shouldAlarm && !isCurrentlyAlarming) {
HandAlarmSaveReqVO newAlarm = new HandAlarmSaveReqVO();
newAlarm.setDetectorId(handVo.getId());
newAlarm.setSn(handVo.getSn());
newAlarm.setAlarmType(AlarmType.BATTERY.getType());
newAlarm.setTAlarmStart(now);
newAlarm.setVAlarmFirst((double) batteryPercentage);
newAlarm.setPicX(handVo.getLatitude());
newAlarm.setPicY(handVo.getLongitude());
newAlarm.setTenantId(Math.toIntExact(handVo.getTenantId()));
newAlarm.setCreator("system");
newAlarm.setCreateTime(now);
context.gasAlarmsToCreate.add(newAlarm);
handVo.setBatteryStatus(EnableStatus.ENABLED.value());
} else if (!shouldAlarm && isCurrentlyAlarming && handVo.getBatteryStatusAlarmId() != null) {
// 从报警恢复正常
HandAlarmDO alarmToEnd = new HandAlarmDO();
alarmToEnd.setId(handVo.getBatteryStatusAlarmId());
alarmToEnd.setTAlarmEnd(now);
alarmToEnd.setUpdater("system");
alarmToEnd.setUpdateTime(now);
context.gasAlarmsToUpdate.add(alarmToEnd);
handVo.setBatteryStatus(EnableStatus.DISABLED.value());
handVo.setBatteryStatusAlarmId(null);
}
} catch (Exception e) {
log.error("[电池报警] 处理失败,SN: {}", handVo.getSn(), e);
}
}
/**
* 围栏报警处理
*/
private void processFenceAlarm(HandDataVo handVo, BatchContext context) {
if (StringUtils.isBlank(handVo.getFenceIds())) {
handVo.setFenceStatus(HandAlarmType.NORMAL.getType());
return;
}
List<Geofence> fenceList = context.fenceCache.get(handVo.getId());
if (fenceList == null || fenceList.isEmpty()) {
handVo.setFenceStatus(HandAlarmType.NORMAL.getType());
return;
}
FenceType fenceType = FenceType.fromType(handVo.getFenceType());
if (null == fenceType) {
log.error("[围栏报警] 围栏类型无效,SN: {}", handVo.getSn());
return;
}
boolean isInside = GeofenceUtils.isInsideAnyFence(
handVo.getLongitude(),
handVo.getLatitude(),
fenceList
);
Boolean isViolating = switch (fenceType) {
case INSIDE -> isInside;
case OUTSIDE -> !isInside;
default -> null;
};
if (isViolating != null) {
handleFenceAlarmLifecycle(isViolating, handVo, fenceType, fenceList, context);
}
}
/**
* 处理围栏报警生命周期
*/
private void handleFenceAlarmLifecycle(boolean isViolating, HandDataVo handVo,
FenceType fenceType, List<Geofence> fenceList, BatchContext context) {
boolean hasOngoingAlarm = handVo.getFenceAlarmId() != null;
LocalDateTime now = LocalDateTime.now();
if (isViolating) {
double distance = GeofenceUtils.fenceDistance(handVo, fenceType, fenceList);
double roundedDistance = Math.round(distance * 100.0) / 100.0;
if (!hasOngoingAlarm) {
// 触发新报警
FenceAlarmSaveReqVO newAlarm = new FenceAlarmSaveReqVO();
newAlarm.setDetectorId(handVo.getId());
newAlarm.setSn(handVo.getSn());
newAlarm.setType(fenceType.getType());
newAlarm.setTAlarmStart(now);
newAlarm.setPicX(handVo.getLongitude());
newAlarm.setPicY(handVo.getLatitude());
newAlarm.setDistance(roundedDistance);
newAlarm.setMaxDistance(roundedDistance);
newAlarm.setTenantId(handVo.getTenantId());
newAlarm.setCreator("system");
newAlarm.setCreateTime(now);
context.fenceAlarmsToCreate.add(newAlarm);
context.pendingFenceAlarmIds.put(handVo.getSn(), newAlarm);
handVo.setDistance(roundedDistance);
handVo.setMaxDistance(roundedDistance);
} else {
// 更新持续报警
handVo.setDistance(roundedDistance);
FenceAlarmDO alarmToUpdate = new FenceAlarmDO();
alarmToUpdate.setId(handVo.getFenceAlarmId());
alarmToUpdate.setDistance(roundedDistance);
if (handVo.getMaxDistance() == null || roundedDistance > handVo.getMaxDistance()) {
alarmToUpdate.setMaxDistance(roundedDistance);
handVo.setMaxDistance(roundedDistance);
}
alarmToUpdate.setUpdater("system");
alarmToUpdate.setUpdateTime(now);
context.fenceAlarmsToUpdate.add(alarmToUpdate);
}
handVo.setFenceStatus(HandAlarmType.ALARM.getType());
} else {
// 结束报警
if (hasOngoingAlarm) {
FenceAlarmDO alarmToEnd = new FenceAlarmDO();
alarmToEnd.setId(handVo.getFenceAlarmId());
alarmToEnd.setTAlarmEnd(now);
alarmToEnd.setStatus(EnableStatus.HANDLE.value());
alarmToEnd.setUpdater("system");
alarmToEnd.setUpdateTime(now);
context.fenceAlarmsToUpdate.add(alarmToEnd);
handVo.setFenceAlarmId(null);
handVo.setDistance(null);
handVo.setMaxDistance(null);
}
handVo.setFenceStatus(HandAlarmType.NORMAL.getType());
}
}
// ========== 工具方法 ==========
/**
* 获取报警规则
*/
private AlarmRuleDO getAlarmRule(HandDataVo handVo, Map<Long, List<AlarmRuleDO>> ruleMap) {
if (handVo.getValue() == null || ruleMap == null) {
return null;
}
double gasValue = handVo.getValue();
Long gasTypeId = handVo.getGasTypeId();
List<AlarmRuleDO> rules = ruleMap.get(gasTypeId);
// 兼容 Redis 反序列化的 String key
if (rules == null && gasTypeId != null) {
rules = ruleMap.get(gasTypeId.toString());
}
if (rules == null || rules.isEmpty()) {
return null;
}
AlarmRuleDO result = null;
for (AlarmRuleDO rule : rules) {
boolean inRange = (rule.getMin() == null || rule.getMin() <= gasValue)
&& (rule.getMax() == null || gasValue <= rule.getMax());
if (inRange) {
if (result == null || rule.getAlarmLevel() > result.getAlarmLevel()) {
result = rule;
}
}
}
return result;
}
/**
* 判断是否需要更新最大值
*/
private boolean shouldUpdateMaxValue(Double currentValue, Double maxValue, Integer direction) {
if (currentValue == null || maxValue == null || direction == null) {
return false;
}
return (MaxDirection.DOWN.value().equals(direction) && currentValue < maxValue)
|| (MaxDirection.UP.value().equals(direction) && currentValue > maxValue);
}
/**
* 发送报警消息异步
*/
private void sendAlarmMessage(HandDataVo handVo, String gasName, Double value,
boolean isAlarming, BatchContext context) {
String valueStr = (value != null && value % 1 == 0)
? String.valueOf(value.intValue())
: String.valueOf(value);
String statusText = isAlarming ? "报警" : "报警结束";
String msgContent = String.format("%s%s,%s气体浓度为%s",
handVo.getName(), statusText, gasName, valueStr);
// 记录报警消息日志
AlarmMessageLog log = new AlarmMessageLog();
log.setDetectorId(handVo.getId());
log.setHolderName(handVo.getName());
log.setSn(handVo.getSn());
log.setDeptId(handVo.getDeptId());
log.setTenantId(handVo.getTenantId());
log.setMessage(msgContent);
log.setRemark("系统自动触发报警推送");
try {
List<String> targetSns = handDetectorService.getSnListByDept(
handVo.getDeptId(),
handVo.getTenantId(),
handVo.getSn()
);
if (targetSns != null && !targetSns.isEmpty()) {
log.setPushSnList(String.join(",", targetSns));
// 异步推送 MQTT 消息
publishAlarmToMqtt(targetSns, msgContent);
}
} catch (Exception e) {
this.log.error("[报警推送] 准备推送数据失败", e);
}
context.alarmMessageLogs.add(log);
}
/**
* 异步推送 MQTT 报警消息
*/
private void publishAlarmToMqtt(List<String> targetSns, String message) {
alarmExecutor.execute(() -> {
Map<String, String> payload = Map.of("message", message);
try {
String json = OBJECT_MAPPER.writeValueAsString(payload);
for (String sn : targetSns) {
if (StringUtils.isBlank(sn)) continue;
try {
String topic = sn + "/zds_down";
client.publish(topic, json);
} catch (Exception e) {
log.error("[MQTT推送] 失败,SN: {}", sn, e);
}
}
} catch (JsonProcessingException e) {
log.error("[MQTT推送] JSON序列化失败", e);
}
});
}
/**
* 批量更新 Redis
*/
private void batchUpdateRedis(Map<String, HandDataVo> updates, Map<String, Long> snToTenantMap) {
// 按租户分组
Map<Long, Map<String, HandDataVo>> updatesByTenant = updates.entrySet().stream()
.filter(e -> snToTenantMap.containsKey(e.getKey()))
.collect(Collectors.groupingBy(
e -> snToTenantMap.get(e.getKey()),
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)
));
// 按租户批量更新
updatesByTenant.forEach((tenantId, deviceMap) -> {
try {
handDetectorService.batchUpdateRedisData(tenantId, deviceMap);
} catch (Exception e) {
log.error("[Redis更新] 失败,tenantId: {}", tenantId, e);
}
});
}
/**
* 创建原始日志对象
*/
private HandOriginalLog createOriginalLog(String sn, String payload, Long tenantId) {
HandOriginalLog log = new HandOriginalLog();
log.setSn(sn);
log.setPayload(payload);
log.setTenantId(tenantId);
log.setTs(new Timestamp(System.currentTimeMillis()));
return log;
}
/**
* 创建处理后的日志对象
*/
private TdengineDataVo createTdengineDataVo(HandDataVo handVo) {
TdengineDataVo vo = BeanUtils.toBean(handVo, TdengineDataVo.class);
vo.setTenantId(handVo.getTenantId());
vo.setTs(new Timestamp(System.currentTimeMillis()));
return vo;
}
// ========== 内部类:批处理上下文 ==========
/**
* 批处理上下文存储整个批次的数据
*/
private static class BatchContext {
// 基础数据
Map<String, Long> snToTenantMap = new HashMap<>();
Map<String, HandDataVo> snToDeviceMap = new HashMap<>();
Map<Long, Map<Long, List<AlarmRuleDO>>> tenantAlarmRules = new HashMap<>();
Map<Long, List<Geofence>> fenceCache = new HashMap<>();
// 待保存的日志
List<HandOriginalLog> originalLogs = new ArrayList<>();
List<TdengineDataVo> processedLogs = new ArrayList<>();
List<AlarmMessageLog> alarmMessageLogs = new ArrayList<>();
// 待保存的报警
List<HandAlarmSaveReqVO> gasAlarmsToCreate = new ArrayList<>();
List<HandAlarmDO> gasAlarmsToUpdate = new ArrayList<>();
List<FenceAlarmSaveReqVO> fenceAlarmsToCreate = new ArrayList<>();
List<FenceAlarmDO> fenceAlarmsToUpdate = new ArrayList<>();
// 待回填的ID(用于新创建的报警记录)
Map<String, HandAlarmSaveReqVO> pendingAlarmIds = new HashMap<>();
Map<String, FenceAlarmSaveReqVO> pendingFenceAlarmIds = new HashMap<>();
// 待更新的 Redis 数据
Map<String, HandDataVo> redisUpdates = new HashMap<>();
boolean isEmpty() {
return snToDeviceMap.isEmpty();
}
}
}

116
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java

@ -27,6 +27,7 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@Component
@ -39,6 +40,9 @@ public class DeviceMessageProcessor {
private TdengineBatchConfig<HandOriginalLog> tdengineBatchProcessor;
@Resource
private TdengineBatchConfig<TdengineDataVo> tdengineBatchConfig;
@Resource
private TdengineBatchConfig<AlarmMessageLog> alarmProcessor;
@Resource
private HandDetectorService handDetectorService;
@Resource
@ -54,9 +58,9 @@ public class DeviceMessageProcessor {
@Resource
private Client client;
@Resource(name = "mqttAlarmExecutor")
private TaskExecutor alarmExecutor;
public void process(String topic, String payload) {
log.debug("[设备上报] 开始处理 -> 主题: {}, 内容: {}", topic, payload);
@ -70,73 +74,52 @@ public class DeviceMessageProcessor {
// 保存原始日志
logSave(topic, payload, tenantId);
// topic 即为设备 sn,是锁的最佳 key
String deviceProcessLockKey = RedisKeyUtil.getDeviceProcessLockKey(tenantId, topic);
RLock lock = redissonClient.getLock(deviceProcessLockKey);
try {
// 尝试在 10 秒内获取锁,防止因锁等待导致 Kafka 消费者线程长时间阻塞
if (!lock.tryLock(10, TimeUnit.SECONDS)) {
log.warn("未能获取设备 {} 的处理锁,消息将被丢弃或重试。", topic);
// 抛出异常,外层逻辑可以捕获并发送到死信队列
throw new RuntimeException("获取设备锁超时: " + topic);
// 从 Redis 中获取设备信息
Object meterObj = redisUtil.hget(RedisKeyUtil.getTenantDeviceHashKey(tenantId), topic);
HandDataVo detector;
if (meterObj == null) { // 缓存未命中
// 从数据库查询 DO 对象
HandDetectorDO one = handDetectorService.getBySn(topic);
if (one == null) {
log.warn("[数据不一致] 在租户 {} 的设备列表中未找到 SN {} 的详细信息。", tenantId, topic);
return; // 直接返回,无需再操作
}
try {
// 从 Redis 中获取设备信息
Object meterObj = redisUtil.hget(RedisKeyUtil.getTenantDeviceHashKey(tenantId), topic);
HandDataVo detector;
if (meterObj == null) { // 缓存未命中
// 从数据库查询 DO 对象
HandDetectorDO one = handDetectorService.getBySn(topic);
if (one == null) {
log.warn("[数据不一致] 在租户 {} 的设备列表中未找到 SN {} 的详细信息。", tenantId, topic);
return; // 直接返回,无需再操作
}
detector = BeanUtils.toBean(one, HandDataVo.class);
} else { // 缓存命中
detector = BeanUtils.toBean(meterObj, HandDataVo.class);
}
detector = BeanUtils.toBean(one, HandDataVo.class);
} else { // 缓存命中
detector = BeanUtils.toBean(meterObj, HandDataVo.class);
}
if (EnableStatus.DISABLED.value().equals(detector.getEnableStatus())) {
log.info("未启用的手持探测器 sn: {}", topic);
return;
}
if (EnableStatus.DISABLED.value().equals(detector.getEnableStatus())) {
log.info("未启用的手持探测器 sn: {}", topic);
return;
}
// 数据解析与转换
HandDataVo handVo = dataConvert(topic, payload, detector);
// 数据解析与转换
HandDataVo handVo = dataConvert(topic, payload, detector);
// 获取气种的报警规则
Map<Long, List<AlarmRuleDO>> ruleMap = alarmRuleService.selectCacheListMap(tenantId);
AlarmRuleDO alarmRule = getAlarmRule(handVo, ruleMap);
// 获取气种的报警规则
Map<Long, List<AlarmRuleDO>> ruleMap = alarmRuleService.selectCacheListMap(tenantId);
AlarmRuleDO alarmRule = getAlarmRule(handVo, ruleMap);
// 气体报警逻辑处理
HandDataVo vo = gasHandAlarm(alarmRule, handVo);
// 气体报警保存
saveGasAlarm(vo, alarmRule);
// 气体报警逻辑处理
HandDataVo vo = gasHandAlarm(alarmRule, handVo);
// 气体报警保存
saveGasAlarm(vo, alarmRule);
//围栏报警逻辑
fanceAlarm(handVo);
//围栏报警逻辑(高并发情况下尚未优化)
fanceAlarm(handVo);
//电量报警逻辑
batteryAlarm(handVo);
//电量报警逻辑
//batteryAlarm(handVo);
// 无论是否发生告警,设备的状态(如电量、位置、最新值)都需要更新
handDetectorService.updateRedisData(tenantId, topic, handVo);
handLogSave(handVo);
// 无论是否发生告警,设备的状态(如电量、位置、最新值)都需要更新
handDetectorService.updateRedisData(tenantId, topic, handVo);
handLogSave(handVo);
} finally {
// 确保锁一定被释放
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("获取设备 {} 的锁时被中断", topic, e);
throw new RuntimeException("处理设备消息时线程被中断: " + topic, e);
}
}
private void handLogSave(HandDataVo handVo) {
TdengineDataVo bean = BeanUtils.toBean(handVo, TdengineDataVo.class);
bean.setTenantId(handVo.getTenantId());
@ -458,7 +441,6 @@ public class DeviceMessageProcessor {
return redisData;
}
String sn = redisData.getSn();
String gasName = alarmRule.getGasTypeName();
LocalDateTime now = LocalDateTime.now();
@ -539,7 +521,6 @@ public class DeviceMessageProcessor {
* @param isAlarming true=报警中, false=报警结束
*/
private void sendGroupMessage(HandDataVo redisData, String gasName, Double value, boolean isAlarming) {
if (null == redisData.getDeptId()) return;
// 1. 准备数据(在主线程做,只做一次)
String valueStr = (value != null && value % 1 == 0) ? String.valueOf(value.intValue()) : String.valueOf(value);
@ -558,12 +539,11 @@ public class DeviceMessageProcessor {
}
// 2. 查人(查同组所有设备的SN)
List<HandDetectorDO> listAll = handDetectorService.getListAll(redisData.getDeptId(), redisData.getTenantId(),redisData.getSn());
List<String> listAll = handDetectorService.getSnListByDept(redisData.getDeptId(), redisData.getTenantId(), redisData.getSn());
if (listAll == null || listAll.isEmpty()) return;
alarmExecutor.execute(() -> {
for (HandDetectorDO device : listAll) {
String deviceSn = device.getSn();
for (String deviceSn : listAll) {
if (deviceSn == null || deviceSn.isEmpty()) continue;
String topic = deviceSn + "/zds_down";
@ -574,6 +554,18 @@ public class DeviceMessageProcessor {
}
}
});
//报警推送消息
AlarmMessageLog alarmMessageLog = new AlarmMessageLog();
alarmMessageLog.setDetectorId(redisData.getId());
alarmMessageLog.setHolderName(redisData.getName());
alarmMessageLog.setSn(redisData.getSn());
alarmMessageLog.setDeptId(redisData.getDeptId());
alarmMessageLog.setTenantId(redisData.getTenantId());
alarmMessageLog.setMessage(msgContent);
alarmMessageLog.setPushSnList(StringUtils.join(listAll, ","));
alarmMessageLog.setMessage(msgContent);
alarmMessageLog.setRemark("系统自动触发报警推送");
alarmProcessor.addToBatch(alarmMessageLog);
}
/**

107
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/AlarmMessageController.java

@ -1,107 +0,0 @@
package cn.iocoder.yudao.module.hand.controller.admin;
import cn.iocoder.yudao.module.hand.dal.AlarmMessageDO;
import cn.iocoder.yudao.module.hand.service.AlarmMessageService;
import cn.iocoder.yudao.module.hand.vo.AlarmMessagePageReqVO;
import cn.iocoder.yudao.module.hand.vo.AlarmMessageRespVO;
import cn.iocoder.yudao.module.hand.vo.AlarmMessageSaveReqVO;
import org.springframework.web.bind.annotation.*;
import jakarta.annotation.Resource;
import org.springframework.validation.annotation.Validated;
import org.springframework.security.access.prepost.PreAuthorize;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Operation;
import jakarta.validation.constraints.*;
import jakarta.validation.*;
import jakarta.servlet.http.*;
import java.util.*;
import java.io.IOException;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
import cn.iocoder.yudao.framework.excel.core.util.ExcelUtils;
import cn.iocoder.yudao.framework.apilog.core.annotation.ApiAccessLog;
import static cn.iocoder.yudao.framework.apilog.core.enums.OperateTypeEnum.*;
@Tag(name = "管理后台 - GAS手持探测器推送")
@RestController
@RequestMapping("/gas/alarm-message")
@Validated
public class AlarmMessageController {
@Resource
private AlarmMessageService alarmMessageService;
@PostMapping("/create")
@Operation(summary = "创建GAS手持探测器推送")
@PreAuthorize("@ss.hasPermission('gas:alarm-message:create')")
public CommonResult<Long> createAlarmMessage(@Valid @RequestBody AlarmMessageSaveReqVO createReqVO) {
return success(alarmMessageService.createAlarmMessage(createReqVO));
}
@PutMapping("/update")
@Operation(summary = "更新GAS手持探测器推送")
@PreAuthorize("@ss.hasPermission('gas:alarm-message:update')")
public CommonResult<Boolean> updateAlarmMessage(@Valid @RequestBody AlarmMessageSaveReqVO updateReqVO) {
alarmMessageService.updateAlarmMessage(updateReqVO);
return success(true);
}
@DeleteMapping("/delete")
@Operation(summary = "删除GAS手持探测器推送")
@Parameter(name = "id", description = "编号", required = true)
@PreAuthorize("@ss.hasPermission('gas:alarm-message:delete')")
public CommonResult<Boolean> deleteAlarmMessage(@RequestParam("id") Long id) {
alarmMessageService.deleteAlarmMessage(id);
return success(true);
}
@DeleteMapping("/delete-list")
@Parameter(name = "ids", description = "编号", required = true)
@Operation(summary = "批量删除GAS手持探测器推送")
@PreAuthorize("@ss.hasPermission('gas:alarm-message:delete')")
public CommonResult<Boolean> deleteAlarmMessageList(@RequestParam("ids") List<Long> ids) {
alarmMessageService.deleteAlarmMessageListByIds(ids);
return success(true);
}
@GetMapping("/get")
@Operation(summary = "获得GAS手持探测器推送")
@Parameter(name = "id", description = "编号", required = true, example = "1024")
@PreAuthorize("@ss.hasPermission('gas:alarm-message:query')")
public CommonResult<AlarmMessageRespVO> getAlarmMessage(@RequestParam("id") Long id) {
AlarmMessageDO alarmMessage = alarmMessageService.getAlarmMessage(id);
return success(BeanUtils.toBean(alarmMessage, AlarmMessageRespVO.class));
}
@GetMapping("/page")
@Operation(summary = "获得GAS手持探测器推送分页")
@PreAuthorize("@ss.hasPermission('gas:alarm-message:query')")
public CommonResult<PageResult<AlarmMessageRespVO>> getAlarmMessagePage(@Valid AlarmMessagePageReqVO pageReqVO) {
PageResult<AlarmMessageDO> pageResult = alarmMessageService.getAlarmMessagePage(pageReqVO);
return success(BeanUtils.toBean(pageResult, AlarmMessageRespVO.class));
}
@GetMapping("/export-excel")
@Operation(summary = "导出GAS手持探测器推送 Excel")
@PreAuthorize("@ss.hasPermission('gas:alarm-message:export')")
@ApiAccessLog(operateType = EXPORT)
public void exportAlarmMessageExcel(@Valid AlarmMessagePageReqVO pageReqVO,
HttpServletResponse response) throws IOException {
pageReqVO.setPageSize(PageParam.PAGE_SIZE_NONE);
List<AlarmMessageDO> list = alarmMessageService.getAlarmMessagePage(pageReqVO).getList();
// 导出 Excel
ExcelUtils.write(response, "GAS手持探测器推送.xls", "数据", AlarmMessageRespVO.class,
BeanUtils.toBean(list, AlarmMessageRespVO.class));
}
}

7
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandTdengineController.java

@ -70,4 +70,11 @@ public class HandTdengineController {
return success(list);
}
@GetMapping("/alarmMessagePage")
@Operation(summary = "报警推送记录")
@PreAuthorize("@ss.hasPermission('gas:hand-td:alarmMessagePage')")
public CommonResult<PageResult<AlarmMessageLog>> alarmMessagePage(@Valid HandTdenginePageVO po) {
PageResult<AlarmMessageLog> list = tdengineService.getAlarmMessageLog(po);
return success(list);
}
}

33
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/AlarmMessageMapper.java

@ -1,33 +0,0 @@
package cn.iocoder.yudao.module.hand.mapper;
import java.util.*;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX;
import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX;
import cn.iocoder.yudao.module.hand.dal.AlarmMessageDO;
import cn.iocoder.yudao.module.hand.vo.AlarmMessagePageReqVO;
import org.apache.ibatis.annotations.Mapper;
/**
* GAS手持探测器推送 Mapper
*
* @author 超级管理员
*/
@Mapper
public interface AlarmMessageMapper extends BaseMapperX<AlarmMessageDO> {
default PageResult<AlarmMessageDO> selectPage(AlarmMessagePageReqVO reqVO) {
return selectPage(reqVO, new LambdaQueryWrapperX<AlarmMessageDO>()
.eqIfPresent(AlarmMessageDO::getDetectorId, reqVO.getDetectorId())
.likeIfPresent(AlarmMessageDO::getName, reqVO.getName())
.eqIfPresent(AlarmMessageDO::getSn, reqVO.getSn())
.eqIfPresent(AlarmMessageDO::getMessage, reqVO.getMessage())
.eqIfPresent(AlarmMessageDO::getPushSnList, reqVO.getPushSnList())
.eqIfPresent(AlarmMessageDO::getRemark, reqVO.getRemark())
.eqIfPresent(AlarmMessageDO::getDeptId, reqVO.getDeptId())
.betweenIfPresent(AlarmMessageDO::getCreateTime, reqVO.getCreateTime())
.orderByDesc(AlarmMessageDO::getId));
}
}

17
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/mapper/TdengineMapper.java

@ -1,10 +1,7 @@
package cn.iocoder.yudao.module.hand.mapper;
import cn.iocoder.yudao.module.hand.vo.HandOriginalLog;
import cn.iocoder.yudao.module.hand.vo.HandTdenginePageVO;
import cn.iocoder.yudao.module.hand.vo.HandTdenginePo;
import cn.iocoder.yudao.module.hand.vo.TdengineDataVo;
import cn.iocoder.yudao.module.hand.vo.*;
import com.baomidou.mybatisplus.annotation.InterceptorIgnore; // 确保导入这个注解
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.Mapper;
@ -17,13 +14,12 @@ import java.util.List;
public interface TdengineMapper {
@InterceptorIgnore(tenantLine = "true")
void insertHandLogBatch(@Param("sn") String sn,@Param("tenantId")Long tenantId,
@Param("logList") List<HandOriginalLog> groupedLogs);
void insertHandLogBatch(@Param("logList") List<HandOriginalLog> logList);
@InterceptorIgnore(tenantLine = "true")
void saveDataLogBatch(@Param("sn")String sn,@Param("tenantId")Long tenantId,
@Param("dataVoList")List<TdengineDataVo> dataVoList);
void saveDataLogBatch(@Param("dataVoList") List<TdengineDataVo> dataVoList);
IPage<TdengineDataVo> selectPage(IPage<TdengineDataVo> page,@Param("vo") HandTdenginePageVO vo);
@ -32,4 +28,9 @@ public interface TdengineMapper {
@InterceptorIgnore(tenantLine = "true")
List<TdengineDataVo> HistoricalSn(@Param("po") HandTdenginePo po);
@InterceptorIgnore(tenantLine = "true")
void saveMeaagesLogList(List<AlarmMessageLog> list);
IPage<AlarmMessageLog> selectMessagePage(IPage<AlarmMessageLog> page,@Param("vo") HandTdenginePageVO vo);
}

67
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/AlarmMessageService.java

@ -1,67 +0,0 @@
package cn.iocoder.yudao.module.hand.service;
import java.util.*;
import cn.iocoder.yudao.module.hand.dal.AlarmMessageDO;
import cn.iocoder.yudao.module.hand.dal.HandDetectorDO;
import cn.iocoder.yudao.module.hand.vo.AlarmMessagePageReqVO;
import cn.iocoder.yudao.module.hand.vo.AlarmMessageSaveReqVO;
import cn.iocoder.yudao.module.hand.vo.HandDataVo;
import jakarta.validation.*;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
/**
* GAS手持探测器推送 Service 接口
*
* @author 超级管理员
*/
public interface AlarmMessageService {
/**
* 创建GAS手持探测器推送
*
* @param createReqVO 创建信息
* @return 编号
*/
Long createAlarmMessage(@Valid AlarmMessageSaveReqVO createReqVO);
/**
* 更新GAS手持探测器推送
*
* @param updateReqVO 更新信息
*/
void updateAlarmMessage(@Valid AlarmMessageSaveReqVO updateReqVO);
/**
* 删除GAS手持探测器推送
*
* @param id 编号
*/
void deleteAlarmMessage(Long id);
/**
* 批量删除GAS手持探测器推送
*
* @param ids 编号
*/
void deleteAlarmMessageListByIds(List<Long> ids);
/**
* 获得GAS手持探测器推送
*
* @param id 编号
* @return GAS手持探测器推送
*/
AlarmMessageDO getAlarmMessage(Long id);
/**
* 获得GAS手持探测器推送分页
*
* @param pageReqVO 分页查询
* @return GAS手持探测器推送分页
*/
PageResult<AlarmMessageDO> getAlarmMessagePage(AlarmMessagePageReqVO pageReqVO);
void createAlarmRecord(HandDataVo redisData, List<HandDetectorDO> listAll, String msgContent);
}

4
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java

@ -64,4 +64,8 @@ public interface FenceAlarmService {
void update( FenceAlarmDO updateReqVO);
void batchCreateFenceAlarm(List<FenceAlarmSaveReqVO> fenceAlarmsToCreate);
void batchUpdateById(List<FenceAlarmDO> fenceAlarmsToUpdate);
}

4
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java

@ -63,4 +63,8 @@ public interface HandAlarmService {
int updateById(HandAlarmDO alarm);
void insertBatch(List<HandAlarmDO> doList);
void batchCreateHandAlarm(List<HandAlarmSaveReqVO> gasAlarmsToCreate);
void batchUpdateById(List<HandAlarmDO> gasAlarmsToUpdate);
}

5
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandDetectorService.java

@ -75,5 +75,8 @@ public interface HandDetectorService {
void dataMigrate(HandDetectorSaveReqVO updateReqVO);
List<HandDetectorDO> getListAll(Long deptId,Long tenantId, String sn);
List<String> getSnListByDept(Long deptId, Long tenantId, String excludeSn);
void batchUpdateRedisData(Long tenantId, Map<String, HandDataVo> deviceMap);
}

12
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/TdengineService.java

@ -1,10 +1,8 @@
package cn.iocoder.yudao.module.hand.service;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.hand.vo.HandOriginalLog;
import cn.iocoder.yudao.module.hand.vo.HandTdenginePageVO;
import cn.iocoder.yudao.module.hand.vo.HandTdenginePo;
import cn.iocoder.yudao.module.hand.vo.TdengineDataVo;
import cn.iocoder.yudao.module.hand.dal.HandDetectorDO;
import cn.iocoder.yudao.module.hand.vo.*;
import java.util.List;
@ -21,4 +19,10 @@ public interface TdengineService {
List<TdengineDataVo> HistoricalSn(HandTdenginePo po);
void createAlarmRecord(List<AlarmMessageLog> alarmMessageLogList);
PageResult<AlarmMessageLog> getAlarmMessageLog(HandTdenginePageVO pageReqVO);
}

117
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/AlarmMessageServiceImpl.java

@ -1,117 +0,0 @@
package cn.iocoder.yudao.module.hand.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.hand.dal.AlarmMessageDO;
import cn.iocoder.yudao.module.hand.dal.HandDetectorDO;
import cn.iocoder.yudao.module.hand.mapper.AlarmMessageMapper;
import cn.iocoder.yudao.module.hand.service.AlarmMessageService;
import cn.iocoder.yudao.module.hand.vo.AlarmMessagePageReqVO;
import cn.iocoder.yudao.module.hand.vo.AlarmMessageSaveReqVO;
import cn.iocoder.yudao.module.hand.vo.HandDataVo;
import org.springframework.stereotype.Service;
import jakarta.annotation.Resource;
import org.springframework.validation.annotation.Validated;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.diffList;
import static cn.iocoder.yudao.module.hand.enums.ErrorCodeConstants.ALARM_MESSAGE_NOT_EXISTS;
/**
* GAS手持探测器推送 Service 实现类
*
* @author 超级管理员
*/
@Service
@Validated
public class AlarmMessageServiceImpl implements AlarmMessageService {
@Resource
private AlarmMessageMapper alarmMessageMapper;
@Override
public Long createAlarmMessage(AlarmMessageSaveReqVO createReqVO) {
// 插入
AlarmMessageDO alarmMessage = BeanUtils.toBean(createReqVO, AlarmMessageDO.class);
alarmMessageMapper.insert(alarmMessage);
// 返回
return alarmMessage.getId();
}
@Override
public void updateAlarmMessage(AlarmMessageSaveReqVO updateReqVO) {
// 校验存在
validateAlarmMessageExists(updateReqVO.getId());
// 更新
AlarmMessageDO updateObj = BeanUtils.toBean(updateReqVO, AlarmMessageDO.class);
alarmMessageMapper.updateById(updateObj);
}
@Override
public void deleteAlarmMessage(Long id) {
// 校验存在
validateAlarmMessageExists(id);
// 删除
alarmMessageMapper.deleteById(id);
}
@Override
public void deleteAlarmMessageListByIds(List<Long> ids) {
// 删除
alarmMessageMapper.deleteByIds(ids);
}
private void validateAlarmMessageExists(Long id) {
if (alarmMessageMapper.selectById(id) == null) {
throw exception(ALARM_MESSAGE_NOT_EXISTS);
}
}
@Override
public AlarmMessageDO getAlarmMessage(Long id) {
return alarmMessageMapper.selectById(id);
}
@Override
public PageResult<AlarmMessageDO> getAlarmMessagePage(AlarmMessagePageReqVO pageReqVO) {
return alarmMessageMapper.selectPage(pageReqVO);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void createAlarmRecord(HandDataVo redisData, List<HandDetectorDO> listAll, String msgContent) {
String pushSnListStr = "";
if (CollUtil.isNotEmpty(listAll)) {
pushSnListStr = listAll.stream()
.map(HandDetectorDO::getSn)
.filter(StrUtil::isNotBlank)
.collect(Collectors.joining(","));
}
// 2. 构建实体对象
AlarmMessageDO alarmDO = new AlarmMessageDO();
alarmDO.setDetectorId(redisData.getId());
alarmDO.setName(redisData.getName());
alarmDO.setSn(redisData.getSn());
alarmDO.setTenantId(redisData.getTenantId());
alarmDO.setDeptId(redisData.getDeptId());
alarmDO.setMessage(msgContent);
alarmDO.setPushSnList(pushSnListStr);
alarmDO.setRemark("系统自动触发报警推送");
// 3. 落库
alarmMessageMapper.insert(alarmDO);
}
}

15
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java

@ -15,7 +15,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils; // 假设您使用的是若依/yudao框架的工具类
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
@ -93,4 +93,17 @@ public class FenceAlarmServiceImpl implements FenceAlarmService {
fenceAlarmMapper.updateById(updateReqVO);
}
@Override
public void batchCreateFenceAlarm(List<FenceAlarmSaveReqVO> fenceAlarmsToCreate) {
List<FenceAlarmDO> fenceAlarmDOs = BeanUtils.toBean(fenceAlarmsToCreate, FenceAlarmDO.class);
fenceAlarmMapper.insertBatch(fenceAlarmDOs);
}
@Override
public void batchUpdateById(List<FenceAlarmDO> fenceAlarmsToUpdate) {
fenceAlarmMapper.updateBatch(fenceAlarmsToUpdate);
}
}

16
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java

@ -96,4 +96,20 @@ public class HandAlarmServiceImpl implements HandAlarmService {
handAlarmMapper.insertBatch(doList);
}
@Override
@Transactional(rollbackFor = Exception.class)
@TenantIgnore
public void batchCreateHandAlarm(List<HandAlarmSaveReqVO> gasAlarmsToCreate) {
List<HandAlarmDO> bean = BeanUtils.toBean(gasAlarmsToCreate, HandAlarmDO.class);
handAlarmMapper.insertBatch(bean);
}
@Override
@Transactional(rollbackFor = Exception.class)
@TenantIgnore
public void batchUpdateById(List<HandAlarmDO> gasAlarmsToUpdate) {
handAlarmMapper.updateBatch(gasAlarmsToUpdate);
}
}

49
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java

@ -255,6 +255,7 @@ public class HandDetectorServiceImpl implements HandDetectorService {
}
@Override
@TenantIgnore
public List<HandDetectorDO> listAll(QueryWrapper<HandDetectorDO> handDetectorDOQueryWrapper) {
return handDetectorMapper.selectList(handDetectorDOQueryWrapper);
@ -333,17 +334,55 @@ public class HandDetectorServiceImpl implements HandDetectorService {
@Override
@TenantIgnore
public List<HandDetectorDO> getListAll(Long deptId,Long tenantId,String sn) {
public List<String> getSnListByDept(Long deptId, Long tenantId, String excludeSn) {
QueryWrapper<HandDetectorDO> queryWrapper = new QueryWrapper<>();
// 【核心优化1】只查询 sn 字段,不查其他几十个字段,极大减少网络传输和内存占用
queryWrapper.select("sn");
if (deptId != null){
queryWrapper.eq("dept_id", deptId);
}
if (sn != null) {
queryWrapper.ne("sn", sn);
// 排除当前的 SN (excludeSn)
if (excludeSn != null) {
queryWrapper.ne("sn", excludeSn);
}
queryWrapper.eq("tenant_id", tenantId);
return handDetectorMapper.selectList(queryWrapper);
// 【核心优化2】使用 selectObjs
// selectObjs 直接返回第一列的值(List<Object>),性能最高
List<Object> result = handDetectorMapper.selectObjs(queryWrapper);
if (result == null) {
return Collections.emptyList();
}
return result.stream()
.filter(Objects::nonNull)
.map(Object::toString)
.collect(Collectors.toList());
}
@Override
public void batchUpdateRedisData(Long tenantId, Map<String, HandDataVo> deviceMap) {
if (tenantId == null || deviceMap == null || deviceMap.isEmpty()) {
return;
}
try {
String redisKey = RedisKeyUtil.getTenantDeviceHashKey(tenantId);
// 将 Map<String, HandDataVo> 转换为 Map<Object, Object>
Map<Object, Object> redisMap = new HashMap<>(deviceMap);
// 使用 hmset 批量更新
boolean success = redisUtil.hmset(redisKey, redisMap);
if (success) {
log.debug("[Redis批量更新] 成功,租户: {}, 设备数量: {}", tenantId, deviceMap.size());
} else {
log.error("[Redis批量更新] 失败,租户: {}, 设备数量: {}", tenantId, deviceMap.size());
}
} catch (Exception e) {
log.error("[Redis批量更新] 异常,租户: {}, 设备数量: {}", tenantId, deviceMap.size(), e);
}
}
}

89
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java

@ -1,20 +1,21 @@
package cn.iocoder.yudao.module.hand.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.security.core.LoginUser;
import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog;
import cn.iocoder.yudao.module.hand.dal.HandDetectorDO;
import cn.iocoder.yudao.module.hand.mapper.TdengineMapper;
import cn.iocoder.yudao.module.hand.service.TdengineService;
import cn.iocoder.yudao.module.hand.vo.HandOriginalLog;
import cn.iocoder.yudao.module.hand.vo.HandTdenginePageVO;
import cn.iocoder.yudao.module.hand.vo.HandTdenginePo;
import cn.iocoder.yudao.module.hand.vo.TdengineDataVo;
import cn.iocoder.yudao.module.hand.vo.*;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import java.util.List;
@ -26,50 +27,50 @@ import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUti
@Service
@Validated
@DS("tdengine")
@Slf4j
public class TdengineServiceImpl implements TdengineService {
private static final Logger log = LoggerFactory.getLogger(TdengineServiceImpl.class);
@Resource
private TdengineMapper tdengineMapper;
@Override
public void saveHandLogBatch(List<HandOriginalLog> batchList) {
if (batchList.isEmpty()) {
if (batchList == null || batchList.isEmpty()) {
return;
}
// 1. 按设备SN分组
Map<String, List<HandOriginalLog>> groupedBySn = batchList.stream()
.collect(Collectors.groupingBy(HandOriginalLog::getSn));
// 遍历Map,每个Entry对应一个子表
for (Map.Entry<String, List<HandOriginalLog>> entry : groupedBySn.entrySet()) {
String sn = entry.getKey();
List<HandOriginalLog> logs = entry.getValue();
Long tenantId = logs.get(0).getTenantId();
tdengineMapper.insertHandLogBatch(sn, tenantId, logs);
int batchSize = 1000;
// 手动分片逻辑 (如果不引入 Guava)
for (int i = 0; i < batchList.size(); i += batchSize) {
int end = Math.min(i + batchSize, batchList.size());
List<HandOriginalLog> subList = batchList.subList(i, end);
try {
tdengineMapper.insertHandLogBatch(subList);
} catch (Exception e) {
log.error("批量保存原始日志失败", e);
// 可以在这里做简单的重试或死信处理
}
}
}
@Override
public void saveDataLogBatch(List<TdengineDataVo> batchList) {
if (batchList.isEmpty()) {
if (batchList == null || batchList.isEmpty()) {
return;
}
// 1. 按设备SN分组
Map<String, List<TdengineDataVo>> groupedBySn = batchList.stream()
.collect(Collectors.groupingBy(TdengineDataVo::getSn));
// 遍历Map,每个Entry对应一个子表
for (Map.Entry<String, List<TdengineDataVo>> entry : groupedBySn.entrySet()) {
String sn = entry.getKey();
List<TdengineDataVo> logs = entry.getValue();
Long tenantId = logs.get(0).getTenantId();
int batchSize = 1000;
tdengineMapper.saveDataLogBatch(sn, tenantId, logs);
for (int i = 0; i < batchList.size(); i += batchSize) {
int end = Math.min(i + batchSize, batchList.size());
List<TdengineDataVo> subList = batchList.subList(i, end);
try {
tdengineMapper.saveDataLogBatch(subList);
} catch (Exception e) {
log.error("批量保存设备数据日志失败, 本批次数量: {}", subList.size(), e);
}
}
}
@ -118,4 +119,32 @@ public class TdengineServiceImpl implements TdengineService {
}
return tdengineMapper.HistoricalSn(po);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void createAlarmRecord(List<AlarmMessageLog> alarmMessageLogList) {
// 3. 落库
tdengineMapper.saveMeaagesLogList(alarmMessageLogList);
}
@Override
public PageResult<AlarmMessageLog> getAlarmMessageLog(HandTdenginePageVO pageReqVO) {
IPage<AlarmMessageLog> page = new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize());
LoginUser loginUser = getLoginUser();
if (loginUser != null){
pageReqVO.setTenantId(loginUser.getTenantId());
}
IPage<AlarmMessageLog> resultPage = tdengineMapper.selectMessagePage(page, pageReqVO);
List<AlarmMessageLog> doList = resultPage.getRecords();
if (doList == null || doList.isEmpty()) {
return PageResult.empty(resultPage.getTotal());
}
return new PageResult<>(doList, resultPage.getTotal());
}
}

8
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisKeyUtil.java

@ -41,13 +41,7 @@ public class RedisKeyUtil {
return DEVICE_INFO_PREFIX + ":" + tenantId;
}
/**
* 获取处理锁 Key (加入租户维度更安全)
* 结构示例: lock:device:{tenantId}:{sn}
*/
public static String getDeviceProcessLockKey(Long tenantId, String sn) {
return LOCK_PREFIX + ":" + tenantId + ":" + sn;
}
public static String getDeviceTenantMappingKey() {
return DEVICE_TENANT_MAPPING_KEY;

4
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/util/RedisUtil.java

@ -187,6 +187,10 @@ public class RedisUtil {
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
public List<Object> hmget(String key, Collection<Object> fields) {
return redisTemplate.opsForHash().multiGet(key, fields);
}
/**
* 获取hashKey对应的所有键值
*

28
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/AlarmMessageDO.java → cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageLog.java

@ -1,40 +1,28 @@
package cn.iocoder.yudao.module.hand.dal;
package cn.iocoder.yudao.module.hand.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import java.util.*;
import java.time.LocalDateTime;
import java.time.LocalDateTime;
import com.baomidou.mybatisplus.annotation.*;
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
import java.sql.Timestamp;
/**
* GAS手持探测器推送 DO
*
* @author 超级管理员
*/
@TableName("gas_alarm_message")
@KeySequence("gas_alarm_message_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AlarmMessageDO extends BaseDO {
public class AlarmMessageLog {
/**
* 主键ID
*/
@TableId
private Long id;
/**
* 手持表id
*/
private Long detectorId;
/**
* 持有人名称
*/
private String name;
private String holderName;
/**
* 设备编号
*/
@ -59,5 +47,9 @@ public class AlarmMessageDO extends BaseDO {
* 租户id
*/
private Long tenantId;
/**
* 更新时间
*/
private Timestamp ts;
}

41
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessagePageReqVO.java

@ -1,41 +0,0 @@
package cn.iocoder.yudao.module.hand.vo;
import lombok.*;
import java.util.*;
import io.swagger.v3.oas.annotations.media.Schema;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import org.springframework.format.annotation.DateTimeFormat;
import java.time.LocalDateTime;
import static cn.iocoder.yudao.framework.common.util.date.DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND;
@Schema(description = "管理后台 - GAS手持探测器推送分页 Request VO")
@Data
public class AlarmMessagePageReqVO extends PageParam {
@Schema(description = "手持表id", example = "10665")
private Long detectorId;
@Schema(description = "持有人名称", example = "王五")
private String name;
@Schema(description = "设备编号")
private String sn;
@Schema(description = "消息")
private String message;
@Schema(description = "推送设备sn,逗号分割")
private String pushSnList;
@Schema(description = "备注", example = "随便")
private String remark;
@Schema(description = "部门id", example = "12286")
private Long deptId;
@Schema(description = "创建时间")
@DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND)
private LocalDateTime[] createTime;
}

51
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageRespVO.java

@ -1,51 +0,0 @@
package cn.iocoder.yudao.module.hand.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import java.util.*;
import org.springframework.format.annotation.DateTimeFormat;
import java.time.LocalDateTime;
import com.alibaba.excel.annotation.*;
@Schema(description = "管理后台 - GAS手持探测器推送 Response VO")
@Data
@ExcelIgnoreUnannotated
public class AlarmMessageRespVO {
@Schema(description = "主键ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "13733")
@ExcelProperty("主键ID")
private Long id;
@Schema(description = "手持表id", example = "10665")
@ExcelProperty("手持表id")
private Long detectorId;
@Schema(description = "持有人名称", example = "王五")
@ExcelProperty("持有人名称")
private String name;
@Schema(description = "设备编号")
@ExcelProperty("设备编号")
private String sn;
@Schema(description = "消息")
@ExcelProperty("消息")
private String message;
@Schema(description = "推送设备sn,逗号分割")
@ExcelProperty("推送设备sn,逗号分割")
private String pushSnList;
@Schema(description = "备注", example = "随便")
@ExcelProperty("备注")
private String remark;
@Schema(description = "部门id", example = "12286")
@ExcelProperty("部门id")
private Long deptId;
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
@ExcelProperty("创建时间")
private LocalDateTime createTime;
}

36
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmMessageSaveReqVO.java

@ -1,36 +0,0 @@
package cn.iocoder.yudao.module.hand.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import java.util.*;
import jakarta.validation.constraints.*;
@Schema(description = "管理后台 - GAS手持探测器推送新增/修改 Request VO")
@Data
public class AlarmMessageSaveReqVO {
@Schema(description = "主键ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "13733")
private Long id;
@Schema(description = "手持表id", example = "10665")
private Long detectorId;
@Schema(description = "持有人名称", example = "王五")
private String name;
@Schema(description = "设备编号")
private String sn;
@Schema(description = "消息")
private String message;
@Schema(description = "推送设备sn,逗号分割")
private String pushSnList;
@Schema(description = "备注", example = "随便")
private String remark;
@Schema(description = "部门id", example = "12286")
private Long deptId;
}

4
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/Geofence.java

@ -6,6 +6,10 @@ import java.util.Collections;
import java.util.List;
@Data
public class Geofence {
private Long id; // <--- 添加 ID 字段
private final List<FencePointVo> vertices;
private final double minX, minY, maxX, maxY; // 包围盒

10
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java

@ -11,6 +11,16 @@ public class HandTdenginePageVO extends PageParam {
private String sn;
/**
* 持有人名称
*/
private String holderName;
/**
* 推送设备sn逗号分割
*/
private String pushSnList;
private Timestamp startTime;
private Timestamp endTime;

2
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java

@ -36,4 +36,6 @@ public class TdengineDataVo {
@Schema(description = "租户id")
private Long tenantId;
}

12
cc-admin-master/yudao-module-hand/src/main/resources/mapper/AlarmMessageMapper.xml

@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.iocoder.yudao.module.hand.mapper.AlarmMessageMapper">
<!--
一般情况下,尽可能使用 Mapper 进行 CRUD 增删改查即可。
无法满足的场景,例如说多表关联查询,才使用 XML 编写 SQL。
代码生成器暂时只生成 Mapper XML 文件本身,更多推荐 MybatisX 快速开发插件来生成查询。
文档可见:https://www.iocoder.cn/MyBatis/x-plugins/
-->
</mapper>

97
cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml

@ -10,27 +10,35 @@
<insert id="insertHandLogBatch">
INSERT INTO hand_original_log_${sn}
USING hand_original_log
TAGS(#{sn}, #{tenantId})
(ts, payload)
VALUES
<foreach collection="logList" item="log" separator=",">
INSERT INTO
<foreach collection="logList" item="log" separator=" ">
<!-- 动态拼接子表名 -->
hand_original_log_${log.sn}
<!-- 指定超级表 -->
USING hand_original_log
<!-- 自动建表所需的 TAGS -->
TAGS(#{log.sn}, #{log.tenantId})
<!-- 插入字段 -->
(ts, payload)
VALUES
(#{log.ts}, #{log.payload})
</foreach>
</insert>
<insert id="saveDataLogBatch">
INSERT INTO device_data_log_#{sn}
USING device_data_log
TAGS(#{sn}, #{tenantId})
(ts, battery, `value`, longitude, latitude, `name`)
VALUES
<foreach collection="dataVoList" item="log" separator=",">
INSERT INTO
<foreach collection="dataVoList" item="log" separator=" ">
<!-- 1. 动态指定子表名 -->
device_data_log_${log.sn}
<!-- 2. 指定超级表 -->
USING device_data_log
<!-- 3. 指定标签 (用于自动建表) -->
TAGS(#{log.sn}, #{log.tenantId})
<!-- 4. 指定列名 -->
(ts, battery, `value`, longitude, latitude, `name`)
<!-- 5. 插入具体值 -->
VALUES
(#{log.ts}, #{log.battery}, #{log.value}, #{log.longitude}, #{log.latitude}, #{log.name})
</foreach>
</insert>
<select id="selectPage" resultType="cn.iocoder.yudao.module.hand.vo.TdengineDataVo">
@ -112,4 +120,63 @@
INTERVAL(1m)
ORDER BY ts ASC
</select>
<insert id="saveMeaagesLogList" parameterType="java.util.List">
INSERT INTO
<foreach collection="list" item="item" separator=" ">
<!-- 1. 动态拼接子表名 -->
`message_log_#{item.sn}`
<!-- 2. 指定超级表 -->
USING alarm_message_log
<!-- 3. 设置标签 (注意:deptId 已移到普通列) -->
TAGS (
#{item.sn},
#{item.detectorId},
#{item.tenantId}
)
<!-- 4. 指定插入的列名 (显式写出来更安全) -->
(ts, holder_name, message, push_sn_list, remark, dept_id)
<!-- 5. 设置值 -->
VALUES (
now, <!-- 使用当前时间 -->
#{item.holderName},
#{item.message},
#{item.pushSnList},
#{item.remark},
#{item.deptId} <!-- 对应 dept_id (普通列) -->
)
</foreach>
</insert>
<select id="selectMessagePage" resultType="cn.iocoder.yudao.module.hand.vo.AlarmMessageLog">
SELECT
ts, sn, detector_id, holder_name, message, push_sn_list, remark, dept_id, tenant_id
FROM
alarm_message_log
<where>
<if test="vo.sn != null and vo.sn != ''">
AND sn = #{vo.sn}
</if>
<if test="vo.tenantId != null">
AND tenant_id = #{vo.tenantId}
</if>
<if test="vo.startTime != null">
AND ts >= #{po.startTime}
</if>
<if test="vo.endTime != null">
AND ts &lt;= #{po.endTime}
</if>
<if test="vo.holderName != null and vo.holderName != ''">
<bind name="holderNamePattern" value="'%' + vo.holderName + '%'" />
AND holder_name LIKE #{holderNamePattern}
</if>
<if test="vo.pushSnList != null and vo.pushSnList != ''">
<!-- 使用 INSTR 函数:查找子串位置,大于0表示存在 -->
AND INSTR(push_sn_list, #{vo.pushSnList}) > 0
</if>
</where>
ORDER BY ts DESC
</select>
</mapper>

11
cc-admin-master/yudao-module-system/pom.xml

@ -23,7 +23,10 @@
<artifactId>yudao-module-infra</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>jakarta.mail</groupId>
<artifactId>jakarta.mail-api</artifactId>
</dependency>
<!-- 业务组件 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
@ -88,6 +91,12 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
<exclusions>
<exclusion>
<groupId>org.eclipse.angus</groupId>
<artifactId>jakarta.mail</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 三方云服务相关 -->

2
cc-admin-master/yudao-server/src/main/resources/application-dev.yaml

@ -235,7 +235,7 @@ mqtt:
topic: +/zds_up,+/zds_down
qos: 1,1
default:
publishQos: 1
publishQos: 0
offlineTime: 180 # 超过 180 秒无数据则判为数据超时
pool:
coreSize: 10

2
cc-admin-master/yudao-server/src/main/resources/application-local.yaml

@ -235,7 +235,7 @@ mqtt:
topic: +/zds_up,+/zds_down
qos: 1,1
default:
publishQos: 1
publishQos: 0
offlineTime: 180 # 超过 180 秒无数据则判为数据超时
pool:
coreSize: 10

2
cc-admin-master/yudao-server/src/main/resources/application-prod.yaml

@ -246,7 +246,7 @@ mqtt:
topic: +/zds_up,+/zds_down
qos: 1,1
default:
publishQos: 1
publishQos: 0
offlineTime: 180 # 超过 180 秒无数据则判为数据超时
pool:
coreSize: 10

2
cc-admin-master/yudao-server/src/main/resources/application.yaml

@ -136,6 +136,8 @@ spring:
consumer:
auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
max-poll-records: 300
fetch-max-wait: 3000 # 等待拉取消息的最大时间(毫秒)
properties:
spring.json.trusted.packages: '*'
group-id: consumer-${spring.application.name} # 消费者分组

Loading…
Cancel
Save