mqtt 封装改版,解决定时器触发时调用mqtt问题

This commit is contained in:
zy 2026-03-28 16:17:43 +08:00
parent cfb359f4e3
commit 3d3e03513a
22 changed files with 1012 additions and 684 deletions

View File

@ -1,13 +0,0 @@
package com.lanhai.mqtt;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@Data
public class DTOMqtt {
@ApiModelProperty("主题")
private String topic;
@ApiModelProperty("发送的内容")
private String message;
}

View File

@ -1,34 +0,0 @@
package com.lanhai.mqtt;
import lombok.Data;
import java.io.Serializable;
@Data
public class LhMqttMsg implements Serializable {
private static final long serialVersionUID = -8303548938481407659L;
/**
* MD5值MD5_lower(content + timestamp)
*/
private String md5;
/**
* 消息内容
*/
private String content = "";
/**
* 消息内容
*/
private byte[] byteContent;
/**
* 时间戳
*/
private Long timestamp;
}

View File

@ -1,80 +0,0 @@
package com.lanhai.mqtt;
import cn.hutool.core.lang.UUID;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.Environment;
@Slf4j
@Configuration
@ConditionalOnProperty(
prefix = "iot.mq",
name = "enabled",
havingValue = "true",
matchIfMissing = true // 默认注册除非显式设置为 false
)
@DependsOn({
"dataSource", // 数据源
"sqlSessionFactory", // SQL 会话工厂
"sqlSessionTemplate", // SQL 会话模板
"transactionManager", // 事务管理器
"paginationInterceptor", // 分页拦截器
})
public class MqttConfig {
@Value("${mqtt.host}")
public String host;
@Value("${mqtt.username}")
public String username;
@Value("${mqtt.password}")
public String password;
// @Value("${mqtt.clientId}")
public String clientId = "FxGather-" + UUID.fastUUID().toString().replaceAll("-","").substring(0,5);
@Value("${mqtt.timeout}")
public int timeOut;
@Value("${mqtt.keepalive}")
public int keepAlive;
@Value("${mqtt.clearSession}")
public boolean clearSession;
@Autowired
private Environment env;
@Bean//注入Spring
public MyMqttClient myMqttClient() {
MyMqttClient myMqttClient = new MyMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession);
// String activeProfile = env.getActiveProfiles()[0];
//正式环境才连接mqtt
// if(activeProfile.contains("prod")){
log.warn("-------------正式环境,MQTT启动初始化---------");
for (int i = 0; i < 10; i++) {
try {
myMqttClient.connect();
// 这里可以订阅主题推荐放到 MqttCallbackExtended.connectComplete方法中
//myMqttClient.subscribe("ABC", 1);
return myMqttClient;
} catch (MqttException e) {
log.error("== MqttConfig ==> MQTT connect exception, connect time = {}", i);
log.error("=========mqtt连接异常"+e.getMessage());
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
// }else{
// log.warn("-------------非正式环境跳过Mqtt订阅---------");
// }
return myMqttClient;
}
}

View File

@ -1,45 +0,0 @@
package com.lanhai.mqtt;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ConditionalOnProperty(
prefix = "iot.mq",
name = "enabled",
havingValue = "true",
matchIfMissing = false // 默认注册除非显式设置为 false
)
public interface MqttService {
/**
* 添加订阅主题
*
* @param topic 主题名称
*/
void addTopic(String topic);
/**
* 取消订阅主题
*
* @param topic 主题名称
*/
void removeTopic(String topic);
/**
* 发布主题消息内容
*
* @param msgContent
* @param topic
*/
void publish(String topic,String msgContent);
/**
* 发布主题消息内容
*
* @param msgContent
* @param topic
*/
void publish(String topic,byte[] msgContent);
}

View File

@ -1,158 +0,0 @@
package com.lanhai.mqtt;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.lanhai.entity.SurvAlertRecord;
import com.lanhai.enums.IotInerfaceTopicType;
import com.lanhai.mapper.SurvAlertRecordMapper;
import com.lanhai.mapper.SurvIotVirtualDeviceMapper;
import com.lanhai.o.iot.lhiot.ResponseCmd;
import com.lanhai.service.IScEquZhibiaoService;
import com.lanhai.service.ISurvIotVirtualDeviceService;
import com.lanhai.service.Impl.CommonServiceImpl;
import com.lanhai.service.Impl.SurvIotVirtualDeviceServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.lanhai.util.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.sql.Wrapper;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {
//手动注入
// private MqttConfig mqttConfig = SpringContextUtil.getBean(MqttConfig.class);
private SurvIotVirtualDeviceServiceImpl deviceService = SpringContextUtil.getBean("survIotVirtualDeviceServiceImpl", SurvIotVirtualDeviceServiceImpl.class);
private CommonServiceImpl commonService = SpringContextUtil.getBean(CommonServiceImpl.class);
// private static RedisTemplate redisTemplate = SpringContextUtil.getBean("redisTemplate", RedisTemplate.class);
private MyMqttClient myMqttClient;
public MyMqttCallback(MyMqttClient myMqttClient) {
this.myMqttClient = myMqttClient;
}
/**
* MQTT Broker连接成功时被调用的方法在该方法中可以执行 订阅系统约定的主题推荐使用
* 如果 MQTT Broker断开连接之后又重新连接成功时主题也需要再次订阅将重新订阅主题放在连接成功后的回调方法中比较合理
*
* @param reconnect
* @param serverURI MQTT Broker的url
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
try {
String connectMode = reconnect ? "重连" : "直连";
log.info("== MyMqttCallback ==> MQTT 连接成功,连接方式:{}serverURI{}", connectMode, serverURI);
//订阅主题
//查询所有需要订阅的mqtt主题
List<String> urlList = deviceService.getAllMqttTopic();
//去重
urlList = urlList.stream()
.distinct()
.collect(Collectors.toList());
for (String url : urlList) {//共享主题语法
myMqttClient.subscribe(url, 1);
}
log.info("== MyMqttCallback ==> 连接方式:{}订阅主题成功topic{}", connectMode, String.join(",", urlList));
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 丢失连接可在这里做重连
* 只会调用一次
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("== MyMqttCallback ==> connectionLost 连接断开5S之后尝试重连: {}", throwable.getMessage());
long reconnectTimes = 1;
while (true) {
try {
if (MyMqttClient.getClient().isConnected()) {
//判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete方法里面 看你们自己选择
log.warn("== MyMqttCallback ==> mqtt reconnect success end 重新连接 重新订阅成功");
return;
}
reconnectTimes += 1;
log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
MyMqttClient.getClient().reconnect();
} catch (MqttException e) {
log.error("== MyMqttCallback ==> mqtt断连异常", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
}
}
}
/**
* 接收到消息subscribe订阅的主题消息时被调用的方法
*
* @param topic
* @param mqttMessage
* @throws Exception 后得到的消息会执行到这里面
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
String o1 = new String(mqttMessage.getPayload());
log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, o1);
try {
if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) {
commonService.processMqttData(topic,o1);
// deviceService.processPayLoad(topic,mqttMessage.getPayload());
} else if (topic.startsWith(IotInerfaceTopicType.LH_IOT_TOPIC_DOWN.getCode())) {//蓝海虚拟设备下行逻辑
//暂时无业务
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 消息发送publish完成时被调用的方法
*
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("== MyMqttCallback ==> deliveryComplete 消息发送完成Complete= {}", iMqttDeliveryToken.isComplete());
}
// 指定最小和最大范围
public static void randomSleep(int minMs, int maxMs) {
try {
Random random = new Random();
if (minMs >= maxMs) {
throw new IllegalArgumentException("最小值必须小于最大值");
}
int sleepTime = minMs + random.nextInt(maxMs - minMs + 1);
System.out.println("随机休眠: " + sleepTime + "ms");
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -1,209 +0,0 @@
package com.lanhai.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@Slf4j
public class MyMqttClient {
/**
* MQTT Broker 基本连接参数用户名密码为非必选参数
*/
private String host;
private String username;
private String password;
private String clientId;
private int timeout;
private int keepalive;
private boolean clearSession;
/**
* MQTT 客户端
*/
private static MqttClient client;
public static MqttClient getClient() {
return client;
}
public static void setClient(MqttClient client) {
MyMqttClient.client = client;
}
public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) {
this.host = host;
this.username = username;
this.password = password;
this.clientId = clientId;
this.timeout = timeOut;
this.keepalive = keepAlive;
this.clearSession = clearSession;
}
/**
* 设置 MQTT Broker 基本连接参数
*
* @param username
* @param password
* @param timeout
* @param keepalive
* @return
*/
public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setCleanSession(clearSession);
options.setAutomaticReconnect(true);
return options;
}
/**
* 连接 MQTT Broker得到 MqttClient连接对象
*/
public void connect() throws MqttException {
if (client == null) {
client = new MqttClient(host, clientId, new MemoryPersistence());
// 设置回调
client.setCallback(new MyMqttCallback(MyMqttClient.this));
}
// 连接参数
MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
if (!client.isConnected()) {
client.connect(mqttConnectOptions);
} else {
client.disconnect();
client.connect(mqttConnectOptions);
}
log.info("== MyMqttClient ==> MQTT connect success");//未发生异常则连接成功
}
/**
* 发布默认qos为0非持久化
*
* @param pushMessage
* @param topic
*/
public void publish(String pushMessage, String topic) {
publish(pushMessage, topic, 2, false);
}
/**
* 发布默认qos为0非持久化
*
* @param pushMessage
* @param topic
*/
public void publish(byte[] pushMessage, String topic) {
publish(pushMessage, topic, 2, false);//至少一次
}
/**
* 发布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(String pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
if (null == mqttTopic) {
log.error("== MyMqttClient ==> topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充
try {
token = mqttTopic.publish(message);//也是发送到执行队列中等待执行线程执行将消息发送到消息中间件
token.waitForCompletion(3000L);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 发布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(byte[] pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage);
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
if (null == mqttTopic) {
log.error("== byte MyMqttClient ==> topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意这里一定要同步否则在多线程publish的情况下线程会发生死锁分析见文章最后补充
try {
token = mqttTopic.publish(message);//也是发送到执行队列中等待执行线程执行将消息发送到消息中间件
token.waitForCompletion(1000L);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 订阅某个主题qos默认为0
*
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 2);
}
/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
MyMqttClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
log.info("== MyMqttClient ==> 订阅主题成功topic = {} qos = {}", topic, qos);
}
/**
* 取消订阅主题
*
* @param topic 主题名称
*/
public void cleanTopic(String topic) {
if (client != null && client.isConnected()) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
log.error("== MyMqttClient ==> 取消订阅失败!");
}
log.info("== MyMqttClient ==> 取消订阅主题成功topic = {}", topic);
}
}

