增加拉取继电器时,同时查询Mqtt

This commit is contained in:
zy 2026-03-30 18:37:49 +08:00
parent 3a1a541144
commit e405152805
17 changed files with 911 additions and 624 deletions

View File

@ -18,7 +18,8 @@ import org.jeecg.common.vo.*;
import org.jeecg.common.vo.statistic.DTOIotSummray;
import org.jeecg.modules.appmana.service.IScEquZhibiaoService;
import org.jeecg.modules.appmana.utils.Iotutils;
import org.jeecg.modules.mqtt.MqttService;
import org.jeecg.modules.mqtt.service.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -739,7 +740,7 @@ public class IotCommonP3ServiceImpl {
if(deploy!=null && variables!=null && !variables.isEmpty()){
QueryCmd queryCmd = LhIotUtil.ConstructCmd(variables);
String cmdStr = JSONObject.toJSONString(queryCmd);
mqttService.publish(deploy.getDeviceIotUrl(),cmdStr);
mqttService.sendMessage(deploy.getDeviceIotUrl(),cmdStr);
}
return result;
}

View File

@ -29,7 +29,7 @@ import org.jeecg.modules.appmana.mapper.IOTStatisticMapper;
import org.jeecg.modules.appmana.mapper.SurvIotVirtualDeviceMapper;
import org.jeecg.modules.appmana.service.ISurvIotVirtualDeviceService;
import org.jeecg.modules.appmana.utils.CommonUtils;
import org.jeecg.modules.mqtt.MqttService;
import org.jeecg.modules.mqtt.service.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -690,9 +690,9 @@ public class SurvIotVirtualDeviceServiceImpl extends ServiceImpl<SurvIotVirtualD
String lockerId = TopicConstant.MQTT_CMD_LOCKER + cmdId;
if(StringUtils.isNotBlank(content)){
mqttCustomerClient.publish(topic,content);
mqttCustomerClient.sendMessage(topic,content);
}else if(msgContent!=null){
mqttCustomerClient.publish(topic,msgContent);
mqttCustomerClient.sendMessage(topic,new String(msgContent));
}
}catch (Exception e){

View File

@ -1,13 +0,0 @@
package org.jeecg.modules.mqtt;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@Data
public class DTOMqtt {
@ApiModelProperty("主题")
private String topic;
@ApiModelProperty("发送的内容")
private String message;
}

View File

@ -1,34 +0,0 @@
package org.jeecg.modules.mqtt;
import lombok.Data;
import java.io.Serializable;
@Data
public class LhMqttMsg implements Serializable {
private static final long serialVersionUID = -8303548938481407659L;
/**
* MD5值MD5_lower(content + timestamp)
*/
private String md5;
/**
* 消息内容
*/
private String content = "";
/**
* 消息内容
*/
private byte[] byteContent;
/**
* 时间戳
*/
private Long timestamp;
}

View File

@ -1,71 +0,0 @@
package org.jeecg.modules.mqtt;
import cn.hutool.core.lang.UUID;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@Slf4j
@Configuration
@ConditionalOnProperty(
prefix = "iot.mq",
name = "enabled",
havingValue = "true",
matchIfMissing = true // 默认注册除非显式设置为 false
)
public class MqttConfig {
@Value("${mqtt.host}")
public String host;
@Value("${mqtt.username}")
public String username;
@Value("${mqtt.password}")
public String password;
// @Value("${mqtt.clientId}")
public String clientId = "fxApp-" + UUID.fastUUID().toString().replaceAll("-","").substring(0,5);
@Value("${mqtt.timeout}")
public int timeOut;
@Value("${mqtt.keepalive}")
public int keepAlive;
@Value("${mqtt.clearSession}")
public boolean clearSession;
@Autowired
private Environment env;
@Bean//注入Spring
public MyMqttClient myMqttClient() {
MyMqttClient myMqttClient = new MyMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession);
String profiles = env.getProperty("runtime.active");
//正式环境才连接mqtt
if(profiles.contains("prod")){
log.warn("-------------正式环境,MQTT启动初始化---------");
for (int i = 0; i < 10; i++) {
try {
myMqttClient.connect();
// 这里可以订阅主题推荐放到 MqttCallbackExtended.connectComplete方法中
//myMqttClient.subscribe("ABC", 1);
return myMqttClient;
} catch (MqttException e) {
log.error("== MqttConfig ==> MQTT connect exception, connect time = {}", i);
log.error("=========mqtt连接异常"+e.getMessage());
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}else{
log.warn("-------------非正式环境跳过Mqtt订阅---------");
}
return myMqttClient;
}
}

View File

@ -1,45 +0,0 @@
package org.jeecg.modules.mqtt;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ConditionalOnProperty(
prefix = "iot.mq",
name = "enabled",
havingValue = "true",
matchIfMissing = false // 默认注册除非显式设置为 false
)
public interface MqttService {
/**
* 添加订阅主题
*
* @param topic 主题名称
*/
void addTopic(String topic);
/**
* 取消订阅主题
*
* @param topic 主题名称
*/
void removeTopic(String topic);
/**
* 发布主题消息内容
*
* @param msgContent
* @param topic
*/
void publish(String topic,String msgContent);
/**
* 发布主题消息内容
*
* @param msgContent
* @param topic
*/
void publish(String topic,byte[] msgContent);
}

View File

@ -1,161 +0,0 @@
package org.jeecg.modules.mqtt;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jeecg.common.iot.enums.IotInerfaceTopicType;
import org.jeecg.common.iot.up.DeviceActionVo;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.appmana.service.ISurvIotVirtualDeviceService;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {
//手动注入
private MqttConfig mqttConfig = SpringContextUtils.getBean(MqttConfig.class);
private ISurvIotVirtualDeviceService deviceService = SpringContextUtils.getBean(ISurvIotVirtualDeviceService.class);
private static RedisTemplate redisTemplate = SpringContextUtils.getBean("redisTemplate", RedisTemplate.class);
private MyMqttClient myMqttClient;
public MyMqttCallback(MyMqttClient myMqttClient) {
this.myMqttClient = myMqttClient;
}
/**
* MQTT Broker连接成功时被调用的方法在该方法中可以执行 订阅系统约定的主题推荐使用
* 如果 MQTT Broker断开连接之后又重新连接成功时主题也需要再次订阅将重新订阅主题放在连接成功后的回调方法中比较合理
*
* @param reconnect
* @param serverURI MQTT Broker的url
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
String connectMode = reconnect ? "重连" : "直连";
log.info("== MyMqttCallback ==> MQTT 连接成功,连接方式:{}serverURI{}", connectMode, serverURI);
//订阅主题
//查询所有需要订阅的mqtt主题
List<String> urlList = deviceService.getAllMqttTopic();
//去重
urlList = urlList.stream()
.distinct()
.collect(Collectors.toList());
for (String url : urlList) {//共享主题语法
myMqttClient.subscribe(url,1);
}
log.info("== MyMqttCallback ==> 连接方式:{}订阅主题成功topic{}", connectMode, String.join(",", urlList));
}
/**
* 丢失连接可在这里做重连
* 只会调用一次
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("== MyMqttCallback ==> connectionLost 连接断开5S之后尝试重连: {}", throwable.getMessage());
long reconnectTimes = 1;
while (true) {
try {
if (MyMqttClient.getClient().isConnected()) {
//判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete方法里面 看你们自己选择
log.warn("== MyMqttCallback ==> mqtt reconnect success end 重新连接 重新订阅成功");
return;
}
reconnectTimes += 1;
log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
MyMqttClient.getClient().reconnect();
} catch (MqttException e) {
log.error("== MyMqttCallback ==> mqtt断连异常", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
}
}
}
/**
* 接收到消息subscribe订阅的主题消息时被调用的方法
*
* @param topic
* @param mqttMessage
* @throws Exception 后得到的消息会执行到这里面
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
String o1 = new String(mqttMessage.getPayload());
log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, o1);
try {
if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) {
log.warn("fe-check=======================" + mqttMessage.getPayload());
//识别主题确认设备 todo 处理消息逻辑
} else if (topic.startsWith(IotInerfaceTopicType.LH_IOT_TOPIC_DOWN.getCode())) {//蓝海虚拟设备下行逻辑
//暂时无业务
} else if (topic.contains(IotInerfaceTopicType.LH_IOT_TOPIC_UP.getCode())) {//蓝海虚拟设备上行逻辑处理控制
// 尝试获取锁等待3秒重试间隔100ms
try {
// Boolean lock = RedisDistributedLock2.getLock("mqtt_receive_logic:__"+topic+"_"+o1.hashCode(),2);
// if(lock){
// System.out.println("锁测试" + o1 + " on topic: " + topic + " - " + lock);
// } else {
// System.out.println("锁测试" + o1 + " on topic: " + topic+ " - " + lock);
// return;
// }
log.warn("蓝海-check=======================" + mqttMessage.getPayload());
//解析数据获取请求id
String message = o1;
DeviceActionVo deviceQueryActionVo = JSONObject.toJavaObject(JSONObject.parseObject(message), DeviceActionVo.class);
//设备响应处理查询控制
deviceService.processIotAction(deviceQueryActionVo);
}catch (Exception e){
log.error("======处理蓝海主题设备报错================"+e.getMessage());
}
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 消息发送publish完成时被调用的方法
*
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("== MyMqttCallback ==> deliveryComplete 消息发送完成Complete= {}", iMqttDeliveryToken.isComplete());
}
// 指定最小和最大范围
public static void randomSleep(int minMs, int maxMs) {
try {
Random random = new Random();
if (minMs >= maxMs) {
throw new IllegalArgumentException("最小值必须小于最大值");
}
int sleepTime = minMs + random.nextInt(maxMs - minMs + 1);
System.out.println("随机休眠: " + sleepTime + "ms");
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -1,209 +0,0 @@
package org.jeecg.modules.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@Slf4j
public class MyMqttClient {
/**
* MQTT Broker 基本连接参数用户名密码为非必选参数
*/
private String host;
private String username;
private String password;
private String clientId;
private int timeout;
private int keepalive;
private boolean clearSession;
/**
* MQTT 客户端
*/
private static MqttClient client;
public static MqttClient getClient() {
return client;
}
public static void setClient(MqttClient client) {
MyMqttClient.client = client;
}
public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) {
this.host = host;
this.username = username;
this.password = password;
this.clientId = clientId;
this.timeout = timeOut;
this.keepalive = keepAlive;
this.clearSession = clearSession;
}
/**
* 设置 MQTT Broker 基本连接参数
*
* @param username
* @param password
* @param timeout
* @param keepalive
* @return
*/
public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setCleanSession(clearSession);
options.setAutomaticReconnect(true);
return options;
}
/**
* 连接 MQTT Broker得到 MqttClient连接对象
*/
public void connect() throws MqttException {
if (client == null) {
client = new MqttClient(host, clientId, new MemoryPersistence());
// 设置回调
client.setCallback(new MyMqttCallback(MyMqttClient.this));
}
// 连接参数
MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
if (!client.isConnected()) {
client.connect(mqttConnectOptions);
} else {
client.disconnect();
client.connect(mqttConnectOptions);
}
log.info("== MyMqttClient ==> MQTT connect success");//未发生异常则连接成功
}
/**
* 发布默认qos为0非持久化
*
* @param pushMessage
* @param topic
*/
public void publish(String pushMessage, String topic) {
publish(pushMessage, topic, 2, false);
}
/**
* 发布默认qos为0非持久化
*
* @param pushMessage
* @param topic
*/
public void publish(byte[] pushMessage, String topic) {
publish(pushMessage, topic, 2, false);//至少一次
}
/**
* 发布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(String pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
if (null == mqttTopic) {
log.error("== MyMqttClient ==> topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充
try {
token = mqttTopic.publish(message);//也是发送到执行队列中等待执行线程执行将消息发送到消息中间件
token.waitForCompletion(3000L);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 发布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(byte[] pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage);
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
if (null == mqttTopic) {
log.error("== byte MyMqttClient ==> topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充
try {
token = mqttTopic.publish(message);//也是发送到执行队列中等待执行线程执行将消息发送到消息中间件
token.waitForCompletion(1000L);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 订阅某个主题qos默认为0
*
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 2);
}
/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
MyMqttClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
log.info("== MyMqttClient ==> 订阅主题成功topic = {} qos = {}", topic, qos);
}
/**
* 取消订阅主题
*
* @param topic 主题名称
*/
public void cleanTopic(String topic) {
if (client != null && client.isConnected()) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
log.error("== MyMqttClient ==> 取消订阅失败!");
}
log.info("== MyMqttClient ==> 取消订阅主题成功topic = {}", topic);
}
}

