From 3d3e03513a79ea718dc411239a2166ef179d540f Mon Sep 17 00:00:00 2001 From: zy <82248909@qq.com> Date: Sat, 28 Mar 2026 16:17:43 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=20=E5=B0=81=E8=A3=85=E6=94=B9=E7=89=88?= =?UTF-8?q?=EF=BC=8C=E8=A7=A3=E5=86=B3=E5=AE=9A=E6=97=B6=E5=99=A8=E8=A7=A6?= =?UTF-8?q?=E5=8F=91=E6=97=B6=E8=B0=83=E7=94=A8mqtt=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/lanhai/mqtt/DTOMqtt.java | 13 -- src/main/java/com/lanhai/mqtt/LhMqttMsg.java | 34 --- src/main/java/com/lanhai/mqtt/MqttConfig.java | 80 ------- .../java/com/lanhai/mqtt/MqttService.java | 45 ---- .../java/com/lanhai/mqtt/MyMqttCallback.java | 158 ------------- .../java/com/lanhai/mqtt/MyMqttClient.java | 209 ------------------ .../com/lanhai/mqtt/MyMqttController.java | 37 ---- .../com/lanhai/mqtt/config/MqttConfig.java | 147 ++++++++++++ .../lanhai/mqtt/config/MqttProperties.java | 52 +++++ .../mqtt/controller/MqttController.java | 111 ++++++++++ .../mqtt/handler/MqttCallbackHandler.java | 63 ++++++ .../com/lanhai/mqtt/impl/MqttServiceImpl.java | 49 ---- .../mqtt/listener/MqttReconnectListener.java | 178 +++++++++++++++ .../lanhai/mqtt/service/MessageProcessor.java | 177 +++++++++++++++ .../com/lanhai/mqtt/service/MqttService.java | 176 +++++++++++++++ .../lanhai/o/iot/pbs/WaterCommonTransVo.java | 3 +- .../service/Impl/CommonServiceImpl.java | 29 ++- .../Impl/SurvIotVirtualDeviceServiceImpl.java | 6 +- .../java/com/lanhai/task/MqttDeviceTask.java | 11 +- src/main/java/com/lanhai/util/DataUtil.java | 2 +- src/main/java/com/lanhai/util/LhIotUtil.java | 86 +++---- src/main/resources/application-prod.yml | 30 ++- 22 files changed, 1012 insertions(+), 684 deletions(-) delete mode 100644 src/main/java/com/lanhai/mqtt/DTOMqtt.java delete mode 100644 src/main/java/com/lanhai/mqtt/LhMqttMsg.java delete mode 100644 src/main/java/com/lanhai/mqtt/MqttConfig.java delete mode 100644 src/main/java/com/lanhai/mqtt/MqttService.java delete mode 100644 src/main/java/com/lanhai/mqtt/MyMqttCallback.java delete mode 100644 src/main/java/com/lanhai/mqtt/MyMqttClient.java delete mode 100644 src/main/java/com/lanhai/mqtt/MyMqttController.java create mode 100644 src/main/java/com/lanhai/mqtt/config/MqttConfig.java create mode 100644 src/main/java/com/lanhai/mqtt/config/MqttProperties.java create mode 100644 src/main/java/com/lanhai/mqtt/controller/MqttController.java create mode 100644 src/main/java/com/lanhai/mqtt/handler/MqttCallbackHandler.java delete mode 100644 src/main/java/com/lanhai/mqtt/impl/MqttServiceImpl.java create mode 100644 src/main/java/com/lanhai/mqtt/listener/MqttReconnectListener.java create mode 100644 src/main/java/com/lanhai/mqtt/service/MessageProcessor.java create mode 100644 src/main/java/com/lanhai/mqtt/service/MqttService.java diff --git a/src/main/java/com/lanhai/mqtt/DTOMqtt.java b/src/main/java/com/lanhai/mqtt/DTOMqtt.java deleted file mode 100644 index 5e812d1..0000000 --- a/src/main/java/com/lanhai/mqtt/DTOMqtt.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.lanhai.mqtt; - -import io.swagger.annotations.ApiModelProperty; -import lombok.Data; - - -@Data -public class DTOMqtt { - @ApiModelProperty("主题") - private String topic; - @ApiModelProperty("发送的内容") - private String message; -} diff --git a/src/main/java/com/lanhai/mqtt/LhMqttMsg.java b/src/main/java/com/lanhai/mqtt/LhMqttMsg.java deleted file mode 100644 index b44da5b..0000000 --- a/src/main/java/com/lanhai/mqtt/LhMqttMsg.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.lanhai.mqtt; - -import lombok.Data; - -import java.io.Serializable; - -@Data -public class LhMqttMsg implements Serializable { - - private static final long serialVersionUID = -8303548938481407659L; - - /** - * MD5值:MD5_lower(content + timestamp) - */ - private String md5; - - /** - * 消息内容 - */ - private String content = ""; - - /** - * 消息内容 - */ - private byte[] byteContent; - - /** - * 时间戳 - */ - private Long timestamp; - - -} - diff --git a/src/main/java/com/lanhai/mqtt/MqttConfig.java b/src/main/java/com/lanhai/mqtt/MqttConfig.java deleted file mode 100644 index 619b3a6..0000000 --- a/src/main/java/com/lanhai/mqtt/MqttConfig.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.lanhai.mqtt; - -import cn.hutool.core.lang.UUID; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.AutoConfigureOrder; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; -import org.springframework.core.env.Environment; - -@Slf4j -@Configuration -@ConditionalOnProperty( - prefix = "iot.mq", - name = "enabled", - havingValue = "true", - matchIfMissing = true // 默认注册,除非显式设置为 false -) -@DependsOn({ - "dataSource", // 数据源 - "sqlSessionFactory", // SQL 会话工厂 - "sqlSessionTemplate", // SQL 会话模板 - "transactionManager", // 事务管理器 - "paginationInterceptor", // 分页拦截器 -}) -public class MqttConfig { - - @Value("${mqtt.host}") - public String host; - @Value("${mqtt.username}") - public String username; - @Value("${mqtt.password}") - public String password; -// @Value("${mqtt.clientId}") - public String clientId = "FxGather-" + UUID.fastUUID().toString().replaceAll("-","").substring(0,5); - @Value("${mqtt.timeout}") - public int timeOut; - @Value("${mqtt.keepalive}") - public int keepAlive; - - @Value("${mqtt.clearSession}") - public boolean clearSession; - - @Autowired - private Environment env; - - @Bean//注入Spring - public MyMqttClient myMqttClient() { - MyMqttClient myMqttClient = new MyMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession); -// String activeProfile = env.getActiveProfiles()[0]; - //正式环境才连接mqtt -// if(activeProfile.contains("prod")){ - log.warn("-------------正式环境,MQTT启动初始化---------"); - for (int i = 0; i < 10; i++) { - try { - myMqttClient.connect(); - // 这里可以订阅主题,推荐放到 MqttCallbackExtended.connectComplete方法中 - //myMqttClient.subscribe("ABC", 1); - return myMqttClient; - } catch (MqttException e) { - log.error("== MqttConfig ==> MQTT connect exception, connect time = {}", i); - log.error("=========mqtt连接异常:"+e.getMessage()); - try { - Thread.sleep(2000); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } - } -// }else{ -// log.warn("-------------非正式环境跳过Mqtt订阅---------"); -// } - return myMqttClient; - } - -} diff --git a/src/main/java/com/lanhai/mqtt/MqttService.java b/src/main/java/com/lanhai/mqtt/MqttService.java deleted file mode 100644 index 613087b..0000000 --- a/src/main/java/com/lanhai/mqtt/MqttService.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.lanhai.mqtt; - - -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; - -@ConditionalOnProperty( - prefix = "iot.mq", - name = "enabled", - havingValue = "true", - matchIfMissing = false // 默认注册,除非显式设置为 false -) -public interface MqttService { - - /** - * 添加订阅主题 - * - * @param topic 主题名称 - */ - void addTopic(String topic); - - /** - * 取消订阅主题 - * - * @param topic 主题名称 - */ - void removeTopic(String topic); - - /** - * 发布主题消息内容 - * - * @param msgContent - * @param topic - */ - void publish(String topic,String msgContent); - - /** - * 发布主题消息内容 - * - * @param msgContent - * @param topic - */ - void publish(String topic,byte[] msgContent); - -} - diff --git a/src/main/java/com/lanhai/mqtt/MyMqttCallback.java b/src/main/java/com/lanhai/mqtt/MyMqttCallback.java deleted file mode 100644 index 535d2c1..0000000 --- a/src/main/java/com/lanhai/mqtt/MyMqttCallback.java +++ /dev/null @@ -1,158 +0,0 @@ -package com.lanhai.mqtt; - -import com.alibaba.fastjson.JSONObject; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.lanhai.entity.SurvAlertRecord; -import com.lanhai.enums.IotInerfaceTopicType; -import com.lanhai.mapper.SurvAlertRecordMapper; -import com.lanhai.mapper.SurvIotVirtualDeviceMapper; -import com.lanhai.o.iot.lhiot.ResponseCmd; -import com.lanhai.service.IScEquZhibiaoService; -import com.lanhai.service.ISurvIotVirtualDeviceService; -import com.lanhai.service.Impl.CommonServiceImpl; -import com.lanhai.service.Impl.SurvIotVirtualDeviceServiceImpl; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import com.lanhai.util.SpringContextUtil; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Component; - -import java.sql.Wrapper; -import java.util.List; -import java.util.Random; -import java.util.stream.Collectors; - -@Slf4j -public class MyMqttCallback implements MqttCallbackExtended { - - //手动注入 -// private MqttConfig mqttConfig = SpringContextUtil.getBean(MqttConfig.class); - - private SurvIotVirtualDeviceServiceImpl deviceService = SpringContextUtil.getBean("survIotVirtualDeviceServiceImpl", SurvIotVirtualDeviceServiceImpl.class); - - private CommonServiceImpl commonService = SpringContextUtil.getBean(CommonServiceImpl.class); - -// private static RedisTemplate redisTemplate = SpringContextUtil.getBean("redisTemplate", RedisTemplate.class); - - - private MyMqttClient myMqttClient; - - public MyMqttCallback(MyMqttClient myMqttClient) { - this.myMqttClient = myMqttClient; - } - - /** - * MQTT Broker连接成功时被调用的方法。在该方法中可以执行 订阅系统约定的主题(推荐使用)。 - * 如果 MQTT Broker断开连接之后又重新连接成功时,主题也需要再次订阅,将重新订阅主题放在连接成功后的回调方法中比较合理。 - * - * @param reconnect - * @param serverURI MQTT Broker的url - */ - @Override - public void connectComplete(boolean reconnect, String serverURI) { - try { - String connectMode = reconnect ? "重连" : "直连"; - log.info("== MyMqttCallback ==> MQTT 连接成功,连接方式:{},serverURI:{}", connectMode, serverURI); - //订阅主题 - //查询所有需要订阅的mqtt主题 - List urlList = deviceService.getAllMqttTopic(); - //去重 - urlList = urlList.stream() - .distinct() - .collect(Collectors.toList()); - for (String url : urlList) {//共享主题语法 - myMqttClient.subscribe(url, 1); - } - log.info("== MyMqttCallback ==> 连接方式:{},订阅主题成功,topic:{}", connectMode, String.join(",", urlList)); - }catch (Exception e){ - e.printStackTrace(); - } - } - - - /** - * 丢失连接,可在这里做重连 - * 只会调用一次 - * - * @param throwable - */ - @Override - public void connectionLost(Throwable throwable) { - log.error("== MyMqttCallback ==> connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage()); - long reconnectTimes = 1; - while (true) { - try { - if (MyMqttClient.getClient().isConnected()) { - //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择 - log.warn("== MyMqttCallback ==> mqtt reconnect success end 重新连接 重新订阅成功"); - return; - } - reconnectTimes += 1; - log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes); - MyMqttClient.getClient().reconnect(); - } catch (MqttException e) { - log.error("== MyMqttCallback ==> mqtt断连异常", e); - } - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - } - } - - /** - * 接收到消息(subscribe订阅的主题消息)时被调用的方法 - * - * @param topic - * @param mqttMessage - * @throws Exception 后得到的消息会执行到这里面 - */ - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - String o1 = new String(mqttMessage.getPayload()); - log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, o1); - try { - if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) { - commonService.processMqttData(topic,o1); -// deviceService.processPayLoad(topic,mqttMessage.getPayload()); - } else if (topic.startsWith(IotInerfaceTopicType.LH_IOT_TOPIC_DOWN.getCode())) {//蓝海虚拟设备下行逻辑 - //暂时无业务 - } - }catch (Exception e){ - e.printStackTrace(); - } - } - - /** - * 消息发送(publish)完成时被调用的方法 - * - * @param iMqttDeliveryToken - */ - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - log.info("== MyMqttCallback ==> deliveryComplete 消息发送完成,Complete= {}", iMqttDeliveryToken.isComplete()); - } - - - // 指定最小和最大范围 - public static void randomSleep(int minMs, int maxMs) { - try { - Random random = new Random(); - if (minMs >= maxMs) { - throw new IllegalArgumentException("最小值必须小于最大值"); - } - int sleepTime = minMs + random.nextInt(maxMs - minMs + 1); - System.out.println("随机休眠: " + sleepTime + "ms"); - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - -} - diff --git a/src/main/java/com/lanhai/mqtt/MyMqttClient.java b/src/main/java/com/lanhai/mqtt/MyMqttClient.java deleted file mode 100644 index 30be926..0000000 --- a/src/main/java/com/lanhai/mqtt/MyMqttClient.java +++ /dev/null @@ -1,209 +0,0 @@ -package com.lanhai.mqtt; - -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -@Slf4j -public class MyMqttClient { - - /** - * MQTT Broker 基本连接参数,用户名、密码为非必选参数 - */ - private String host; - private String username; - private String password; - private String clientId; - private int timeout; - private int keepalive; - private boolean clearSession; - - /** - * MQTT 客户端 - */ - private static MqttClient client; - - public static MqttClient getClient() { - return client; - } - - public static void setClient(MqttClient client) { - MyMqttClient.client = client; - } - - public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) { - this.host = host; - this.username = username; - this.password = password; - this.clientId = clientId; - this.timeout = timeOut; - this.keepalive = keepAlive; - this.clearSession = clearSession; - } - - /** - * 设置 MQTT Broker 基本连接参数 - * - * @param username - * @param password - * @param timeout - * @param keepalive - * @return - */ - public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) { - MqttConnectOptions options = new MqttConnectOptions(); - options.setUserName(username); - options.setPassword(password.toCharArray()); - options.setConnectionTimeout(timeout); - options.setKeepAliveInterval(keepalive); - options.setCleanSession(clearSession); - options.setAutomaticReconnect(true); - return options; - } - - /** - * 连接 MQTT Broker,得到 MqttClient连接对象 - */ - public void connect() throws MqttException { - if (client == null) { - client = new MqttClient(host, clientId, new MemoryPersistence()); - // 设置回调 - client.setCallback(new MyMqttCallback(MyMqttClient.this)); - } - // 连接参数 - MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive); - if (!client.isConnected()) { - client.connect(mqttConnectOptions); - } else { - client.disconnect(); - client.connect(mqttConnectOptions); - } - log.info("== MyMqttClient ==> MQTT connect success");//未发生异常,则连接成功 - } - - /** - * 发布,默认qos为0,非持久化 - * - * @param pushMessage - * @param topic - */ - public void publish(String pushMessage, String topic) { - publish(pushMessage, topic, 2, false); - } - - /** - * 发布,默认qos为0,非持久化 - * - * @param pushMessage - * @param topic - */ - public void publish(byte[] pushMessage, String topic) { - publish(pushMessage, topic, 2, false);//至少一次 - } - - /** - * 发布消息 - * - * @param pushMessage - * @param topic - * @param qos - * @param retained:留存 - */ - public void publish(String pushMessage, String topic, int qos, boolean retained) { - MqttMessage message = new MqttMessage(); - message.setPayload(pushMessage.getBytes()); - message.setQos(qos); - message.setRetained(retained); - MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic); - if (null == mqttTopic) { - log.error("== MyMqttClient ==> topic is not exist"); - } - MqttDeliveryToken token;//Delivery:配送 - synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 - try { - token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 - token.waitForCompletion(3000L); - } catch (MqttPersistenceException e) { - e.printStackTrace(); - } catch (MqttException e) { - e.printStackTrace(); - } - } - } - - - /** - * 发布消息 - * - * @param pushMessage - * @param topic - * @param qos - * @param retained:留存 - */ - public void publish(byte[] pushMessage, String topic, int qos, boolean retained) { - MqttMessage message = new MqttMessage(); - message.setPayload(pushMessage); - message.setQos(qos); - message.setRetained(retained); - MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic); - if (null == mqttTopic) { - log.error("== byte MyMqttClient ==> topic is not exist"); - } - MqttDeliveryToken token;//Delivery:配送 - synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 - try { - token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 - token.waitForCompletion(1000L); - } catch (MqttPersistenceException e) { - e.printStackTrace(); - } catch (MqttException e) { - e.printStackTrace(); - } - } - } - - /** - * 订阅某个主题,qos默认为0 - * - * @param topic - */ - public void subscribe(String topic) { - subscribe(topic, 2); - } - - /** - * 订阅某个主题 - * - * @param topic - * @param qos - */ - public void subscribe(String topic, int qos) { - try { - MyMqttClient.getClient().subscribe(topic, qos); - } catch (MqttException e) { - e.printStackTrace(); - } - log.info("== MyMqttClient ==> 订阅主题成功:topic = {}, qos = {}", topic, qos); - } - - - /** - * 取消订阅主题 - * - * @param topic 主题名称 - */ - public void cleanTopic(String topic) { - if (client != null && client.isConnected()) { - try { - client.unsubscribe(topic); - } catch (MqttException e) { - e.printStackTrace(); - } - } else { - log.error("== MyMqttClient ==> 取消订阅失败!"); - } - log.info("== MyMqttClient ==> 取消订阅主题成功:topic = {}", topic); - } - -} - diff --git a/src/main/java/com/lanhai/mqtt/MyMqttController.java b/src/main/java/com/lanhai/mqtt/MyMqttController.java deleted file mode 100644 index 2d54498..0000000 --- a/src/main/java/com/lanhai/mqtt/MyMqttController.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.lanhai.mqtt; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -@RestController -@RequestMapping("/mqtt") -@Api(value = "MyMqttController", tags = {"MQTT相关操作接口"}) -public class MyMqttController { - @Autowired(required = false) - private MqttService mqttService; - - @GetMapping("/addTopic") - @ApiOperation(value = "添加订阅主题接口") - public void addTopic(String topic) { - mqttService.addTopic(topic); - } - - @GetMapping("/removeTopic") - @ApiOperation(value = "取消订阅主题接口") - public void removeTopic(String topic) { - mqttService.removeTopic(topic); - } - - @PostMapping("/removeTopic") - @ApiOperation(value = "发布主题消息内容接口") - public void removeTopic(String msgContent, String topic) { - mqttService.publish(msgContent, topic); - } - -} - diff --git a/src/main/java/com/lanhai/mqtt/config/MqttConfig.java b/src/main/java/com/lanhai/mqtt/config/MqttConfig.java new file mode 100644 index 0000000..6b515cc --- /dev/null +++ b/src/main/java/com/lanhai/mqtt/config/MqttConfig.java @@ -0,0 +1,147 @@ +package com.lanhai.mqtt.config; + +import com.lanhai.mqtt.handler.MqttCallbackHandler; +import com.lanhai.service.ISurvIotVirtualDeviceService; +import com.lanhai.service.Impl.SurvIotVirtualDeviceServiceImpl; +import com.lanhai.util.SpringContextUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PreDestroy; +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Configuration +@RequiredArgsConstructor +@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true") +public class MqttConfig { + + private final MqttProperties mqttProperties; + private final MqttCallbackHandler mqttCallbackHandler; // 这里注入 + private MqttClient mqttClient; + private final ISurvIotVirtualDeviceService deviceService; + + + @Bean + public MqttClient mqttClient() { + try { + log.info("========== 初始化 MQTT 客户端 =========="); + + // 1. 创建客户端 + mqttClient = new MqttClient( + mqttProperties.getBroker().getUrl(), + mqttProperties.getClient().getId(), + new MemoryPersistence() + ); + + // 2. ⭐ 将 MqttClient 注入到 CallbackHandler + mqttCallbackHandler.setMqttClient(mqttClient); + + // 3. 设置回调 + mqttClient.setCallback(mqttCallbackHandler); + + // 4. 配置连接选项 + MqttConnectOptions options = createConnectOptions(); + + // 5. 连接 + mqttClient.connect(options); + log.info("MQTT 连接成功!"); + + // 6. 订阅主题 + subscribeTopics(); + + return mqttClient; + + } catch (MqttException e) { + log.error("MQTT 连接失败", e); + throw new RuntimeException("MQTT 连接失败", e); + } + } + + private void subscribeTopics() { + try { +// // 订阅接收主题 +// if (mqttProperties.getTopics().getReceive() != null) { +// mqttClient.subscribe( +// mqttProperties.getTopics().getReceive(), +// mqttProperties.getQos() +// ); +// log.info("订阅主题: {}", mqttProperties.getTopics().getReceive()); +// } +// +// // 订阅状态主题 +// if (mqttProperties.getTopics().getStatus() != null) { +// mqttClient.subscribe( +// mqttProperties.getTopics().getStatus(), +// mqttProperties.getQos() +// ); +// log.info("订阅主题: {}", mqttProperties.getTopics().getStatus()); +// } +// +// // 订阅设备通配符主题 +// if (mqttProperties.getTopics().getDevice() != null) { +// mqttClient.subscribe( +// mqttProperties.getTopics().getDevice(), +// mqttProperties.getQos() +// ); +// log.info("订阅通配符主题: {}", mqttProperties.getTopics().getDevice()); +// } + + //查询所有需要订阅的mqtt主题 + List urlList = deviceService.getAllMqttTopic(); + //去重 + urlList = urlList.stream() + .distinct() + .collect(Collectors.toList()); + for (String url : urlList) {//共享主题语法 + mqttClient.subscribe( + url, + mqttProperties.getQos() + ); + } + log.error("== MyMqttCallback ==> ,订阅主题成功,topic:{}", String.join(",", urlList)); + + } catch (MqttException e) { + log.error("订阅主题失败", e); + } + } + + private MqttConnectOptions createConnectOptions() { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(mqttProperties.getClient().getCleanSession()); + options.setConnectionTimeout(mqttProperties.getConnection().getTimeout()); + options.setKeepAliveInterval(mqttProperties.getConnection().getKeepAlive()); + + if (mqttProperties.getConnection().getAutomaticReconnect() != null) { + options.setAutomaticReconnect(mqttProperties.getConnection().getAutomaticReconnect()); + } + + if (mqttProperties.getAuth().getUsername() != null) { + options.setUserName(mqttProperties.getAuth().getUsername()); + options.setPassword(mqttProperties.getAuth().getPassword().toCharArray()); + } + + return options; + } + + @PreDestroy + public void destroy() { + if (mqttClient != null && mqttClient.isConnected()) { + try { + mqttClient.disconnect(); + log.info("MQTT 客户端已断开连接"); + } catch (MqttException e) { + log.error("断开 MQTT 连接失败", e); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/lanhai/mqtt/config/MqttProperties.java b/src/main/java/com/lanhai/mqtt/config/MqttProperties.java new file mode 100644 index 0000000..3eb3c0b --- /dev/null +++ b/src/main/java/com/lanhai/mqtt/config/MqttProperties.java @@ -0,0 +1,52 @@ +package com.lanhai.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "mqtt") +public class MqttProperties { + private Boolean enabled; + private Broker broker = new Broker(); + private Client client = new Client(); + private Connection connection = new Connection(); + private Auth auth = new Auth(); + private Topics topics = new Topics(); + private Integer qos; + private Boolean retained; + + @Data + public static class Broker { + private String url; + } + + @Data + public static class Client { + private String id; + private Boolean cleanSession; + } + + @Data + public static class Connection { + private Integer timeout; + private Integer keepAlive; + private Integer maxReconnectDelay; + private Boolean automaticReconnect; + } + + @Data + public static class Auth { + private String username; + private String password; + } + + @Data + public static class Topics { + private String send; + private String receive; + private String status; + private String device; + } +} \ No newline at end of file diff --git a/src/main/java/com/lanhai/mqtt/controller/MqttController.java b/src/main/java/com/lanhai/mqtt/controller/MqttController.java new file mode 100644 index 0000000..8d21f29 --- /dev/null +++ b/src/main/java/com/lanhai/mqtt/controller/MqttController.java @@ -0,0 +1,111 @@ +package com.lanhai.mqtt.controller; + +import com.lanhai.mqtt.service.MqttService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.HashMap; +import java.util.Map; + +@Slf4j +@RestController +@RequestMapping("/api/mqtt") +@RequiredArgsConstructor +public class MqttController { + + private final MqttService mqttService; + + @PostMapping("/publish") + public ResponseEntity> publish(@RequestBody Map request) { + String topic = (String) request.get("topic"); + String message = (String) request.get("message"); + + boolean success = mqttService.sendMessage(topic, message); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("topic", topic); + response.put("message", message); + + return ResponseEntity.ok(response); + } + + @PostMapping("/publish/json") + public ResponseEntity> publishJson(@RequestBody Map request) { + String topic = (String) request.get("topic"); + Object data = request.get("data"); + + boolean success = mqttService.sendJsonMessage(topic, data); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("topic", topic); + + return ResponseEntity.ok(response); + } + + @PostMapping("/device/command") + public ResponseEntity> sendDeviceCommand(@RequestBody Map request) { + String deviceId = (String) request.get("deviceId"); + String command = (String) request.get("command"); + Object params = request.get("params"); + + boolean success = mqttService.sendDeviceCommand(deviceId, command, params); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("deviceId", deviceId); + response.put("command", command); + + return ResponseEntity.ok(response); + } + + @PostMapping("/subscribe") + public ResponseEntity> subscribe(@RequestBody Map request) { + String topic = (String) request.get("topic"); + Integer qos = (Integer) request.getOrDefault("qos", 1); + + boolean success = mqttService.subscribe(topic, qos); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("topic", topic); + + return ResponseEntity.ok(response); + } + + @PostMapping("/unsubscribe") + public ResponseEntity> unsubscribe(@RequestBody Map request) { + String topic = (String) request.get("topic"); + + boolean success = mqttService.unsubscribe(topic); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("topic", topic); + + return ResponseEntity.ok(response); + } + + @GetMapping("/status") + public ResponseEntity> status() { + Map status = new HashMap<>(); + status.put("connected", mqttService.isConnected()); + status.put("clientId", mqttService.getClientId()); + + return ResponseEntity.ok(status); + } + + @PostMapping("/reconnect") + public ResponseEntity> reconnect() { + boolean success = mqttService.reconnect(); + + Map response = new HashMap<>(); + response.put("success", success); + response.put("message", success ? "重连成功" : "重连失败"); + + return ResponseEntity.ok(response); + } +} \ No newline at end of file diff --git a/src/main/java/com/lanhai/mqtt/handler/MqttCallbackHandler.java b/src/main/java/com/lanhai/mqtt/handler/MqttCallbackHandler.java new file mode 100644 index 0000000..5725119 --- /dev/null +++ b/src/main/java/com/lanhai/mqtt/handler/MqttCallbackHandler.java @@ -0,0 +1,63 @@ +package com.lanhai.mqtt.handler; + +import com.lanhai.mqtt.service.MessageProcessor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +@Slf4j +@Component +public class MqttCallbackHandler implements MqttCallback { + + private final MessageProcessor messageProcessor; + private MqttClient mqttClient; // 改为 Setter 注入 + + public MqttCallbackHandler(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } + + /** + * 通过 Setter 注入 MqttClient,避免循环依赖 + */ + public void setMqttClient(MqttClient mqttClient) { + this.mqttClient = mqttClient; + log.info("MqttClient 已注入到 CallbackHandler"); + } + + @PostConstruct + public void init() { + log.info("MQTT 回调处理器已初始化"); + } + + @Override + public void connectionLost(Throwable cause) { + log.error("MQTT 连接丢失,原因: {}", cause.getMessage(), cause); + + if (mqttClient != null && !mqttClient.isConnected()) { + log.info("等待客户端自动重连..."); + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String payload = new String(message.getPayload()); + log.info("收到消息 - 主题: {}, QoS: {}", topic, message.getQos()); + + // 异步处理消息 + messageProcessor.processAsync(topic, payload); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + try { + log.debug("消息发送完成 - Message ID: {}", token.getMessageId()); + } catch (Exception e) { + log.error("处理消息发送完成回调失败", e); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/lanhai/mqtt/impl/MqttServiceImpl.java b/src/main/java/com/lanhai/mqtt/impl/MqttServiceImpl.java deleted file mode 100644 index 425bc67..0000000 --- a/src/main/java/com/lanhai/mqtt/impl/MqttServiceImpl.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.lanhai.mqtt.impl; - -import com.lanhai.mqtt.MqttService; -import com.lanhai.mqtt.MyMqttClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; - -@ConditionalOnProperty( - prefix = "iot.mq", - name = "enabled", - havingValue = "true", - matchIfMissing = false // 默认注册,除非显式设置为 false -) -@Service -public class MqttServiceImpl implements MqttService { - - @Autowired - private MyMqttClient myMqttClient; - - @Override - public void addTopic(String topic) { - myMqttClient.subscribe(topic); - } - - @Override - public void removeTopic(String topic) { - myMqttClient.cleanTopic(topic); - } - - @Override - public void publish(String topic,String msgContent) { - //MyXxxMqttMsg 转Json -// LhMqttMsg myXxxMqttMsg = new LhMqttMsg(); -// myXxxMqttMsg.setContent(msgContent); -// myXxxMqttMsg.setTimestamp(System.currentTimeMillis()); -// // TODO Md5值 -// myXxxMqttMsg.setMd5(UUID.randomUUID().toString()); -// String msgJson = JSON.toJSONString(myXxxMqttMsg); - - //发布消息 - myMqttClient.publish(msgContent, topic); - } - - @Override - public void publish(String topic,byte[] msgContent) { - myMqttClient.publish(msgContent, topic); - } -} diff --git a/src/main/java/com/lanhai/mqtt/listener/MqttReconnectListener.java b/src/main/java/com/lanhai/mqtt/listener/MqttReconnectListener.java new file mode 100644 index 0000000..46e0bbf --- /dev/null +++ b/src/main/java/com/lanhai/mqtt/listener/MqttReconnectListener.java @@ -0,0 +1,178 @@ +package com.lanhai.mqtt.listener; + +import com.lanhai.mqtt.config.MqttProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +@Component +@EnableScheduling +@RequiredArgsConstructor +@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true") +public class MqttReconnectListener { + + private final MqttClient mqttClient; + private final MqttProperties mqttProperties; + + private AtomicBoolean isReconnecting = new AtomicBoolean(false); + private int reconnectCount = 0; + private static final int MAX_RECONNECT_COUNT = 5; + private static final long RECONNECT_INTERVAL = 30000; // 30秒 + + @PostConstruct + public void init() { + log.info("MQTT 重连监听器已启动"); + } + + @EventListener(ContextRefreshedEvent.class) + public void onApplicationReady() { + log.info("应用启动完成,MQTT 连接状态: {}", mqttClient.isConnected()); + } + + /** + * 定时检查连接状态并重连 + */ + @Scheduled(fixedDelay = 10000) // 每10秒检查一次 + public void checkAndReconnect() { + // 如果已经连接,重置重连计数 + if (mqttClient.isConnected()) { + if (reconnectCount > 0) { + log.info("MQTT 连接已恢复,重置重连计数器"); + reconnectCount = 0; + isReconnecting.set(false); + } + return; + } + + // 如果正在重连,跳过 + if (isReconnecting.get()) { + return; + } + + // 检查重连次数 + if (reconnectCount >= MAX_RECONNECT_COUNT) { + log.error("MQTT 重连失败次数已达上限 ({}),停止重连", MAX_RECONNECT_COUNT); + return; + } + + // 执行重连 + attemptReconnect(); + } + + @Async + public void attemptReconnect() { + if (!isReconnecting.compareAndSet(false, true)) { + return; + } + + try { + reconnectCount++; + log.info("尝试重连 MQTT (第 {}/{} 次)...", reconnectCount, MAX_RECONNECT_COUNT); + + // 等待一段时间再重连 + Thread.sleep(RECONNECT_INTERVAL); + + // 重新连接 + if (!mqttClient.isConnected()) { + MqttConnectOptions options = createConnectOptions(); + mqttClient.connect(options); + log.info("MQTT 重连成功!"); + + // 重连成功后重新订阅主题 + resubscribeTopics(); + + // 重置重连计数 + reconnectCount = 0; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("重连被中断"); + } catch (MqttException e) { + log.error("MQTT 重连失败", e); + + // 如果重连次数达到上限,发送告警 + if (reconnectCount >= MAX_RECONNECT_COUNT) { + sendReconnectAlert(); + } + } finally { + isReconnecting.set(false); + } + } + + private MqttConnectOptions createConnectOptions() { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(mqttProperties.getClient().getCleanSession()); + options.setConnectionTimeout(mqttProperties.getConnection().getTimeout()); + options.setKeepAliveInterval(mqttProperties.getConnection().getKeepAlive()); + + if (mqttProperties.getAuth().getUsername() != null) { + options.setUserName(mqttProperties.getAuth().getUsername()); + options.setPassword(mqttProperties.getAuth().getPassword().toCharArray()); + } + + return options; + } + + private void resubscribeTopics() { + try { + log.info("重新订阅主题..."); + + if (mqttProperties.getTopics().getReceive() != null) { + mqttClient.subscribe(mqttProperties.getTopics().getReceive(), + mqttProperties.getQos()); + log.info("重新订阅: {}", mqttProperties.getTopics().getReceive()); + } + + if (mqttProperties.getTopics().getStatus() != null) { + mqttClient.subscribe(mqttProperties.getTopics().getStatus(), + mqttProperties.getQos()); + log.info("重新订阅: {}", mqttProperties.getTopics().getStatus()); + } + + if (mqttProperties.getTopics().getDevice() != null) { + mqttClient.subscribe(mqttProperties.getTopics().getDevice(), + mqttProperties.getQos()); + log.info("重新订阅: {}", mqttProperties.getTopics().getDevice()); + } + + // 发送上线消息 + if (mqttProperties.getTopics().getStatus() != null) { + mqttClient.publish( + mqttProperties.getTopics().getStatus(), + "online".getBytes(), + mqttProperties.getQos(), + true + ); + log.info("已发送上线状态"); + } + + } catch (MqttException e) { + log.error("重新订阅主题失败", e); + } + } + + private void sendReconnectAlert() { + log.error("========== MQTT 重连告警 =========="); + log.error("MQTT 服务器: {}", mqttProperties.getBroker().getUrl()); + log.error("重连失败次数: {}", MAX_RECONNECT_COUNT); + log.error("请检查 MQTT 服务器状态和网络连接"); + log.error("=================================="); + + // 这里可以添加告警通知,如发送邮件、短信等 + // emailService.sendAlert("MQTT连接失败"); + } +} \ No newline at end of file diff --git a/src/main/java/com/lanhai/mqtt/service/MessageProcessor.java b/src/main/java/com/lanhai/mqtt/service/MessageProcessor.java new file mode 100644 index 0000000..1d3c19d --- /dev/null +++ b/src/main/java/com/lanhai/mqtt/service/MessageProcessor.java @@ -0,0 +1,177 @@ +package com.lanhai.mqtt.service; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanhai.enums.IotInerfaceTopicType; +import com.lanhai.service.Impl.CommonServiceImpl; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Service +@RequiredArgsConstructor +public class MessageProcessor { + + private final ObjectMapper objectMapper; + private final Map handlers = new ConcurrentHashMap<>(); + @Autowired + private CommonServiceImpl commonService ; + /** + * 同步处理消息 + */ + public void process(String topic, String payload) { + log.debug("处理消息 - 主题: {}, 负载: {}", topic, payload); + + try { + // 根据主题选择处理器 + MessageHandler handler = findHandler(topic); + if (handler != null) { + handler.handle(topic, payload); + } else { + handleDefault(topic, payload); + } + } catch (Exception e) { + log.error("处理消息失败", e); + } + } + + /** + * 异步处理消息 + */ + @Async + public void processAsync(String topic, String payload) { + process(topic, payload); + } + + /** + * 处理设备控制消息 + */ + private void handleDeviceControl(String topic, String payload) { + try { + JsonNode json = objectMapper.readTree(payload); + String deviceId = extractDeviceId(topic); + String command = json.get("command").asText(); + + log.info("设备控制 - deviceId: {}, command: {}, 参数: {}", + deviceId, command, json.get("params")); + + // 处理设备控制逻辑 + switch (command) { + case "turn_on": + log.info("开启设备: {}", deviceId); + // 开启设备业务逻辑 + break; + case "turn_off": + log.info("关闭设备: {}", deviceId); + // 关闭设备业务逻辑 + break; + case "reboot": + log.info("重启设备: {}", deviceId); + // 重启设备业务逻辑 + break; + case "update_config": + log.info("更新设备配置: {}", deviceId); + // 更新设备配置业务逻辑 + break; + default: + log.warn("未知命令: {}", command); + } + + } catch (Exception e) { + log.error("处理设备控制消息失败", e); + } + } + + /** + * 处理默认消息 + */ + private void handleDefault(String topic, String payload) { + log.info("默认处理 - 主题: {}, 消息: {}", topic, payload); + // 可以根据需要存储到数据库或转发到其他系统 + // 例如: saveToDatabase(topic, payload); + } + + /** + * 从主题中提取设备ID + * 例如: lanhai/device/DEV001/command -> DEV001 + * lanhai/device/DEV001/status -> DEV001 + */ + private String extractDeviceId(String topic) { + String[] parts = topic.split("/"); + if (parts.length >= 3 && "device".equals(parts[1])) { + return parts[2]; + } + return null; + } + + /** + * 查找处理器 + */ + private MessageHandler findHandler(String topic) { + // 设备上报回执主题 + if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) { + return this::handleDeviceRep; + } + +// // 设备状态主题 +// if (topic.contains("/device/") && topic.endsWith("/status")) { +// return this::handleDeviceStatus; +// } +// +// // 自定义处理器 +// for (Map.Entry entry : handlers.entrySet()) { +// if (topic.matches(entry.getKey())) { +// return entry.getValue(); +// } +// } + + return null; + } + + /** + * 处理设备状态上报 + */ + private void handleDeviceStatus(String topic, String payload) { + try { + JsonNode json = objectMapper.readTree(payload); + String deviceId = extractDeviceId(topic); + String status = json.get("status").asText(); + + log.info("设备状态上报 - deviceId: {}, status: {}, 数据: {}", + deviceId, status, payload); + + // 处理设备状态更新 + // 例如: updateDeviceStatus(deviceId, status, json); + + } catch (Exception e) { + log.error("处理设备状态消息失败", e); + } + } + + /** + * 注册自定义处理器 + */ + public void registerHandler(String topicPattern, MessageHandler handler) { + handlers.put(topicPattern, handler); + log.info("注册消息处理器: {}", topicPattern); + } + + @FunctionalInterface + public interface MessageHandler { + void handle(String topic, String payload); + } + + + /** + * 处理设备监测回执消息 + */ + private void handleDeviceRep(String topic, String payload) { + commonService.processMqttData(topic,payload); + } +} \ No newline at end of file diff --git a/src/main/java/com/lanhai/mqtt/service/MqttService.java b/src/main/java/com/lanhai/mqtt/service/MqttService.java new file mode 100644 index 0000000..ee8ad34 --- /dev/null +++ b/src/main/java/com/lanhai/mqtt/service/MqttService.java @@ -0,0 +1,176 @@ +package com.lanhai.mqtt.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanhai.mqtt.config.MqttProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +@Slf4j +@Service +@RequiredArgsConstructor +public class MqttService { + + private final MqttClient mqttClient; + private final MqttProperties mqttProperties; + private final ObjectMapper objectMapper; + + /** + * 发送消息 + */ + public boolean sendMessage(String topic, String payload) { + return sendMessage(topic, payload, mqttProperties.getQos(), mqttProperties.getRetained()); + } + + /** + * 发送消息(自定义QoS) + */ + public boolean sendMessage(String topic, String payload, int qos, boolean retained) { + try { + if (!mqttClient.isConnected()) { + log.warn("MQTT 未连接,消息发送失败: {}", payload); + return false; + } + + MqttMessage message = new MqttMessage(payload.getBytes()); + message.setQos(qos); + message.setRetained(retained); + message.setId((int) (System.currentTimeMillis() % 100000)); + + mqttClient.publish(topic, message); + log.debug("消息发送成功 - 主题: {}, QoS: {}, 消息: {}", topic, qos, + payload.length() > 100 ? payload.substring(0, 100) + "..." : payload); + return true; + + } catch (MqttException e) { + log.error("消息发送失败", e); + return false; + } + } + + /** + * 发送 JSON 消息 + */ + public boolean sendJsonMessage(String topic, Object data) { + try { + String json = objectMapper.writeValueAsString(data); + return sendMessage(topic, json); + } catch (JsonProcessingException e) { + log.error("JSON 序列化失败", e); + return false; + } + } + + /** + * 异步发送消息 + */ + public CompletableFuture sendMessageAsync(String topic, String payload) { + return CompletableFuture.supplyAsync(() -> sendMessage(topic, payload)); + } + + /** + * 发送设备控制指令 + */ + public boolean sendDeviceCommand(String deviceId, String command, Object params) { + String topic = String.format("lanhai/device/%s/command", deviceId); + + try { + Map message = new HashMap<>(); + message.put("command", command); + message.put("params", params); + message.put("timestamp", System.currentTimeMillis()); + message.put("requestId", UUID.randomUUID().toString()); + + return sendJsonMessage(topic, message); + } catch (Exception e) { + log.error("发送设备指令失败", e); + return false; + } + } + + /** + * 订阅主题 + */ + public boolean subscribe(String topic, int qos) { + try { + mqttClient.subscribe(topic, qos); + log.info("订阅主题: {}", topic); + return true; + } catch (MqttException e) { + log.error("订阅失败", e); + return false; + } + } + + /** + * 取消订阅 + */ + public boolean unsubscribe(String topic) { + try { + mqttClient.unsubscribe(topic); + log.info("取消订阅: {}", topic); + return true; + } catch (MqttException e) { + log.error("取消订阅失败", e); + return false; + } + } + + /** + * 检查连接状态 + */ + public boolean isConnected() { + return mqttClient.isConnected(); + } + + /** + * 获取客户端ID + */ + public String getClientId() { + return mqttProperties.getClient().getId(); + } + + /** + * 手动重连 + */ + public boolean reconnect() { + try { + if (!mqttClient.isConnected()) { + mqttClient.reconnect(); + log.info("手动重连成功"); + return true; + } + return false; + } catch (MqttException e) { + log.error("手动重连失败", e); + return false; + } + } + + /** + * 断开连接 + */ + public void disconnect() { + try { + if (mqttClient.isConnected()) { + // 发送离线状态 + if (mqttProperties.getTopics().getStatus() != null) { + sendMessage(mqttProperties.getTopics().getStatus(), "offline", 1, true); + } + mqttClient.disconnect(); + log.info("MQTT 连接已断开"); + } + } catch (MqttException e) { + log.error("断开连接失败", e); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/lanhai/o/iot/pbs/WaterCommonTransVo.java b/src/main/java/com/lanhai/o/iot/pbs/WaterCommonTransVo.java index 2825c62..48bc41f 100644 --- a/src/main/java/com/lanhai/o/iot/pbs/WaterCommonTransVo.java +++ b/src/main/java/com/lanhai/o/iot/pbs/WaterCommonTransVo.java @@ -2,6 +2,7 @@ package com.lanhai.o.iot.pbs; import lombok.Data; +import java.time.LocalDateTime; import java.util.Date; /** @@ -67,7 +68,7 @@ public class WaterCommonTransVo { /** * 数据更新时间 */ - private Date dataDateTime; + private LocalDateTime dataDateTime; /** * 数据获取类型;realTime=实时,dayTime=日数据,month=月数据,year=年数据 diff --git a/src/main/java/com/lanhai/service/Impl/CommonServiceImpl.java b/src/main/java/com/lanhai/service/Impl/CommonServiceImpl.java index b1aeb99..79262d8 100644 --- a/src/main/java/com/lanhai/service/Impl/CommonServiceImpl.java +++ b/src/main/java/com/lanhai/service/Impl/CommonServiceImpl.java @@ -2,7 +2,8 @@ package com.lanhai.service.Impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.IdUtil; -import com.alibaba.fastjson.JSONObject; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; import com.lanhai.constant.IotConstants; import com.lanhai.constant.PollutionConstants; import com.lanhai.entity.*; @@ -10,6 +11,7 @@ import com.lanhai.o.iot.lhiot.ResponseCmd; import com.lanhai.o.iot.pbs.WaterCommonTransVo; import com.lanhai.service.*; import com.lanhai.util.LhIotUtil; +import com.xxl.job.core.context.XxlJobHelper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -44,6 +46,7 @@ public class CommonServiceImpl implements ICommonService { @Autowired @Lazy private SurvStationInfoServiceImpl survStationInfoService; + // // @Autowired // private ISurvTransdataAirService transdataAirService; @@ -110,8 +113,10 @@ public class CommonServiceImpl implements ICommonService { .eq(SurvDeviceDeploy::getDeviceReverseIotUrl,topic) .list(); ; - + log.error("===================主题:{},设备数量:{}==============================",topic,deployList.size()); + XxlJobHelper.log("===================主题:{},设备数量:{}==============================",topic,deployList.size()); if(!deployList.isEmpty()){ + SurvDeviceDeploy survDeviceDeploy = deployList.get(0); List zhibiaoList = zhibiaoService.getListByEquid(survDeviceDeploy.getId()); if(PollutionConstants.WATER_ORIENT.equals(survDeviceDeploy.getDeployType()) || PollutionConstants.WATER_LIVE.equals(survDeviceDeploy.getDeployType())){//面源数据 @@ -121,13 +126,23 @@ public class CommonServiceImpl implements ICommonService { waterCommonTransVo = LhIotUtil.transData(zhibiaoList,mqttMessage); break; } + log.error("----------------------查询到设备:{},回执:{}----------------------------",survDeviceDeploy.getId()+"_"+survDeviceDeploy.getDeployDes(), JSONUtil.toJsonStr(waterCommonTransVo)); + XxlJobHelper.log("----------------------查询到设备:{},回执:{}----------------------------",survDeviceDeploy.getId()+"_"+survDeviceDeploy.getDeployDes(), JSONUtil.toJsonStr(waterCommonTransVo)); if(waterCommonTransVo!=null){ LocalDateTime nowTime =LocalDateTime.now(); //遍历保存所有租户数据 for (SurvDeviceDeploy deploy : deployList) { + log.error("###########################保存数据中-----设备:{}#####################################",deploy.getDeployCode()+"__"+deploy.getDeployDes()); + XxlJobHelper.log("###########################保存数据中-----设备:{}#####################################",deploy.getDeployCode()+"__"+deploy.getDeployDes()); SurvStationInfo survStationInfo = survStationInfoService.getByCode(deploy.getStationCode()); String stationName = survStationInfo!=null?survStationInfo.getStationName():"站点"; if(PollutionConstants.WATER_ORIENT.equals(deploy.getDeployType())){ + log.error("^^^^^^^^^^^^^^^^面源数据保存:{}^^^^^^^^^^^^^^^^^",deploy.getDeployCode()+"__"+deploy.getDeployDes()); + XxlJobHelper.log("^^^^^^^^^^^^^^^^面源数据保存:{}^^^^^^^^^^^^^^^^^",deploy.getDeployCode()+"__"+deploy.getDeployDes()); + SurvTransdataOrientwater oldData = transdataOrientwaterService.getOneByDeviceId(deploy.getId()); + if(oldData!=null){ + transdataOrientwaterService.removeById(oldData.getId()); + } SurvTransdataOrientwater orientwater = new SurvTransdataOrientwater(); BeanUtil.copyProperties(waterCommonTransVo,orientwater); orientwater.setTenantId(deploy.getTenantId()); @@ -138,6 +153,7 @@ public class CommonServiceImpl implements ICommonService { orientwater.setDeviceId(deploy.getId()); orientwater.setCreatedBy("task");//创建人 orientwater.setCreateTime(nowTime);//创建时间 + orientwater.setStationId(survStationInfo.getId()); orientwater.setDataType(deploy.getDeploySecondaryType()); transdataOrientwaterService.save(orientwater); @@ -149,6 +165,12 @@ public class CommonServiceImpl implements ICommonService { hisdataOrientwaterService.save(hisOrientWater); } else if (PollutionConstants.WATER_LIVE.equals(deploy.getDeployType())) { + log.error("^^^^^^^^^^^^^^^^畜禽数据保存:{}^^^^^^^^^^^^^^^^^",deploy.getDeployCode()+"__"+deploy.getDeployDes()); + XxlJobHelper.log("^^^^^^^^^^^^^^^^畜禽数据保存:{}^^^^^^^^^^^^^^^^^",deploy.getDeployCode()+"__"+deploy.getDeployDes()); + SurvTransdataLivestockwater oldData = transdataLivestockwaterService.getOneByDeviceId(deploy.getId()); + if(oldData!=null){ + transdataLivestockwaterService.removeById(oldData.getId()); + } SurvTransdataLivestockwater livestockwater = new SurvTransdataLivestockwater(); BeanUtil.copyProperties(waterCommonTransVo,livestockwater); livestockwater.setTenantId(deploy.getTenantId()); @@ -160,7 +182,7 @@ public class CommonServiceImpl implements ICommonService { livestockwater.setCreatedBy("task");//创建人 livestockwater.setCreateTime(nowTime);//创建时间 livestockwater.setDataType(deploy.getDeploySecondaryType()); - + livestockwater.setStationId(survStationInfo.getId()); transdataLivestockwaterService.save(livestockwater); //保存历史表 @@ -175,6 +197,7 @@ public class CommonServiceImpl implements ICommonService { } }else{ log.error("主题:{}--消息:{},解析失败,任务中断。",topic,mqttMessage); + XxlJobHelper.log("主题:{}--消息:{},解析失败,任务中断。",topic,mqttMessage); } } diff --git a/src/main/java/com/lanhai/service/Impl/SurvIotVirtualDeviceServiceImpl.java b/src/main/java/com/lanhai/service/Impl/SurvIotVirtualDeviceServiceImpl.java index 2f4bae9..83fccbd 100644 --- a/src/main/java/com/lanhai/service/Impl/SurvIotVirtualDeviceServiceImpl.java +++ b/src/main/java/com/lanhai/service/Impl/SurvIotVirtualDeviceServiceImpl.java @@ -56,12 +56,12 @@ public class SurvIotVirtualDeviceServiceImpl extends ServiceImpl devicelist = deviceDeployService.page(page,wrapper); - log.warn("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages()); + log.error("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages()); XxlJobHelper.log("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages()); @@ -61,7 +62,7 @@ public class MqttDeviceTask { List dlist = new ArrayList<>(); List updList = new ArrayList<>(); for (int i = 1; i <= devicelist.getPages(); i++) { - log.warn("=====================x需采集的MQTT设备数量:page:{},共:{}页==========================",i,devicelist.getPages()); + log.error("=====================x需采集的MQTT设备数量:page:{},共:{}页==========================",i,devicelist.getPages()); XxlJobHelper.log("=====================x需采集的MQTT设备数量:page:{},共:{}页==========================",i,devicelist.getPages()); if(i>1){//翻页 page = new Page(i,curPageSize); @@ -72,7 +73,9 @@ public class MqttDeviceTask { if (StringUtils.isNotBlank(deploy.getProtocolCode())) { try { switch (deploy.getDeployType()) { - case PollutionConstants.CONTROL_CAB: + case PollutionConstants.WATER_ORIENT: + case PollutionConstants.WATER_LIVE: + XxlJobHelper.log("<=======面源畜禽设备========>"); if (IotConstants.lhviot_standard.equals(deploy.getProtocolCode())) {//蓝海设备协议 LhIotUtil.DeviceQuery(deploy); } diff --git a/src/main/java/com/lanhai/util/DataUtil.java b/src/main/java/com/lanhai/util/DataUtil.java index ef3323c..11cdb5f 100644 --- a/src/main/java/com/lanhai/util/DataUtil.java +++ b/src/main/java/com/lanhai/util/DataUtil.java @@ -280,7 +280,7 @@ public class DataUtil{ if(StringUtils.isNotBlank(str) && deploy!=null && deploy.getDeviceConfig() !=null){ - waterCommonTransVo.setDataDateTime(new Date()); + waterCommonTransVo.setDataDateTime(LocalDateTime.now()); waterCommonTransVo.setDataGatherType("realTime"); waterCommonTransVo.setDeployCode(deploy.getDeployCode()); diff --git a/src/main/java/com/lanhai/util/LhIotUtil.java b/src/main/java/com/lanhai/util/LhIotUtil.java index ae74272..7de5ea8 100644 --- a/src/main/java/com/lanhai/util/LhIotUtil.java +++ b/src/main/java/com/lanhai/util/LhIotUtil.java @@ -1,18 +1,21 @@ package com.lanhai.util; import com.alibaba.fastjson.JSONObject; +import com.lanhai.constant.IotConstants; import com.lanhai.entity.ScEquZhibiao; import com.lanhai.entity.SurvDeviceDeploy; import com.lanhai.entity.SurvDeviceDeployRelay; -import com.lanhai.mqtt.MqttService; +import com.lanhai.mqtt.service.MqttService; import com.lanhai.o.iot.lhiot.*; import com.lanhai.o.iot.pbs.WaterCommonTransVo; import com.lanhai.service.IScEquZhibiaoService; import com.lanhai.service.ISurvDeviceDeployRelayService; +import com.xxl.job.core.context.XxlJobHelper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -32,7 +35,9 @@ public class LhIotUtil { QueryCmd queryCmd = ConstructCmd(survDeviceDeploy); //发送指令,传入下行主题 iotUrl String cmdStr = JSONObject.toJSONString(queryCmd); - mqttService.publish(survDeviceDeploy.getDeviceIotUrl(),cmdStr); + log.error("=================发送查询指令========{}=================",cmdStr); + XxlJobHelper.log("=================发送查询指令========{}=================",cmdStr); + mqttService.sendMessage(survDeviceDeploy.getDeviceIotUrl(),cmdStr); } return true; } @@ -44,58 +49,61 @@ public class LhIotUtil { public static QueryCmd ConstructCmd(SurvDeviceDeploy survDeviceDeploy){ QueryCmd queryCmd = new QueryCmd(); //查询设备配置 - List eleList = scEquZhibiaoService.lambdaQuery() - .eq(ScEquZhibiao::getEquId,survDeviceDeploy.getId()) - .list(); - if(eleList!=null && !eleList.isEmpty()){ - List registers = new ArrayList<>(); - for (ScEquZhibiao scEquZhibiao : eleList) { - QueryCmdRegister queryCmdRegister = new QueryCmdRegister(); - queryCmdRegister.setName(scEquZhibiao.getCode()); - registers.add(queryCmdRegister); - } - QueryCmdDetail queryCmdDetail = new QueryCmdDetail(); - queryCmdDetail.setR_data(registers); - queryCmd.setRw_prot(queryCmdDetail); - } + List eleList = scEquZhibiaoService.lambdaQuery() + .eq(ScEquZhibiao::getEquId,survDeviceDeploy.getId()) + .list(); + if(eleList!=null && !eleList.isEmpty()){ + List registers = new ArrayList<>(); + for (ScEquZhibiao scEquZhibiao : eleList) { + QueryCmdRegister queryCmdRegister = new QueryCmdRegister(); + queryCmdRegister.setName(scEquZhibiao.getCode()); + registers.add(queryCmdRegister); + } + QueryCmdDetail queryCmdDetail = new QueryCmdDetail(); + queryCmdDetail.setR_data(registers); + queryCmd.setRw_prot(queryCmdDetail); + } return queryCmd; } public static WaterCommonTransVo transData(List zhibiaoList, String mqttMessage) { - WaterCommonTransVo waterCommonTransVo = new WaterCommonTransVo(); - ResponseCmd responseCmd = JSONObject.parseObject(mqttMessage,ResponseCmd.class); - if(responseCmd!=null){ - if(responseCmd.getRw_prot()!=null){ - if("rsp".equals(responseCmd.getRw_prot().getDir())) { - if (zhibiaoList != null && !zhibiaoList.isEmpty()) { - Map eleMap = new HashMap<>(); - for (ScEquZhibiao scEquZhibiao : zhibiaoList) { - eleMap.put(scEquZhibiao.getCode(), scEquZhibiao.getEntityField()); - } + WaterCommonTransVo waterCommonTransVo = new WaterCommonTransVo(); + ResponseCmd responseCmd = JSONObject.parseObject(mqttMessage,ResponseCmd.class); + if(responseCmd!=null){ + if(responseCmd.getRw_prot()!=null){ + if("rsp".equals(responseCmd.getRw_prot().getDir())) { + if (zhibiaoList != null && !zhibiaoList.isEmpty()) { + Map eleMap = new HashMap<>(); + for (ScEquZhibiao scEquZhibiao : zhibiaoList) { + eleMap.put(scEquZhibiao.getCode(), scEquZhibiao.getEntityField()); + } - if (!responseCmd.getRw_prot().getR_data().isEmpty()) { - for (ResponseCmdData rDatum : responseCmd.getRw_prot().getR_data()) { - String entityName = eleMap.get(rDatum.getName());//字段名称 - if (StringUtils.isNotBlank(entityName)) { - TUtil.setFieldValue(waterCommonTransVo, entityName, rDatum.getValue()); - } else { - log.error("指令:{}----未匹配到监测项目:{}", mqttMessage, rDatum.getName()); - } + if (!responseCmd.getRw_prot().getR_data().isEmpty()) { + //因为数据中不包含时间,所以使用当前时间 + waterCommonTransVo.setDataDateTime(LocalDateTime.now()); + waterCommonTransVo.setDataGatherType(IotConstants.MARK_REALDATA); + for (ResponseCmdData rDatum : responseCmd.getRw_prot().getR_data()) { + String entityName = eleMap.get(rDatum.getName());//字段名称 + if (StringUtils.isNotBlank(entityName)) { + TUtil.setFieldValue(waterCommonTransVo, entityName, rDatum.getValue()); + } else { + log.error("指令:{}----未匹配到监测项目:{}", mqttMessage, rDatum.getName()); } } } - log.warn("{}----指令解析结果=============={}",mqttMessage,JSONObject.toJSONString(waterCommonTransVo)); - }else{ - log.error("----非主动请求数据,跳过解析==============回执:{}",mqttMessage); } + log.warn("{}-----------指令解析结果=============={}",mqttMessage,JSONObject.toJSONString(waterCommonTransVo)); }else{ - log.error("------------解析失败,回执格式有误2-------------"); + log.error("----非主动请求数据,跳过解析==============回执:{}",mqttMessage); } }else{ - log.error("xxxxxxxxxxxxx解析失败,回执格式有误1xxxxxxxxxxxxx"); + log.error("------------解析失败,回执格式有误2-------------"); } + }else{ + log.error("xxxxxxxxxxxxx解析失败,回执格式有误1xxxxxxxxxxxxx"); + } return waterCommonTransVo; } } diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 4e09cc6..f4a78dc 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -77,15 +77,29 @@ xxl: port: 10004 logpath: /logs/xxl-job/jobhandler logretentiondays: 30 + # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息) mqtt: - host: tcp://mqtt.ilhzn.cn:10007 - userName: lhzn.mqtt - passWord: lhzn.2025 + enabled: true + broker: + url: tcp://mqtt.ilhzn.cn:10007 + # 多个broker配置: tcp://broker1:1883,tcp://broker2:1883 + client: + id: lanhai-mqtt-client-${random.uuid} + clean-session: false + connection: + timeout: 30 + keep-alive: 60 + max-reconnect-delay: 128000 # 最大重连延迟(ms) + automatic-reconnect: true + auth: + username: lhzn.mqtt + password: lhzn.2025 + topics: + send: "lanhai/mqtt/send" + receive: "lanhai/mqtt/receive" + status: "lanhai/mqtt/status" + device: "lanhai/device/+/command" qos: 1 - clientId: fxgather-client-${random.uuid} #ClientId_local必须唯一。 - timeout: 10 # 超时时间 - keepalive: 30 # 保持连接时间 - clearSession: false # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息) - + retained: false