View File

@ -1,37 +0,0 @@
package com.lanhai.mqtt;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/mqtt")
@Api(value = "MyMqttController", tags = {"MQTT相关操作接口"})
public class MyMqttController {
@Autowired(required = false)
private MqttService mqttService;
@GetMapping("/addTopic")
@ApiOperation(value = "添加订阅主题接口")
public void addTopic(String topic) {
mqttService.addTopic(topic);
}
@GetMapping("/removeTopic")
@ApiOperation(value = "取消订阅主题接口")
public void removeTopic(String topic) {
mqttService.removeTopic(topic);
}
@PostMapping("/removeTopic")
@ApiOperation(value = "发布主题消息内容接口")
public void removeTopic(String msgContent, String topic) {
mqttService.publish(msgContent, topic);
}
}

View File

@ -0,0 +1,147 @@
package com.lanhai.mqtt.config;
import com.lanhai.mqtt.handler.MqttCallbackHandler;
import com.lanhai.service.ISurvIotVirtualDeviceService;
import com.lanhai.service.Impl.SurvIotVirtualDeviceServiceImpl;
import com.lanhai.util.SpringContextUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true")
public class MqttConfig {
private final MqttProperties mqttProperties;
private final MqttCallbackHandler mqttCallbackHandler; // 这里注入
private MqttClient mqttClient;
private final ISurvIotVirtualDeviceService deviceService;
@Bean
public MqttClient mqttClient() {
try {
log.info("========== 初始化 MQTT 客户端 ==========");
// 1. 创建客户端
mqttClient = new MqttClient(
mqttProperties.getBroker().getUrl(),
mqttProperties.getClient().getId(),
new MemoryPersistence()
);
// 2. MqttClient 注入到 CallbackHandler
mqttCallbackHandler.setMqttClient(mqttClient);
// 3. 设置回调
mqttClient.setCallback(mqttCallbackHandler);
// 4. 配置连接选项
MqttConnectOptions options = createConnectOptions();
// 5. 连接
mqttClient.connect(options);
log.info("MQTT 连接成功!");
// 6. 订阅主题
subscribeTopics();
return mqttClient;
} catch (MqttException e) {
log.error("MQTT 连接失败", e);
throw new RuntimeException("MQTT 连接失败", e);
}
}
private void subscribeTopics() {
try {
// // 订阅接收主题
// if (mqttProperties.getTopics().getReceive() != null) {
// mqttClient.subscribe(
// mqttProperties.getTopics().getReceive(),
// mqttProperties.getQos()
// );
// log.info("订阅主题: {}", mqttProperties.getTopics().getReceive());
// }
//
// // 订阅状态主题
// if (mqttProperties.getTopics().getStatus() != null) {
// mqttClient.subscribe(
// mqttProperties.getTopics().getStatus(),
// mqttProperties.getQos()
// );
// log.info("订阅主题: {}", mqttProperties.getTopics().getStatus());
// }
//
// // 订阅设备通配符主题
// if (mqttProperties.getTopics().getDevice() != null) {
// mqttClient.subscribe(
// mqttProperties.getTopics().getDevice(),
// mqttProperties.getQos()
// );
// log.info("订阅通配符主题: {}", mqttProperties.getTopics().getDevice());
// }
//查询所有需要订阅的mqtt主题
List<String> urlList = deviceService.getAllMqttTopic();
//去重
urlList = urlList.stream()
.distinct()
.collect(Collectors.toList());
for (String url : urlList) {//共享主题语法
mqttClient.subscribe(
url,
mqttProperties.getQos()
);
}
log.error("== MyMqttCallback ==> 订阅主题成功topic{}", String.join(",", urlList));
} catch (MqttException e) {
log.error("订阅主题失败", e);
}
}
private MqttConnectOptions createConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(mqttProperties.getClient().getCleanSession());
options.setConnectionTimeout(mqttProperties.getConnection().getTimeout());
options.setKeepAliveInterval(mqttProperties.getConnection().getKeepAlive());
if (mqttProperties.getConnection().getAutomaticReconnect() != null) {
options.setAutomaticReconnect(mqttProperties.getConnection().getAutomaticReconnect());
}
if (mqttProperties.getAuth().getUsername() != null) {
options.setUserName(mqttProperties.getAuth().getUsername());
options.setPassword(mqttProperties.getAuth().getPassword().toCharArray());
}
return options;
}
@PreDestroy
public void destroy() {
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.disconnect();
log.info("MQTT 客户端已断开连接");
} catch (MqttException e) {
log.error("断开 MQTT 连接失败", e);
}
}
}
}

