Browse Source

优化mqtt发送线程数

master
wangwei_123 4 days ago
parent
commit
41d495920f
  1. 35
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java
  2. 4
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java
  3. 2
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

35
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java

@ -20,23 +20,7 @@ import java.util.concurrent.BlockingQueue;
@Slf4j
@Component
public class Client {
private final OnMessageCallback onMessageCallback; // 1. 注入我们新的 Kafka 生产者回调
// 【修改点2】定义全局通用的发送回调,避免每次发送都创建对象,减少GC
private final IMqttActionListener globalPublishCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// 高并发下,成功通常不打印日志,否则磁盘IO会爆炸
// log.debug("发送成功: {}", asyncActionToken.getTopics());
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// 只有失败才打印日志
log.error("MQTT异步发送失败, Topic: {}",
asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null",
exception);
}
};
// --- 1. 配置注入 (保持不变) ---
@Value("${mqtt.enable:false}")
private Boolean enable;
@ -63,6 +47,24 @@ public class Client {
//使用异步客户端接口
private IMqttAsyncClient mqttClient;
private final OnMessageCallback onMessageCallback; // 1. 注入我们新的 Kafka 生产者回调
private final IMqttActionListener globalPublishCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// log.debug("发送成功: {}", asyncActionToken.getTopics());
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// 只有失败才打印日志
log.error("MQTT异步发送失败, Topic: {}",
asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null",
exception);
}
};
public Client(OnMessageCallback onMessageCallback) {
this.onMessageCallback = onMessageCallback;
}
@ -170,7 +172,6 @@ public class Client {
public void publishAsync(String topic, byte[] payload, int qos, boolean retained) {
if (mqttClient == null || !mqttClient.isConnected()) {
// 降级:只打印日志,不抛异常,避免中断业务循环
// 在高并发场景下,这里可以考虑加一个计数器,每N次打印一次日志
log.warn("MQTT未连接,丢弃消息: {}", topic);
return;
}

4
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java

@ -14,11 +14,11 @@ public class ThreadPoolConfig {
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:CPU核心数
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
// 最大线程数:CPU核心数 * 4
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
// 队列大小
executor.setQueueCapacity(2000);
executor.setQueueCapacity(50000);
// 线程名称前缀
executor.setThreadNamePrefix("mqtt-executor-");
// 拒绝策略:由调用者线程处理

2
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

@ -712,7 +712,7 @@ public class BatchDeviceMessageProcessor {
}
/**
* 处理围栏报警生命周期
* 处理围栏报警生命周期·
*/
private void handleFenceAlarmLifecycle(boolean isViolating, HandDataVo handVo,
FenceType fenceType, List<Geofence> fenceList, BatchContext context) {

Loading…
Cancel
Save