View File

@ -1,37 +0,0 @@
package org.jeecg.modules.mqtt;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/mqtt")
@Api(value = "MyMqttController", tags = {"MQTT相关操作接口"})
public class MyMqttController {
@Autowired(required = false)
private MqttService mqttService;
@GetMapping("/addTopic")
@ApiOperation(value = "添加订阅主题接口")
public void addTopic(String topic) {
mqttService.addTopic(topic);
}
@GetMapping("/removeTopic")
@ApiOperation(value = "取消订阅主题接口")
public void removeTopic(String topic) {
mqttService.removeTopic(topic);
}
@PostMapping("/removeTopic")
@ApiOperation(value = "发布主题消息内容接口")
public void removeTopic(String msgContent, String topic) {
mqttService.publish(msgContent, topic);
}
}

View File

@ -0,0 +1,148 @@
package org.jeecg.modules.mqtt.config;
import org.jeecg.modules.appmana.service.ISurvIotVirtualDeviceService;
import org.jeecg.modules.mqtt.handler.MqttCallbackHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true")
public class MqttConfig {
private final MqttProperties mqttProperties;
private final MqttCallbackHandler mqttCallbackHandler; // 这里注入
private MqttClient mqttClient;
private final ISurvIotVirtualDeviceService deviceService;
@Bean
public MqttClient mqttClient() {
try {
log.info("========== 初始化 MQTT 客户端 ==========");
// 1. 创建客户端
mqttClient = new MqttClient(
mqttProperties.getBroker().getUrl(),
mqttProperties.getClient().getId(),
new MemoryPersistence()
);
// 2. MqttClient 注入到 CallbackHandler
mqttCallbackHandler.setMqttClient(mqttClient);
// 3. 设置回调
mqttClient.setCallback(mqttCallbackHandler);
// 4. 配置连接选项
MqttConnectOptions options = createConnectOptions();
// 5. 连接
mqttClient.connect(options);
log.info("MQTT 连接成功!");
// 6. 订阅主题
subscribeTopics();
return mqttClient;
} catch (MqttException e) {
log.error("MQTT 连接失败", e);
throw new RuntimeException("MQTT 连接失败", e);
}
}
private void subscribeTopics() {
try {
// // 订阅接收主题
// if (mqttProperties.getTopics().getReceive() != null) {
// mqttClient.subscribe(
// mqttProperties.getTopics().getReceive(),
// mqttProperties.getQos()
// );
// log.info("订阅主题: {}", mqttProperties.getTopics().getReceive());
// }
//
// // 订阅状态主题
// if (mqttProperties.getTopics().getStatus() != null) {
// mqttClient.subscribe(
// mqttProperties.getTopics().getStatus(),
// mqttProperties.getQos()
// );
// log.info("订阅主题: {}", mqttProperties.getTopics().getStatus());
// }
//
// // 订阅设备通配符主题
// if (mqttProperties.getTopics().getDevice() != null) {
// mqttClient.subscribe(
// mqttProperties.getTopics().getDevice(),
// mqttProperties.getQos()
// );
// log.info("订阅通配符主题: {}", mqttProperties.getTopics().getDevice());
// }
//查询所有需要订阅的mqtt主题
// List<String> urlList = deviceService.getAllMqttTopic();
List<String> urlList = new ArrayList<>();
if(!urlList.isEmpty()) {
//去重
urlList = urlList.stream()
.distinct()
.collect(Collectors.toList());
for (String url : urlList) {//共享主题语法
mqttClient.subscribe(
url,
mqttProperties.getQos()
);
}
}
log.error("== MyMqttCallback ==> 订阅主题成功topic{}", String.join(",", urlList));
} catch (MqttException e) {
log.error("订阅主题失败", e);
}
}
private MqttConnectOptions createConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(mqttProperties.getClient().getCleanSession());
options.setConnectionTimeout(mqttProperties.getConnection().getTimeout());
options.setKeepAliveInterval(mqttProperties.getConnection().getKeepAlive());
if (mqttProperties.getConnection().getAutomaticReconnect() != null) {
options.setAutomaticReconnect(mqttProperties.getConnection().getAutomaticReconnect());
}
if (mqttProperties.getAuth().getUsername() != null) {
options.setUserName(mqttProperties.getAuth().getUsername());
options.setPassword(mqttProperties.getAuth().getPassword().toCharArray());
}
return options;
}
@PreDestroy
public void destroy() {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.disconnect();
log.info("MQTT 客户端已断开连接");
} catch (MqttException e) {
log.error("断开 MQTT 连接失败", e);
}
}
}
}