View File

@ -0,0 +1,52 @@
package com.lanhai.mqtt.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
private Boolean enabled;
private Broker broker = new Broker();
private Client client = new Client();
private Connection connection = new Connection();
private Auth auth = new Auth();
private Topics topics = new Topics();
private Integer qos;
private Boolean retained;
@Data
public static class Broker {
private String url;
}
@Data
public static class Client {
private String id;
private Boolean cleanSession;
}
@Data
public static class Connection {
private Integer timeout;
private Integer keepAlive;
private Integer maxReconnectDelay;
private Boolean automaticReconnect;
}
@Data
public static class Auth {
private String username;
private String password;
}
@Data
public static class Topics {
private String send;
private String receive;
private String status;
private String device;
}
}

View File

@ -0,0 +1,111 @@
package com.lanhai.mqtt.controller;
import com.lanhai.mqtt.service.MqttService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RestController
@RequestMapping("/api/mqtt")
@RequiredArgsConstructor
public class MqttController {
private final MqttService mqttService;
@PostMapping("/publish")
public ResponseEntity<Map<String, Object>> publish(@RequestBody Map<String, Object> request) {
String topic = (String) request.get("topic");
String message = (String) request.get("message");
boolean success = mqttService.sendMessage(topic, message);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("topic", topic);
response.put("message", message);
return ResponseEntity.ok(response);
}
@PostMapping("/publish/json")
public ResponseEntity<Map<String, Object>> publishJson(@RequestBody Map<String, Object> request) {
String topic = (String) request.get("topic");
Object data = request.get("data");
boolean success = mqttService.sendJsonMessage(topic, data);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("topic", topic);
return ResponseEntity.ok(response);
}
@PostMapping("/device/command")
public ResponseEntity<Map<String, Object>> sendDeviceCommand(@RequestBody Map<String, Object> request) {
String deviceId = (String) request.get("deviceId");
String command = (String) request.get("command");
Object params = request.get("params");
boolean success = mqttService.sendDeviceCommand(deviceId, command, params);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("deviceId", deviceId);
response.put("command", command);
return ResponseEntity.ok(response);
}
@PostMapping("/subscribe")
public ResponseEntity<Map<String, Object>> subscribe(@RequestBody Map<String, Object> request) {
String topic = (String) request.get("topic");
Integer qos = (Integer) request.getOrDefault("qos", 1);
boolean success = mqttService.subscribe(topic, qos);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("topic", topic);
return ResponseEntity.ok(response);
}
@PostMapping("/unsubscribe")
public ResponseEntity<Map<String, Object>> unsubscribe(@RequestBody Map<String, Object> request) {
String topic = (String) request.get("topic");
boolean success = mqttService.unsubscribe(topic);
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("topic", topic);
return ResponseEntity.ok(response);
}
@GetMapping("/status")
public ResponseEntity<Map<String, Object>> status() {
Map<String, Object> status = new HashMap<>();
status.put("connected", mqttService.isConnected());
status.put("clientId", mqttService.getClientId());
return ResponseEntity.ok(status);
}
@PostMapping("/reconnect")
public ResponseEntity<Map<String, Object>> reconnect() {
boolean success = mqttService.reconnect();
Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("message", success ? "重连成功" : "重连失败");
return ResponseEntity.ok(response);
}
}

View File

@ -0,0 +1,63 @@
package com.lanhai.mqtt.handler;
import com.lanhai.mqtt.service.MessageProcessor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class MqttCallbackHandler implements MqttCallback {
private final MessageProcessor messageProcessor;
private MqttClient mqttClient; // 改为 Setter 注入
public MqttCallbackHandler(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor;
}
/**
* 通过 Setter 注入 MqttClient避免循环依赖
*/
public void setMqttClient(MqttClient mqttClient) {
this.mqttClient = mqttClient;
log.info("MqttClient 已注入到 CallbackHandler");
}
@PostConstruct
public void init() {
log.info("MQTT 回调处理器已初始化");
}
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT 连接丢失,原因: {}", cause.getMessage(), cause);
if (mqttClient != null && !mqttClient.isConnected()) {
log.info("等待客户端自动重连...");
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
log.info("收到消息 - 主题: {}, QoS: {}", topic, message.getQos());
// 异步处理消息
messageProcessor.processAsync(topic, payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
log.debug("消息发送完成 - Message ID: {}", token.getMessageId());
} catch (Exception e) {
log.error("处理消息发送完成回调失败", e);
}
}
}

View File

@ -1,49 +0,0 @@
package com.lanhai.mqtt.impl;
import com.lanhai.mqtt.MqttService;
import com.lanhai.mqtt.MyMqttClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@ConditionalOnProperty(
prefix = "iot.mq",
name = "enabled",
havingValue = "true",
matchIfMissing = false // 默认注册除非显式设置为 false
)
@Service
public class MqttServiceImpl implements MqttService {
@Autowired
private MyMqttClient myMqttClient;
@Override
public void addTopic(String topic) {
myMqttClient.subscribe(topic);
}
@Override
public void removeTopic(String topic) {
myMqttClient.cleanTopic(topic);
}
@Override
public void publish(String topic,String msgContent) {
//MyXxxMqttMsg 转Json
// LhMqttMsg myXxxMqttMsg = new LhMqttMsg();
// myXxxMqttMsg.setContent(msgContent);
// myXxxMqttMsg.setTimestamp(System.currentTimeMillis());
// // TODO Md5值
// myXxxMqttMsg.setMd5(UUID.randomUUID().toString());
// String msgJson = JSON.toJSONString(myXxxMqttMsg);
//发布消息
myMqttClient.publish(msgContent, topic);
}
@Override
public void publish(String topic,byte[] msgContent) {
myMqttClient.publish(msgContent, topic);
}
}

