Browse Source

手持表增加mqtt批处理发送,报警推送改成kafka发送

master
wangwei_123 5 days ago
parent
commit
9098977b29
  1. 25
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java
  2. 26
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java
  3. 17
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaTopicType.java
  4. 46
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java
  5. 10
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java
  6. 102
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
  7. 82
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java
  8. 2
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/FenceAlarmDO.java
  9. 2
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java
  10. 2
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java
  11. 5
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java
  12. 5
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java
  13. 17
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmDispatchEvent.java
  14. 1
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java
  15. 1
      cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java
  16. 9
      cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml
  17. 36
      cc-admin-master/yudao-server/src/main/resources/application.yaml

25
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java

@ -38,24 +38,37 @@ public class KafkaConfig {
}
@Bean("batchFactory")
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<String, String> consumerFactory,
KafkaTemplate<String, String> kafkaTemplate) {
public KafkaListenerContainerFactory<?> batchFactory(
ConsumerFactory<String, String> consumerFactory,
KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 【核心设置】开启批量监听!没有这就还是单条处理
// 1. 开启批量监听
factory.setBatchListener(true);
// 【并发设置】根据你的分区数设置,比如你有 3 个分区就设为 3
// 2. 并发数 (按分区数调整)
factory.setConcurrency(6);
// 3. 批量手动 ACK (配合 enable-auto-commit: false)
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
factory.getContainerProperties().setPollTimeout(3000);
// 创建一个 DeadLetterPublishingRecoverer,它知道如何将失败消息发送到 DLT
// 4. 错误处理与死信队列
// 定义死信发送器
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(KafkaTopicType.DEAD_LETTER_TOPIC.getValue(), record.partition()));
(record, ex) -> {
// 策略:原 Topic + ".DLT" (例如 zds_up -> zds_up.DLT)
// 这样比写死 KafkaTopicType.DEAD_LETTER_TOPIC 更灵活,不同业务互不干扰
return new TopicPartition(record.topic() + ".DLT", record.partition());
});
// 定义回退策略:间隔 1秒,重试 2次
// 注意:Batch 模式下,如果重试失败,整批数据都会进死信
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 2L));
// 配置给工厂
factory.setCommonErrorHandler(errorHandler);
return factory;

26
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java

@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.mqtt.kafka;
import cn.iocoder.yudao.module.mqtt.processor.BatchDeviceMessageProcessor;
import cn.iocoder.yudao.module.mqtt.processor.DeviceMessageProcessor;
import cn.iocoder.yudao.module.mqtt.processor.HandAlarmMessageProcess;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Qualifier;
@ -19,20 +20,19 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static cn.iocoder.yudao.module.mqtt.kafka.KafkaTopicType.DEAD_LETTER_TOPIC;
@Slf4j
@Service // <--- 添加这个注解!
public class KafkaMessageConsumer {
private final DeviceMessageProcessor deviceMessageProcessor;
private final HandAlarmMessageProcess handAlarmMessageProcess;
private final BatchDeviceMessageProcessor batchDeviceMessageProcessor;
public KafkaMessageConsumer(DeviceMessageProcessor deviceMessageProcessor,
public KafkaMessageConsumer(HandAlarmMessageProcess handAlarmMessageProcess,
BatchDeviceMessageProcessor batchDeviceMessageProcessor) {
this.deviceMessageProcessor = deviceMessageProcessor;
this.handAlarmMessageProcess = handAlarmMessageProcess;
this.batchDeviceMessageProcessor = batchDeviceMessageProcessor;
}
@ -57,6 +57,24 @@ public class KafkaMessageConsumer {
throw e;
}
}
@KafkaListener(topics = "zds_up_alarm", containerFactory = "batchFactory")
public void HandAlarmMessage(List<ConsumerRecord<String, String>> records) {
if (records.isEmpty()) {
return;
}
log.info(">>> 开始逐条处理 Batch,共 {} 条", records.size());
// 遍历 List,一条条处理
for (ConsumerRecord<String, String> record : records) {
try {
handAlarmMessageProcess.processSingle(record.value());
} catch (Exception e) {
log.error("单条消息处理失败,已跳过。Key: {}, Value: {}", record.key(), record.value(), e);
}
}
log.info("Batch 处理完毕");
}
}

17
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaTopicType.java

