Browse Source

手持表没有部门不推送报警

master
wangwei_123 1 week ago
parent
commit
9130271cde
  1. 50
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java
  2. 17
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
  3. 84
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java
  4. 6
      cc-admin-master/yudao-server/src/main/resources/application-dev.yaml
  5. 2
      cc-admin-master/yudao-server/src/main/resources/application-local.yaml
  6. 2
      cc-admin-master/yudao-server/src/main/resources/application-prod.yaml

50
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java

@ -1,12 +1,12 @@
package cn.iocoder.yudao.module.mqtt.kafka; package cn.iocoder.yudao.module.mqtt.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
@ -14,28 +14,9 @@ import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff; import org.springframework.util.backoff.FixedBackOff;
@Configuration @Configuration
@Slf4j
public class KafkaConfig { public class KafkaConfig {
/**
* 配置一个通用的错误处理器.
* 它会在反序列化失败或监听器方法抛出异常时生效.
* @param operations KafkaTemplate 实例
* @return a DefaultErrorHandler
*/
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<Object, Object> operations) {
// 1. 创建 DeadLetterPublishingRecoverer,它负责将失败的消息推送到 DLT
// 这里我们为所有失败的消息都指定了一个通用的 DLT 主题
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(operations,
(rec, ex) -> new TopicPartition(rec.topic() + ".DLT", rec.partition()));
// 根据业务需求选择合适的策略。
FixedBackOff backOff = new FixedBackOff(1000L, 2L);
// 3. 创建并返回 DefaultErrorHandler
// 当重试耗尽后,会调用 recoverer 将消息送入 DLT
return new DefaultErrorHandler(recoverer, backOff);
}
@Bean("batchFactory") @Bean("batchFactory")
public KafkaListenerContainerFactory<?> batchFactory( public KafkaListenerContainerFactory<?> batchFactory(
@ -49,26 +30,31 @@ public class KafkaConfig {
// 1. 开启批量监听 // 1. 开启批量监听
factory.setBatchListener(true); factory.setBatchListener(true);
// 2. 并发数 (按分区数调整)
// 2. 并发数(按分区数调整)
factory.setConcurrency(6); factory.setConcurrency(6);
// 3. 批量手动 ACK (配合 enable-auto-commit: false)
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
factory.getContainerProperties().setPollTimeout(3000); factory.getContainerProperties().setPollTimeout(3000);
// 4. 错误处理与死信队列 // 4. 错误处理与死信队列
// 定义死信发送器
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, ex) -> { (record, ex) -> {
// 策略:原 Topic + ".DLT" (例如 zds_up -> zds_up.DLT)
// 这样比写死 KafkaTopicType.DEAD_LETTER_TOPIC 更灵活,不同业务互不干扰
return new TopicPartition(record.topic() + ".DLT", record.partition());
log.warn("[DLT] 消息进入死信队列, topic={}, partition={}, offset={}, error={}",
record.topic(), record.partition(), record.offset(),
ex.getMessage());
return new TopicPartition(record.topic() + ".DLT", 0);
}); });
// 定义回退策略:间隔 1秒,重试 2次
// 注意:Batch 模式下,如果重试失败,整批数据都会进死信
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 2L));
// 配置给工厂
// 【问题4 FIX】批量场景下重试次数设为 0,失败立即进 DLT
// 避免整批阻塞消费线程,由 DLT 消费者做单条重试
// 如果业务可以接受短暂阻塞,可改为 FixedBackOff(1000L, 1L) 最多重试1次
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
recoverer,
new FixedBackOff(0L, 0L)
);
factory.setCommonErrorHandler(errorHandler); factory.setCommonErrorHandler(errorHandler);
return factory; return factory;

17
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