View File

@ -0,0 +1,178 @@
package com.lanhai.mqtt.listener;
import com.lanhai.mqtt.config.MqttProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
@EnableScheduling
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true")
public class MqttReconnectListener {
private final MqttClient mqttClient;
private final MqttProperties mqttProperties;
private AtomicBoolean isReconnecting = new AtomicBoolean(false);
private int reconnectCount = 0;
private static final int MAX_RECONNECT_COUNT = 5;
private static final long RECONNECT_INTERVAL = 30000; // 30秒
@PostConstruct
public void init() {
log.info("MQTT 重连监听器已启动");
}
@EventListener(ContextRefreshedEvent.class)
public void onApplicationReady() {
log.info("应用启动完成MQTT 连接状态: {}", mqttClient.isConnected());
}
/**
* 定时检查连接状态并重连
*/
@Scheduled(fixedDelay = 10000) // 每10秒检查一次
public void checkAndReconnect() {
// 如果已经连接重置重连计数
if (mqttClient.isConnected()) {
if (reconnectCount > 0) {
log.info("MQTT 连接已恢复,重置重连计数器");
reconnectCount = 0;
isReconnecting.set(false);
}
return;
}
// 如果正在重连跳过
if (isReconnecting.get()) {
return;
}
// 检查重连次数
if (reconnectCount >= MAX_RECONNECT_COUNT) {
log.error("MQTT 重连失败次数已达上限 ({}),停止重连", MAX_RECONNECT_COUNT);
return;
}
// 执行重连
attemptReconnect();
}
@Async
public void attemptReconnect() {
if (!isReconnecting.compareAndSet(false, true)) {
return;
}
try {
reconnectCount++;
log.info("尝试重连 MQTT (第 {}/{} 次)...", reconnectCount, MAX_RECONNECT_COUNT);
// 等待一段时间再重连
Thread.sleep(RECONNECT_INTERVAL);
// 重新连接
if (!mqttClient.isConnected()) {
MqttConnectOptions options = createConnectOptions();
mqttClient.connect(options);
log.info("MQTT 重连成功!");
// 重连成功后重新订阅主题
resubscribeTopics();
// 重置重连计数
reconnectCount = 0;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("重连被中断");
} catch (MqttException e) {
log.error("MQTT 重连失败", e);
// 如果重连次数达到上限发送告警
if (reconnectCount >= MAX_RECONNECT_COUNT) {
sendReconnectAlert();
}
} finally {
isReconnecting.set(false);
}
}
private MqttConnectOptions createConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(mqttProperties.getClient().getCleanSession());
options.setConnectionTimeout(mqttProperties.getConnection().getTimeout());
options.setKeepAliveInterval(mqttProperties.getConnection().getKeepAlive());
if (mqttProperties.getAuth().getUsername() != null) {
options.setUserName(mqttProperties.getAuth().getUsername());
options.setPassword(mqttProperties.getAuth().getPassword().toCharArray());
}
return options;
}
private void resubscribeTopics() {
try {
log.info("重新订阅主题...");
if (mqttProperties.getTopics().getReceive() != null) {
mqttClient.subscribe(mqttProperties.getTopics().getReceive(),
mqttProperties.getQos());
log.info("重新订阅: {}", mqttProperties.getTopics().getReceive());
}
if (mqttProperties.getTopics().getStatus() != null) {
mqttClient.subscribe(mqttProperties.getTopics().getStatus(),
mqttProperties.getQos());
log.info("重新订阅: {}", mqttProperties.getTopics().getStatus());
}
if (mqttProperties.getTopics().getDevice() != null) {
mqttClient.subscribe(mqttProperties.getTopics().getDevice(),
mqttProperties.getQos());
log.info("重新订阅: {}", mqttProperties.getTopics().getDevice());
}
// 发送上线消息
if (mqttProperties.getTopics().getStatus() != null) {
mqttClient.publish(
mqttProperties.getTopics().getStatus(),
"online".getBytes(),
mqttProperties.getQos(),
true
);
log.info("已发送上线状态");
}
} catch (MqttException e) {
log.error("重新订阅主题失败", e);
}
}
private void sendReconnectAlert() {
log.error("========== MQTT 重连告警 ==========");
log.error("MQTT 服务器: {}", mqttProperties.getBroker().getUrl());
log.error("重连失败次数: {}", MAX_RECONNECT_COUNT);
log.error("请检查 MQTT 服务器状态和网络连接");
log.error("==================================");
// 这里可以添加告警通知如发送邮件短信等
// emailService.sendAlert("MQTT连接失败");
}
}

View File

@ -0,0 +1,177 @@
package com.lanhai.mqtt.service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanhai.enums.IotInerfaceTopicType;
import com.lanhai.service.Impl.CommonServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProcessor {
private final ObjectMapper objectMapper;
private final Map<String, MessageHandler> handlers = new ConcurrentHashMap<>();
@Autowired
private CommonServiceImpl commonService ;
/**
* 同步处理消息
*/
public void process(String topic, String payload) {
log.debug("处理消息 - 主题: {}, 负载: {}", topic, payload);
try {
// 根据主题选择处理器
MessageHandler handler = findHandler(topic);
if (handler != null) {
handler.handle(topic, payload);
} else {
handleDefault(topic, payload);
}
} catch (Exception e) {
log.error("处理消息失败", e);
}
}
/**
* 异步处理消息
*/
@Async
public void processAsync(String topic, String payload) {
process(topic, payload);
}
/**
* 处理设备控制消息
*/
private void handleDeviceControl(String topic, String payload) {
try {
JsonNode json = objectMapper.readTree(payload);
String deviceId = extractDeviceId(topic);
String command = json.get("command").asText();
log.info("设备控制 - deviceId: {}, command: {}, 参数: {}",
deviceId, command, json.get("params"));
// 处理设备控制逻辑
switch (command) {
case "turn_on":
log.info("开启设备: {}", deviceId);
// 开启设备业务逻辑
break;
case "turn_off":
log.info("关闭设备: {}", deviceId);
// 关闭设备业务逻辑
break;
case "reboot":
log.info("重启设备: {}", deviceId);
// 重启设备业务逻辑
break;
case "update_config":
log.info("更新设备配置: {}", deviceId);
// 更新设备配置业务逻辑
break;
default:
log.warn("未知命令: {}", command);
}
} catch (Exception e) {
log.error("处理设备控制消息失败", e);
}
}
/**
* 处理默认消息
*/
private void handleDefault(String topic, String payload) {
log.info("默认处理 - 主题: {}, 消息: {}", topic, payload);
// 可以根据需要存储到数据库或转发到其他系统
// 例如: saveToDatabase(topic, payload);
}
/**
* 从主题中提取设备ID
* 例如: lanhai/device/DEV001/command -> DEV001
* lanhai/device/DEV001/status -> DEV001
*/
private String extractDeviceId(String topic) {
String[] parts = topic.split("/");
if (parts.length >= 3 && "device".equals(parts[1])) {
return parts[2];
}
return null;
}
/**
* 查找处理器
*/
private MessageHandler findHandler(String topic) {
// 设备上报回执主题
if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) {
return this::handleDeviceRep;
}
// // 设备状态主题
// if (topic.contains("/device/") && topic.endsWith("/status")) {
// return this::handleDeviceStatus;
// }
//
// // 自定义处理器
// for (Map.Entry<String, MessageHandler> entry : handlers.entrySet()) {
// if (topic.matches(entry.getKey())) {
// return entry.getValue();
// }
// }
return null;
}
/**
* 处理设备状态上报
*/
private void handleDeviceStatus(String topic, String payload) {
try {
JsonNode json = objectMapper.readTree(payload);
String deviceId = extractDeviceId(topic);
String status = json.get("status").asText();
log.info("设备状态上报 - deviceId: {}, status: {}, 数据: {}",
deviceId, status, payload);
// 处理设备状态更新
// 例如: updateDeviceStatus(deviceId, status, json);
} catch (Exception e) {
log.error("处理设备状态消息失败", e);
}
}
/**
* 注册自定义处理器
*/
public void registerHandler(String topicPattern, MessageHandler handler) {
handlers.put(topicPattern, handler);
log.info("注册消息处理器: {}", topicPattern);
}
@FunctionalInterface
public interface MessageHandler {
void handle(String topic, String payload);
}
/**
* 处理设备监测回执消息
*/
private void handleDeviceRep(String topic, String payload) {
commonService.processMqttData(topic,payload);
}
}