View File

@ -0,0 +1,52 @@
package org.jeecg.modules.mqtt.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
private Boolean enabled;
private Broker broker = new Broker();
private Client client = new Client();
private Connection connection = new Connection();
private Auth auth = new Auth();
private Topics topics = new Topics();
private Integer qos;
private Boolean retained;
@Data
public static class Broker {
private String url;
}
@Data
public static class Client {
private String id;
private Boolean cleanSession;
}
@Data
public static class Connection {
private Integer timeout;
private Integer keepAlive;
private Integer maxReconnectDelay;
private Boolean automaticReconnect;
}
@Data
public static class Auth {
private String username;
private String password;
}
@Data
public static class Topics {
private String send;
private String receive;
private String status;
private String device;
}
}

View File

@ -0,0 +1,112 @@
package org.jeecg.modules.mqtt.controller;
import org.jeecg.modules.mqtt.service.MqttService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RestController
@RequestMapping("/api/mqtt")
@RequiredArgsConstructor
public class MqttController {
@Autowired(required = false)
private final MqttService mqttService;
@PostMapping("/publish")
public ResponseEntity<Map<String, Object>> publish(@RequestBody Map<String, Object> request) {
String topic = (String) request.get("topic");
String message = (String) request.get("message");
boolean success = mqttService.sendMessage(topic, message);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("topic", topic);
response.put("message", message);
return ResponseEntity.ok(response);
}
@PostMapping("/publish/json")
public ResponseEntity<Map<String, Object>> publishJson(@RequestBody Map<String, Object> request) {
String topic = (String) request.get("topic");
Object data = request.get("data");
boolean success = mqttService.sendJsonMessage(topic, data);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("topic", topic);
return ResponseEntity.ok(response);
}
@PostMapping("/device/command")
public ResponseEntity<Map<String, Object>> sendDeviceCommand(@RequestBody Map<String, Object> request) {
String deviceId = (String) request.get("deviceId");
String command = (String) request.get("command");
Object params = request.get("params");
boolean success = mqttService.sendDeviceCommand(deviceId, command, params);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("deviceId", deviceId);
response.put("command", command);
return ResponseEntity.ok(response);
}
@PostMapping("/subscribe")
public ResponseEntity<Map<String, Object>> subscribe(@RequestBody Map<String, Object> request) {
String topic = (String) request.get("topic");
Integer qos = (Integer) request.getOrDefault("qos", 1);
boolean success = mqttService.subscribe(topic, qos);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("topic", topic);
return ResponseEntity.ok(response);
}
@PostMapping("/unsubscribe")
public ResponseEntity<Map<String, Object>> unsubscribe(@RequestBody Map<String, Object> request) {
String topic = (String) request.get("topic");
boolean success = mqttService.unsubscribe(topic);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("topic", topic);
return ResponseEntity.ok(response);
}
@GetMapping("/status")
public ResponseEntity<Map<String, Object>> status() {
Map<String, Object> status = new HashMap<>();
status.put("connected", mqttService.isConnected());
status.put("clientId", mqttService.getClientId());
return ResponseEntity.ok(status);
}
@PostMapping("/reconnect")
public ResponseEntity<Map<String, Object>> reconnect() {
boolean success = mqttService.reconnect();
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("message", success ? "重连成功" : "重连失败");
return ResponseEntity.ok(response);
}
}