@ -54,7 +54,7 @@ public class BatchDeviceMessageProcessor {
private FenceAlarmService fenceAlarmService; private FenceAlarmService fenceAlarmService;
@Resource @Resource
private KafkaTemplate<String, String> kafkaTemplate; private KafkaTemplate<String, String> kafkaTemplate;
private List<ConsumerRecord<String, String>> records;
/** /**
@ -94,7 +94,6 @@ public class BatchDeviceMessageProcessor {
} }
private BatchContext prepareBatchContext(List<ConsumerRecord<String, String>> records) { private BatchContext prepareBatchContext(List<ConsumerRecord<String, String>> records) {
this.records = records;
BatchContext context = new BatchContext(); BatchContext context = new BatchContext();
// 1. 提取所有有效的 SNs // 1. 提取所有有效的 SNs
@ -116,7 +115,6 @@ public class BatchDeviceMessageProcessor {
.filter(sn -> !context.snToTenantMap.containsKey(sn)) .filter(sn -> !context.snToTenantMap.containsKey(sn))
.toList(); .toList();
if (CollectionUtils.isNotEmpty(missingSns)) { if (CollectionUtils.isNotEmpty(missingSns)) {
// 2.1 查库判断设备是否真的不存在(可能只是 Redis 中丢失了映射)
QueryWrapper<HandDetectorDO> query = new QueryWrapper<>(); QueryWrapper<HandDetectorDO> query = new QueryWrapper<>();
query.in("sn", missingSns); query.in("sn", missingSns);
List<HandDetectorDO> dbDetectors = handDetectorService.listAll(query); List<HandDetectorDO> dbDetectors = handDetectorService.listAll(query);
@ -126,7 +124,6 @@ public class BatchDeviceMessageProcessor {
existingDbSns.add(dbDetector.getSn()); existingDbSns.add(dbDetector.getSn());
if (dbDetector.getTenantId() != null) { if (dbDetector.getTenantId() != null) {
context.snToTenantMap.put(dbDetector.getSn(), Long.valueOf(dbDetector.getTenantId())); context.snToTenantMap.put(dbDetector.getSn(), Long.valueOf(dbDetector.getTenantId()));
// 顺便回填 Redis,防止下次继续穿透查库
try { try {
redisUtil.hset(RedisKeyUtil.getDeviceTenantMappingKey(), dbDetector.getSn(), dbDetector.getTenantId()); redisUtil.hset(RedisKeyUtil.getDeviceTenantMappingKey(), dbDetector.getSn(), dbDetector.getTenantId());
} catch (Exception e) { } catch (Exception e) {
@ -159,12 +156,8 @@ public class BatchDeviceMessageProcessor {
} }
try { try {
// 保存到数据库。注意: 这里使用 save 单条保存,如果项目中存在 saveBatch(newDetectors) 则推荐替换成批量保存以提升性能。
handDetectorService.saveList(newDetectors); handDetectorService.saveList(newDetectors);
log.info("[自动注册] 自动保存未知设备 {} 个,默认租户ID为 1", newDetectors.size()); log.info("[自动注册] 自动保存未知设备 {} 个,默认租户ID为 1", newDetectors.size());
// 将新注册的设备信息回填进上下文并更新 Redis,这样后续步骤就能流畅处理了
for (HandDetectorDO nd : newDetectors) { for (HandDetectorDO nd : newDetectors) {
context.snToTenantMap.put(nd.getSn(), 1L); context.snToTenantMap.put(nd.getSn(), 1L);
try { try {
@ -719,8 +712,8 @@ public class BatchDeviceMessageProcessor {
newAlarm.setAlarmType(AlarmType.BATTERY.getType()); newAlarm.setAlarmType(AlarmType.BATTERY.getType());
newAlarm.setTAlarmStart(now); newAlarm.setTAlarmStart(now);
newAlarm.setVAlarmFirst((double) batteryPercentage); newAlarm.setVAlarmFirst((double) batteryPercentage);
newAlarm.setPicX(handVo.getLatitude());
newAlarm.setPicY(handVo.getLongitude());
newAlarm.setPicX(handVo.getLongitude());
newAlarm.setPicY(handVo.getLatitude());
newAlarm.setTenantId(handVo.getTenantId()); newAlarm.setTenantId(handVo.getTenantId());
newAlarm.setCreator("system"); newAlarm.setCreator("system");
newAlarm.setCreateTime(now); newAlarm.setCreateTime(now);
@ -917,6 +910,10 @@ public class BatchDeviceMessageProcessor {
*/ */
private void sendAlarmMessage(HandDataVo handVo, String gasName, Double value, private void sendAlarmMessage(HandDataVo handVo, String gasName, Double value,
boolean isAlarming, BatchContext context) { boolean isAlarming, BatchContext context) {
if (null == handVo.getDeptId()){
log.error("[sendAlarmMessage][deviceSn({}) 发送报警消息时,部门ID为空]", handVo.getSn());
return;
}
String valueStr = (value != null && value % 1 == 0) String valueStr = (value != null && value % 1 == 0)
? String.valueOf(value.intValue()) ? String.valueOf(value.intValue())

84
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java

@ -6,15 +6,15 @@ import cn.iocoder.yudao.module.hand.service.TdengineService;
import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent; import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent;
import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog; import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog;
import cn.iocoder.yudao.module.mqtt.mqtt.Client; import cn.iocoder.yudao.module.mqtt.mqtt.Client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -22,7 +22,12 @@ import java.util.Map;
@Component @Component
public class HandAlarmMessageProcess { public class HandAlarmMessageProcess {
private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0);
/**
* 问题6 FIX限流速率从配置文件读取不再硬编码
* 配置项示例mqtt.alarm.rate-limit=3000.0
*/
private final RateLimiter mqttRateLimiter;
@Resource @Resource
private Client mqttClient; private Client mqttClient;
@Resource @Resource
@ -30,6 +35,11 @@ public class HandAlarmMessageProcess {
@Resource @Resource
private TdengineService tdengineService; private TdengineService tdengineService;
public HandAlarmMessageProcess(
@Value("${mqtt.alarm.rate-limit:3000.0}") double rateLimit) {
this.mqttRateLimiter = RateLimiter.create(rateLimit);
}
public void processSingle(String jsonValue) { public void processSingle(String jsonValue) {
// 1. 解析 Kafka 消息 // 1. 解析 Kafka 消息
AlarmDispatchEvent event = JsonUtils.parseObject(jsonValue, AlarmDispatchEvent.class); AlarmDispatchEvent event = JsonUtils.parseObject(jsonValue, AlarmDispatchEvent.class);
@ -39,7 +49,7 @@ public class HandAlarmMessageProcess {
} }
try { try {
// 2. 【查库】根据部门/租户/源设备SN,查询需要推送的目标设备列表
// 2. 根据部门/租户/源设备SN,查询需要推送的目标设备列表
List<String> targetSns = handDetectorService.getSnListByDept( List<String> targetSns = handDetectorService.getSnListByDept(
event.getDeptId(), event.getDeptId(),
event.getTenantId(), event.getTenantId(),
@ -50,23 +60,28 @@ public class HandAlarmMessageProcess {
log.info("[报警推送] 无需推送,目标列表为空. SourceSN: {}", event.getSourceSn()); log.info("[报警推送] 无需推送,目标列表为空. SourceSN: {}", event.getSourceSn());
return; return;
} }
// 3. 执行推送逻辑
this.publishAlarmToMqtt(targetSns, event.getMsgContent());
// 记录报警消息日志
AlarmMessageLog alarmMessageLog = new AlarmMessageLog();
alarmMessageLog.setDetectorId(event.getId());
alarmMessageLog.setHolderName(event.getUserName());
alarmMessageLog.setSn(event.getSourceSn());
alarmMessageLog.setDeptId(event.getDeptId());
alarmMessageLog.setTenantId(event.getTenantId());
alarmMessageLog.setMessage(event.getMsgContent());
alarmMessageLog.setRemark("系统自动触发报警推送");
alarmMessageLog.setPushSnList(StringUtils.join(targetSns, ","));
ArrayList<AlarmMessageLog> objects = new ArrayList<>();
objects.add(alarmMessageLog);
tdengineService.createAlarmRecord(objects);
// 3. 执行推送,收集实际成功推送的 SN 列表
// 【问题3 FIX】由 publishAlarmToMqtt 返回实际推送成功的列表,日志只记录成功项
List<String> succeededSns = publishAlarmToMqtt(targetSns, event.getMsgContent());
// 4. 记录报警消息日志(仅记录实际推送成功的 SN)
if (!succeededSns.isEmpty()) {
AlarmMessageLog alarmMessageLog = new AlarmMessageLog();
alarmMessageLog.setDetectorId(event.getId());
alarmMessageLog.setHolderName(event.getUserName());
alarmMessageLog.setSn(event.getSourceSn());
alarmMessageLog.setDeptId(event.getDeptId());
alarmMessageLog.setTenantId(event.getTenantId());
alarmMessageLog.setMessage(event.getMsgContent());
alarmMessageLog.setRemark("系统自动触发报警推送");
alarmMessageLog.setPushSnList(StringUtils.join(succeededSns, ","));
alarmMessageLog.setTs(new Timestamp(System.currentTimeMillis()));
tdengineService.createAlarmRecord(List.of(alarmMessageLog));
}
} catch (Exception e) { } catch (Exception e) {
log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e); log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e);
@ -74,30 +89,35 @@ public class HandAlarmMessageProcess {
} }
/** /**
* 执行 MQTT 推送 (包含限流)
* 执行 MQTT 推送含限流
*
*/ */
private void publishAlarmToMqtt(List<String> targetSns, String message) {
if (message == null) {
return;
private List<String> publishAlarmToMqtt(List<String> targetSns, String message) {
List<String> succeededSns = new ArrayList<>();
// 【问题5 FIX】同时检查 null 和 blank,避免推送空消息给设备
if (StringUtils.isBlank(message)) {
log.warn("[MQTT推送] 消息内容为空,跳过推送");
return succeededSns;
} }
// 构造 MQTT 消息体
Map<String, String> payload = Map.of("message", message);
String jsonPayload = JsonUtils.toJsonString(payload);
for (String sn : targetSns) { for (String sn : targetSns) {
if (StringUtils.isBlank(sn)) continue; if (StringUtils.isBlank(sn)) continue;
mqttRateLimiter.acquire(); mqttRateLimiter.acquire();
try { try {
// 【BUG-2 FIX】JSON 构造移至 try 块内,确保异常不会绕过限流语义
String topic = sn + "/zds_down"; String topic = sn + "/zds_down";
// 发送
String jsonPayload = JsonUtils.toJsonString(Map.of("message", message));
mqttClient.publish(topic, jsonPayload); mqttClient.publish(topic, jsonPayload);
succeededSns.add(sn);
} catch (Exception e) { } catch (Exception e) {
// 5. 【隔离】单个设备推送失败,不要影响列表里的下一个设备
// 单个设备推送失败,隔离异常不影响后续设备
log.error("[MQTT推送] 单个发送失败,SN: {}", sn, e); log.error("[MQTT推送] 单个发送失败,SN: {}", sn, e);
} }
} }
return succeededSns;
} }
} }

6
cc-admin-master/yudao-server/src/main/resources/application-dev.yaml

@ -48,7 +48,7 @@ spring:
primary: master primary: master
datasource: datasource:
master: master:
url: jdbc:mysql://192.168.0.180:3306/hand_alarm?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true # MySQL Connector/J 8.X 连接的示例
url: jdbc:mysql://192.168.0.180:3306/hand_alarm_dev?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true # MySQL Connector/J 8.X 连接的示例
username: root username: root
password: Gsking164411 password: Gsking164411
driver-class-name: com.mysql.cj.jdbc.Driver # MySQL Connector/J 8.X 连接的示例 driver-class-name: com.mysql.cj.jdbc.Driver # MySQL Connector/J 8.X 连接的示例
@ -226,7 +226,7 @@ mqtt:
username: root username: root
password: roomasd111 password: roomasd111
client: client:
id: cc-admin-qg-dev44
id: cc-admin-qg-dev
connectionTimeout: 10 connectionTimeout: 10
keepAliveInterval: 60 keepAliveInterval: 60
cleanSession: true cleanSession: true
@ -241,3 +241,5 @@ mqtt:
coreSize: 10 coreSize: 10
maxSize: 20 maxSize: 20
queueSize: 100 queueSize: 100
alarm:
rate-limit: 3000.0 # 报警 MQTT 推送限流速率(每秒最大请求数)

2
cc-admin-master/yudao-server/src/main/resources/application-local.yaml

@ -241,3 +241,5 @@ mqtt:
coreSize: 10 coreSize: 10
maxSize: 20 maxSize: 20
queueSize: 100 queueSize: 100
alarm:
rate-limit: 3000.0 # 报警 MQTT 推送限流速率(每秒最大请求数)

2
cc-admin-master/yudao-server/src/main/resources/application-prod.yaml

@ -252,3 +252,5 @@ mqtt:
coreSize: 10 coreSize: 10
maxSize: 20 maxSize: 20
queueSize: 100 queueSize: 100
alarm:
rate-limit: 3000.0 # 报警 MQTT 推送限流速率(每秒最大请求数)
Loading…
Cancel
Save