View File

@ -0,0 +1,176 @@
package com.lanhai.mqtt.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanhai.mqtt.config.MqttProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Slf4j
@Service
@RequiredArgsConstructor
public class MqttService {
private final MqttClient mqttClient;
private final MqttProperties mqttProperties;
private final ObjectMapper objectMapper;
/**
* 发送消息
*/
public boolean sendMessage(String topic, String payload) {
return sendMessage(topic, payload, mqttProperties.getQos(), mqttProperties.getRetained());
}
/**
* 发送消息自定义QoS
*/
public boolean sendMessage(String topic, String payload, int qos, boolean retained) {
try {
if (!mqttClient.isConnected()) {
log.warn("MQTT 未连接,消息发送失败: {}", payload);
return false;
}
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
message.setRetained(retained);
message.setId((int) (System.currentTimeMillis() % 100000));
mqttClient.publish(topic, message);
log.debug("消息发送成功 - 主题: {}, QoS: {}, 消息: {}", topic, qos,
payload.length() > 100 ? payload.substring(0, 100) + "..." : payload);
return true;
} catch (MqttException e) {
log.error("消息发送失败", e);
return false;
}
}
/**
* 发送 JSON 消息
*/
public boolean sendJsonMessage(String topic, Object data) {
try {
String json = objectMapper.writeValueAsString(data);
return sendMessage(topic, json);
} catch (JsonProcessingException e) {
log.error("JSON 序列化失败", e);
return false;
}
}
/**
* 异步发送消息
*/
public CompletableFuture<Boolean> sendMessageAsync(String topic, String payload) {
return CompletableFuture.supplyAsync(() -> sendMessage(topic, payload));
}
/**
* 发送设备控制指令
*/
public boolean sendDeviceCommand(String deviceId, String command, Object params) {
String topic = String.format("lanhai/device/%s/command", deviceId);
try {
Map<String, Object> message = new HashMap<>();
message.put("command", command);
message.put("params", params);
message.put("timestamp", System.currentTimeMillis());
message.put("requestId", UUID.randomUUID().toString());
return sendJsonMessage(topic, message);
} catch (Exception e) {
log.error("发送设备指令失败", e);
return false;
}
}
/**
* 订阅主题
*/
public boolean subscribe(String topic, int qos) {
try {
mqttClient.subscribe(topic, qos);
log.info("订阅主题: {}", topic);
return true;
} catch (MqttException e) {
log.error("订阅失败", e);
return false;
}
}
/**
* 取消订阅
*/
public boolean unsubscribe(String topic) {
try {
mqttClient.unsubscribe(topic);
log.info("取消订阅: {}", topic);
return true;
} catch (MqttException e) {
log.error("取消订阅失败", e);
return false;
}
}
/**
* 检查连接状态
*/
public boolean isConnected() {
return mqttClient.isConnected();
}
/**
* 获取客户端ID
*/
public String getClientId() {
return mqttProperties.getClient().getId();
}
/**
* 手动重连
*/
public boolean reconnect() {
try {
if (!mqttClient.isConnected()) {
mqttClient.reconnect();
log.info("手动重连成功");
return true;
}
return false;
} catch (MqttException e) {
log.error("手动重连失败", e);
return false;
}
}
/**
* 断开连接
*/
public void disconnect() {
try {
if (mqttClient.isConnected()) {
// 发送离线状态
if (mqttProperties.getTopics().getStatus() != null) {
sendMessage(mqttProperties.getTopics().getStatus(), "offline", 1, true);
}
mqttClient.disconnect();
log.info("MQTT 连接已断开");
}
} catch (MqttException e) {
log.error("断开连接失败", e);
}
}
}

View File

