Browse Source

手持表报警逻辑处理

master
wangwei_123 20 hours ago
parent
commit
599f82eeba
  1. 2
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java
  2. 10
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java
  3. 45
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java
  4. 2
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java
  5. 82
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

2
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java

@ -48,7 +48,7 @@ public class KafkaConfig {
factory.setBatchListener(true);
// 【并发设置】根据你的分区数设置,比如你有 3 个分区就设为 3
factory.setConcurrency(3);
factory.setConcurrency(6);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
factory.getContainerProperties().setPollTimeout(3000);

10
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java

@ -24,24 +24,16 @@ import static cn.iocoder.yudao.module.mqtt.kafka.KafkaTopicType.DEAD_LETTER_TOPI
@Slf4j
@Service // <--- 添加这个注解!
public class KafkaMessageConsumer {
private final TaskExecutor taskExecutor;
private final DeviceMessageProcessor deviceMessageProcessor;
private final BatchDeviceMessageProcessor batchDeviceMessageProcessor;
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaMessageConsumer(DeviceMessageProcessor deviceMessageProcessor,
@Qualifier("mqttExecutor") TaskExecutor taskExecutor,
BatchDeviceMessageProcessor batchDeviceMessageProcessor,
KafkaTemplate<String, String> kafkaTemplate) {
BatchDeviceMessageProcessor batchDeviceMessageProcessor) {
this.deviceMessageProcessor = deviceMessageProcessor;
this.taskExecutor = taskExecutor;
this.batchDeviceMessageProcessor = batchDeviceMessageProcessor;
this.kafkaTemplate = kafkaTemplate;
}
// 使用你定义的 batchFactory

45
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java

@ -21,6 +21,22 @@ import java.util.concurrent.BlockingQueue;
@Component
public class Client {
private final OnMessageCallback onMessageCallback; // 1. 注入我们新的 Kafka 生产者回调
// 【修改点2】定义全局通用的发送回调,避免每次发送都创建对象,减少GC
private final IMqttActionListener globalPublishCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// 高并发下,成功通常不打印日志,否则磁盘IO会爆炸
// log.debug("发送成功: {}", asyncActionToken.getTopics());
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// 只有失败才打印日志
log.error("MQTT异步发送失败, Topic: {}",
asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null",
exception);
}
};
// --- 1. 配置注入 (保持不变) ---
@Value("${mqtt.enable:false}")
private Boolean enable;
@ -44,27 +60,9 @@ public class Client {
private int[] subscribeQos;
@Value("${mqtt.default.publishQos:0}")
private int publishQos;
//使用异步客户端接口
private IMqttAsyncClient mqttClient;
// 【修改点2】定义全局通用的发送回调,避免每次发送都创建对象,减少GC
private final IMqttActionListener globalPublishCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// 高并发下,成功通常不打印日志,否则磁盘IO会爆炸
// log.debug("发送成功: {}", asyncActionToken.getTopics());
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// 只有失败才打印日志
log.error("MQTT异步发送失败, Topic: {}",
asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null",
exception);
}
};
public Client(OnMessageCallback onMessageCallback) {
this.onMessageCallback = onMessageCallback;
}
@ -118,9 +116,12 @@ public class Client {
options.setKeepAliveInterval(keepAliveInterval);
options.setCleanSession(cleanSession);
options.setAutomaticReconnect(true);
options.setCleanSession(false);
// 建议设置:超时和保活时间
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
//对于 10w QPS,必须拉满到 MQTT 协议上限 (65535)
//对于 10w QPS,必须拉满到 MQTT 协议上限 (65535)
options.setMaxInflight(65535);
return options;
}
@ -138,6 +139,7 @@ public class Client {
public void onSuccess(IMqttToken asyncActionToken) {
log.info("订阅成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.error("订阅失败", exception);
@ -150,9 +152,6 @@ public class Client {
// --- 发布方法 ---
/**
* 兼容旧代码的重载方法
*/
public void publish(String topic, String payload) {
publishAsync(topic, payload);
}

2
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java

@ -33,7 +33,7 @@ public class ThreadPoolConfig {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(5000); // 队列大一点,应对瞬间并发
executor.setQueueCapacity(10000); // 队列大一点,应对瞬间并发
executor.setThreadNamePrefix("alarm-push-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();

82
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

@ -14,6 +14,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -64,6 +65,9 @@ public class BatchDeviceMessageProcessor {
@Resource(name = "mqttAlarmExecutor")
private TaskExecutor alarmExecutor;
private final RateLimiter mqttRateLimiter = RateLimiter.create(50.0);
/**
* 批量处理 Kafka 消息的主入口
*/
@ -218,13 +222,37 @@ public class BatchDeviceMessageProcessor {
alarmProcessor.addToBatch(context.alarmMessageLogs);
log.debug("[批量持久化] 报警消息: {} 条", context.alarmMessageLogs.size());
}
// 4. 批量创建气体报警
if (!context.gasAlarmsToCreate.isEmpty()) {
// 2. 批量保存
// MybatisPlus 会把生成的 ID 填入 doList 的对象中
handAlarmService.batchCreateHandAlarm(context.gasAlarmsToCreate);
log.debug("[批量持久化] 新增气体报警: {} 条", context.gasAlarmsToCreate.size());
}
// 3. 原有的回填 Redis 逻辑 (现在 alarmVo.getId() 有值了)
context.pendingAlarmIds.forEach((sn, alarmVo) -> {
// 1. 确保数据库确实生成了 ID
if (alarmVo.getId() != null) {
// 2. 找到准备更新到 Redis 的设备对象
HandDataVo redisVo = context.redisUpdates.get(sn);
// 3. 如果 Redis 更新队列里没有,尝试从全局缓存取
if (redisVo == null) {
redisVo = context.snToDeviceMap.get(sn);
if (redisVo != null) {
context.redisUpdates.put(sn, redisVo);
}
}
// 4. 执行回填
if (redisVo != null) {
redisVo.setAlarmId(alarmVo.getId());
// log.info("SN: {} 回填气体报警ID: {}", sn, alarmVo.getId());
}
} else {
log.error("SN: {} 气体报警保存后 ID 仍为空,请检查 Service 层事务或 ID 生成策略", sn);
}
});
}
// 5. 批量更新气体报警
if (!context.gasAlarmsToUpdate.isEmpty()) {
handAlarmService.batchUpdateById(context.gasAlarmsToUpdate);
@ -235,6 +263,23 @@ public class BatchDeviceMessageProcessor {
if (!context.fenceAlarmsToCreate.isEmpty()) {
fenceAlarmService.batchCreateFenceAlarm(context.fenceAlarmsToCreate);
log.debug("[批量持久化] 新增围栏报警: {} 条", context.fenceAlarmsToCreate.size());
// ★★★【核心修改:围栏报警 ID 回填】★★★
context.pendingFenceAlarmIds.forEach((sn, fenceAlarmVo) -> {
if (fenceAlarmVo.getId() != null) {
HandDataVo redisVo = context.redisUpdates.get(sn);
if (redisVo == null) {
redisVo = context.snToDeviceMap.get(sn);
if (redisVo != null) context.redisUpdates.put(sn, redisVo);
}
if (redisVo != null) {
// 假设 HandDataVo 有 setFenceAlarmId 方法,或者业务逻辑复用 setAlarmId
// 请根据你的 HandDataVo 实际字段修改下面这行:
redisVo.setFenceAlarmId(fenceAlarmVo.getId());
}
}
});
}
// 7. 批量更新围栏报警
@ -444,6 +489,7 @@ public class BatchDeviceMessageProcessor {
return;
}
LocalDateTime now = LocalDateTime.now();
// 处理离线报警恢复
@ -455,6 +501,8 @@ public class BatchDeviceMessageProcessor {
alarmToEnd.setId(handVo.getAlarmId());
alarmToEnd.setTAlarmEnd(now);
alarmToEnd.setStatus(EnableStatus.HANDLE.value());
alarmToEnd.setRemark("系统已自动处理报警");
context.gasAlarmsToUpdate.add(alarmToEnd);
handVo.setAlarmId(null);
@ -474,6 +522,12 @@ public class BatchDeviceMessageProcessor {
// 发送报警结束消息
sendAlarmMessage(handVo, alarmRule.getGasTypeName(), handVo.getValue(), false, context);
handVo.setLastPushValue(null);
HandAlarmDO alarmToEnd = new HandAlarmDO();
alarmToEnd.setId(handVo.getAlarmId());
alarmToEnd.setTAlarmEnd(now);
alarmToEnd.setStatus(EnableStatus.HANDLE.value());
alarmToEnd.setRemark("系统已自动处理报警");
context.gasAlarmsToUpdate.add(alarmToEnd);
return;
}
@ -495,17 +549,20 @@ public class BatchDeviceMessageProcessor {
handVo.setMaxAlarmLevel(newLevel);
// 创建新报警
HandAlarmSaveReqVO newAlarm = new HandAlarmSaveReqVO();
HandAlarmDO newAlarm = new HandAlarmDO();
newAlarm.setDetectorId(handVo.getId());
newAlarm.setSn(handVo.getSn());
newAlarm.setName(handVo.getName());
newAlarm.setVAlarmFirst(handVo.getFirstValue());
newAlarm.setVAlarmMaximum(handVo.getFirstValue());
newAlarm.setGasType(handVo.getGasChemical());
newAlarm.setDeptId(handVo.getDeptId());
newAlarm.setPicX(handVo.getLongitude());
newAlarm.setPicY(handVo.getLatitude());
newAlarm.setAlarmType(AlarmType.GAS.getType());
newAlarm.setAlarmLevel(newLevel);
newAlarm.setTAlarmStart(now);
newAlarm.setTenantId(Math.toIntExact(handVo.getTenantId()));
newAlarm.setTenantId(handVo.getTenantId());
newAlarm.setUnit(handVo.getUnit());
newAlarm.setCreator("system");
newAlarm.setCreateTime(now);
@ -570,7 +627,7 @@ public class BatchDeviceMessageProcessor {
// 从正常变为报警
if (shouldAlarm && !isCurrentlyAlarming) {
HandAlarmSaveReqVO newAlarm = new HandAlarmSaveReqVO();
HandAlarmDO newAlarm = new HandAlarmDO();
newAlarm.setDetectorId(handVo.getId());
newAlarm.setSn(handVo.getSn());
newAlarm.setAlarmType(AlarmType.BATTERY.getType());
@ -578,7 +635,7 @@ public class BatchDeviceMessageProcessor {
newAlarm.setVAlarmFirst((double) batteryPercentage);
newAlarm.setPicX(handVo.getLatitude());
newAlarm.setPicY(handVo.getLongitude());
newAlarm.setTenantId(Math.toIntExact(handVo.getTenantId()));
newAlarm.setTenantId(handVo.getTenantId());
newAlarm.setCreator("system");
newAlarm.setCreateTime(now);
@ -655,7 +712,7 @@ public class BatchDeviceMessageProcessor {
if (!hasOngoingAlarm) {
// 触发新报警
FenceAlarmSaveReqVO newAlarm = new FenceAlarmSaveReqVO();
FenceAlarmDO newAlarm = new FenceAlarmDO();
newAlarm.setDetectorId(handVo.getId());
newAlarm.setSn(handVo.getSn());
newAlarm.setType(fenceType.getType());
@ -823,6 +880,9 @@ public class BatchDeviceMessageProcessor {
for (String sn : targetSns) {
if (StringUtils.isBlank(sn)) continue;
// 限流
mqttRateLimiter.acquire();
try {
String topic = sn + "/zds_down";
client.publish(topic, json);
@ -898,14 +958,14 @@ public class BatchDeviceMessageProcessor {
List<AlarmMessageLog> alarmMessageLogs = new ArrayList<>();
// 待保存的报警
List<HandAlarmSaveReqVO> gasAlarmsToCreate = new ArrayList<>();
List<HandAlarmDO> gasAlarmsToCreate = new ArrayList<>();
List<HandAlarmDO> gasAlarmsToUpdate = new ArrayList<>();
List<FenceAlarmSaveReqVO> fenceAlarmsToCreate = new ArrayList<>();
List<FenceAlarmDO> fenceAlarmsToCreate = new ArrayList<>();
List<FenceAlarmDO> fenceAlarmsToUpdate = new ArrayList<>();
// 待回填的ID(用于新创建的报警记录)
Map<String, HandAlarmSaveReqVO> pendingAlarmIds = new HashMap<>();
Map<String, FenceAlarmSaveReqVO> pendingFenceAlarmIds = new HashMap<>();
Map<String, HandAlarmDO> pendingAlarmIds = new HashMap<>();
Map<String, FenceAlarmDO> pendingFenceAlarmIds = new HashMap<>();
// 待更新的 Redis 数据
Map<String, HandDataVo> redisUpdates = new HashMap<>();

Loading…
Cancel
Save