View File

@ -0,0 +1,63 @@
package org.jeecg.modules.mqtt.handler;
import org.jeecg.modules.mqtt.service.MessageProcessor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class MqttCallbackHandler implements MqttCallback {
private final MessageProcessor messageProcessor;
private MqttClient mqttClient; // 改为 Setter 注入
public MqttCallbackHandler(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor;
}
/**
* 通过 Setter 注入 MqttClient避免循环依赖
*/
public void setMqttClient(MqttClient mqttClient) {
this.mqttClient = mqttClient;
log.info("MqttClient 已注入到 CallbackHandler");
}
@PostConstruct
public void init() {
log.info("MQTT 回调处理器已初始化");
}
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT 连接丢失,原因: {}", cause.getMessage(), cause);
if (mqttClient != null && !mqttClient.isConnected()) {
log.info("等待客户端自动重连...");
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
log.info("收到消息 - 主题: {}, QoS: {}", topic, message.getQos());
// 异步处理消息
messageProcessor.processAsync(topic, payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
log.debug("消息发送完成 - Message ID: {}", token.getMessageId());
} catch (Exception e) {
log.error("处理消息发送完成回调失败", e);
}
}
}

View File

@ -1,49 +0,0 @@
package org.jeecg.modules.mqtt.impl;
import org.jeecg.modules.mqtt.MqttService;
import org.jeecg.modules.mqtt.MyMqttClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@ConditionalOnProperty(
prefix = "iot.mq",
name = "enabled",
havingValue = "true",
matchIfMissing = false // 默认注册除非显式设置为 false
)
@Service
public class MqttServiceImpl implements MqttService {
@Autowired
private MyMqttClient myMqttClient;
@Override
public void addTopic(String topic) {
myMqttClient.subscribe(topic);
}
@Override
public void removeTopic(String topic) {
myMqttClient.cleanTopic(topic);
}
@Override
public void publish(String topic,String msgContent) {
//MyXxxMqttMsg 转Json
// LhMqttMsg myXxxMqttMsg = new LhMqttMsg();
// myXxxMqttMsg.setContent(msgContent);
// myXxxMqttMsg.setTimestamp(System.currentTimeMillis());
// // TODO Md5值
// myXxxMqttMsg.setMd5(UUID.randomUUID().toString());
// String msgJson = JSON.toJSONString(myXxxMqttMsg);
//发布消息
myMqttClient.publish(msgContent, topic);
}
@Override
public void publish(String topic,byte[] msgContent) {
myMqttClient.publish(msgContent, topic);
}
}

