From e4051528058f21bb52770dc9430c09b5f4e36e4c Mon Sep 17 00:00:00 2001 From: zy <82248909@qq.com> Date: Mon, 30 Mar 2026 18:37:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=8B=89=E5=8F=96=E7=BB=A7?= =?UTF-8?q?=E7=94=B5=E5=99=A8=E6=97=B6=EF=BC=8C=E5=90=8C=E6=97=B6=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2Mqtt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/IotCommonP3ServiceImpl.java | 5 +- .../impl/SurvIotVirtualDeviceServiceImpl.java | 6 +- .../java/org/jeecg/modules/mqtt/DTOMqtt.java | 13 -- .../org/jeecg/modules/mqtt/LhMqttMsg.java | 34 --- .../org/jeecg/modules/mqtt/MqttConfig.java | 71 ------ .../org/jeecg/modules/mqtt/MqttService.java | 45 ---- .../jeecg/modules/mqtt/MyMqttCallback.java | 161 -------------- .../org/jeecg/modules/mqtt/MyMqttClient.java | 209 ------------------ .../jeecg/modules/mqtt/MyMqttController.java | 37 ---- .../jeecg/modules/mqtt/config/MqttConfig.java | 148 +++++++++++++ .../modules/mqtt/config/MqttProperties.java | 52 +++++ .../mqtt/controller/MqttController.java | 112 ++++++++++ .../mqtt/handler/MqttCallbackHandler.java | 63 ++++++ .../modules/mqtt/impl/MqttServiceImpl.java | 49 ---- .../mqtt/listener/MqttReconnectListener.java | 178 +++++++++++++++ .../mqtt/service/MessageProcessor.java | 176 +++++++++++++++ .../modules/mqtt/service/MqttService.java | 176 +++++++++++++++ 17 files changed, 911 insertions(+), 624 deletions(-) delete mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/DTOMqtt.java delete mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/LhMqttMsg.java delete mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MqttConfig.java delete mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MqttService.java delete mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttCallback.java delete mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttClient.java delete mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttController.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/config/MqttConfig.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/config/MqttProperties.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/controller/MqttController.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/handler/MqttCallbackHandler.java delete mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/impl/MqttServiceImpl.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/listener/MqttReconnectListener.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/service/MessageProcessor.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/service/MqttService.java diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/service/impl/IotCommonP3ServiceImpl.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/service/impl/IotCommonP3ServiceImpl.java index 7240cec..1b076c8 100644 --- a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/service/impl/IotCommonP3ServiceImpl.java +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/service/impl/IotCommonP3ServiceImpl.java @@ -18,7 +18,8 @@ import org.jeecg.common.vo.*; import org.jeecg.common.vo.statistic.DTOIotSummray; import org.jeecg.modules.appmana.service.IScEquZhibiaoService; import org.jeecg.modules.appmana.utils.Iotutils; -import org.jeecg.modules.mqtt.MqttService; + +import org.jeecg.modules.mqtt.service.MqttService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -739,7 +740,7 @@ public class IotCommonP3ServiceImpl { if(deploy!=null && variables!=null && !variables.isEmpty()){ QueryCmd queryCmd = LhIotUtil.ConstructCmd(variables); String cmdStr = JSONObject.toJSONString(queryCmd); - mqttService.publish(deploy.getDeviceIotUrl(),cmdStr); + mqttService.sendMessage(deploy.getDeviceIotUrl(),cmdStr); } return result; } diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/service/impl/SurvIotVirtualDeviceServiceImpl.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/service/impl/SurvIotVirtualDeviceServiceImpl.java index fb1ea75..0cd5b63 100644 --- a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/service/impl/SurvIotVirtualDeviceServiceImpl.java +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/service/impl/SurvIotVirtualDeviceServiceImpl.java @@ -29,7 +29,7 @@ import org.jeecg.modules.appmana.mapper.IOTStatisticMapper; import org.jeecg.modules.appmana.mapper.SurvIotVirtualDeviceMapper; import org.jeecg.modules.appmana.service.ISurvIotVirtualDeviceService; import org.jeecg.modules.appmana.utils.CommonUtils; -import org.jeecg.modules.mqtt.MqttService; +import org.jeecg.modules.mqtt.service.MqttService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -690,9 +690,9 @@ public class SurvIotVirtualDeviceServiceImpl extends ServiceImpl MQTT connect exception, connect time = {}", i); - log.error("=========mqtt连接异常:"+e.getMessage()); - try { - Thread.sleep(2000); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } - } - }else{ - log.warn("-------------非正式环境跳过Mqtt订阅---------"); - } - return myMqttClient; - } - -} diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MqttService.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MqttService.java deleted file mode 100644 index efd0670..0000000 --- a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MqttService.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.jeecg.modules.mqtt; - - -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; - -@ConditionalOnProperty( - prefix = "iot.mq", - name = "enabled", - havingValue = "true", - matchIfMissing = false // 默认注册,除非显式设置为 false -) -public interface MqttService { - - /** - * 添加订阅主题 - * - * @param topic 主题名称 - */ - void addTopic(String topic); - - /** - * 取消订阅主题 - * - * @param topic 主题名称 - */ - void removeTopic(String topic); - - /** - * 发布主题消息内容 - * - * @param msgContent - * @param topic - */ - void publish(String topic,String msgContent); - - /** - * 发布主题消息内容 - * - * @param msgContent - * @param topic - */ - void publish(String topic,byte[] msgContent); - -} - diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttCallback.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttCallback.java deleted file mode 100644 index 02eb6f9..0000000 --- a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttCallback.java +++ /dev/null @@ -1,161 +0,0 @@ -package org.jeecg.modules.mqtt; - -import com.alibaba.fastjson.JSONObject; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.jeecg.common.iot.enums.IotInerfaceTopicType; -import org.jeecg.common.iot.up.DeviceActionVo; -import org.jeecg.common.util.SpringContextUtils; -import org.jeecg.modules.appmana.service.ISurvIotVirtualDeviceService; -import org.springframework.data.redis.core.RedisTemplate; - -import java.util.List; -import java.util.Random; -import java.util.stream.Collectors; - -@Slf4j -public class MyMqttCallback implements MqttCallbackExtended { - - //手动注入 - private MqttConfig mqttConfig = SpringContextUtils.getBean(MqttConfig.class); - - private ISurvIotVirtualDeviceService deviceService = SpringContextUtils.getBean(ISurvIotVirtualDeviceService.class); - - private static RedisTemplate redisTemplate = SpringContextUtils.getBean("redisTemplate", RedisTemplate.class); - - - private MyMqttClient myMqttClient; - - public MyMqttCallback(MyMqttClient myMqttClient) { - this.myMqttClient = myMqttClient; - } - - /** - * MQTT Broker连接成功时被调用的方法。在该方法中可以执行 订阅系统约定的主题(推荐使用)。 - * 如果 MQTT Broker断开连接之后又重新连接成功时,主题也需要再次订阅,将重新订阅主题放在连接成功后的回调方法中比较合理。 - * - * @param reconnect - * @param serverURI MQTT Broker的url - */ - @Override - public void connectComplete(boolean reconnect, String serverURI) { - String connectMode = reconnect ? "重连" : "直连"; - log.info("== MyMqttCallback ==> MQTT 连接成功,连接方式:{},serverURI:{}", connectMode, serverURI); - //订阅主题 - //查询所有需要订阅的mqtt主题 - List urlList = deviceService.getAllMqttTopic(); - //去重 - urlList = urlList.stream() - .distinct() - .collect(Collectors.toList()); - for (String url : urlList) {//共享主题语法 - myMqttClient.subscribe(url,1); - } - log.info("== MyMqttCallback ==> 连接方式:{},订阅主题成功,topic:{}", connectMode, String.join(",", urlList)); - } - - - /** - * 丢失连接,可在这里做重连 - * 只会调用一次 - * - * @param throwable - */ - @Override - public void connectionLost(Throwable throwable) { - log.error("== MyMqttCallback ==> connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage()); - long reconnectTimes = 1; - while (true) { - try { - if (MyMqttClient.getClient().isConnected()) { - //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择 - log.warn("== MyMqttCallback ==> mqtt reconnect success end 重新连接 重新订阅成功"); - return; - } - reconnectTimes += 1; - log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes); - MyMqttClient.getClient().reconnect(); - } catch (MqttException e) { - log.error("== MyMqttCallback ==> mqtt断连异常", e); - } - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - } - } - - /** - * 接收到消息(subscribe订阅的主题消息)时被调用的方法 - * - * @param topic - * @param mqttMessage - * @throws Exception 后得到的消息会执行到这里面 - */ - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - String o1 = new String(mqttMessage.getPayload()); - log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, o1); - try { - if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) { - log.warn("fe-check=======================" + mqttMessage.getPayload()); - //识别主题,确认设备 todo 处理消息逻辑 - } else if (topic.startsWith(IotInerfaceTopicType.LH_IOT_TOPIC_DOWN.getCode())) {//蓝海虚拟设备下行逻辑 - //暂时无业务 - } else if (topic.contains(IotInerfaceTopicType.LH_IOT_TOPIC_UP.getCode())) {//蓝海虚拟设备上行逻辑,处理控制 - // 尝试获取锁,等待3秒,重试间隔100ms - try { -// Boolean lock = RedisDistributedLock2.getLock("mqtt_receive_logic:__"+topic+"_"+o1.hashCode(),2); -// if(lock){ -// System.out.println("锁测试" + o1 + " on topic: " + topic + " - " + lock); -// } else { -// System.out.println("锁测试" + o1 + " on topic: " + topic+ " - " + lock); -// return; -// } - log.warn("蓝海-check=======================" + mqttMessage.getPayload()); - //解析数据获取请求id - String message = o1; - DeviceActionVo deviceQueryActionVo = JSONObject.toJavaObject(JSONObject.parseObject(message), DeviceActionVo.class); - //设备响应处理,查询、控制 - deviceService.processIotAction(deviceQueryActionVo); - - }catch (Exception e){ - log.error("======处理蓝海主题设备报错================"+e.getMessage()); - } - } - }catch (Exception e){ - e.printStackTrace(); - } - } - - /** - * 消息发送(publish)完成时被调用的方法 - * - * @param iMqttDeliveryToken - */ - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - log.info("== MyMqttCallback ==> deliveryComplete 消息发送完成,Complete= {}", iMqttDeliveryToken.isComplete()); - } - - - // 指定最小和最大范围 - public static void randomSleep(int minMs, int maxMs) { - try { - Random random = new Random(); - if (minMs >= maxMs) { - throw new IllegalArgumentException("最小值必须小于最大值"); - } - int sleepTime = minMs + random.nextInt(maxMs - minMs + 1); - System.out.println("随机休眠: " + sleepTime + "ms"); - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - -} - diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttClient.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttClient.java deleted file mode 100644 index f85bf23..0000000 --- a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttClient.java +++ /dev/null @@ -1,209 +0,0 @@ -package org.jeecg.modules.mqtt; - -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -@Slf4j -public class MyMqttClient { - - /** - * MQTT Broker 基本连接参数,用户名、密码为非必选参数 - */ - private String host; - private String username; - private String password; - private String clientId; - private int timeout; - private int keepalive; - private boolean clearSession; - - /** - * MQTT 客户端 - */ - private static MqttClient client; - - public static MqttClient getClient() { - return client; - } - - public static void setClient(MqttClient client) { - MyMqttClient.client = client; - } - - public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) { - this.host = host; - this.username = username; - this.password = password; - this.clientId = clientId; - this.timeout = timeOut; - this.keepalive = keepAlive; - this.clearSession = clearSession; - } - - /** - * 设置 MQTT Broker 基本连接参数 - * - * @param username - * @param password - * @param timeout - * @param keepalive - * @return - */ - public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) { - MqttConnectOptions options = new MqttConnectOptions(); - options.setUserName(username); - options.setPassword(password.toCharArray()); - options.setConnectionTimeout(timeout); - options.setKeepAliveInterval(keepalive); - options.setCleanSession(clearSession); - options.setAutomaticReconnect(true); - return options; - } - - /** - * 连接 MQTT Broker,得到 MqttClient连接对象 - */ - public void connect() throws MqttException { - if (client == null) { - client = new MqttClient(host, clientId, new MemoryPersistence()); - // 设置回调 - client.setCallback(new MyMqttCallback(MyMqttClient.this)); - } - // 连接参数 - MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive); - if (!client.isConnected()) { - client.connect(mqttConnectOptions); - } else { - client.disconnect(); - client.connect(mqttConnectOptions); - } - log.info("== MyMqttClient ==> MQTT connect success");//未发生异常,则连接成功 - } - - /** - * 发布,默认qos为0,非持久化 - * - * @param pushMessage - * @param topic - */ - public void publish(String pushMessage, String topic) { - publish(pushMessage, topic, 2, false); - } - - /** - * 发布,默认qos为0,非持久化 - * - * @param pushMessage - * @param topic - */ - public void publish(byte[] pushMessage, String topic) { - publish(pushMessage, topic, 2, false);//至少一次 - } - - /** - * 发布消息 - * - * @param pushMessage - * @param topic - * @param qos - * @param retained:留存 - */ - public void publish(String pushMessage, String topic, int qos, boolean retained) { - MqttMessage message = new MqttMessage(); - message.setPayload(pushMessage.getBytes()); - message.setQos(qos); - message.setRetained(retained); - MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic); - if (null == mqttTopic) { - log.error("== MyMqttClient ==> topic is not exist"); - } - MqttDeliveryToken token;//Delivery:配送 - synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 - try { - token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 - token.waitForCompletion(3000L); - } catch (MqttPersistenceException e) { - e.printStackTrace(); - } catch (MqttException e) { - e.printStackTrace(); - } - } - } - - - /** - * 发布消息 - * - * @param pushMessage - * @param topic - * @param qos - * @param retained:留存 - */ - public void publish(byte[] pushMessage, String topic, int qos, boolean retained) { - MqttMessage message = new MqttMessage(); - message.setPayload(pushMessage); - message.setQos(qos); - message.setRetained(retained); - MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic); - if (null == mqttTopic) { - log.error("== byte MyMqttClient ==> topic is not exist"); - } - MqttDeliveryToken token;//Delivery:配送 - synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 - try { - token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 - token.waitForCompletion(1000L); - } catch (MqttPersistenceException e) { - e.printStackTrace(); - } catch (MqttException e) { - e.printStackTrace(); - } - } - } - - /** - * 订阅某个主题,qos默认为0 - * - * @param topic - */ - public void subscribe(String topic) { - subscribe(topic, 2); - } - - /** - * 订阅某个主题 - * - * @param topic - * @param qos - */ - public void subscribe(String topic, int qos) { - try { - MyMqttClient.getClient().subscribe(topic, qos); - } catch (MqttException e) { - e.printStackTrace(); - } - log.info("== MyMqttClient ==> 订阅主题成功:topic = {}, qos = {}", topic, qos); - } - - - /** - * 取消订阅主题 - * - * @param topic 主题名称 - */ - public void cleanTopic(String topic) { - if (client != null && client.isConnected()) { - try { - client.unsubscribe(topic); - } catch (MqttException e) { - e.printStackTrace(); - } - } else { - log.error("== MyMqttClient ==> 取消订阅失败!"); - } - log.info("== MyMqttClient ==> 取消订阅主题成功:topic = {}", topic); - } - -} - diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttController.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttController.java deleted file mode 100644 index 258e6d3..0000000 --- a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/MyMqttController.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.jeecg.modules.mqtt; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -@RestController -@RequestMapping("/mqtt") -@Api(value = "MyMqttController", tags = {"MQTT相关操作接口"}) -public class MyMqttController { - @Autowired(required = false) - private MqttService mqttService; - - @GetMapping("/addTopic") - @ApiOperation(value = "添加订阅主题接口") - public void addTopic(String topic) { - mqttService.addTopic(topic); - } - - @GetMapping("/removeTopic") - @ApiOperation(value = "取消订阅主题接口") - public void removeTopic(String topic) { - mqttService.removeTopic(topic); - } - - @PostMapping("/removeTopic") - @ApiOperation(value = "发布主题消息内容接口") - public void removeTopic(String msgContent, String topic) { - mqttService.publish(msgContent, topic); - } - -} - diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/config/MqttConfig.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/config/MqttConfig.java new file mode 100644 index 0000000..dd69286 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/config/MqttConfig.java @@ -0,0 +1,148 @@ +package org.jeecg.modules.mqtt.config; + +import org.jeecg.modules.appmana.service.ISurvIotVirtualDeviceService; +import org.jeecg.modules.mqtt.handler.MqttCallbackHandler; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PreDestroy; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Configuration +@RequiredArgsConstructor +@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true") +public class MqttConfig { + + private final MqttProperties mqttProperties; + private final MqttCallbackHandler mqttCallbackHandler; // 这里注入 + private MqttClient mqttClient; + private final ISurvIotVirtualDeviceService deviceService; + + + @Bean + public MqttClient mqttClient() { + try { + log.info("========== 初始化 MQTT 客户端 =========="); + + // 1. 创建客户端 + mqttClient = new MqttClient( + mqttProperties.getBroker().getUrl(), + mqttProperties.getClient().getId(), + new MemoryPersistence() + ); + + // 2. ⭐ 将 MqttClient 注入到 CallbackHandler + mqttCallbackHandler.setMqttClient(mqttClient); + + // 3. 设置回调 + mqttClient.setCallback(mqttCallbackHandler); + + // 4. 配置连接选项 + MqttConnectOptions options = createConnectOptions(); + + // 5. 连接 + mqttClient.connect(options); + log.info("MQTT 连接成功!"); + + // 6. 订阅主题 + subscribeTopics(); + + return mqttClient; + + } catch (MqttException e) { + log.error("MQTT 连接失败", e); + throw new RuntimeException("MQTT 连接失败", e); + } + } + + private void subscribeTopics() { + try { +// // 订阅接收主题 +// if (mqttProperties.getTopics().getReceive() != null) { +// mqttClient.subscribe( +// mqttProperties.getTopics().getReceive(), +// mqttProperties.getQos() +// ); +// log.info("订阅主题: {}", mqttProperties.getTopics().getReceive()); +// } +// +// // 订阅状态主题 +// if (mqttProperties.getTopics().getStatus() != null) { +// mqttClient.subscribe( +// mqttProperties.getTopics().getStatus(), +// mqttProperties.getQos() +// ); +// log.info("订阅主题: {}", mqttProperties.getTopics().getStatus()); +// } +// +// // 订阅设备通配符主题 +// if (mqttProperties.getTopics().getDevice() != null) { +// mqttClient.subscribe( +// mqttProperties.getTopics().getDevice(), +// mqttProperties.getQos() +// ); +// log.info("订阅通配符主题: {}", mqttProperties.getTopics().getDevice()); +// } + + //查询所有需要订阅的mqtt主题 +// List urlList = deviceService.getAllMqttTopic(); + List urlList = new ArrayList<>(); + if(!urlList.isEmpty()) { + //去重 + urlList = urlList.stream() + .distinct() + .collect(Collectors.toList()); + for (String url : urlList) {//共享主题语法 + mqttClient.subscribe( + url, + mqttProperties.getQos() + ); + } + } + log.error("== MyMqttCallback ==> ,订阅主题成功,topic:{}", String.join(",", urlList)); + + } catch (MqttException e) { + log.error("订阅主题失败", e); + } + } + + private MqttConnectOptions createConnectOptions() { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(mqttProperties.getClient().getCleanSession()); + options.setConnectionTimeout(mqttProperties.getConnection().getTimeout()); + options.setKeepAliveInterval(mqttProperties.getConnection().getKeepAlive()); + + if (mqttProperties.getConnection().getAutomaticReconnect() != null) { + options.setAutomaticReconnect(mqttProperties.getConnection().getAutomaticReconnect()); + } + + if (mqttProperties.getAuth().getUsername() != null) { + options.setUserName(mqttProperties.getAuth().getUsername()); + options.setPassword(mqttProperties.getAuth().getPassword().toCharArray()); + } + + return options; + } + + @PreDestroy + public void destroy() { + if (mqttClient != null && mqttClient.isConnected()) { + try { + mqttClient.disconnect(); + log.info("MQTT 客户端已断开连接"); + } catch (MqttException e) { + log.error("断开 MQTT 连接失败", e); + } + } + } +} \ No newline at end of file diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/config/MqttProperties.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/config/MqttProperties.java new file mode 100644 index 0000000..56095a2 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/config/MqttProperties.java @@ -0,0 +1,52 @@ +package org.jeecg.modules.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "mqtt") +public class MqttProperties { + private Boolean enabled; + private Broker broker = new Broker(); + private Client client = new Client(); + private Connection connection = new Connection(); + private Auth auth = new Auth(); + private Topics topics = new Topics(); + private Integer qos; + private Boolean retained; + + @Data + public static class Broker { + private String url; + } + + @Data + public static class Client { + private String id; + private Boolean cleanSession; + } + + @Data + public static class Connection { + private Integer timeout; + private Integer keepAlive; + private Integer maxReconnectDelay; + private Boolean automaticReconnect; + } + + @Data + public static class Auth { + private String username; + private String password; + } + + @Data + public static class Topics { + private String send; + private String receive; + private String status; + private String device; + } +} \ No newline at end of file diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/controller/MqttController.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/controller/MqttController.java new file mode 100644 index 0000000..2488183 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/controller/MqttController.java @@ -0,0 +1,112 @@ +package org.jeecg.modules.mqtt.controller; + +import org.jeecg.modules.mqtt.service.MqttService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.HashMap; +import java.util.Map; + +@Slf4j +@RestController +@RequestMapping("/api/mqtt") +@RequiredArgsConstructor +public class MqttController { + @Autowired(required = false) + private final MqttService mqttService; + + @PostMapping("/publish") + public ResponseEntity> publish(@RequestBody Map request) { + String topic = (String) request.get("topic"); + String message = (String) request.get("message"); + + boolean success = mqttService.sendMessage(topic, message); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("topic", topic); + response.put("message", message); + + return ResponseEntity.ok(response); + } + + @PostMapping("/publish/json") + public ResponseEntity> publishJson(@RequestBody Map request) { + String topic = (String) request.get("topic"); + Object data = request.get("data"); + + boolean success = mqttService.sendJsonMessage(topic, data); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("topic", topic); + + return ResponseEntity.ok(response); + } + + @PostMapping("/device/command") + public ResponseEntity> sendDeviceCommand(@RequestBody Map request) { + String deviceId = (String) request.get("deviceId"); + String command = (String) request.get("command"); + Object params = request.get("params"); + + boolean success = mqttService.sendDeviceCommand(deviceId, command, params); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("deviceId", deviceId); + response.put("command", command); + + return ResponseEntity.ok(response); + } + + @PostMapping("/subscribe") + public ResponseEntity> subscribe(@RequestBody Map request) { + String topic = (String) request.get("topic"); + Integer qos = (Integer) request.getOrDefault("qos", 1); + + boolean success = mqttService.subscribe(topic, qos); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("topic", topic); + + return ResponseEntity.ok(response); + } + + @PostMapping("/unsubscribe") + public ResponseEntity> unsubscribe(@RequestBody Map request) { + String topic = (String) request.get("topic"); + + boolean success = mqttService.unsubscribe(topic); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("topic", topic); + + return ResponseEntity.ok(response); + } + + @GetMapping("/status") + public ResponseEntity> status() { + Map status = new HashMap<>(); + status.put("connected", mqttService.isConnected()); + status.put("clientId", mqttService.getClientId()); + + return ResponseEntity.ok(status); + } + + @PostMapping("/reconnect") + public ResponseEntity> reconnect() { + boolean success = mqttService.reconnect(); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("message", success ? "重连成功" : "重连失败"); + + return ResponseEntity.ok(response); + } +} \ No newline at end of file diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/handler/MqttCallbackHandler.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/handler/MqttCallbackHandler.java new file mode 100644 index 0000000..66521d9 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/handler/MqttCallbackHandler.java @@ -0,0 +1,63 @@ +package org.jeecg.modules.mqtt.handler; + +import org.jeecg.modules.mqtt.service.MessageProcessor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +@Slf4j +@Component +public class MqttCallbackHandler implements MqttCallback { + + private final MessageProcessor messageProcessor; + private MqttClient mqttClient; // 改为 Setter 注入 + + public MqttCallbackHandler(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } + + /** + * 通过 Setter 注入 MqttClient,避免循环依赖 + */ + public void setMqttClient(MqttClient mqttClient) { + this.mqttClient = mqttClient; + log.info("MqttClient 已注入到 CallbackHandler"); + } + + @PostConstruct + public void init() { + log.info("MQTT 回调处理器已初始化"); + } + + @Override + public void connectionLost(Throwable cause) { + log.error("MQTT 连接丢失,原因: {}", cause.getMessage(), cause); + + if (mqttClient != null && !mqttClient.isConnected()) { + log.info("等待客户端自动重连..."); + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String payload = new String(message.getPayload()); + log.info("收到消息 - 主题: {}, QoS: {}", topic, message.getQos()); + + // 异步处理消息 + messageProcessor.processAsync(topic, payload); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + try { + log.debug("消息发送完成 - Message ID: {}", token.getMessageId()); + } catch (Exception e) { + log.error("处理消息发送完成回调失败", e); + } + } +} \ No newline at end of file diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/impl/MqttServiceImpl.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/impl/MqttServiceImpl.java deleted file mode 100644 index 1d06609..0000000 --- a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/impl/MqttServiceImpl.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.jeecg.modules.mqtt.impl; - -import org.jeecg.modules.mqtt.MqttService; -import org.jeecg.modules.mqtt.MyMqttClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; - -@ConditionalOnProperty( - prefix = "iot.mq", - name = "enabled", - havingValue = "true", - matchIfMissing = false // 默认注册,除非显式设置为 false -) -@Service -public class MqttServiceImpl implements MqttService { - - @Autowired - private MyMqttClient myMqttClient; - - @Override - public void addTopic(String topic) { - myMqttClient.subscribe(topic); - } - - @Override - public void removeTopic(String topic) { - myMqttClient.cleanTopic(topic); - } - - @Override - public void publish(String topic,String msgContent) { - //MyXxxMqttMsg 转Json -// LhMqttMsg myXxxMqttMsg = new LhMqttMsg(); -// myXxxMqttMsg.setContent(msgContent); -// myXxxMqttMsg.setTimestamp(System.currentTimeMillis()); -// // TODO Md5值 -// myXxxMqttMsg.setMd5(UUID.randomUUID().toString()); -// String msgJson = JSON.toJSONString(myXxxMqttMsg); - - //发布消息 - myMqttClient.publish(msgContent, topic); - } - - @Override - public void publish(String topic,byte[] msgContent) { - myMqttClient.publish(msgContent, topic); - } -} diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/listener/MqttReconnectListener.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/listener/MqttReconnectListener.java new file mode 100644 index 0000000..5405874 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/listener/MqttReconnectListener.java @@ -0,0 +1,178 @@ +package org.jeecg.modules.mqtt.listener; + +import org.jeecg.modules.mqtt.config.MqttProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +@Component +@EnableScheduling +@RequiredArgsConstructor +@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true") +public class MqttReconnectListener { + + private final MqttClient mqttClient; + private final MqttProperties mqttProperties; + + private AtomicBoolean isReconnecting = new AtomicBoolean(false); + private int reconnectCount = 0; + private static final int MAX_RECONNECT_COUNT = 5; + private static final long RECONNECT_INTERVAL = 30000; // 30秒 + + @PostConstruct + public void init() { + log.info("MQTT 重连监听器已启动"); + } + + @EventListener(ContextRefreshedEvent.class) + public void onApplicationReady() { + log.info("应用启动完成,MQTT 连接状态: {}", mqttClient.isConnected()); + } + + /** + * 定时检查连接状态并重连 + */ + @Scheduled(fixedDelay = 10000) // 每10秒检查一次 + public void checkAndReconnect() { + // 如果已经连接,重置重连计数 + if (mqttClient.isConnected()) { + if (reconnectCount > 0) { + log.info("MQTT 连接已恢复,重置重连计数器"); + reconnectCount = 0; + isReconnecting.set(false); + } + return; + } + + // 如果正在重连,跳过 + if (isReconnecting.get()) { + return; + } + + // 检查重连次数 + if (reconnectCount >= MAX_RECONNECT_COUNT) { + log.error("MQTT 重连失败次数已达上限 ({}),停止重连", MAX_RECONNECT_COUNT); + return; + } + + // 执行重连 + attemptReconnect(); + } + + @Async + public void attemptReconnect() { + if (!isReconnecting.compareAndSet(false, true)) { + return; + } + + try { + reconnectCount++; + log.info("尝试重连 MQTT (第 {}/{} 次)...", reconnectCount, MAX_RECONNECT_COUNT); + + // 等待一段时间再重连 + Thread.sleep(RECONNECT_INTERVAL); + + // 重新连接 + if (!mqttClient.isConnected()) { + MqttConnectOptions options = createConnectOptions(); + mqttClient.connect(options); + log.info("MQTT 重连成功!"); + + // 重连成功后重新订阅主题 + resubscribeTopics(); + + // 重置重连计数 + reconnectCount = 0; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("重连被中断"); + } catch (MqttException e) { + log.error("MQTT 重连失败", e); + + // 如果重连次数达到上限,发送告警 + if (reconnectCount >= MAX_RECONNECT_COUNT) { + sendReconnectAlert(); + } + } finally { + isReconnecting.set(false); + } + } + + private MqttConnectOptions createConnectOptions() { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(mqttProperties.getClient().getCleanSession()); + options.setConnectionTimeout(mqttProperties.getConnection().getTimeout()); + options.setKeepAliveInterval(mqttProperties.getConnection().getKeepAlive()); + + if (mqttProperties.getAuth().getUsername() != null) { + options.setUserName(mqttProperties.getAuth().getUsername()); + options.setPassword(mqttProperties.getAuth().getPassword().toCharArray()); + } + + return options; + } + + private void resubscribeTopics() { + try { + log.info("重新订阅主题..."); + + if (mqttProperties.getTopics().getReceive() != null) { + mqttClient.subscribe(mqttProperties.getTopics().getReceive(), + mqttProperties.getQos()); + log.info("重新订阅: {}", mqttProperties.getTopics().getReceive()); + } + + if (mqttProperties.getTopics().getStatus() != null) { + mqttClient.subscribe(mqttProperties.getTopics().getStatus(), + mqttProperties.getQos()); + log.info("重新订阅: {}", mqttProperties.getTopics().getStatus()); + } + + if (mqttProperties.getTopics().getDevice() != null) { + mqttClient.subscribe(mqttProperties.getTopics().getDevice(), + mqttProperties.getQos()); + log.info("重新订阅: {}", mqttProperties.getTopics().getDevice()); + } + + // 发送上线消息 + if (mqttProperties.getTopics().getStatus() != null) { + mqttClient.publish( + mqttProperties.getTopics().getStatus(), + "online".getBytes(), + mqttProperties.getQos(), + true + ); + log.info("已发送上线状态"); + } + + } catch (MqttException e) { + log.error("重新订阅主题失败", e); + } + } + + private void sendReconnectAlert() { + log.error("========== MQTT 重连告警 =========="); + log.error("MQTT 服务器: {}", mqttProperties.getBroker().getUrl()); + log.error("重连失败次数: {}", MAX_RECONNECT_COUNT); + log.error("请检查 MQTT 服务器状态和网络连接"); + log.error("=================================="); + + // 这里可以添加告警通知,如发送邮件、短信等 + // emailService.sendAlert("MQTT连接失败"); + } +} \ No newline at end of file diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/service/MessageProcessor.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/service/MessageProcessor.java new file mode 100644 index 0000000..29b14a9 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/service/MessageProcessor.java @@ -0,0 +1,176 @@ +package org.jeecg.modules.mqtt.service; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jeecg.common.iot.enums.IotInerfaceTopicType; +import org.jeecg.modules.appmana.service.impl.CommonServiceImpl; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Service +@RequiredArgsConstructor +public class MessageProcessor { + + private final ObjectMapper objectMapper; + private final Map handlers = new ConcurrentHashMap<>(); + @Autowired + private CommonServiceImpl commonService ; + /** + * 同步处理消息 + */ + public void process(String topic, String payload) { + log.debug("处理消息 - 主题: {}, 负载: {}", topic, payload); + + try { + // 根据主题选择处理器 + MessageHandler handler = findHandler(topic); + if (handler != null) { + handler.handle(topic, payload); + } else { + handleDefault(topic, payload); + } + } catch (Exception e) { + log.error("处理消息失败", e); + } + } + + /** + * 异步处理消息 + */ + @Async + public void processAsync(String topic, String payload) { + process(topic, payload); + } + + /** + * 处理设备控制消息 + */ + private void handleDeviceControl(String topic, String payload) { + try { + JsonNode json = objectMapper.readTree(payload); + String deviceId = extractDeviceId(topic); + String command = json.get("command").asText(); + + log.info("设备控制 - deviceId: {}, command: {}, 参数: {}", + deviceId, command, json.get("params")); + + // 处理设备控制逻辑 + switch (command) { + case "turn_on": + log.info("开启设备: {}", deviceId); + // 开启设备业务逻辑 + break; + case "turn_off": + log.info("关闭设备: {}", deviceId); + // 关闭设备业务逻辑 + break; + case "reboot": + log.info("重启设备: {}", deviceId); + // 重启设备业务逻辑 + break; + case "update_config": + log.info("更新设备配置: {}", deviceId); + // 更新设备配置业务逻辑 + break; + default: + log.warn("未知命令: {}", command); + } + + } catch (Exception e) { + log.error("处理设备控制消息失败", e); + } + } + + /** + * 处理默认消息 + */ + private void handleDefault(String topic, String payload) { + log.info("默认处理 - 主题: {}, 消息: {}", topic, payload); + // 可以根据需要存储到数据库或转发到其他系统 + // 例如: saveToDatabase(topic, payload); + } + + /** + * 从主题中提取设备ID + * 例如: lanhai/device/DEV001/command -> DEV001 + * lanhai/device/DEV001/status -> DEV001 + */ + private String extractDeviceId(String topic) { + String[] parts = topic.split("/"); + if (parts.length >= 3 && "device".equals(parts[1])) { + return parts[2]; + } + return null; + } + + /** + * 查找处理器 + */ + private MessageHandler findHandler(String topic) { + // 设备上报回执主题 + if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) { + return this::handleDeviceRep; + } + +// // 设备状态主题 +// if (topic.contains("/device/") && topic.endsWith("/status")) { +// return this::handleDeviceStatus; +// } +// +// // 自定义处理器 +// for (Map.Entry entry : handlers.entrySet()) { +// if (topic.matches(entry.getKey())) { +// return entry.getValue(); +// } +// } + + return null; + } + + /** + * 处理设备状态上报 + */ + private void handleDeviceStatus(String topic, String payload) { + try { + JsonNode json = objectMapper.readTree(payload); + String deviceId = extractDeviceId(topic); + String status = json.get("status").asText(); + + log.info("设备状态上报 - deviceId: {}, status: {}, 数据: {}", + deviceId, status, payload); + + // 处理设备状态更新 + // 例如: updateDeviceStatus(deviceId, status, json); + + } catch (Exception e) { + log.error("处理设备状态消息失败", e); + } + } + + /** + * 注册自定义处理器 + */ + public void registerHandler(String topicPattern, MessageHandler handler) { + handlers.put(topicPattern, handler); + log.info("注册消息处理器: {}", topicPattern); + } + + @FunctionalInterface + public interface MessageHandler { + void handle(String topic, String payload); + } + + + /** + * 处理设备监测回执消息 + */ + private void handleDeviceRep(String topic, String payload) { + } +} \ No newline at end of file diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/service/MqttService.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/service/MqttService.java new file mode 100644 index 0000000..85839c6 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/mqtt/service/MqttService.java @@ -0,0 +1,176 @@ +package org.jeecg.modules.mqtt.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.jeecg.modules.mqtt.config.MqttProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Service +@RequiredArgsConstructor +public class MqttService { + + private final MqttClient mqttClient; + private final MqttProperties mqttProperties; + private final ObjectMapper objectMapper; + + /** + * 发送消息 + */ + public boolean sendMessage(String topic, String payload) { + return sendMessage(topic, payload, mqttProperties.getQos(), mqttProperties.getRetained()); + } + + /** + * 发送消息(自定义QoS) + */ + public boolean sendMessage(String topic, String payload, int qos, boolean retained) { + try { + if (!mqttClient.isConnected()) { + log.warn("MQTT 未连接,消息发送失败: {}", payload); + return false; + } + + MqttMessage message = new MqttMessage(payload.getBytes()); + message.setQos(qos); + message.setRetained(retained); + message.setId((int) (System.currentTimeMillis() % 100000)); + + mqttClient.publish(topic, message); + log.debug("消息发送成功 - 主题: {}, QoS: {}, 消息: {}", topic, qos, + payload.length() > 100 ? payload.substring(0, 100) + "..." : payload); + return true; + + } catch (MqttException e) { + log.error("消息发送失败", e); + return false; + } + } + + /** + * 发送 JSON 消息 + */ + public boolean sendJsonMessage(String topic, Object data) { + try { + String json = objectMapper.writeValueAsString(data); + return sendMessage(topic, json); + } catch (JsonProcessingException e) { + log.error("JSON 序列化失败", e); + return false; + } + } + + /** + * 异步发送消息 + */ + public CompletableFuture sendMessageAsync(String topic, String payload) { + return CompletableFuture.supplyAsync(() -> sendMessage(topic, payload)); + } + + /** + * 发送设备控制指令 + */ + public boolean sendDeviceCommand(String deviceId, String command, Object params) { + String topic = String.format("lanhai/device/%s/command", deviceId); + + try { + Map message = new HashMap<>(); + message.put("command", command); + message.put("params", params); + message.put("timestamp", System.currentTimeMillis()); + message.put("requestId", UUID.randomUUID().toString()); + + return sendJsonMessage(topic, message); + } catch (Exception e) { + log.error("发送设备指令失败", e); + return false; + } + } + + /** + * 订阅主题 + */ + public boolean subscribe(String topic, int qos) { + try { + mqttClient.subscribe(topic, qos); + log.info("订阅主题: {}", topic); + return true; + } catch (MqttException e) { + log.error("订阅失败", e); + return false; + } + } + + /** + * 取消订阅 + */ + public boolean unsubscribe(String topic) { + try { + mqttClient.unsubscribe(topic); + log.info("取消订阅: {}", topic); + return true; + } catch (MqttException e) { + log.error("取消订阅失败", e); + return false; + } + } + + /** + * 检查连接状态 + */ + public boolean isConnected() { + return mqttClient.isConnected(); + } + + /** + * 获取客户端ID + */ + public String getClientId() { + return mqttProperties.getClient().getId(); + } + + /** + * 手动重连 + */ + public boolean reconnect() { + try { + if (!mqttClient.isConnected()) { + mqttClient.reconnect(); + log.info("手动重连成功"); + return true; + } + return false; + } catch (MqttException e) { + log.error("手动重连失败", e); + return false; + } + } + + /** + * 断开连接 + */ + public void disconnect() { + try { + if (mqttClient.isConnected()) { + // 发送离线状态 + if (mqttProperties.getTopics().getStatus() != null) { + sendMessage(mqttProperties.getTopics().getStatus(), "offline", 1, true); + } + mqttClient.disconnect(); + log.info("MQTT 连接已断开"); + } + } catch (MqttException e) { + log.error("断开连接失败", e); + } + } +} \ No newline at end of file