From 1d232808e0a0902af4b56ef548bd794b59bcb488 Mon Sep 17 00:00:00 2001
From: zy <82248909@qq.com>
Date: Sat, 14 Mar 2026 18:58:48 +0800
Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=95=B0=E6=8D=AE=E7=94=9F?=
=?UTF-8?q?=E6=88=90=E9=80=BB=E8=BE=91=EF=BC=8C=E5=8F=AF=E4=BB=A5=E4=BD=86?=
=?UTF-8?q?=E8=AE=BE=E5=A4=87=E5=A4=9A=E7=BB=91=E5=AE=9A?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../java/com/lanhai/MybatisPlusGenerator.java | 4 +-
.../com/lanhai/constant/IotConstants.java | 7 +
.../SurvDeviceDeployRelayController.java | 21 ++
.../SurvDeviceDeployRelaygroupController.java | 21 ++
.../com/lanhai/entity/SurvDeviceDeploy.java | 6 +-
.../lanhai/entity/SurvDeviceDeployRelay.java | 220 ++++++++++++++++++
.../entity/SurvDeviceDeployRelaygroup.java | 118 ++++++++++
.../mapper/SurvDeviceDeployRelayMapper.java | 16 ++
.../SurvDeviceDeployRelaygroupMapper.java | 16 ++
.../xml/SurvDeviceDeployRelayMapper.xml | 5 +
.../xml/SurvDeviceDeployRelaygroupMapper.xml | 5 +
.../java/com/lanhai/mqtt/MyMqttCallback.java | 8 +-
.../java/com/lanhai/o/iot/lhiot/QueryCmd.java | 16 ++
.../lanhai/o/iot/lhiot/QueryCmdDetail.java | 49 ++++
.../lanhai/o/iot/lhiot/QueryCmdRegister.java | 11 +
.../com/lanhai/o/iot/lhiot/ResponseCmd.java | 12 +
.../lanhai/o/iot/lhiot/ResponseCmdData.java | 17 ++
.../lanhai/o/iot/lhiot/ResponseCmdDetail.java | 18 ++
.../ISurvDeviceDeployRelayService.java | 16 ++
.../ISurvDeviceDeployRelaygroupService.java | 16 ++
.../service/ISurvTransdataAirService.java | 2 +
.../service/Impl/CommonServiceImpl.java | 112 ++++++++-
.../SurvDeviceDeployRelayServiceImpl.java | 20 ++
...SurvDeviceDeployRelaygroupServiceImpl.java | 20 ++
.../Impl/SurvHisdataSoilServiceImpl.java | 5 +-
.../Impl/SurvIotVirtualDeviceServiceImpl.java | 66 +++---
.../Impl/SurvTransdataAirServiceImpl.java | 7 +
.../Impl/SurvTransdataSoilServiceImpl.java | 42 ++--
.../java/com/lanhai/task/MqttDeviceTask.java | 106 +++++++++
.../java/com/lanhai/task/MultithreadTask.java | 6 +-
src/main/java/com/lanhai/util/LhIotUtil.java | 101 ++++++++
src/main/java/com/lanhai/util/SdrkUtils.java | 15 +-
src/main/java/com/lanhai/util/XphUtils.java | 22 +-
33 files changed, 1047 insertions(+), 79 deletions(-)
create mode 100644 src/main/java/com/lanhai/controller/SurvDeviceDeployRelayController.java
create mode 100644 src/main/java/com/lanhai/controller/SurvDeviceDeployRelaygroupController.java
create mode 100644 src/main/java/com/lanhai/entity/SurvDeviceDeployRelay.java
create mode 100644 src/main/java/com/lanhai/entity/SurvDeviceDeployRelaygroup.java
create mode 100644 src/main/java/com/lanhai/mapper/SurvDeviceDeployRelayMapper.java
create mode 100644 src/main/java/com/lanhai/mapper/SurvDeviceDeployRelaygroupMapper.java
create mode 100644 src/main/java/com/lanhai/mapper/xml/SurvDeviceDeployRelayMapper.xml
create mode 100644 src/main/java/com/lanhai/mapper/xml/SurvDeviceDeployRelaygroupMapper.xml
create mode 100644 src/main/java/com/lanhai/o/iot/lhiot/QueryCmd.java
create mode 100644 src/main/java/com/lanhai/o/iot/lhiot/QueryCmdDetail.java
create mode 100644 src/main/java/com/lanhai/o/iot/lhiot/QueryCmdRegister.java
create mode 100644 src/main/java/com/lanhai/o/iot/lhiot/ResponseCmd.java
create mode 100644 src/main/java/com/lanhai/o/iot/lhiot/ResponseCmdData.java
create mode 100644 src/main/java/com/lanhai/o/iot/lhiot/ResponseCmdDetail.java
create mode 100644 src/main/java/com/lanhai/service/ISurvDeviceDeployRelayService.java
create mode 100644 src/main/java/com/lanhai/service/ISurvDeviceDeployRelaygroupService.java
create mode 100644 src/main/java/com/lanhai/service/Impl/SurvDeviceDeployRelayServiceImpl.java
create mode 100644 src/main/java/com/lanhai/service/Impl/SurvDeviceDeployRelaygroupServiceImpl.java
create mode 100644 src/main/java/com/lanhai/task/MqttDeviceTask.java
create mode 100644 src/main/java/com/lanhai/util/LhIotUtil.java
diff --git a/src/main/java/com/lanhai/MybatisPlusGenerator.java b/src/main/java/com/lanhai/MybatisPlusGenerator.java
index 0153f63..a23c331 100644
--- a/src/main/java/com/lanhai/MybatisPlusGenerator.java
+++ b/src/main/java/com/lanhai/MybatisPlusGenerator.java
@@ -14,7 +14,7 @@ import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
*/
public class MybatisPlusGenerator {
- public static final String database = "jdbc:mysql://172.27.17.3:13306/fx_nsp";
+ public static final String database = "jdbc:mysql://8.130.9.244:13306/fx_nsp";
public static final String user = "user_fx";
public static final String passwd = "user_fx";
@@ -80,7 +80,7 @@ public class MybatisPlusGenerator {
public static void main(String[] args) {
- new MybatisPlusGenerator().generator("surv_iot_manufacturer_config","surv_iot_manufacturer_info","surv_iot_protocol","surv_iot_virtual_device","surv_iot_virtual_device_group","surv_iot_virtual_device_module","surv_iot_virtual_device_net");//指标
+ new MybatisPlusGenerator().generator("surv_device_deploy_relay","surv_device_deploy_relaygroup");//指标
}
diff --git a/src/main/java/com/lanhai/constant/IotConstants.java b/src/main/java/com/lanhai/constant/IotConstants.java
index 7528ea2..8bc0b5a 100644
--- a/src/main/java/com/lanhai/constant/IotConstants.java
+++ b/src/main/java/com/lanhai/constant/IotConstants.java
@@ -214,4 +214,11 @@ public interface IotConstants {
* 规则类型 预设
*/
String RULE_PRE = "rule_preset";
+
+
+ /**
+ * 蓝海虚拟设备协议
+ */
+ String lhviot_standard = "lhviot_standard";
+
}
diff --git a/src/main/java/com/lanhai/controller/SurvDeviceDeployRelayController.java b/src/main/java/com/lanhai/controller/SurvDeviceDeployRelayController.java
new file mode 100644
index 0000000..4ac08dc
--- /dev/null
+++ b/src/main/java/com/lanhai/controller/SurvDeviceDeployRelayController.java
@@ -0,0 +1,21 @@
+package com.lanhai.controller;
+
+
+import org.springframework.web.bind.annotation.RequestMapping;
+
+import org.springframework.stereotype.Controller;
+
+/**
+ *
+ * 设备继电器表 前端控制器
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+@Controller
+@RequestMapping("/survDeviceDeployRelay")
+public class SurvDeviceDeployRelayController {
+
+}
+
diff --git a/src/main/java/com/lanhai/controller/SurvDeviceDeployRelaygroupController.java b/src/main/java/com/lanhai/controller/SurvDeviceDeployRelaygroupController.java
new file mode 100644
index 0000000..6facb34
--- /dev/null
+++ b/src/main/java/com/lanhai/controller/SurvDeviceDeployRelaygroupController.java
@@ -0,0 +1,21 @@
+package com.lanhai.controller;
+
+
+import org.springframework.web.bind.annotation.RequestMapping;
+
+import org.springframework.stereotype.Controller;
+
+/**
+ *
+ * 继电器分组 前端控制器
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+@Controller
+@RequestMapping("/survDeviceDeployRelaygroup")
+public class SurvDeviceDeployRelaygroupController {
+
+}
+
diff --git a/src/main/java/com/lanhai/entity/SurvDeviceDeploy.java b/src/main/java/com/lanhai/entity/SurvDeviceDeploy.java
index e8cd88e..1e24dcb 100644
--- a/src/main/java/com/lanhai/entity/SurvDeviceDeploy.java
+++ b/src/main/java/com/lanhai/entity/SurvDeviceDeploy.java
@@ -85,6 +85,10 @@ public class SurvDeviceDeploy extends Model {
@TableField("DEVICE_IOT_URL")
private String deviceIotUrl;
+
+ @TableField("DEVICE_REVERSE_IOT_URL")
+ private String deviceReverseIotUrl;
+
/**
* 设备排序
*/
@@ -217,6 +221,6 @@ public class SurvDeviceDeploy extends Model {
* 同设备编号的有多少id
*/
@TableField(exist = false)
- private List deviceList;
+ private List deviceList;
}
diff --git a/src/main/java/com/lanhai/entity/SurvDeviceDeployRelay.java b/src/main/java/com/lanhai/entity/SurvDeviceDeployRelay.java
new file mode 100644
index 0000000..882d52a
--- /dev/null
+++ b/src/main/java/com/lanhai/entity/SurvDeviceDeployRelay.java
@@ -0,0 +1,220 @@
+package com.lanhai.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.extension.activerecord.Model;
+import java.util.Date;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableField;
+import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.Accessors;
+
+/**
+ *
+ * 设备继电器表
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Accessors(chain = true)
+public class SurvDeviceDeployRelay extends Model {
+
+ private static final long serialVersionUID=1L;
+
+ /**
+ * 主键
+ */
+ @TableId(value = "ID", type = IdType.ID_WORKER_STR)
+ private String id;
+
+ /**
+ * 设备ID
+ */
+ @TableField("DEPLOY_ID")
+ private String deployId;
+
+ /**
+ * 类别,1=普通
+ */
+ @TableField("RELAY_CATE")
+ private Integer relayCate;
+
+ /**
+ * 1=风机,2=水泵,3=增氧机,4=湿帘,5=遮阳,6=开窗,7=保温,8=投食机
+ */
+ @TableField("RELAY_TYPE")
+ private Integer relayType;
+
+ /**
+ * 继电器key/编号
+ */
+ @TableField("RELAY_KEY")
+ private String relayKey;
+
+ /**
+ * 继电器名称
+ */
+ @TableField("RELAY_NAME")
+ private String relayName;
+
+ /**
+ * 继电器备注
+ */
+ @TableField("RELAY_NOTES")
+ private String relayNotes;
+
+ /**
+ * 继电器标记
+ */
+ @TableField("RELAY_MARK")
+ private String relayMark;
+
+ /**
+ * 寄存器类型
+ */
+ @TableField("REGISTER_TYPE")
+ private Integer registerType;
+
+ /**
+ * 寄存器序号
+ */
+ @TableField("REGISTER_NUM")
+ private Integer registerNum;
+
+ /**
+ * 开指令
+ */
+ @TableField("REGISTER_CMD_ON")
+ private String registerCmdOn;
+
+ /**
+ * 关指令
+ */
+ @TableField("REGISTER_CMD_OFF")
+ private String registerCmdOff;
+
+ /**
+ * 停指令
+ */
+ @TableField("REGISTER_CMD_STOP")
+ private String registerCmdStop;
+
+ /**
+ * 回执状态开
+ */
+ @TableField("REGISTER_ON")
+ private String registerOn;
+
+ /**
+ * 回执状态关
+ */
+ @TableField("REGISTER_OFF")
+ private String registerOff;
+
+ /**
+ * 回执状态停
+ */
+ @TableField("REGISTER_STOP")
+ private String registerStop;
+
+ /**
+ * 实体对应字段
+ */
+ @TableField("ENTITY_FIELD")
+ private String entityField;
+
+ /**
+ * 是否启用
+ */
+ @TableField("IS_ENABLE")
+ private Integer isEnable;
+
+ /**
+ * 是否可远程可控
+ */
+ @TableField("IS_ABLE_REMOTE")
+ private Integer isAbleRemote;
+
+ /**
+ * 分组ID
+ */
+ @TableField("GROUP_ID")
+ private String groupId;
+
+ /**
+ * 分组CODE
+ */
+ @TableField("GROUP_CODE")
+ private String groupCode;
+
+ /**
+ * 分组名称
+ */
+ @TableField("GROUP_NAME")
+ private String groupName;
+
+ /**
+ * 排序号
+ */
+ @TableField("SORT_NO")
+ private Integer sortNo;
+
+ /**
+ * 租户号
+ */
+ @TableField("TENANT_ID")
+ private String tenantId;
+
+ /**
+ * 乐观锁
+ */
+ @TableField("RE_VISION")
+ private Integer reVision;
+
+ /**
+ * 创建人
+ */
+ @TableField("CREATE_BY")
+ private String createBy;
+
+ /**
+ * 创建时间
+ */
+ @TableField("CREATE_TIME")
+ private Date createTime;
+
+ /**
+ * 更新人
+ */
+ @TableField("UPDATE_BY")
+ private String updateBy;
+
+ /**
+ * 逻辑删除
+ */
+ @TableField("IS_DEL")
+ private Integer isDel;
+
+ /**
+ * 更新时间
+ */
+ @TableField("UPDATE_TIME")
+ private Date updateTime;
+
+ /**
+ * 乐观锁
+ */
+ @TableField("REVISION")
+ private Integer revision;
+
+
+ @Override
+ protected Serializable pkVal() {
+ return this.id;
+ }
+
+}
diff --git a/src/main/java/com/lanhai/entity/SurvDeviceDeployRelaygroup.java b/src/main/java/com/lanhai/entity/SurvDeviceDeployRelaygroup.java
new file mode 100644
index 0000000..09faf12
--- /dev/null
+++ b/src/main/java/com/lanhai/entity/SurvDeviceDeployRelaygroup.java
@@ -0,0 +1,118 @@
+package com.lanhai.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.extension.activerecord.Model;
+import java.util.Date;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableField;
+import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.Accessors;
+
+/**
+ *
+ * 继电器分组
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Accessors(chain = true)
+public class SurvDeviceDeployRelaygroup extends Model {
+
+ private static final long serialVersionUID=1L;
+
+ /**
+ * 主键
+ */
+ @TableId(value = "ID", type = IdType.ID_WORKER_STR)
+ private String id;
+
+ /**
+ * 设备id
+ */
+ @TableField("DEPLOY_ID")
+ private String deployId;
+
+ /**
+ * 分组类型;button=按钮,switch=两开关,shifter=三开关
+ */
+ @TableField("GROUP_TYPE")
+ private String groupType;
+
+ /**
+ * 分组名称
+ */
+ @TableField("GROUP_NAME")
+ private String groupName;
+
+ /**
+ * 分组备注
+ */
+ @TableField("GROUP_NOTES")
+ private String groupNotes;
+
+ /**
+ * 序号
+ */
+ @TableField("SORT_NO")
+ private Integer sortNo;
+
+ /**
+ * 是否启用
+ */
+ @TableField("IS_ENABLE")
+ private Integer isEnable;
+
+ /**
+ * 租户号
+ */
+ @TableField("TENANT_ID")
+ private String tenantId;
+
+ /**
+ * 乐观锁
+ */
+ @TableField("RE_VISION")
+ private Integer reVision;
+
+ /**
+ * 创建人
+ */
+ @TableField("CREATE_BY")
+ private String createBy;
+
+ /**
+ * 创建时间
+ */
+ @TableField("CREATE_TIME")
+ private Date createTime;
+
+ /**
+ * 更新人
+ */
+ @TableField("UPDATE_BY")
+ private String updateBy;
+
+ /**
+ * 逻辑删除
+ */
+ @TableField("IS_DEL")
+ private Integer isDel;
+
+ /**
+ * 更新时间
+ */
+ @TableField("UPDATE_TIME")
+ private Date updateTime;
+
+
+ @Override
+ protected Serializable pkVal() {
+ return this.id;
+ }
+
+}
diff --git a/src/main/java/com/lanhai/mapper/SurvDeviceDeployRelayMapper.java b/src/main/java/com/lanhai/mapper/SurvDeviceDeployRelayMapper.java
new file mode 100644
index 0000000..63d5c60
--- /dev/null
+++ b/src/main/java/com/lanhai/mapper/SurvDeviceDeployRelayMapper.java
@@ -0,0 +1,16 @@
+package com.lanhai.mapper;
+
+import com.lanhai.entity.SurvDeviceDeployRelay;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+ *
+ * 设备继电器表 Mapper 接口
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+public interface SurvDeviceDeployRelayMapper extends BaseMapper {
+
+}
diff --git a/src/main/java/com/lanhai/mapper/SurvDeviceDeployRelaygroupMapper.java b/src/main/java/com/lanhai/mapper/SurvDeviceDeployRelaygroupMapper.java
new file mode 100644
index 0000000..214c4b1
--- /dev/null
+++ b/src/main/java/com/lanhai/mapper/SurvDeviceDeployRelaygroupMapper.java
@@ -0,0 +1,16 @@
+package com.lanhai.mapper;
+
+import com.lanhai.entity.SurvDeviceDeployRelaygroup;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+ *
+ * 继电器分组 Mapper 接口
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+public interface SurvDeviceDeployRelaygroupMapper extends BaseMapper {
+
+}
diff --git a/src/main/java/com/lanhai/mapper/xml/SurvDeviceDeployRelayMapper.xml b/src/main/java/com/lanhai/mapper/xml/SurvDeviceDeployRelayMapper.xml
new file mode 100644
index 0000000..4d7c1d0
--- /dev/null
+++ b/src/main/java/com/lanhai/mapper/xml/SurvDeviceDeployRelayMapper.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/src/main/java/com/lanhai/mapper/xml/SurvDeviceDeployRelaygroupMapper.xml b/src/main/java/com/lanhai/mapper/xml/SurvDeviceDeployRelaygroupMapper.xml
new file mode 100644
index 0000000..3e4cbfd
--- /dev/null
+++ b/src/main/java/com/lanhai/mapper/xml/SurvDeviceDeployRelaygroupMapper.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/src/main/java/com/lanhai/mqtt/MyMqttCallback.java b/src/main/java/com/lanhai/mqtt/MyMqttCallback.java
index df7909b..535d2c1 100644
--- a/src/main/java/com/lanhai/mqtt/MyMqttCallback.java
+++ b/src/main/java/com/lanhai/mqtt/MyMqttCallback.java
@@ -6,7 +6,10 @@ import com.lanhai.entity.SurvAlertRecord;
import com.lanhai.enums.IotInerfaceTopicType;
import com.lanhai.mapper.SurvAlertRecordMapper;
import com.lanhai.mapper.SurvIotVirtualDeviceMapper;
+import com.lanhai.o.iot.lhiot.ResponseCmd;
+import com.lanhai.service.IScEquZhibiaoService;
import com.lanhai.service.ISurvIotVirtualDeviceService;
+import com.lanhai.service.Impl.CommonServiceImpl;
import com.lanhai.service.Impl.SurvIotVirtualDeviceServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -30,8 +33,9 @@ public class MyMqttCallback implements MqttCallbackExtended {
//手动注入
// private MqttConfig mqttConfig = SpringContextUtil.getBean(MqttConfig.class);
- private SurvIotVirtualDeviceServiceImpl deviceService = SpringContextUtil.getBean("survIotVirtualDeviceServiceImpl", SurvIotVirtualDeviceServiceImpl.class);;
+ private SurvIotVirtualDeviceServiceImpl deviceService = SpringContextUtil.getBean("survIotVirtualDeviceServiceImpl", SurvIotVirtualDeviceServiceImpl.class);
+ private CommonServiceImpl commonService = SpringContextUtil.getBean(CommonServiceImpl.class);
// private static RedisTemplate redisTemplate = SpringContextUtil.getBean("redisTemplate", RedisTemplate.class);
@@ -114,7 +118,7 @@ public class MyMqttCallback implements MqttCallbackExtended {
log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, o1);
try {
if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) {
- //todo 处理消息逻辑,解析数据最终入库
+ commonService.processMqttData(topic,o1);
// deviceService.processPayLoad(topic,mqttMessage.getPayload());
} else if (topic.startsWith(IotInerfaceTopicType.LH_IOT_TOPIC_DOWN.getCode())) {//蓝海虚拟设备下行逻辑
//暂时无业务
diff --git a/src/main/java/com/lanhai/o/iot/lhiot/QueryCmd.java b/src/main/java/com/lanhai/o/iot/lhiot/QueryCmd.java
new file mode 100644
index 0000000..929462f
--- /dev/null
+++ b/src/main/java/com/lanhai/o/iot/lhiot/QueryCmd.java
@@ -0,0 +1,16 @@
+package com.lanhai.o.iot.lhiot;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Data
+public class QueryCmd {
+ @ApiModelProperty("指令")
+ private QueryCmdDetail rw_prot;
+}
diff --git a/src/main/java/com/lanhai/o/iot/lhiot/QueryCmdDetail.java b/src/main/java/com/lanhai/o/iot/lhiot/QueryCmdDetail.java
new file mode 100644
index 0000000..0e3ed21
--- /dev/null
+++ b/src/main/java/com/lanhai/o/iot/lhiot/QueryCmdDetail.java
@@ -0,0 +1,49 @@
+package com.lanhai.o.iot.lhiot;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Data
+public class QueryCmdDetail {
+
+
+ private static final AtomicInteger counter = new AtomicInteger(0);
+ private static final DateTimeFormatter FORMATTER =
+ DateTimeFormatter.ofPattern("yyyyMMddHHmmss")
+ .withZone(ZoneId.systemDefault());
+
+
+ @ApiModelProperty("指令类型")
+ private String dir = "down";
+
+ @ApiModelProperty("指令id")
+ private String id = generate();
+
+ @ApiModelProperty("寄存器编号集合")
+ private List r_data;
+
+
+
+ /**
+ * 线程安全的高并发版本
+ */
+ public static String generate() {
+ // 1. 生成时间戳部分
+ String timestamp = FORMATTER.format(Instant.now());
+
+ // 2. 生成4位随机数(使用AtomicInteger保证线程安全)
+ int random = counter.incrementAndGet() % 10000;
+ if (random < 0) {
+ random = -random;
+ }
+ String randomStr = String.format("%04d", random);
+
+ return timestamp + randomStr;
+ }
+}
diff --git a/src/main/java/com/lanhai/o/iot/lhiot/QueryCmdRegister.java b/src/main/java/com/lanhai/o/iot/lhiot/QueryCmdRegister.java
new file mode 100644
index 0000000..e6af5bd
--- /dev/null
+++ b/src/main/java/com/lanhai/o/iot/lhiot/QueryCmdRegister.java
@@ -0,0 +1,11 @@
+package com.lanhai.o.iot.lhiot;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+public class QueryCmdRegister {
+
+ @ApiModelProperty("寄存器编号")
+ private String name;
+}
diff --git a/src/main/java/com/lanhai/o/iot/lhiot/ResponseCmd.java b/src/main/java/com/lanhai/o/iot/lhiot/ResponseCmd.java
new file mode 100644
index 0000000..154a329
--- /dev/null
+++ b/src/main/java/com/lanhai/o/iot/lhiot/ResponseCmd.java
@@ -0,0 +1,12 @@
+package com.lanhai.o.iot.lhiot;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class ResponseCmd {
+ @ApiModelProperty("指令")
+ private ResponseCmdDetail rw_prot;
+}
diff --git a/src/main/java/com/lanhai/o/iot/lhiot/ResponseCmdData.java b/src/main/java/com/lanhai/o/iot/lhiot/ResponseCmdData.java
new file mode 100644
index 0000000..d22e60b
--- /dev/null
+++ b/src/main/java/com/lanhai/o/iot/lhiot/ResponseCmdData.java
@@ -0,0 +1,17 @@
+package com.lanhai.o.iot.lhiot;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+public class ResponseCmdData {
+
+ @ApiModelProperty("寄存器编号")
+ private String name;
+
+ @ApiModelProperty("寄存器值")
+ private String value;
+
+ @ApiModelProperty("是否报错")
+ private String err;
+}
diff --git a/src/main/java/com/lanhai/o/iot/lhiot/ResponseCmdDetail.java b/src/main/java/com/lanhai/o/iot/lhiot/ResponseCmdDetail.java
new file mode 100644
index 0000000..ec3af3d
--- /dev/null
+++ b/src/main/java/com/lanhai/o/iot/lhiot/ResponseCmdDetail.java
@@ -0,0 +1,18 @@
+package com.lanhai.o.iot.lhiot;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class ResponseCmdDetail {
+ @ApiModelProperty("指令类型")
+ private String dir;
+
+ @ApiModelProperty("指令id")
+ private String id;
+
+ @ApiModelProperty("数据")
+ private List r_data;
+}
diff --git a/src/main/java/com/lanhai/service/ISurvDeviceDeployRelayService.java b/src/main/java/com/lanhai/service/ISurvDeviceDeployRelayService.java
new file mode 100644
index 0000000..9ed27eb
--- /dev/null
+++ b/src/main/java/com/lanhai/service/ISurvDeviceDeployRelayService.java
@@ -0,0 +1,16 @@
+package com.lanhai.service;
+
+import com.lanhai.entity.SurvDeviceDeployRelay;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+ *
+ * 设备继电器表 服务类
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+public interface ISurvDeviceDeployRelayService extends IService {
+
+}
diff --git a/src/main/java/com/lanhai/service/ISurvDeviceDeployRelaygroupService.java b/src/main/java/com/lanhai/service/ISurvDeviceDeployRelaygroupService.java
new file mode 100644
index 0000000..e3272b9
--- /dev/null
+++ b/src/main/java/com/lanhai/service/ISurvDeviceDeployRelaygroupService.java
@@ -0,0 +1,16 @@
+package com.lanhai.service;
+
+import com.lanhai.entity.SurvDeviceDeployRelaygroup;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+ *
+ * 继电器分组 服务类
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+public interface ISurvDeviceDeployRelaygroupService extends IService {
+
+}
diff --git a/src/main/java/com/lanhai/service/ISurvTransdataAirService.java b/src/main/java/com/lanhai/service/ISurvTransdataAirService.java
index dcd9e9d..b03b477 100644
--- a/src/main/java/com/lanhai/service/ISurvTransdataAirService.java
+++ b/src/main/java/com/lanhai/service/ISurvTransdataAirService.java
@@ -14,5 +14,7 @@ public interface ISurvTransdataAirService extends IService {
SurvTransdataAir getOneByDeviceCode(String deviceCode,String filterId);
+ SurvTransdataAir getOneByDeviceId(String deployId);
+
void saveBaowen(Map baowenMap);
}
diff --git a/src/main/java/com/lanhai/service/Impl/CommonServiceImpl.java b/src/main/java/com/lanhai/service/Impl/CommonServiceImpl.java
index 9658900..e7e3ad2 100644
--- a/src/main/java/com/lanhai/service/Impl/CommonServiceImpl.java
+++ b/src/main/java/com/lanhai/service/Impl/CommonServiceImpl.java
@@ -1,31 +1,60 @@
package com.lanhai.service.Impl;
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.util.IdUtil;
+import com.alibaba.fastjson.JSONObject;
import com.lanhai.constant.IotConstants;
-import com.lanhai.entity.SurvConfig;
+import com.lanhai.constant.PollutionConstants;
+import com.lanhai.entity.*;
+import com.lanhai.o.iot.lhiot.ResponseCmd;
+import com.lanhai.o.iot.pbs.WaterCommonTransVo;
import com.lanhai.service.*;
+import com.lanhai.util.LhIotUtil;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
+import java.util.Date;
+import java.util.List;
+
/**
* 处理业务的服务
*/
@Service
+@Slf4j
public class CommonServiceImpl implements ICommonService {
@Autowired
@Lazy
private SurvConfigServiceImpl configService;
+
+ @Autowired
+ @Lazy
+ private SurvDeviceDeployServiceImpl deviceDeployService;
+
+ @Autowired
+ @Lazy
+ private ScEquZhibiaoServiceImpl zhibiaoService;
+
+ @Autowired
+ @Lazy
+ private SurvStationInfoServiceImpl survStationInfoService;
//
// @Autowired
// private ISurvTransdataAirService transdataAirService;
-// @Autowired
-// private ISurvTransdataLivestockwaterService transdataLivestockwaterService;
-// @Autowired
-// private ISurvTransdataOrientwaterService transdataOrientwaterService;
+ @Autowired
+ private ISurvTransdataLivestockwaterService transdataLivestockwaterService;
+ @Autowired
+ private ISurvHisdataLivestockwaterService hisdataLivestockwaterService;
+ @Autowired
+ private ISurvTransdataOrientwaterService transdataOrientwaterService;
+ @Autowired
+ private ISurvHisdataOrientwaterService hisdataOrientwaterService;
+
// @Autowired
// private ISurvTransdataPestlightService transdataPestlightService;
// @Autowired
@@ -74,6 +103,79 @@ public class CommonServiceImpl implements ICommonService {
}
+ public void processMqttData(String topic,String mqttMessage) {
+ //1. 检查主题所属,确定如何执行后续逻辑
+ List deployList = deviceDeployService.lambdaQuery()
+ .eq(SurvDeviceDeploy::getDeviceReverseIotUrl,topic)
+ .list();
+ ;
+
+ if(!deployList.isEmpty()){
+ SurvDeviceDeploy survDeviceDeploy = deployList.get(0);
+ List zhibiaoList = zhibiaoService.getListByEquid(survDeviceDeploy.getId());
+ if(PollutionConstants.WATER_ORIENT.equals(survDeviceDeploy.getDeployType()) || PollutionConstants.WATER_LIVE.equals(survDeviceDeploy.getDeployType())){//面源数据
+ WaterCommonTransVo waterCommonTransVo = null;
+ switch (survDeviceDeploy.getProtocolCode()){
+ case IotConstants.lhviot_standard:
+ waterCommonTransVo = LhIotUtil.transData(zhibiaoList,mqttMessage);
+ break;
+ }
+ if(waterCommonTransVo!=null){
+ //遍历保存所有租户数据
+ for (SurvDeviceDeploy deploy : deployList) {
+ SurvStationInfo survStationInfo = survStationInfoService.getByCode(deploy.getStationCode());
+ String stationName = survStationInfo!=null?survStationInfo.getStationName():"站点";
+ if(PollutionConstants.WATER_ORIENT.equals(deploy.getDeployType())){
+ SurvTransdataOrientwater orientwater = new SurvTransdataOrientwater();
+ BeanUtil.copyProperties(waterCommonTransVo,orientwater);
+ orientwater.setTenantId(deploy.getTenantId());
+ orientwater.setDeployCode(deploy.getDeployCode());
+ orientwater.setStationCode(deploy.getStationCode());
+ orientwater.setStationName(stationName);
+ orientwater.setDeviceName(deploy.getDeployDes());
+ orientwater.setDeviceId(deploy.getId());
+ orientwater.setCreatedBy("task");//创建人
+ orientwater.setCreateTime(new Date());//创建时间
+
+ transdataOrientwaterService.save(orientwater);
+
+ //保存历史表
+ SurvHisdataOrientwater hisOrientWater= new SurvHisdataOrientwater();
+ hisOrientWater.setId(IdUtil.getSnowflakeNextIdStr());
+ hisOrientWater.setTransDate(new Date());
+ hisdataOrientwaterService.save(hisOrientWater);
+
+ } else if (PollutionConstants.WATER_LIVE.equals(deploy.getDeployType())) {
+ SurvTransdataLivestockwater livestockwater = new SurvTransdataLivestockwater();
+ BeanUtil.copyProperties(waterCommonTransVo,livestockwater);
+ livestockwater.setTenantId(deploy.getTenantId());
+ livestockwater.setDeployCode(deploy.getDeployCode());
+ livestockwater.setStationCode(deploy.getStationCode());
+ livestockwater.setStationName(stationName);
+ livestockwater.setDeviceName(deploy.getDeployDes());
+ livestockwater.setDeviceId(deploy.getId());
+ livestockwater.setCreatedBy("task");//创建人
+ livestockwater.setCreateTime(new Date());//创建时间
+
+ transdataLivestockwaterService.save(livestockwater);
+
+ //保存历史表
+ SurvHisdataLivestockwater hisdataLivestockwater = new SurvHisdataLivestockwater();
+ hisdataLivestockwater.setId(IdUtil.getSnowflakeNextIdStr());
+ hisdataLivestockwater.setTransDate(new Date());
+ hisdataLivestockwaterService.save(hisdataLivestockwater);
+ }
+ }
+ }else{
+ log.error("主题:{}--消息:{},解析失败,任务中断。",topic,mqttMessage);
+ }
+ }
+
+ }
+
+
+ //2. 检查有
+ }
}
diff --git a/src/main/java/com/lanhai/service/Impl/SurvDeviceDeployRelayServiceImpl.java b/src/main/java/com/lanhai/service/Impl/SurvDeviceDeployRelayServiceImpl.java
new file mode 100644
index 0000000..74b9255
--- /dev/null
+++ b/src/main/java/com/lanhai/service/Impl/SurvDeviceDeployRelayServiceImpl.java
@@ -0,0 +1,20 @@
+package com.lanhai.service.Impl;
+
+import com.lanhai.entity.SurvDeviceDeployRelay;
+import com.lanhai.mapper.SurvDeviceDeployRelayMapper;
+import com.lanhai.service.ISurvDeviceDeployRelayService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+/**
+ *
+ * 设备继电器表 服务实现类
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+@Service
+public class SurvDeviceDeployRelayServiceImpl extends ServiceImpl implements ISurvDeviceDeployRelayService {
+
+}
diff --git a/src/main/java/com/lanhai/service/Impl/SurvDeviceDeployRelaygroupServiceImpl.java b/src/main/java/com/lanhai/service/Impl/SurvDeviceDeployRelaygroupServiceImpl.java
new file mode 100644
index 0000000..4bb8679
--- /dev/null
+++ b/src/main/java/com/lanhai/service/Impl/SurvDeviceDeployRelaygroupServiceImpl.java
@@ -0,0 +1,20 @@
+package com.lanhai.service.Impl;
+
+import com.lanhai.entity.SurvDeviceDeployRelaygroup;
+import com.lanhai.mapper.SurvDeviceDeployRelaygroupMapper;
+import com.lanhai.service.ISurvDeviceDeployRelaygroupService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+/**
+ *
+ * 继电器分组 服务实现类
+ *
+ *
+ * @author ${author}
+ * @since 2026-03-14
+ */
+@Service
+public class SurvDeviceDeployRelaygroupServiceImpl extends ServiceImpl implements ISurvDeviceDeployRelaygroupService {
+
+}
diff --git a/src/main/java/com/lanhai/service/Impl/SurvHisdataSoilServiceImpl.java b/src/main/java/com/lanhai/service/Impl/SurvHisdataSoilServiceImpl.java
index 2e1f144..fc7f51f 100644
--- a/src/main/java/com/lanhai/service/Impl/SurvHisdataSoilServiceImpl.java
+++ b/src/main/java/com/lanhai/service/Impl/SurvHisdataSoilServiceImpl.java
@@ -50,7 +50,7 @@ public class SurvHisdataSoilServiceImpl extends ServiceImpl devices = lambdaQuery().nested(wrapper->
- wrapper.isNotNull(SurvIotVirtualDevice::getUpperUrl)
- .or()
- .isNotNull(SurvIotVirtualDevice::getLowerUrl)
- ).list();
- if(!devices.isEmpty()){
- for (SurvIotVirtualDevice device : devices) {
- if(StringUtils.isNotBlank(device.getUpperUrl())){
- topics.add(device.getUpperUrl());
- }
- if(StringUtils.isNotBlank(device.getLowerUrl())){
- topics.add(device.getLowerUrl());
- }
- }
- }
- //物联网模块中的主题
- List nets = netService.lambdaQuery()
- .nested(wrapper->
- wrapper.isNotNull(SurvIotVirtualDeviceNet::getNetUpTopic)
- .or()
- .isNotNull(SurvIotVirtualDeviceNet::getNetDownTopic)
- ).list();
- ;
- if(!nets.isEmpty()){
- for (SurvIotVirtualDeviceNet net : nets) {
- if(StringUtils.isNotBlank(net.getNetUpTopic())){
- topics.add(net.getNetUpTopic());
- }
- if(StringUtils.isNotBlank(net.getNetDownTopic())){
- topics.add(net.getNetDownTopic());
- }
- }
- }
+// List devices = lambdaQuery().nested(wrapper->
+// wrapper.isNotNull(SurvIotVirtualDevice::getUpperUrl)
+// .or()
+// .isNotNull(SurvIotVirtualDevice::getLowerUrl)
+// ).list();
+// if(!devices.isEmpty()){
+// for (SurvIotVirtualDevice device : devices) {
+// if(StringUtils.isNotBlank(device.getUpperUrl())){
+// topics.add(device.getUpperUrl());
+// }
+// if(StringUtils.isNotBlank(device.getLowerUrl())){
+// topics.add(device.getLowerUrl());
+// }
+// }
+// }
+// //物联网模块中的主题
+// List nets = netService.lambdaQuery()
+// .nested(wrapper->
+// wrapper.isNotNull(SurvIotVirtualDeviceNet::getNetUpTopic)
+// .or()
+// .isNotNull(SurvIotVirtualDeviceNet::getNetDownTopic)
+// ).list();
+// ;
+// if(!nets.isEmpty()){
+// for (SurvIotVirtualDeviceNet net : nets) {
+// if(StringUtils.isNotBlank(net.getNetUpTopic())){
+// topics.add(net.getNetUpTopic());
+// }
+// if(StringUtils.isNotBlank(net.getNetDownTopic())){
+// topics.add(net.getNetDownTopic());
+// }
+// }
+// }
return topics;
}
diff --git a/src/main/java/com/lanhai/service/Impl/SurvTransdataAirServiceImpl.java b/src/main/java/com/lanhai/service/Impl/SurvTransdataAirServiceImpl.java
index 30293aa..29d0331 100644
--- a/src/main/java/com/lanhai/service/Impl/SurvTransdataAirServiceImpl.java
+++ b/src/main/java/com/lanhai/service/Impl/SurvTransdataAirServiceImpl.java
@@ -44,6 +44,13 @@ public class SurvTransdataAirServiceImpl extends ServiceImpl queryWrapper = new QueryWrapper();
+ queryWrapper.eq("DEPLOY_ID",deployId).last("limit 1");
+ return getOne(queryWrapper);
+ }
+
@Transactional
@Override
public void saveBaowen(Map baowenMap) {
diff --git a/src/main/java/com/lanhai/service/Impl/SurvTransdataSoilServiceImpl.java b/src/main/java/com/lanhai/service/Impl/SurvTransdataSoilServiceImpl.java
index f66d0ab..97ebaa0 100644
--- a/src/main/java/com/lanhai/service/Impl/SurvTransdataSoilServiceImpl.java
+++ b/src/main/java/com/lanhai/service/Impl/SurvTransdataSoilServiceImpl.java
@@ -50,18 +50,24 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl queryWrapper = new QueryWrapper();
+ queryWrapper.eq("DEPLOY_ID",deployId).last("limit 1");
+ return getOne(queryWrapper);
+ }
+
/**
*辛普惠 土壤
*/
@Transactional
@Override
- public void saveBaowen(Map baowenMap, SurvDeviceDeploy deploy,Map deviceDeployMap) {
+ public void saveBaowen(Map baowenMap, SurvDeviceDeploy survDeviceDeploy,Map deviceDeployMap) {
String mn = baowenMap.get("mn");
String DataTime = baowenMap.get("DataTime");
// SurvDeviceDeploy deploy = deviceDeployService.getOneByCode(mn);
//保存或者更新实时表
- SurvTransdataSoil soil = getOneByDeviceCode(mn,null);
+ SurvTransdataSoil soil = getOneByDeviceId(survDeviceDeploy.getId());
Date kDateTime = null;
Date nowTime = new Date();
if(soil == null){
@@ -88,8 +94,8 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl()
- .eq(SurvDeviceDeploy::getDeployCode, deploy.getDeployCode())
+ .eq(SurvDeviceDeploy::getId, survDeviceDeploy.getId())
.set(SurvDeviceDeploy::getLastsyncTime, nowTime));
//保存土壤历史表
@@ -146,7 +152,7 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl()
- .eq(SurvDeviceDeploy::getDeployCode, deploy.getDeployCode())//只能根据设备号更新,因为存在单个设备同时对应土壤空气的情况
+ .eq(SurvDeviceDeploy::getId, survDeviceDeploy.getId())//只能根据设备号更新,因为存在单个设备同时对应土壤空气的情况
.set(SurvDeviceDeploy::getLastsyncTime, nowTime));
//保存历史表
@@ -220,9 +226,9 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl page = new Page(1,curPageSize);
+ LambdaQueryWrapper wrapper = Wrappers.lambdaQuery();
+ //监测类设备,并且是mqtt协议
+ wrapper.eq(SurvDeviceDeploy::getRunStatus,0)
+ .eq(SurvDeviceDeploy::getDeployCate,"surv")
+ .eq(SurvDeviceDeploy::getProtocolType, "mqtt");
+
+ IPage devicelist = deviceDeployService.page(page,wrapper);
+
+ log.warn("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages());
+ XxlJobHelper.log("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages());
+
+
+ if (!devicelist.getRecords().isEmpty()) {
+ List dlist = new ArrayList<>();
+ List updList = new ArrayList<>();
+ for (int i = 1; i <= devicelist.getPages(); i++) {
+ log.warn("=====================x需采集的MQTT设备数量:page:{},共:{}页==========================",i,devicelist.getPages());
+ XxlJobHelper.log("=====================x需采集的MQTT设备数量:page:{},共:{}页==========================",i,devicelist.getPages());
+ if(i>1){//翻页
+ page = new Page(i,curPageSize);
+ devicelist = deviceDeployService.page(page,wrapper);
+ }
+ for (SurvDeviceDeploy deploy : devicelist.getRecords()) {
+ DeviceStatusVo dvo = new DeviceStatusVo();
+ if (StringUtils.isNotBlank(deploy.getProtocolCode())) {
+ try {
+ switch (deploy.getDeployType()) {
+ case PollutionConstants.CONTROL_CAB:
+ if (IotConstants.lhviot_standard.equals(deploy.getProtocolCode())) {//蓝海设备协议
+ LhIotUtil.DeviceQuery(deploy);
+ }
+ break;
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ XxlJobHelper.log("设备:{},执行Mqtt采集失败,跳过",deploy.getId());
+ continue;
+ }
+ dlist.add(dvo);
+ }
+ }
+ }
+ }else{
+ log.warn("=====未查询到需要采集的mqtt设备");
+ }
+ log.warn("======================MQTT设备采集任务结束=========================");
+ } catch (Exception e) {
+ e.printStackTrace();
+ XxlJobHelper.log(e);
+ XxlJobHelper.handleFail();
+ } finally {
+ XxlJobHelper.log("MQTT设备采集任务结束");
+ }
+ }
+
+
+
+
+}
diff --git a/src/main/java/com/lanhai/task/MultithreadTask.java b/src/main/java/com/lanhai/task/MultithreadTask.java
index 38cb1e4..f206cda 100644
--- a/src/main/java/com/lanhai/task/MultithreadTask.java
+++ b/src/main/java/com/lanhai/task/MultithreadTask.java
@@ -78,17 +78,17 @@ public class MultithreadTask {
if (!deploys.isEmpty()) {
//获取各农场的设备,过滤重复
Map deviceMap = new HashMap<>();
- Map> allDeviceMap = new HashMap<>();
+ Map> allDeviceMap = new HashMap<>();
Map deviceDeployMap = new HashMap<>();
for (SurvDeviceDeploy singleDevice : deploys) {
SurvDeviceDeploy checkDeploy = deviceMap.get(singleDevice.getDeployCode());
Boolean checkResult = Boolean.TRUE;
- List codeDeviceLsit =allDeviceMap.get(singleDevice.getDeployCode());
+ List codeDeviceLsit =allDeviceMap.get(singleDevice.getDeployCode());
deviceDeployMap.put(singleDevice.getDeployCode() + "_" + singleDevice.getDeployType(),singleDevice);
if(codeDeviceLsit==null){
codeDeviceLsit = new ArrayList<>();
}
- codeDeviceLsit.add(singleDevice.getId());
+ codeDeviceLsit.add(singleDevice);
allDeviceMap.put(singleDevice.getDeployCode(),codeDeviceLsit);
if (checkDeploy != null) {
log.error("发现重复设备,编号:{},已有类型:{},现类型:{}", singleDevice.getDeployCode(), checkDeploy.getDeployType(), singleDevice.getDeployType());
diff --git a/src/main/java/com/lanhai/util/LhIotUtil.java b/src/main/java/com/lanhai/util/LhIotUtil.java
new file mode 100644
index 0000000..ae74272
--- /dev/null
+++ b/src/main/java/com/lanhai/util/LhIotUtil.java
@@ -0,0 +1,101 @@
+package com.lanhai.util;
+
+import com.alibaba.fastjson.JSONObject;
+import com.lanhai.entity.ScEquZhibiao;
+import com.lanhai.entity.SurvDeviceDeploy;
+import com.lanhai.entity.SurvDeviceDeployRelay;
+import com.lanhai.mqtt.MqttService;
+import com.lanhai.o.iot.lhiot.*;
+import com.lanhai.o.iot.pbs.WaterCommonTransVo;
+import com.lanhai.service.IScEquZhibiaoService;
+import com.lanhai.service.ISurvDeviceDeployRelayService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class LhIotUtil {
+ static IScEquZhibiaoService scEquZhibiaoService = SpringContextUtil.getBean(IScEquZhibiaoService.class);
+ static MqttService mqttService = SpringContextUtil.getBean(MqttService.class);
+ /**
+ * 设备发送查询指令
+ */
+
+ public static boolean DeviceQuery(SurvDeviceDeploy survDeviceDeploy){
+ if(survDeviceDeploy!=null){
+ //构造指令
+ QueryCmd queryCmd = ConstructCmd(survDeviceDeploy);
+ //发送指令,传入下行主题 iotUrl
+ String cmdStr = JSONObject.toJSONString(queryCmd);
+ mqttService.publish(survDeviceDeploy.getDeviceIotUrl(),cmdStr);
+ }
+ return true;
+ }
+
+
+ /**
+ * 指令构造器
+ */
+ public static QueryCmd ConstructCmd(SurvDeviceDeploy survDeviceDeploy){
+ QueryCmd queryCmd = new QueryCmd();
+ //查询设备配置
+ List eleList = scEquZhibiaoService.lambdaQuery()
+ .eq(ScEquZhibiao::getEquId,survDeviceDeploy.getId())
+ .list();
+ if(eleList!=null && !eleList.isEmpty()){
+ List registers = new ArrayList<>();
+ for (ScEquZhibiao scEquZhibiao : eleList) {
+ QueryCmdRegister queryCmdRegister = new QueryCmdRegister();
+ queryCmdRegister.setName(scEquZhibiao.getCode());
+ registers.add(queryCmdRegister);
+ }
+ QueryCmdDetail queryCmdDetail = new QueryCmdDetail();
+ queryCmdDetail.setR_data(registers);
+ queryCmd.setRw_prot(queryCmdDetail);
+ }
+
+
+ return queryCmd;
+ }
+
+ public static WaterCommonTransVo transData(List zhibiaoList, String mqttMessage) {
+ WaterCommonTransVo waterCommonTransVo = new WaterCommonTransVo();
+ ResponseCmd responseCmd = JSONObject.parseObject(mqttMessage,ResponseCmd.class);
+ if(responseCmd!=null){
+ if(responseCmd.getRw_prot()!=null){
+ if("rsp".equals(responseCmd.getRw_prot().getDir())) {
+ if (zhibiaoList != null && !zhibiaoList.isEmpty()) {
+ Map eleMap = new HashMap<>();
+ for (ScEquZhibiao scEquZhibiao : zhibiaoList) {
+ eleMap.put(scEquZhibiao.getCode(), scEquZhibiao.getEntityField());
+ }
+
+ if (!responseCmd.getRw_prot().getR_data().isEmpty()) {
+ for (ResponseCmdData rDatum : responseCmd.getRw_prot().getR_data()) {
+ String entityName = eleMap.get(rDatum.getName());//字段名称
+ if (StringUtils.isNotBlank(entityName)) {
+ TUtil.setFieldValue(waterCommonTransVo, entityName, rDatum.getValue());
+ } else {
+ log.error("指令:{}----未匹配到监测项目:{}", mqttMessage, rDatum.getName());
+ }
+ }
+ }
+ }
+ log.warn("{}----指令解析结果=============={}",mqttMessage,JSONObject.toJSONString(waterCommonTransVo));
+ }else{
+ log.error("----非主动请求数据,跳过解析==============回执:{}",mqttMessage);
+ }
+ }else{
+ log.error("------------解析失败,回执格式有误2-------------");
+ }
+ }else{
+ log.error("xxxxxxxxxxxxx解析失败,回执格式有误1xxxxxxxxxxxxx");
+ }
+ return waterCommonTransVo;
+ }
+}
diff --git a/src/main/java/com/lanhai/util/SdrkUtils.java b/src/main/java/com/lanhai/util/SdrkUtils.java
index 0b32706..5a1f1a7 100644
--- a/src/main/java/com/lanhai/util/SdrkUtils.java
+++ b/src/main/java/com/lanhai/util/SdrkUtils.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* 山东仁科工具类
@@ -46,10 +47,10 @@ public class SdrkUtils {
private static String domain = "http://www.0531yun.com";
- public static boolean processSurvData(SurvDeviceDeploy deploy, Map> allDeviceMap, Map deviceDeployMap) {
- List allDeployList = allDeviceMap.get(deploy.getDeployCode());
+ public static boolean processSurvData(SurvDeviceDeploy deploy, Map> allDeviceMap, Map deviceDeployMap) {
+ List allDeployList = allDeviceMap.get(deploy.getDeployCode());
deploy.setDeviceList(allDeployList);
- deploy.setScEquZhibiaoList(scEquZhibiaoService.getListByEquid(allDeployList));
+
log.error("========正在执行仁科=======,设备编号:{},类型:{}==========", deploy.getDeployCode(), deploy.getDeployType());
RenkeDataRealTimePack renkeDataRealTimePack = getDeviceData(deploy);
XxlJobHelper.log("仁科回执:"+JSONObject.toJSONString(renkeDataRealTimePack));
@@ -57,8 +58,12 @@ public class SdrkUtils {
if(renkeDataRealTimePack!=null){
if(renkeDataRealTimePack.getData()!=null){
if(!renkeDataRealTimePack.getData().isEmpty()){
- saveData(deploy,deviceDeployMap,renkeDataRealTimePack.getData());
- b = true;
+ //此处至少有一条数据,所以直接处理
+ for (SurvDeviceDeploy survDeviceDeploy : deploy.getDeviceList()) {
+ survDeviceDeploy.setScEquZhibiaoList(scEquZhibiaoService.getListByEquid(survDeviceDeploy.getId()));
+ saveData(survDeviceDeploy,deviceDeployMap,renkeDataRealTimePack.getData());
+ }
+ b = true;
}
}
}
diff --git a/src/main/java/com/lanhai/util/XphUtils.java b/src/main/java/com/lanhai/util/XphUtils.java
index c3b98ae..0aa72d3 100644
--- a/src/main/java/com/lanhai/util/XphUtils.java
+++ b/src/main/java/com/lanhai/util/XphUtils.java
@@ -29,6 +29,7 @@ import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
+import java.util.stream.Collectors;
@Slf4j
public class XphUtils {
@@ -197,16 +198,16 @@ public class XphUtils {
/**
* 处理保存数据
*/
- public static boolean processSurvData(SurvDeviceDeploy deploy,Map> allDeviceMap,Map deviceDeployMap){
+ public static boolean processSurvData(SurvDeviceDeploy deploy,Map> allDeviceMap,Map deviceDeployMap){
String deviceApiUrl = String.format(domain+"/screen/data?deviceId=%s", deploy.getDeployCode());
// List allDeployList = allDeviceMap.get(deploy.getFarmId() + "_"+deploy.getDeployCode());
- List allDeployList = allDeviceMap.get(deploy.getDeployCode());
+ List allDeployList = allDeviceMap.get(deploy.getDeployCode());
deploy.setDeviceList(allDeployList);
//根据编号查询该设备在系统中有多少监测项
// System.out.println(deploy.getDeployCode()+"==========firstCheck=========="+allDeployList);
VOIotAccess voIotAccess = getAccess(deploy);
String token = getXphToken(voIotAccess.getAppId(),voIotAccess.getAppSecret());
- deploy.setScEquZhibiaoList(scEquZhibiaoService.getListByEquid(allDeployList));
+
log.error("========正在执行=======设备编号:{},类型:{}==========",deploy.getDeployCode(), deploy.getDeployType());
log.error("***设备请求数据:{}",deviceApiUrl);
String dataResult = HttpUtil.createGet(deviceApiUrl).header("token",token).execute().body();
@@ -216,11 +217,16 @@ public class XphUtils {
log.error("==========================执行中断============================");
return false;
}
-
- if (PollutionConstants.WATER_QULITY.equals(deploy.getDeployType())) {
- survTransdataSoilService.saveWaterBaowen(dataMap, deploy,deviceDeployMap);
- } else {
- survTransdataSoilService.saveBaowen(dataMap, deploy,deviceDeployMap);
+ //此deviceList至少为1个长度,所以直接处理
+ for (SurvDeviceDeploy survDeviceDeploy : deploy.getDeviceList()) {
+ log.error("@@@@@@@@@@@@@设备号:{}共绑定有{}个@@@@@@@@@@@@@@@@",deploy.getDeployCode(),deploy.getDeviceList().size());
+ survDeviceDeploy.setScEquZhibiaoList(scEquZhibiaoService.getListByEquid(survDeviceDeploy.getId()));
+ log.error("xxxxxxxxxxxxx当前进行============={}================{}==========={}",survDeviceDeploy.getTenantId(),survDeviceDeploy.getId(),survDeviceDeploy.getScEquZhibiaoList().size());
+ if (PollutionConstants.WATER_QULITY.equals(survDeviceDeploy.getDeployType())) {
+ survTransdataSoilService.saveWaterBaowen(dataMap, survDeviceDeploy,deviceDeployMap);
+ } else {
+ survTransdataSoilService.saveBaowen(dataMap, survDeviceDeploy,deviceDeployMap);
+ }
}
return true;
}