View File

@ -0,0 +1,178 @@
package org.jeecg.modules.mqtt.listener;
import org.jeecg.modules.mqtt.config.MqttProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
@EnableScheduling
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true")
public class MqttReconnectListener {
private final MqttClient mqttClient;
private final MqttProperties mqttProperties;
private AtomicBoolean isReconnecting = new AtomicBoolean(false);
private int reconnectCount = 0;
private static final int MAX_RECONNECT_COUNT = 5;
private static final long RECONNECT_INTERVAL = 30000; // 30秒
@PostConstruct
public void init() {
log.info("MQTT 重连监听器已启动");
}
@EventListener(ContextRefreshedEvent.class)
public void onApplicationReady() {
log.info("应用启动完成MQTT 连接状态: {}", mqttClient.isConnected());
}
/**
* 定时检查连接状态并重连
*/
@Scheduled(fixedDelay = 10000) // 每10秒检查一次
public void checkAndReconnect() {
// 如果已经连接重置重连计数
if (mqttClient.isConnected()) {
if (reconnectCount > 0) {
log.info("MQTT 连接已恢复,重置重连计数器");
reconnectCount = 0;
isReconnecting.set(false);
}
return;
}
// 如果正在重连跳过
if (isReconnecting.get()) {
return;
}
// 检查重连次数
if (reconnectCount >= MAX_RECONNECT_COUNT) {
log.error("MQTT 重连失败次数已达上限 ({}),停止重连", MAX_RECONNECT_COUNT);
return;
}
// 执行重连
attemptReconnect();
}
@Async
public void attemptReconnect() {
if (!isReconnecting.compareAndSet(false, true)) {
return;
}
try {
reconnectCount++;
log.info("尝试重连 MQTT (第 {}/{} 次)...", reconnectCount, MAX_RECONNECT_COUNT);
// 等待一段时间再重连
Thread.sleep(RECONNECT_INTERVAL);
// 重新连接
if (!mqttClient.isConnected()) {
MqttConnectOptions options = createConnectOptions();
mqttClient.connect(options);
log.info("MQTT 重连成功!");
// 重连成功后重新订阅主题
resubscribeTopics();
// 重置重连计数
reconnectCount = 0;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("重连被中断");
} catch (MqttException e) {
log.error("MQTT 重连失败", e);
// 如果重连次数达到上限发送告警
if (reconnectCount >= MAX_RECONNECT_COUNT) {
sendReconnectAlert();
}
} finally {
isReconnecting.set(false);
}
}
private MqttConnectOptions createConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(mqttProperties.getClient().getCleanSession());
options.setConnectionTimeout(mqttProperties.getConnection().getTimeout());
options.setKeepAliveInterval(mqttProperties.getConnection().getKeepAlive());
if (mqttProperties.getAuth().getUsername() != null) {
options.setUserName(mqttProperties.getAuth().getUsername());
options.setPassword(mqttProperties.getAuth().getPassword().toCharArray());
}
return options;
}
private void resubscribeTopics() {
try {
log.info("重新订阅主题...");
if (mqttProperties.getTopics().getReceive() != null) {
mqttClient.subscribe(mqttProperties.getTopics().getReceive(),
mqttProperties.getQos());
log.info("重新订阅: {}", mqttProperties.getTopics().getReceive());
}
if (mqttProperties.getTopics().getStatus() != null) {
mqttClient.subscribe(mqttProperties.getTopics().getStatus(),
mqttProperties.getQos());
log.info("重新订阅: {}", mqttProperties.getTopics().getStatus());
}
if (mqttProperties.getTopics().getDevice() != null) {
mqttClient.subscribe(mqttProperties.getTopics().getDevice(),
mqttProperties.getQos());
log.info("重新订阅: {}", mqttProperties.getTopics().getDevice());
}
// 发送上线消息
if (mqttProperties.getTopics().getStatus() != null) {
mqttClient.publish(
mqttProperties.getTopics().getStatus(),
"online".getBytes(),
mqttProperties.getQos(),
true
);
log.info("已发送上线状态");
}
} catch (MqttException e) {
log.error("重新订阅主题失败", e);
}
}
private void sendReconnectAlert() {
log.error("========== MQTT 重连告警 ==========");
log.error("MQTT 服务器: {}", mqttProperties.getBroker().getUrl());
log.error("重连失败次数: {}", MAX_RECONNECT_COUNT);
log.error("请检查 MQTT 服务器状态和网络连接");
log.error("==================================");
// 这里可以添加告警通知如发送邮件短信等
// emailService.sendAlert("MQTT连接失败");
}
}