@ -9,34 +9,25 @@ import java.util.stream.Collectors;
/**
* Kafka 主题枚举类用于统一管理和引用 Topic 名称
*/
@Getter // Lombok 注解,自动为所有字段生成 public 的 getter 方法,例如 getValue()
@Getter
public enum KafkaTopicType {
// 1. 常量名和值的含义保持一致,更清晰
DEVICE_STATUS_UP("zds_up"),
DEAD_LETTER_TOPIC("zds_up_dlt");
ALARM_TOPIC("zds_up_alarm");
// 2. 添加了 final 字段
private final String value;
// 3. 【核心修复】添加了私有构造函数,用于初始化 final 字段
KafkaTopicType(String value) {
this.value = value;
}
// --- 以下是更高效和健壮的 from 方法实现 ---
// 4. 使用静态 Map 缓存,提高查找性能,避免每次调用都遍历
private static final Map<String, KafkaTopicType> LOOKUP_MAP = Arrays.stream(values())
.collect(Collectors.toMap(KafkaTopicType::getValue, Function.identity()));
/**
* 根据字符串值查找对应的枚举常量.
*
* @param value topic 字符串值
* @return 对应的枚举常量
* @throws IllegalArgumentException 如果找不到匹配的常量
*/
public static KafkaTopicType from(String value) {
return LOOKUP_MAP.get(value);

46
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java

@ -9,6 +9,7 @@ import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@ -19,12 +20,14 @@ import java.util.regex.Pattern;
@Slf4j
@Component
public class OnMessageCallback implements MqttCallbackExtended {
private final Pattern topicPattern = Pattern.compile("([^/]+)/?(.+)?");
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Resource
private ApplicationEventPublisher eventPublisher;
@Resource(name = "mqttExecutor")
private TaskExecutor kafkaForwardExecutor;
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT 连接成功。是否为重连: {}, 服务器 URI: {}", reconnect, serverURI);
@ -38,29 +41,34 @@ public class OnMessageCallback implements MqttCallbackExtended {
}
@Override
public void messageArrived(String MqttTopic, MqttMessage message) {
String payload = new String(message.getPayload());
List<String> groups = ReUtil.getAllGroups(topicPattern, MqttTopic, false);
public void messageArrived(String mqttTopic, MqttMessage message) {
if (groups.get(1) == null) {
log.warn("无法从topic {} 中获取消息发送者ID", MqttTopic);
return;
}
String sn = groups.get(0);
String topic = groups.get(1);
KafkaTopicType from = KafkaTopicType.from(groups.get(1));
if (null == from) {
log.warn("发送的topic无效{}", topic);
return;
}
// 【关键】使用注入的线程池异步处理
kafkaForwardExecutor.execute(() -> processAndSend(mqttTopic, message));
}
private void processAndSend(String mqttTopic, MqttMessage message) {
try {
kafkaTemplate.send(topic, sn, payload);
log.debug("成功将消息转发到 Kafka Topic [{}]: payload=[{}]", topic, payload);
int slashIndex = mqttTopic.indexOf('/');
if (slashIndex == -1) return;
String sn = mqttTopic.substring(0, slashIndex);
String suffix = mqttTopic.substring(slashIndex + 1);
// 2. 校验 Topic
KafkaTopicType from = KafkaTopicType.from(suffix);
if (from == null) return;
// 3. 转发 Kafka (异步 IO)
// 配合 application.yml 的 linger.ms,底层会自动批量发送
String payload = new String(message.getPayload());
kafkaTemplate.send(suffix, sn, payload);
} catch (Exception e) {
log.error("转发消息到 Kafka Topic [{}] 时发生错误!", topic, e);
// 生产环境建议降低日志级别或采样打印,防止日志把磁盘打满
log.error("转发异常 Topic: {}", mqttTopic, e);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// no-op

10
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java

@ -15,8 +15,8 @@ public class ThreadPoolConfig {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:CPU核心数
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
// 最大线程数:CPU核心数 * 2
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
// 最大线程数:CPU核心数 * 4
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
// 队列大小
executor.setQueueCapacity(2000);
// 线程名称前缀
@ -31,8 +31,10 @@ public class ThreadPoolConfig {
@Bean("mqttAlarmExecutor")
public TaskExecutor mqttAlarmExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
// 核心线程数:CPU核心数
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
// 最大线程数:CPU核心数 * 2
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setQueueCapacity(10000); // 队列大一点,应对瞬间并发
executor.setThreadNamePrefix("alarm-push-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

102
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.mqtt.processor;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.hand.dal.*;
import cn.iocoder.yudao.module.hand.enums.*;
@ -9,6 +10,7 @@ import cn.iocoder.yudao.module.hand.service.*;
import cn.iocoder.yudao.module.hand.util.*;
import cn.iocoder.yudao.module.hand.vo.*;
import cn.iocoder.yudao.module.mqtt.config.TdengineBatchConfig;
import cn.iocoder.yudao.module.mqtt.kafka.KafkaTopicType;
import cn.iocoder.yudao.module.mqtt.mqtt.Client;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
@ -20,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.sql.Timestamp;
@ -61,11 +64,7 @@ public class BatchDeviceMessageProcessor {
@Resource
private FenceAlarmService fenceAlarmService;
@Resource
private Client client;
@Resource(name = "mqttAlarmExecutor")
private TaskExecutor alarmExecutor;
private final RateLimiter mqttRateLimiter = RateLimiter.create(50.0);
private KafkaTemplate<String, String> kafkaTemplate;
/**
@ -445,10 +444,17 @@ public class BatchDeviceMessageProcessor {
*/
private HandDataVo parseAndConvertData(String sn, String payload, HandDataVo device) {
try {
// 1. 【防御性编程】处理双重序列化问题
// 如果 payload 首尾有引号,说明是 "{\"a\":1}" 格式,需要先解开一层
if (payload.startsWith("\"") && payload.endsWith("\"")) {
// 去掉首尾引号,并把转义字符 \" 变成 "
payload = payload.substring(1, payload.length() - 1).replace("\\\"", "\"");
}
JSONObject json = JSONUtil.parseObj(payload);
Double gasValue = json.getDouble("gas", null);
String loc = json.getStr("loc");
Double gasValue = json.getDouble("gas");
String loc = json.getStr("loc"); // Hutool 会把数组转成字符串 "[115, 39, 0]"
String battery = json.getStr("battery");
device.setValue(gasValue);
@ -459,21 +465,29 @@ public class BatchDeviceMessageProcessor {
// 解析位置信息
if (StringUtils.isNotBlank(loc)) {
String coords = loc.substring(1, loc.length() - 1);
// 去掉首尾的中括号
String coords = loc.replace("[", "").replace("]", "");
String[] parts = coords.split(",");
if (parts.length == 3) {
if (parts.length >= 2) {
// 2. 【关键修正】使用 trim() 去除空格
// Double.parseDouble 实际上可以处理空格,但 trim 一下更安全
Map<String, Double> converted = CoordinateTransferUtils.wgs84ToGcj02(
Double.parseDouble(parts[0]),
Double.parseDouble(parts[1])
Double.parseDouble(parts[0].trim()),
Double.parseDouble(parts[1].trim())
);
device.setLongitude(converted.get("lon"));
device.setLatitude(converted.get("lat"));
device.setGpsType(Integer.parseInt(parts[2]));
if (parts.length >= 3) {
// 3. 【关键修正】Integer.parseInt 必须 trim(),否则 " 0" 会报错
device.setGpsType(Integer.parseInt(parts[2].trim()));
}
}
}
} catch (Exception e) {
// 打印完整的 payload 以便排查
log.error("[数据转换] 解析失败,SN: {}, payload: {}", sn, payload, e);
}
@ -838,63 +852,37 @@ public class BatchDeviceMessageProcessor {
handVo.getName(), statusText, gasName, valueStr);
// 记录报警消息日志
AlarmMessageLog log = new AlarmMessageLog();
log.setDetectorId(handVo.getId());
log.setHolderName(handVo.getName());
log.setSn(handVo.getSn());
log.setDeptId(handVo.getDeptId());
log.setTenantId(handVo.getTenantId());
log.setMessage(msgContent);
log.setRemark("系统自动触发报警推送");
AlarmMessageLog alarmMessageLog = new AlarmMessageLog();
alarmMessageLog.setDetectorId(handVo.getId());
alarmMessageLog.setHolderName(handVo.getName());
alarmMessageLog.setSn(handVo.getSn());
alarmMessageLog.setDeptId(handVo.getDeptId());
alarmMessageLog.setTenantId(handVo.getTenantId());
alarmMessageLog.setMessage(msgContent);
alarmMessageLog.setRemark("系统自动触发报警推送");
try {
List<String> targetSns = handDetectorService.getSnListByDept(
AlarmDispatchEvent event = new AlarmDispatchEvent(
handVo.getDeptId(),
handVo.getTenantId(),
handVo.getSn()
);
handVo.getSn(),
msgContent
);
if (targetSns != null && !targetSns.isEmpty()) {
log.setPushSnList(String.join(",", targetSns));
// 异步推送 MQTT 消息
publishAlarmToMqtt(targetSns, msgContent);
}
// 异步发送 Kafka,耗时极短 (<1ms)
// 使用 sourceSn 作为 Key,保证同一个部门/设备的报警排队有序处理
String key = handVo.getSn();
String json = JsonUtils.toJsonString(event);
kafkaTemplate.send(KafkaTopicType.ALARM_TOPIC.getValue(), key, json);
} catch (Exception e) {
this.log.error("[报警推送] 准备推送数据失败", e);
log.error("[报警推送] 准备推送数据失败", e);
}
context.alarmMessageLogs.add(log);
context.alarmMessageLogs.add(alarmMessageLog);
}
/**
* 异步推送 MQTT 报警消息
*/
private void publishAlarmToMqtt(List<String> targetSns, String message) {
alarmExecutor.execute(() -> {
Map<String, String> payload = Map.of("message", message);
try {
String json = OBJECT_MAPPER.writeValueAsString(payload);
for (String sn : targetSns) {
if (StringUtils.isBlank(sn)) continue;
// 限流
mqttRateLimiter.acquire();
try {
String topic = sn + "/zds_down";
client.publish(topic, json);
} catch (Exception e) {
log.error("[MQTT推送] 失败,SN: {}", sn, e);
}
}
} catch (JsonProcessingException e) {
log.error("[MQTT推送] JSON序列化失败", e);
}
});
}
/**
* 批量更新 Redis

82
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java

@ -0,0 +1,82 @@
package cn.iocoder.yudao.module.mqtt.processor;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.hand.service.HandDetectorService;
import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent;
import cn.iocoder.yudao.module.mqtt.mqtt.Client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.RateLimiter;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class HandAlarmMessageProcess {
@Resource
private Client mqttClient;
@Resource
private HandDetectorService handDetectorService;
private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0);
public void processSingle(String jsonValue) {
// 1. 解析 Kafka 消息
AlarmDispatchEvent event = JsonUtils.parseObject(jsonValue, AlarmDispatchEvent.class);
if (event == null) {
log.warn("[报警推送] 消息解析为空,忽略: {}", jsonValue);
return;
}
try {
// 2. 【查库】根据部门/租户/源设备SN,查询需要推送的目标设备列表
List<String> targetSns = handDetectorService.getSnListByDept(
event.getDeptId(),
event.getTenantId(),
event.getSourceSn()
);
if (targetSns == null || targetSns.isEmpty()) {
log.info("[报警推送] 无需推送,目标列表为空. SourceSN: {}", event.getSourceSn());
return;
}
// 3. 执行推送逻辑
this.publishAlarmToMqtt(targetSns, event.getMsgContent());
} catch (Exception e) {
log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e);
}
}
/**
* 执行 MQTT 推送 (包含限流)
*/
private void publishAlarmToMqtt(List<String> targetSns, String message) {
if (message != null) {
return;
}
// 构造 MQTT 消息体
Map<String, String> payload = Map.of("message", message);
String jsonPayload = JsonUtils.toJsonString(payload);
for (String sn : targetSns) {
if (StringUtils.isBlank(sn)) continue;
mqttRateLimiter.acquire();
try {
String topic = sn + "/zds_down";
// 发送
mqttClient.publish(topic, jsonPayload);
} catch (Exception e) {
// 5. 【隔离】单个设备推送失败,不要影响列表里的下一个设备
log.error("[MQTT推送] 单个发送失败,SN: {}", sn, e);
}
}
}
}

2
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/FenceAlarmDO.java

@ -84,7 +84,7 @@ public class FenceAlarmDO extends BaseDO {
/**
* 租户id
*/
private Integer tenantId;
private Long tenantId;
/**
* 部门id

2
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java

@ -64,7 +64,7 @@ public interface FenceAlarmService {
void update( FenceAlarmDO updateReqVO);
void batchCreateFenceAlarm(List<FenceAlarmSaveReqVO> fenceAlarmsToCreate);
void batchCreateFenceAlarm(List<FenceAlarmDO> fenceAlarmsToCreate);
void batchUpdateById(List<FenceAlarmDO> fenceAlarmsToUpdate);

2
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java

@ -64,7 +64,7 @@ public interface HandAlarmService {
void insertBatch(List<HandAlarmDO> doList);
void batchCreateHandAlarm(List<HandAlarmSaveReqVO> gasAlarmsToCreate);
void batchCreateHandAlarm(List<HandAlarmDO> gasAlarmsToCreate);
void batchUpdateById(List<HandAlarmDO> gasAlarmsToUpdate);
}

5
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java

@ -94,10 +94,9 @@ public class FenceAlarmServiceImpl implements FenceAlarmService {
}
@Override
public void batchCreateFenceAlarm(List<FenceAlarmSaveReqVO> fenceAlarmsToCreate) {
public void batchCreateFenceAlarm(List<FenceAlarmDO> fenceAlarmsToCreate) {
List<FenceAlarmDO> fenceAlarmDOs = BeanUtils.toBean(fenceAlarmsToCreate, FenceAlarmDO.class);
fenceAlarmMapper.insertBatch(fenceAlarmDOs);
fenceAlarmMapper.insertBatch(fenceAlarmsToCreate);
}
@Override

5
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java

@ -99,10 +99,9 @@ public class HandAlarmServiceImpl implements HandAlarmService {
@Override
@Transactional(rollbackFor = Exception.class)
@TenantIgnore
public void batchCreateHandAlarm(List<HandAlarmSaveReqVO> gasAlarmsToCreate) {
public void batchCreateHandAlarm(List<HandAlarmDO> gasAlarmsToCreate) {
List<HandAlarmDO> bean = BeanUtils.toBean(gasAlarmsToCreate, HandAlarmDO.class);
handAlarmMapper.insertBatch(bean);
handAlarmMapper.insertBatch(gasAlarmsToCreate);
}
@Override

17
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmDispatchEvent.java

@ -0,0 +1,17 @@
package cn.iocoder.yudao.module.hand.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AlarmDispatchEvent {
private Long deptId;
private Long tenantId;
private String sourceSn;
// 消息内容
private String msgContent;
}

1
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java

@ -112,4 +112,5 @@ public class HandDataVo {
@Schema(description = "最后推送数据")
private Double lastPushValue;
}

1
cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java

@ -27,5 +27,6 @@ public class HandTdenginePageVO extends PageParam {
private Long tenantId;
private Long deptId;
}

9
cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml

@ -162,14 +162,17 @@
AND tenant_id = #{vo.tenantId}
</if>
<if test="vo.startTime != null">
AND ts >= #{po.startTime}
AND ts >= #{vo.startTime}
</if>
<if test="vo.endTime != null">
AND ts &lt;= #{po.endTime}
AND ts &lt;= #{vo.endTime}
</if>
<if test="vo.deptId != null">
AND dept_id; = #{vo.deptId}
</if>
<if test="vo.holderName != null and vo.holderName != ''">
<bind name="holderNamePattern" value="'%' + vo.holderName + '%'" />
AND holder_name LIKE #{holderNamePattern}
AND holder_name LIKE #{vo.holderNamePattern}
</if>
<if test="vo.pushSnList != null and vo.pushSnList != ''">
<!-- 使用 INSTR 函数:查找子串位置,大于0表示存在 -->

36
cc-admin-master/yudao-server/src/main/resources/application.yaml

@ -125,25 +125,31 @@ rocketmq:
group: ${spring.application.name}_PRODUCER # 生产者分组
spring:
# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
# Kafka Producer 配置项
producer:
acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
retries: 3 # 发送失败时,重试发送的次数
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
# Kafka Consumer 配置项
consumer:
auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
max-poll-records: 300
fetch-max-wait: 3000 # 等待拉取消息的最大时间(毫秒)
acks: 1
retries: 3
batch-size: 16384
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
spring.json.trusted.packages: '*'
group-id: consumer-${spring.application.name} # 消费者分组
# Kafka Consumer Listener 监听器配置
linger.ms: 10
buffer.memory: 33554432
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 1000
fetch-max-wait: 3000
# 【修正3】改为 StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: consumer-${spring.application.name}
listener:
missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
missing-topics-fatal: false
--- #################### 芋道相关配置 ####################
yudao:

Loading…
Cancel
Save