From c7e837c0e7fcf150adcbfe91b6cace30e7d10338 Mon Sep 17 00:00:00 2001 From: zy <82248909@qq.com> Date: Fri, 19 Dec 2025 19:19:07 +0800 Subject: [PATCH] =?UTF-8?q?'=E6=9B=B4=E6=96=B0mqtt=E6=94=AF=E6=8C=81'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jeecg/modules/appmana/mqtt/DTOMqtt.java | 13 ++ .../jeecg/modules/appmana/mqtt/LhMqttMsg.java | 34 +++ .../modules/appmana/mqtt/MqttConfig.java | 71 ++++++ .../modules/appmana/mqtt/MqttService.java | 45 ++++ .../modules/appmana/mqtt/MyMqttCallback.java | 170 ++++++++++++++ .../modules/appmana/mqtt/MyMqttClient.java | 209 ++++++++++++++++++ .../appmana/mqtt/MyMqttController.java | 37 ++++ .../appmana/mqtt/impl/MqttServiceImpl.java | 52 +++++ 8 files changed, 631 insertions(+) create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/DTOMqtt.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/LhMqttMsg.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MqttConfig.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MqttService.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttCallback.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttClient.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttController.java create mode 100644 zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/impl/MqttServiceImpl.java diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/DTOMqtt.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/DTOMqtt.java new file mode 100644 index 0000000..674d454 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/DTOMqtt.java @@ -0,0 +1,13 @@ +package org.jeecg.modules.mqtt; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + + +@Data +public class DTOMqtt { + @ApiModelProperty("主题") + private String topic; + @ApiModelProperty("发送的内容") + private String message; +} diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/LhMqttMsg.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/LhMqttMsg.java new file mode 100644 index 0000000..4d4452f --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/LhMqttMsg.java @@ -0,0 +1,34 @@ +package org.jeecg.modules.mqtt; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class LhMqttMsg implements Serializable { + + private static final long serialVersionUID = -8303548938481407659L; + + /** + * MD5值:MD5_lower(content + timestamp) + */ + private String md5; + + /** + * 消息内容 + */ + private String content = ""; + + /** + * 消息内容 + */ + private byte[] byteContent; + + /** + * 时间戳 + */ + private Long timestamp; + + +} + diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MqttConfig.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MqttConfig.java new file mode 100644 index 0000000..88a827e --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MqttConfig.java @@ -0,0 +1,71 @@ +package org.jeecg.modules.mqtt; + +import cn.hutool.core.lang.UUID; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; + +@Slf4j +@Configuration +@ConditionalOnProperty( + prefix = "iot.mq", + name = "enabled", + havingValue = "true", + matchIfMissing = false // 默认注册,除非显式设置为 false +) +public class MqttConfig { + + @Value("${mqtt.host}") + public String host; + @Value("${mqtt.username}") + public String username; + @Value("${mqtt.password}") + public String password; +// @Value("${mqtt.clientId}") + public String clientId = "jppApp-" + UUID.fastUUID().toString().replaceAll("-","").substring(0,5); + @Value("${mqtt.timeout}") + public int timeOut; + @Value("${mqtt.keepalive}") + public int keepAlive; + + @Value("${mqtt.clearSession}") + public boolean clearSession; + + @Autowired + private Environment env; + + @Bean//注入Spring + public MyMqttClient myMqttClient() { + MyMqttClient myMqttClient = new MyMqttClient(host, username, password, clientId, timeOut, keepAlive, clearSession); + String activeProfile = env.getActiveProfiles()[0]; + //正式环境才连接mqtt + if(activeProfile.contains("prod")){ + log.warn("-------------正式环境,MQTT启动初始化---------"); + for (int i = 0; i < 10; i++) { + try { + myMqttClient.connect(); + // 这里可以订阅主题,推荐放到 MqttCallbackExtended.connectComplete方法中 + //myMqttClient.subscribe("ABC", 1); + return myMqttClient; + } catch (MqttException e) { + log.error("== MqttConfig ==> MQTT connect exception, connect time = {}", i); + log.error("=========mqtt连接异常:"+e.getMessage()); + try { + Thread.sleep(2000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + }else{ + log.warn("-------------非正式环境跳过Mqtt订阅---------"); + } + return myMqttClient; + } + +} diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MqttService.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MqttService.java new file mode 100644 index 0000000..efd0670 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MqttService.java @@ -0,0 +1,45 @@ +package org.jeecg.modules.mqtt; + + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; + +@ConditionalOnProperty( + prefix = "iot.mq", + name = "enabled", + havingValue = "true", + matchIfMissing = false // 默认注册,除非显式设置为 false +) +public interface MqttService { + + /** + * 添加订阅主题 + * + * @param topic 主题名称 + */ + void addTopic(String topic); + + /** + * 取消订阅主题 + * + * @param topic 主题名称 + */ + void removeTopic(String topic); + + /** + * 发布主题消息内容 + * + * @param msgContent + * @param topic + */ + void publish(String topic,String msgContent); + + /** + * 发布主题消息内容 + * + * @param msgContent + * @param topic + */ + void publish(String topic,byte[] msgContent); + +} + diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttCallback.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttCallback.java new file mode 100644 index 0000000..f616caf --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttCallback.java @@ -0,0 +1,170 @@ +package org.jeecg.modules.mqtt; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.jeecg.common.util.SpringContextUtils; +import org.jeecg.modules.config.RedisDistributedLock2; +import org.jeecg.modules.iot.enums.IotInerfaceTopicType; +import org.jeecg.modules.iot.o.dto.DTOMqttMessage; +import org.jeecg.modules.iot.o.mqtt.up.DeviceActionVo; +import org.jeecg.modules.iot.service.IFIotVirtualDeviceService; +import org.jeecg.modules.rocketmq.service.RocketMqService; +import org.springframework.data.redis.core.RedisTemplate; + +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +@Slf4j +public class MyMqttCallback implements MqttCallbackExtended { + + //手动注入 + private MqttConfig mqttConfig = SpringContextUtils.getBean(MqttConfig.class); + + private IFIotVirtualDeviceService deviceService = SpringContextUtils.getBean(IFIotVirtualDeviceService.class); + + private static RedisTemplate redisTemplate = SpringContextUtils.getBean("redisTemplate", RedisTemplate.class); + + private static RocketMqService rocketMqService = SpringContextUtils.getBean(RocketMqService.class); + + private MyMqttClient myMqttClient; + + public MyMqttCallback(MyMqttClient myMqttClient) { + this.myMqttClient = myMqttClient; + } + + /** + * MQTT Broker连接成功时被调用的方法。在该方法中可以执行 订阅系统约定的主题(推荐使用)。 + * 如果 MQTT Broker断开连接之后又重新连接成功时,主题也需要再次订阅,将重新订阅主题放在连接成功后的回调方法中比较合理。 + * + * @param reconnect + * @param serverURI MQTT Broker的url + */ + @Override + public void connectComplete(boolean reconnect, String serverURI) { + String connectMode = reconnect ? "重连" : "直连"; + log.info("== MyMqttCallback ==> MQTT 连接成功,连接方式:{},serverURI:{}", connectMode, serverURI); + //订阅主题 + //查询所有需要订阅的mqtt主题 + List urlList = deviceService.getAllMqttTopic(); + //去重 + urlList = urlList.stream() + .distinct() + .collect(Collectors.toList()); + for (String url : urlList) {//共享主题语法 + myMqttClient.subscribe(url,1); + } + log.info("== MyMqttCallback ==> 连接方式:{},订阅主题成功,topic:{}", connectMode, String.join(",", urlList)); + } + + + /** + * 丢失连接,可在这里做重连 + * 只会调用一次 + * + * @param throwable + */ + @Override + public void connectionLost(Throwable throwable) { + log.error("== MyMqttCallback ==> connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage()); + long reconnectTimes = 1; + while (true) { + try { + if (MyMqttClient.getClient().isConnected()) { + //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择 + log.warn("== MyMqttCallback ==> mqtt reconnect success end 重新连接 重新订阅成功"); + return; + } + reconnectTimes += 1; + log.warn("== MyMqttCallback ==> mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes); + MyMqttClient.getClient().reconnect(); + } catch (MqttException e) { + log.error("== MyMqttCallback ==> mqtt断连异常", e); + } + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + } + } + } + + /** + * 接收到消息(subscribe订阅的主题消息)时被调用的方法 + * + * @param topic + * @param mqttMessage + * @throws Exception 后得到的消息会执行到这里面 + */ + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + String o1 = new String(mqttMessage.getPayload()); + log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, o1); + try { + if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) { + log.warn("fe-check=======================" + mqttMessage.getPayload()); + //识别主题,确认设备 + deviceService.processInterface(topic, mqttMessage.getPayload()); + } else if (topic.startsWith(IotInerfaceTopicType.LH_IOT_TOPIC_UP.getCode())) {//蓝海虚拟设备上行逻辑 + //暂时无业务 + } else if (topic.contains(IotInerfaceTopicType.LH_IOT_TOPIC_DOWN.getCode())) {//蓝海虚拟设备下行逻辑,处理控制 + // 尝试获取锁,等待3秒,重试间隔100ms + try { +// Boolean lock = RedisDistributedLock2.getLock("mqtt_receive_logic:__"+topic+"_"+o1.hashCode(),2); +// if(lock){ +// System.out.println("锁测试" + o1 + " on topic: " + topic + " - " + lock); +// } else { +// System.out.println("锁测试" + o1 + " on topic: " + topic+ " - " + lock); +// return; +// } + log.warn("蓝海-check=======================" + mqttMessage.getPayload()); + //解析数据获取请求id + String message = o1; + DeviceActionVo deviceQueryActionVo = JSONObject.toJavaObject(JSONObject.parseObject(message), DeviceActionVo.class); + //设备响应处理,查询、控制 + deviceService.processIotAction(deviceQueryActionVo); + //MQ模式 +// DTOMqttMessage dtoMqttMessage = new DTOMqttMessage(); +// dtoMqttMessage.setTopic(topic); +// dtoMqttMessage.setMessage(message); +// rocketMqService.sendMqttMessage(dtoMqttMessage); + }catch (Exception e){ + log.error("======处理蓝海主题设备报错================"+e.getMessage()); + } + } + }catch (Exception e){ + e.printStackTrace(); + } + } + + /** + * 消息发送(publish)完成时被调用的方法 + * + * @param iMqttDeliveryToken + */ + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.info("== MyMqttCallback ==> deliveryComplete 消息发送完成,Complete= {}", iMqttDeliveryToken.isComplete()); + } + + + // 指定最小和最大范围 + public static void randomSleep(int minMs, int maxMs) { + try { + Random random = new Random(); + if (minMs >= maxMs) { + throw new IllegalArgumentException("最小值必须小于最大值"); + } + int sleepTime = minMs + random.nextInt(maxMs - minMs + 1); + System.out.println("随机休眠: " + sleepTime + "ms"); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + +} + diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttClient.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttClient.java new file mode 100644 index 0000000..f85bf23 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttClient.java @@ -0,0 +1,209 @@ +package org.jeecg.modules.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +@Slf4j +public class MyMqttClient { + + /** + * MQTT Broker 基本连接参数,用户名、密码为非必选参数 + */ + private String host; + private String username; + private String password; + private String clientId; + private int timeout; + private int keepalive; + private boolean clearSession; + + /** + * MQTT 客户端 + */ + private static MqttClient client; + + public static MqttClient getClient() { + return client; + } + + public static void setClient(MqttClient client) { + MyMqttClient.client = client; + } + + public MyMqttClient(String host, String username, String password, String clientId, int timeOut, int keepAlive, boolean clearSession) { + this.host = host; + this.username = username; + this.password = password; + this.clientId = clientId; + this.timeout = timeOut; + this.keepalive = keepAlive; + this.clearSession = clearSession; + } + + /** + * 设置 MQTT Broker 基本连接参数 + * + * @param username + * @param password + * @param timeout + * @param keepalive + * @return + */ + public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) { + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(timeout); + options.setKeepAliveInterval(keepalive); + options.setCleanSession(clearSession); + options.setAutomaticReconnect(true); + return options; + } + + /** + * 连接 MQTT Broker,得到 MqttClient连接对象 + */ + public void connect() throws MqttException { + if (client == null) { + client = new MqttClient(host, clientId, new MemoryPersistence()); + // 设置回调 + client.setCallback(new MyMqttCallback(MyMqttClient.this)); + } + // 连接参数 + MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive); + if (!client.isConnected()) { + client.connect(mqttConnectOptions); + } else { + client.disconnect(); + client.connect(mqttConnectOptions); + } + log.info("== MyMqttClient ==> MQTT connect success");//未发生异常,则连接成功 + } + + /** + * 发布,默认qos为0,非持久化 + * + * @param pushMessage + * @param topic + */ + public void publish(String pushMessage, String topic) { + publish(pushMessage, topic, 2, false); + } + + /** + * 发布,默认qos为0,非持久化 + * + * @param pushMessage + * @param topic + */ + public void publish(byte[] pushMessage, String topic) { + publish(pushMessage, topic, 2, false);//至少一次 + } + + /** + * 发布消息 + * + * @param pushMessage + * @param topic + * @param qos + * @param retained:留存 + */ + public void publish(String pushMessage, String topic, int qos, boolean retained) { + MqttMessage message = new MqttMessage(); + message.setPayload(pushMessage.getBytes()); + message.setQos(qos); + message.setRetained(retained); + MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic); + if (null == mqttTopic) { + log.error("== MyMqttClient ==> topic is not exist"); + } + MqttDeliveryToken token;//Delivery:配送 + synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 + try { + token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 + token.waitForCompletion(3000L); + } catch (MqttPersistenceException e) { + e.printStackTrace(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + } + + + /** + * 发布消息 + * + * @param pushMessage + * @param topic + * @param qos + * @param retained:留存 + */ + public void publish(byte[] pushMessage, String topic, int qos, boolean retained) { + MqttMessage message = new MqttMessage(); + message.setPayload(pushMessage); + message.setQos(qos); + message.setRetained(retained); + MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic); + if (null == mqttTopic) { + log.error("== byte MyMqttClient ==> topic is not exist"); + } + MqttDeliveryToken token;//Delivery:配送 + synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 + try { + token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件 + token.waitForCompletion(1000L); + } catch (MqttPersistenceException e) { + e.printStackTrace(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + } + + /** + * 订阅某个主题,qos默认为0 + * + * @param topic + */ + public void subscribe(String topic) { + subscribe(topic, 2); + } + + /** + * 订阅某个主题 + * + * @param topic + * @param qos + */ + public void subscribe(String topic, int qos) { + try { + MyMqttClient.getClient().subscribe(topic, qos); + } catch (MqttException e) { + e.printStackTrace(); + } + log.info("== MyMqttClient ==> 订阅主题成功:topic = {}, qos = {}", topic, qos); + } + + + /** + * 取消订阅主题 + * + * @param topic 主题名称 + */ + public void cleanTopic(String topic) { + if (client != null && client.isConnected()) { + try { + client.unsubscribe(topic); + } catch (MqttException e) { + e.printStackTrace(); + } + } else { + log.error("== MyMqttClient ==> 取消订阅失败!"); + } + log.info("== MyMqttClient ==> 取消订阅主题成功:topic = {}", topic); + } + +} + diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttController.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttController.java new file mode 100644 index 0000000..258e6d3 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/MyMqttController.java @@ -0,0 +1,37 @@ +package org.jeecg.modules.mqtt; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/mqtt") +@Api(value = "MyMqttController", tags = {"MQTT相关操作接口"}) +public class MyMqttController { + @Autowired(required = false) + private MqttService mqttService; + + @GetMapping("/addTopic") + @ApiOperation(value = "添加订阅主题接口") + public void addTopic(String topic) { + mqttService.addTopic(topic); + } + + @GetMapping("/removeTopic") + @ApiOperation(value = "取消订阅主题接口") + public void removeTopic(String topic) { + mqttService.removeTopic(topic); + } + + @PostMapping("/removeTopic") + @ApiOperation(value = "发布主题消息内容接口") + public void removeTopic(String msgContent, String topic) { + mqttService.publish(msgContent, topic); + } + +} + diff --git a/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/impl/MqttServiceImpl.java b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/impl/MqttServiceImpl.java new file mode 100644 index 0000000..cd42003 --- /dev/null +++ b/zh-module-applet/zh-applet-admin/src/main/java/org/jeecg/modules/appmana/mqtt/impl/MqttServiceImpl.java @@ -0,0 +1,52 @@ +package org.jeecg.modules.mqtt.impl; + +import cn.hutool.core.lang.UUID; +import com.alibaba.fastjson.JSON; +import org.jeecg.modules.mqtt.MqttService; +import org.jeecg.modules.mqtt.MyMqttClient; +import org.jeecg.modules.mqtt.LhMqttMsg; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +@ConditionalOnProperty( + prefix = "iot.mq", + name = "enabled", + havingValue = "true", + matchIfMissing = false // 默认注册,除非显式设置为 false +) +@Service +public class MqttServiceImpl implements MqttService { + + @Autowired + private MyMqttClient myMqttClient; + + @Override + public void addTopic(String topic) { + myMqttClient.subscribe(topic); + } + + @Override + public void removeTopic(String topic) { + myMqttClient.cleanTopic(topic); + } + + @Override + public void publish(String topic,String msgContent) { + //MyXxxMqttMsg 转Json +// LhMqttMsg myXxxMqttMsg = new LhMqttMsg(); +// myXxxMqttMsg.setContent(msgContent); +// myXxxMqttMsg.setTimestamp(System.currentTimeMillis()); +// // TODO Md5值 +// myXxxMqttMsg.setMd5(UUID.randomUUID().toString()); +// String msgJson = JSON.toJSONString(myXxxMqttMsg); + + //发布消息 + myMqttClient.publish(msgContent, topic); + } + + @Override + public void publish(String topic,byte[] msgContent) { + myMqttClient.publish(msgContent, topic); + } +}