View File

@ -0,0 +1,176 @@
package org.jeecg.modules.mqtt.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.iot.enums.IotInerfaceTopicType;
import org.jeecg.modules.appmana.service.impl.CommonServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProcessor {
private final ObjectMapper objectMapper;
private final Map<String, MessageHandler> handlers = new ConcurrentHashMap<>();
@Autowired
private CommonServiceImpl commonService ;
/**
* 同步处理消息
*/
public void process(String topic, String payload) {
log.debug("处理消息 - 主题: {}, 负载: {}", topic, payload);
try {
// 根据主题选择处理器
MessageHandler handler = findHandler(topic);
if (handler != null) {
handler.handle(topic, payload);
} else {
handleDefault(topic, payload);
}
} catch (Exception e) {
log.error("处理消息失败", e);
}
}
/**
* 异步处理消息
*/
@Async
public void processAsync(String topic, String payload) {
process(topic, payload);
}
/**
* 处理设备控制消息
*/
private void handleDeviceControl(String topic, String payload) {
try {
JsonNode json = objectMapper.readTree(payload);
String deviceId = extractDeviceId(topic);
String command = json.get("command").asText();
log.info("设备控制 - deviceId: {}, command: {}, 参数: {}",
deviceId, command, json.get("params"));
// 处理设备控制逻辑
switch (command) {
case "turn_on":
log.info("开启设备: {}", deviceId);
// 开启设备业务逻辑
break;
case "turn_off":
log.info("关闭设备: {}", deviceId);
// 关闭设备业务逻辑
break;
case "reboot":
log.info("重启设备: {}", deviceId);
// 重启设备业务逻辑
break;
case "update_config":
log.info("更新设备配置: {}", deviceId);
// 更新设备配置业务逻辑
break;
default:
log.warn("未知命令: {}", command);
}
} catch (Exception e) {
log.error("处理设备控制消息失败", e);
}
}
/**
* 处理默认消息
*/
private void handleDefault(String topic, String payload) {
log.info("默认处理 - 主题: {}, 消息: {}", topic, payload);
// 可以根据需要存储到数据库或转发到其他系统
// 例如: saveToDatabase(topic, payload);
}
/**
* 从主题中提取设备ID
* 例如: lanhai/device/DEV001/command -> DEV001
* lanhai/device/DEV001/status -> DEV001
*/
private String extractDeviceId(String topic) {
String[] parts = topic.split("/");
if (parts.length >= 3 && "device".equals(parts[1])) {
return parts[2];
}
return null;
}
/**
* 查找处理器
*/
private MessageHandler findHandler(String topic) {
// 设备上报回执主题
if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) {
return this::handleDeviceRep;
}
// // 设备状态主题
// if (topic.contains("/device/") && topic.endsWith("/status")) {
// return this::handleDeviceStatus;
// }
//
// // 自定义处理器
// for (Map.Entry<String, MessageHandler> entry : handlers.entrySet()) {
// if (topic.matches(entry.getKey())) {
// return entry.getValue();
// }
// }
return null;
}
/**
* 处理设备状态上报
*/
private void handleDeviceStatus(String topic, String payload) {
try {
JsonNode json = objectMapper.readTree(payload);
String deviceId = extractDeviceId(topic);
String status = json.get("status").asText();
log.info("设备状态上报 - deviceId: {}, status: {}, 数据: {}",
deviceId, status, payload);
// 处理设备状态更新
// 例如: updateDeviceStatus(deviceId, status, json);
} catch (Exception e) {
log.error("处理设备状态消息失败", e);
}
}
/**
* 注册自定义处理器
*/
public void registerHandler(String topicPattern, MessageHandler handler) {
handlers.put(topicPattern, handler);
log.info("注册消息处理器: {}", topicPattern);
}
@FunctionalInterface
public interface MessageHandler {
void handle(String topic, String payload);
}
/**
* 处理设备监测回执消息
*/
private void handleDeviceRep(String topic, String payload) {
}
}