@ -2,6 +2,7 @@ package com.lanhai.o.iot.pbs;
import lombok.Data; import lombok.Data;
import java.time.LocalDateTime;
import java.util.Date; import java.util.Date;
/** /**
@ -67,7 +68,7 @@ public class WaterCommonTransVo {
/** /**
* 数据更新时间 * 数据更新时间
*/ */
private Date dataDateTime; private LocalDateTime dataDateTime;
/** /**
* 数据获取类型;realTime=实时dayTime=日数据month=月数据year=年数据 * 数据获取类型;realTime=实时dayTime=日数据month=月数据year=年数据

View File

@ -2,7 +2,8 @@ package com.lanhai.service.Impl;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSONObject; import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.lanhai.constant.IotConstants; import com.lanhai.constant.IotConstants;
import com.lanhai.constant.PollutionConstants; import com.lanhai.constant.PollutionConstants;
import com.lanhai.entity.*; import com.lanhai.entity.*;
@ -10,6 +11,7 @@ import com.lanhai.o.iot.lhiot.ResponseCmd;
import com.lanhai.o.iot.pbs.WaterCommonTransVo; import com.lanhai.o.iot.pbs.WaterCommonTransVo;
import com.lanhai.service.*; import com.lanhai.service.*;
import com.lanhai.util.LhIotUtil; import com.lanhai.util.LhIotUtil;
import com.xxl.job.core.context.XxlJobHelper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -44,6 +46,7 @@ public class CommonServiceImpl implements ICommonService {
@Autowired @Autowired
@Lazy @Lazy
private SurvStationInfoServiceImpl survStationInfoService; private SurvStationInfoServiceImpl survStationInfoService;
// //
// @Autowired // @Autowired
// private ISurvTransdataAirService transdataAirService; // private ISurvTransdataAirService transdataAirService;
@ -110,8 +113,10 @@ public class CommonServiceImpl implements ICommonService {
.eq(SurvDeviceDeploy::getDeviceReverseIotUrl,topic) .eq(SurvDeviceDeploy::getDeviceReverseIotUrl,topic)
.list(); .list();
; ;
log.error("===================主题:{},设备数量:{}==============================",topic,deployList.size());
XxlJobHelper.log("===================主题:{},设备数量:{}==============================",topic,deployList.size());
if(!deployList.isEmpty()){ if(!deployList.isEmpty()){
SurvDeviceDeploy survDeviceDeploy = deployList.get(0); SurvDeviceDeploy survDeviceDeploy = deployList.get(0);
List<ScEquZhibiao> zhibiaoList = zhibiaoService.getListByEquid(survDeviceDeploy.getId()); List<ScEquZhibiao> zhibiaoList = zhibiaoService.getListByEquid(survDeviceDeploy.getId());
if(PollutionConstants.WATER_ORIENT.equals(survDeviceDeploy.getDeployType()) || PollutionConstants.WATER_LIVE.equals(survDeviceDeploy.getDeployType())){//面源数据 if(PollutionConstants.WATER_ORIENT.equals(survDeviceDeploy.getDeployType()) || PollutionConstants.WATER_LIVE.equals(survDeviceDeploy.getDeployType())){//面源数据
@ -121,13 +126,23 @@ public class CommonServiceImpl implements ICommonService {
waterCommonTransVo = LhIotUtil.transData(zhibiaoList,mqttMessage); waterCommonTransVo = LhIotUtil.transData(zhibiaoList,mqttMessage);
break; break;
} }
log.error("----------------------查询到设备:{},回执:{}----------------------------",survDeviceDeploy.getId()+"_"+survDeviceDeploy.getDeployDes(), JSONUtil.toJsonStr(waterCommonTransVo));
XxlJobHelper.log("----------------------查询到设备:{},回执:{}----------------------------",survDeviceDeploy.getId()+"_"+survDeviceDeploy.getDeployDes(), JSONUtil.toJsonStr(waterCommonTransVo));
if(waterCommonTransVo!=null){ if(waterCommonTransVo!=null){
LocalDateTime nowTime =LocalDateTime.now(); LocalDateTime nowTime =LocalDateTime.now();
//遍历保存所有租户数据 //遍历保存所有租户数据
for (SurvDeviceDeploy deploy : deployList) { for (SurvDeviceDeploy deploy : deployList) {
log.error("###########################保存数据中-----设备:{}#####################################",deploy.getDeployCode()+"__"+deploy.getDeployDes());
XxlJobHelper.log("###########################保存数据中-----设备:{}#####################################",deploy.getDeployCode()+"__"+deploy.getDeployDes());
SurvStationInfo survStationInfo = survStationInfoService.getByCode(deploy.getStationCode()); SurvStationInfo survStationInfo = survStationInfoService.getByCode(deploy.getStationCode());
String stationName = survStationInfo!=null?survStationInfo.getStationName():"站点"; String stationName = survStationInfo!=null?survStationInfo.getStationName():"站点";
if(PollutionConstants.WATER_ORIENT.equals(deploy.getDeployType())){ if(PollutionConstants.WATER_ORIENT.equals(deploy.getDeployType())){
log.error("^^^^^^^^^^^^^^^^面源数据保存:{}^^^^^^^^^^^^^^^^^",deploy.getDeployCode()+"__"+deploy.getDeployDes());
XxlJobHelper.log("^^^^^^^^^^^^^^^^面源数据保存:{}^^^^^^^^^^^^^^^^^",deploy.getDeployCode()+"__"+deploy.getDeployDes());
SurvTransdataOrientwater oldData = transdataOrientwaterService.getOneByDeviceId(deploy.getId());
if(oldData!=null){
transdataOrientwaterService.removeById(oldData.getId());
}
SurvTransdataOrientwater orientwater = new SurvTransdataOrientwater(); SurvTransdataOrientwater orientwater = new SurvTransdataOrientwater();
BeanUtil.copyProperties(waterCommonTransVo,orientwater); BeanUtil.copyProperties(waterCommonTransVo,orientwater);
orientwater.setTenantId(deploy.getTenantId()); orientwater.setTenantId(deploy.getTenantId());
@ -138,6 +153,7 @@ public class CommonServiceImpl implements ICommonService {
orientwater.setDeviceId(deploy.getId()); orientwater.setDeviceId(deploy.getId());
orientwater.setCreatedBy("task");//创建人 orientwater.setCreatedBy("task");//创建人
orientwater.setCreateTime(nowTime);//创建时间 orientwater.setCreateTime(nowTime);//创建时间
orientwater.setStationId(survStationInfo.getId());
orientwater.setDataType(deploy.getDeploySecondaryType()); orientwater.setDataType(deploy.getDeploySecondaryType());
transdataOrientwaterService.save(orientwater); transdataOrientwaterService.save(orientwater);
@ -149,6 +165,12 @@ public class CommonServiceImpl implements ICommonService {
hisdataOrientwaterService.save(hisOrientWater); hisdataOrientwaterService.save(hisOrientWater);
} else if (PollutionConstants.WATER_LIVE.equals(deploy.getDeployType())) { } else if (PollutionConstants.WATER_LIVE.equals(deploy.getDeployType())) {
log.error("^^^^^^^^^^^^^^^^畜禽数据保存:{}^^^^^^^^^^^^^^^^^",deploy.getDeployCode()+"__"+deploy.getDeployDes());
XxlJobHelper.log("^^^^^^^^^^^^^^^^畜禽数据保存:{}^^^^^^^^^^^^^^^^^",deploy.getDeployCode()+"__"+deploy.getDeployDes());
SurvTransdataLivestockwater oldData = transdataLivestockwaterService.getOneByDeviceId(deploy.getId());
if(oldData!=null){
transdataLivestockwaterService.removeById(oldData.getId());
}
SurvTransdataLivestockwater livestockwater = new SurvTransdataLivestockwater(); SurvTransdataLivestockwater livestockwater = new SurvTransdataLivestockwater();
BeanUtil.copyProperties(waterCommonTransVo,livestockwater); BeanUtil.copyProperties(waterCommonTransVo,livestockwater);
livestockwater.setTenantId(deploy.getTenantId()); livestockwater.setTenantId(deploy.getTenantId());
@ -160,7 +182,7 @@ public class CommonServiceImpl implements ICommonService {
livestockwater.setCreatedBy("task");//创建人 livestockwater.setCreatedBy("task");//创建人
livestockwater.setCreateTime(nowTime);//创建时间 livestockwater.setCreateTime(nowTime);//创建时间
livestockwater.setDataType(deploy.getDeploySecondaryType()); livestockwater.setDataType(deploy.getDeploySecondaryType());
livestockwater.setStationId(survStationInfo.getId());
transdataLivestockwaterService.save(livestockwater); transdataLivestockwaterService.save(livestockwater);
//保存历史表 //保存历史表
@ -175,6 +197,7 @@ public class CommonServiceImpl implements ICommonService {
} }
}else{ }else{
log.error("主题:{}--消息:{},解析失败,任务中断。",topic,mqttMessage); log.error("主题:{}--消息:{},解析失败,任务中断。",topic,mqttMessage);
XxlJobHelper.log("主题:{}--消息:{},解析失败,任务中断。",topic,mqttMessage);
} }
} }

View File

@ -56,12 +56,12 @@ public class SurvIotVirtualDeviceServiceImpl extends ServiceImpl<SurvIotVirtualD
.list(); .list();
if(!deploys.isEmpty()){ if(!deploys.isEmpty()){
for (SurvDeviceDeploy deploy : deploys) { for (SurvDeviceDeploy deploy : deploys) {
if(StringUtils.isNotBlank(deploy.getDeviceUrl())){
topics.add(deploy.getDeviceUrl());
}
if(StringUtils.isNotBlank(deploy.getDeviceIotUrl())){ if(StringUtils.isNotBlank(deploy.getDeviceIotUrl())){
topics.add(deploy.getDeviceIotUrl()); topics.add(deploy.getDeviceIotUrl());
} }
if(StringUtils.isNotBlank(deploy.getDeviceReverseIotUrl())){
topics.add(deploy.getDeviceReverseIotUrl());
}
} }
} }
//虚拟设备中需要订阅的主题 //虚拟设备中需要订阅的主题

View File

@ -34,12 +34,13 @@ public class MqttDeviceTask {
@Autowired @Autowired
private ISurvDeviceDeployService deviceDeployService; private ISurvDeviceDeployService deviceDeployService;
/** /**
* mqtt设备采集任务 * mqtt设备采集任务
*/ */
@XxlJob("IotMqttDeviceGatherTask") @XxlJob("IotMqttDeviceGatherTask")
public void TaskHandler() throws Exception{ public void TaskHandler() throws Exception{
log.warn("========================mqtt设备采集任务启动========================="); log.error("========================mqtt设备采集任务启动=========================");
XxlJobHelper.log("mqtt设备采集任务启动"); XxlJobHelper.log("mqtt设备采集任务启动");
try { try {
//查询需要查询的设备 //查询需要查询的设备
@ -53,7 +54,7 @@ public class MqttDeviceTask {
IPage<SurvDeviceDeploy> devicelist = deviceDeployService.page(page,wrapper); IPage<SurvDeviceDeploy> devicelist = deviceDeployService.page(page,wrapper);
log.warn("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages()); log.error("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages());
XxlJobHelper.log("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages()); XxlJobHelper.log("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages());
@ -61,7 +62,7 @@ public class MqttDeviceTask {
List<DeviceStatusVo> dlist = new ArrayList<>(); List<DeviceStatusVo> dlist = new ArrayList<>();
List<SurvDeviceDeploy> updList = new ArrayList<>(); List<SurvDeviceDeploy> updList = new ArrayList<>();
for (int i = 1; i <= devicelist.getPages(); i++) { for (int i = 1; i <= devicelist.getPages(); i++) {
log.warn("=====================x需采集的MQTT设备数量page:{},共:{}页==========================",i,devicelist.getPages()); log.error("=====================x需采集的MQTT设备数量page:{},共:{}页==========================",i,devicelist.getPages());
XxlJobHelper.log("=====================x需采集的MQTT设备数量page:{},共:{}页==========================",i,devicelist.getPages()); XxlJobHelper.log("=====================x需采集的MQTT设备数量page:{},共:{}页==========================",i,devicelist.getPages());
if(i>1){//翻页 if(i>1){//翻页
page = new Page<SurvDeviceDeploy>(i,curPageSize); page = new Page<SurvDeviceDeploy>(i,curPageSize);
@ -72,7 +73,9 @@ public class MqttDeviceTask {
if (StringUtils.isNotBlank(deploy.getProtocolCode())) { if (StringUtils.isNotBlank(deploy.getProtocolCode())) {
try { try {
switch (deploy.getDeployType()) { switch (deploy.getDeployType()) {
case PollutionConstants.CONTROL_CAB: case PollutionConstants.WATER_ORIENT:
case PollutionConstants.WATER_LIVE:
XxlJobHelper.log("<=======面源畜禽设备========>");
if (IotConstants.lhviot_standard.equals(deploy.getProtocolCode())) {//蓝海设备协议 if (IotConstants.lhviot_standard.equals(deploy.getProtocolCode())) {//蓝海设备协议
LhIotUtil.DeviceQuery(deploy); LhIotUtil.DeviceQuery(deploy);
} }

View File

@ -280,7 +280,7 @@ public class DataUtil{
if(StringUtils.isNotBlank(str) && deploy!=null && deploy.getDeviceConfig() !=null){ if(StringUtils.isNotBlank(str) && deploy!=null && deploy.getDeviceConfig() !=null){
waterCommonTransVo.setDataDateTime(new Date()); waterCommonTransVo.setDataDateTime(LocalDateTime.now());
waterCommonTransVo.setDataGatherType("realTime"); waterCommonTransVo.setDataGatherType("realTime");
waterCommonTransVo.setDeployCode(deploy.getDeployCode()); waterCommonTransVo.setDeployCode(deploy.getDeployCode());

View File

@ -1,18 +1,21 @@
package com.lanhai.util; package com.lanhai.util;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.lanhai.constant.IotConstants;
import com.lanhai.entity.ScEquZhibiao; import com.lanhai.entity.ScEquZhibiao;
import com.lanhai.entity.SurvDeviceDeploy; import com.lanhai.entity.SurvDeviceDeploy;
import com.lanhai.entity.SurvDeviceDeployRelay; import com.lanhai.entity.SurvDeviceDeployRelay;
import com.lanhai.mqtt.MqttService; import com.lanhai.mqtt.service.MqttService;
import com.lanhai.o.iot.lhiot.*; import com.lanhai.o.iot.lhiot.*;
import com.lanhai.o.iot.pbs.WaterCommonTransVo; import com.lanhai.o.iot.pbs.WaterCommonTransVo;
import com.lanhai.service.IScEquZhibiaoService; import com.lanhai.service.IScEquZhibiaoService;
import com.lanhai.service.ISurvDeviceDeployRelayService; import com.lanhai.service.ISurvDeviceDeployRelayService;
import com.xxl.job.core.context.XxlJobHelper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -32,7 +35,9 @@ public class LhIotUtil {
QueryCmd queryCmd = ConstructCmd(survDeviceDeploy); QueryCmd queryCmd = ConstructCmd(survDeviceDeploy);
//发送指令传入下行主题 iotUrl //发送指令传入下行主题 iotUrl
String cmdStr = JSONObject.toJSONString(queryCmd); String cmdStr = JSONObject.toJSONString(queryCmd);
mqttService.publish(survDeviceDeploy.getDeviceIotUrl(),cmdStr); log.error("=================发送查询指令========{}=================",cmdStr);
XxlJobHelper.log("=================发送查询指令========{}=================",cmdStr);
mqttService.sendMessage(survDeviceDeploy.getDeviceIotUrl(),cmdStr);
} }
return true; return true;
} }
@ -44,58 +49,61 @@ public class LhIotUtil {
public static QueryCmd ConstructCmd(SurvDeviceDeploy survDeviceDeploy){ public static QueryCmd ConstructCmd(SurvDeviceDeploy survDeviceDeploy){
QueryCmd queryCmd = new QueryCmd(); QueryCmd queryCmd = new QueryCmd();
//查询设备配置 //查询设备配置
List<ScEquZhibiao> eleList = scEquZhibiaoService.lambdaQuery() List<ScEquZhibiao> eleList = scEquZhibiaoService.lambdaQuery()
.eq(ScEquZhibiao::getEquId,survDeviceDeploy.getId()) .eq(ScEquZhibiao::getEquId,survDeviceDeploy.getId())
.list(); .list();
if(eleList!=null && !eleList.isEmpty()){ if(eleList!=null && !eleList.isEmpty()){
List<QueryCmdRegister> registers = new ArrayList<>(); List<QueryCmdRegister> registers = new ArrayList<>();
for (ScEquZhibiao scEquZhibiao : eleList) { for (ScEquZhibiao scEquZhibiao : eleList) {
QueryCmdRegister queryCmdRegister = new QueryCmdRegister(); QueryCmdRegister queryCmdRegister = new QueryCmdRegister();
queryCmdRegister.setName(scEquZhibiao.getCode()); queryCmdRegister.setName(scEquZhibiao.getCode());
registers.add(queryCmdRegister); registers.add(queryCmdRegister);
} }
QueryCmdDetail queryCmdDetail = new QueryCmdDetail(); QueryCmdDetail queryCmdDetail = new QueryCmdDetail();
queryCmdDetail.setR_data(registers); queryCmdDetail.setR_data(registers);
queryCmd.setRw_prot(queryCmdDetail); queryCmd.setRw_prot(queryCmdDetail);
} }
return queryCmd; return queryCmd;
} }
public static WaterCommonTransVo transData(List<ScEquZhibiao> zhibiaoList, String mqttMessage) { public static WaterCommonTransVo transData(List<ScEquZhibiao> zhibiaoList, String mqttMessage) {
WaterCommonTransVo waterCommonTransVo = new WaterCommonTransVo(); WaterCommonTransVo waterCommonTransVo = new WaterCommonTransVo();
ResponseCmd responseCmd = JSONObject.parseObject(mqttMessage,ResponseCmd.class); ResponseCmd responseCmd = JSONObject.parseObject(mqttMessage,ResponseCmd.class);
if(responseCmd!=null){ if(responseCmd!=null){
if(responseCmd.getRw_prot()!=null){ if(responseCmd.getRw_prot()!=null){
if("rsp".equals(responseCmd.getRw_prot().getDir())) { if("rsp".equals(responseCmd.getRw_prot().getDir())) {
if (zhibiaoList != null && !zhibiaoList.isEmpty()) { if (zhibiaoList != null && !zhibiaoList.isEmpty()) {
Map<String, String> eleMap = new HashMap<>(); Map<String, String> eleMap = new HashMap<>();
for (ScEquZhibiao scEquZhibiao : zhibiaoList) { for (ScEquZhibiao scEquZhibiao : zhibiaoList) {
eleMap.put(scEquZhibiao.getCode(), scEquZhibiao.getEntityField()); eleMap.put(scEquZhibiao.getCode(), scEquZhibiao.getEntityField());
} }
if (!responseCmd.getRw_prot().getR_data().isEmpty()) { if (!responseCmd.getRw_prot().getR_data().isEmpty()) {
for (ResponseCmdData rDatum : responseCmd.getRw_prot().getR_data()) { //因为数据中不包含时间所以使用当前时间
String entityName = eleMap.get(rDatum.getName());//字段名称 waterCommonTransVo.setDataDateTime(LocalDateTime.now());
if (StringUtils.isNotBlank(entityName)) { waterCommonTransVo.setDataGatherType(IotConstants.MARK_REALDATA);
TUtil.setFieldValue(waterCommonTransVo, entityName, rDatum.getValue()); for (ResponseCmdData rDatum : responseCmd.getRw_prot().getR_data()) {
} else { String entityName = eleMap.get(rDatum.getName());//字段名称
log.error("指令:{}----未匹配到监测项目:{}", mqttMessage, rDatum.getName()); if (StringUtils.isNotBlank(entityName)) {
} TUtil.setFieldValue(waterCommonTransVo, entityName, rDatum.getValue());
} else {
log.error("指令:{}----未匹配到监测项目:{}", mqttMessage, rDatum.getName());
} }
} }
} }
log.warn("{}----指令解析结果=============={}",mqttMessage,JSONObject.toJSONString(waterCommonTransVo));
}else{
log.error("----非主动请求数据,跳过解析==============回执:{}",mqttMessage);
} }
log.warn("{}-----------指令解析结果=============={}",mqttMessage,JSONObject.toJSONString(waterCommonTransVo));
}else{ }else{
log.error("------------解析失败回执格式有误2-------------"); log.error("----非主动请求数据,跳过解析==============回执:{}",mqttMessage);
} }
}else{ }else{
log.error("xxxxxxxxxxxxx解析失败回执格式有误1xxxxxxxxxxxxx"); log.error("------------解析失败回执格式有误2-------------");
} }
}else{
log.error("xxxxxxxxxxxxx解析失败回执格式有误1xxxxxxxxxxxxx");
}
return waterCommonTransVo; return waterCommonTransVo;
} }
} }

View File

@ -77,15 +77,29 @@ xxl:
port: 10004 port: 10004
logpath: /logs/xxl-job/jobhandler logpath: /logs/xxl-job/jobhandler
logretentiondays: 30 logretentiondays: 30
# 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
mqtt: mqtt:
host: tcp://mqtt.ilhzn.cn:10007 enabled: true
userName: lhzn.mqtt broker:
passWord: lhzn.2025 url: tcp://mqtt.ilhzn.cn:10007
# 多个broker配置: tcp://broker1:1883,tcp://broker2:1883
client:
id: lanhai-mqtt-client-${random.uuid}
clean-session: false
connection:
timeout: 30
keep-alive: 60
max-reconnect-delay: 128000 # 最大重连延迟(ms)
automatic-reconnect: true
auth:
username: lhzn.mqtt
password: lhzn.2025
topics:
send: "lanhai/mqtt/send"
receive: "lanhai/mqtt/receive"
status: "lanhai/mqtt/status"
device: "lanhai/device/+/command"
qos: 1 qos: 1
clientId: fxgather-client-${random.uuid} #ClientId_local必须唯一。 retained: false
timeout: 10 # 超时时间
keepalive: 30 # 保持连接时间
clearSession: false # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)