View File

@ -0,0 +1,176 @@
package org.jeecg.modules.mqtt.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jeecg.modules.mqtt.config.MqttProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Slf4j
@Service
@RequiredArgsConstructor
public class MqttService {
private final MqttClient mqttClient;
private final MqttProperties mqttProperties;
private final ObjectMapper objectMapper;
/**
* 发送消息
*/
public boolean sendMessage(String topic, String payload) {
return sendMessage(topic, payload, mqttProperties.getQos(), mqttProperties.getRetained());
}
/**
* 发送消息自定义QoS
*/
public boolean sendMessage(String topic, String payload, int qos, boolean retained) {
try {
if (!mqttClient.isConnected()) {
log.warn("MQTT 未连接,消息发送失败: {}", payload);
return false;
}
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
message.setRetained(retained);
message.setId((int) (System.currentTimeMillis() % 100000));
mqttClient.publish(topic, message);
log.debug("消息发送成功 - 主题: {}, QoS: {}, 消息: {}", topic, qos,
payload.length() > 100 ? payload.substring(0, 100) + "..." : payload);
return true;
} catch (MqttException e) {
log.error("消息发送失败", e);
return false;
}
}
/**
* 发送 JSON 消息
*/
public boolean sendJsonMessage(String topic, Object data) {
try {
String json = objectMapper.writeValueAsString(data);
return sendMessage(topic, json);
} catch (JsonProcessingException e) {
log.error("JSON 序列化失败", e);
return false;
}
}
/**
* 异步发送消息
*/
public CompletableFuture<Boolean> sendMessageAsync(String topic, String payload) {
return CompletableFuture.supplyAsync(() -> sendMessage(topic, payload));
}
/**
* 发送设备控制指令
*/
public boolean sendDeviceCommand(String deviceId, String command, Object params) {
String topic = String.format("lanhai/device/%s/command", deviceId);
try {
Map<String, Object> message = new HashMap<>();
message.put("command", command);
message.put("params", params);
message.put("timestamp", System.currentTimeMillis());
message.put("requestId", UUID.randomUUID().toString());
return sendJsonMessage(topic, message);
} catch (Exception e) {
log.error("发送设备指令失败", e);
return false;
}
}
/**
* 订阅主题
*/
public boolean subscribe(String topic, int qos) {
try {
mqttClient.subscribe(topic, qos);
log.info("订阅主题: {}", topic);
return true;
} catch (MqttException e) {
log.error("订阅失败", e);
return false;
}
}
/**
* 取消订阅
*/
public boolean unsubscribe(String topic) {
try {
mqttClient.unsubscribe(topic);
log.info("取消订阅: {}", topic);
return true;
} catch (MqttException e) {
log.error("取消订阅失败", e);
return false;
}
}
/**
* 检查连接状态
*/
public boolean isConnected() {
return mqttClient.isConnected();
}
/**
* 获取客户端ID
*/
public String getClientId() {
return mqttProperties.getClient().getId();
}
/**
* 手动重连
*/
public boolean reconnect() {
try {
if (!mqttClient.isConnected()) {
mqttClient.reconnect();
log.info("手动重连成功");
return true;
}
return false;
} catch (MqttException e) {
log.error("手动重连失败", e);
return false;
}
}
/**
* 断开连接
*/
public void disconnect() {
try {
if (mqttClient.isConnected()) {
// 发送离线状态
if (mqttProperties.getTopics().getStatus() != null) {
sendMessage(mqttProperties.getTopics().getStatus(), "offline", 1, true);
}
mqttClient.disconnect();
log.info("MQTT 连接已断开");
}
} catch (MqttException e) {
log.error("断开连接失败", e);
}
}
}