调整数据生成逻辑,可以但设备多绑定
This commit is contained in:
parent
1debbe7b1f
commit
1d232808e0
|
|
@ -14,7 +14,7 @@ import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
|
|||
*/
|
||||
public class MybatisPlusGenerator {
|
||||
|
||||
public static final String database = "jdbc:mysql://172.27.17.3:13306/fx_nsp";
|
||||
public static final String database = "jdbc:mysql://8.130.9.244:13306/fx_nsp";
|
||||
public static final String user = "user_fx";
|
||||
public static final String passwd = "user_fx";
|
||||
|
||||
|
|
@ -80,7 +80,7 @@ public class MybatisPlusGenerator {
|
|||
|
||||
public static void main(String[] args) {
|
||||
|
||||
new MybatisPlusGenerator().generator("surv_iot_manufacturer_config","surv_iot_manufacturer_info","surv_iot_protocol","surv_iot_virtual_device","surv_iot_virtual_device_group","surv_iot_virtual_device_module","surv_iot_virtual_device_net");//指标
|
||||
new MybatisPlusGenerator().generator("surv_device_deploy_relay","surv_device_deploy_relaygroup");//指标
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -214,4 +214,11 @@ public interface IotConstants {
|
|||
* 规则类型 预设
|
||||
*/
|
||||
String RULE_PRE = "rule_preset";
|
||||
|
||||
|
||||
/**
|
||||
* 蓝海虚拟设备协议
|
||||
*/
|
||||
String lhviot_standard = "lhviot_standard";
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,21 @@
|
|||
package com.lanhai.controller;
|
||||
|
||||
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
|
||||
import org.springframework.stereotype.Controller;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 设备继电器表 前端控制器
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
@Controller
|
||||
@RequestMapping("/survDeviceDeployRelay")
|
||||
public class SurvDeviceDeployRelayController {
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
package com.lanhai.controller;
|
||||
|
||||
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
|
||||
import org.springframework.stereotype.Controller;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 继电器分组 前端控制器
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
@Controller
|
||||
@RequestMapping("/survDeviceDeployRelaygroup")
|
||||
public class SurvDeviceDeployRelaygroupController {
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -85,6 +85,10 @@ public class SurvDeviceDeploy extends Model<SurvDeviceDeploy> {
|
|||
@TableField("DEVICE_IOT_URL")
|
||||
private String deviceIotUrl;
|
||||
|
||||
|
||||
@TableField("DEVICE_REVERSE_IOT_URL")
|
||||
private String deviceReverseIotUrl;
|
||||
|
||||
/**
|
||||
* 设备排序
|
||||
*/
|
||||
|
|
@ -217,6 +221,6 @@ public class SurvDeviceDeploy extends Model<SurvDeviceDeploy> {
|
|||
* 同设备编号的有多少id
|
||||
*/
|
||||
@TableField(exist = false)
|
||||
private List<String> deviceList;
|
||||
private List<SurvDeviceDeploy> deviceList;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,220 @@
|
|||
package com.lanhai.entity;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.extension.activerecord.Model;
|
||||
import java.util.Date;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import java.io.Serializable;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 设备继电器表
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = false)
|
||||
@Accessors(chain = true)
|
||||
public class SurvDeviceDeployRelay extends Model<SurvDeviceDeployRelay> {
|
||||
|
||||
private static final long serialVersionUID=1L;
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
@TableId(value = "ID", type = IdType.ID_WORKER_STR)
|
||||
private String id;
|
||||
|
||||
/**
|
||||
* 设备ID
|
||||
*/
|
||||
@TableField("DEPLOY_ID")
|
||||
private String deployId;
|
||||
|
||||
/**
|
||||
* 类别,1=普通
|
||||
*/
|
||||
@TableField("RELAY_CATE")
|
||||
private Integer relayCate;
|
||||
|
||||
/**
|
||||
* 1=风机,2=水泵,3=增氧机,4=湿帘,5=遮阳,6=开窗,7=保温,8=投食机
|
||||
*/
|
||||
@TableField("RELAY_TYPE")
|
||||
private Integer relayType;
|
||||
|
||||
/**
|
||||
* 继电器key/编号
|
||||
*/
|
||||
@TableField("RELAY_KEY")
|
||||
private String relayKey;
|
||||
|
||||
/**
|
||||
* 继电器名称
|
||||
*/
|
||||
@TableField("RELAY_NAME")
|
||||
private String relayName;
|
||||
|
||||
/**
|
||||
* 继电器备注
|
||||
*/
|
||||
@TableField("RELAY_NOTES")
|
||||
private String relayNotes;
|
||||
|
||||
/**
|
||||
* 继电器标记
|
||||
*/
|
||||
@TableField("RELAY_MARK")
|
||||
private String relayMark;
|
||||
|
||||
/**
|
||||
* 寄存器类型
|
||||
*/
|
||||
@TableField("REGISTER_TYPE")
|
||||
private Integer registerType;
|
||||
|
||||
/**
|
||||
* 寄存器序号
|
||||
*/
|
||||
@TableField("REGISTER_NUM")
|
||||
private Integer registerNum;
|
||||
|
||||
/**
|
||||
* 开指令
|
||||
*/
|
||||
@TableField("REGISTER_CMD_ON")
|
||||
private String registerCmdOn;
|
||||
|
||||
/**
|
||||
* 关指令
|
||||
*/
|
||||
@TableField("REGISTER_CMD_OFF")
|
||||
private String registerCmdOff;
|
||||
|
||||
/**
|
||||
* 停指令
|
||||
*/
|
||||
@TableField("REGISTER_CMD_STOP")
|
||||
private String registerCmdStop;
|
||||
|
||||
/**
|
||||
* 回执状态开
|
||||
*/
|
||||
@TableField("REGISTER_ON")
|
||||
private String registerOn;
|
||||
|
||||
/**
|
||||
* 回执状态关
|
||||
*/
|
||||
@TableField("REGISTER_OFF")
|
||||
private String registerOff;
|
||||
|
||||
/**
|
||||
* 回执状态停
|
||||
*/
|
||||
@TableField("REGISTER_STOP")
|
||||
private String registerStop;
|
||||
|
||||
/**
|
||||
* 实体对应字段
|
||||
*/
|
||||
@TableField("ENTITY_FIELD")
|
||||
private String entityField;
|
||||
|
||||
/**
|
||||
* 是否启用
|
||||
*/
|
||||
@TableField("IS_ENABLE")
|
||||
private Integer isEnable;
|
||||
|
||||
/**
|
||||
* 是否可远程可控
|
||||
*/
|
||||
@TableField("IS_ABLE_REMOTE")
|
||||
private Integer isAbleRemote;
|
||||
|
||||
/**
|
||||
* 分组ID
|
||||
*/
|
||||
@TableField("GROUP_ID")
|
||||
private String groupId;
|
||||
|
||||
/**
|
||||
* 分组CODE
|
||||
*/
|
||||
@TableField("GROUP_CODE")
|
||||
private String groupCode;
|
||||
|
||||
/**
|
||||
* 分组名称
|
||||
*/
|
||||
@TableField("GROUP_NAME")
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 排序号
|
||||
*/
|
||||
@TableField("SORT_NO")
|
||||
private Integer sortNo;
|
||||
|
||||
/**
|
||||
* 租户号
|
||||
*/
|
||||
@TableField("TENANT_ID")
|
||||
private String tenantId;
|
||||
|
||||
/**
|
||||
* 乐观锁
|
||||
*/
|
||||
@TableField("RE_VISION")
|
||||
private Integer reVision;
|
||||
|
||||
/**
|
||||
* 创建人
|
||||
*/
|
||||
@TableField("CREATE_BY")
|
||||
private String createBy;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
@TableField("CREATE_TIME")
|
||||
private Date createTime;
|
||||
|
||||
/**
|
||||
* 更新人
|
||||
*/
|
||||
@TableField("UPDATE_BY")
|
||||
private String updateBy;
|
||||
|
||||
/**
|
||||
* 逻辑删除
|
||||
*/
|
||||
@TableField("IS_DEL")
|
||||
private Integer isDel;
|
||||
|
||||
/**
|
||||
* 更新时间
|
||||
*/
|
||||
@TableField("UPDATE_TIME")
|
||||
private Date updateTime;
|
||||
|
||||
/**
|
||||
* 乐观锁
|
||||
*/
|
||||
@TableField("REVISION")
|
||||
private Integer revision;
|
||||
|
||||
|
||||
@Override
|
||||
protected Serializable pkVal() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,118 @@
|
|||
package com.lanhai.entity;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.extension.activerecord.Model;
|
||||
import java.util.Date;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import java.io.Serializable;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 继电器分组
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = false)
|
||||
@Accessors(chain = true)
|
||||
public class SurvDeviceDeployRelaygroup extends Model<SurvDeviceDeployRelaygroup> {
|
||||
|
||||
private static final long serialVersionUID=1L;
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
@TableId(value = "ID", type = IdType.ID_WORKER_STR)
|
||||
private String id;
|
||||
|
||||
/**
|
||||
* 设备id
|
||||
*/
|
||||
@TableField("DEPLOY_ID")
|
||||
private String deployId;
|
||||
|
||||
/**
|
||||
* 分组类型;button=按钮,switch=两开关,shifter=三开关
|
||||
*/
|
||||
@TableField("GROUP_TYPE")
|
||||
private String groupType;
|
||||
|
||||
/**
|
||||
* 分组名称
|
||||
*/
|
||||
@TableField("GROUP_NAME")
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* 分组备注
|
||||
*/
|
||||
@TableField("GROUP_NOTES")
|
||||
private String groupNotes;
|
||||
|
||||
/**
|
||||
* 序号
|
||||
*/
|
||||
@TableField("SORT_NO")
|
||||
private Integer sortNo;
|
||||
|
||||
/**
|
||||
* 是否启用
|
||||
*/
|
||||
@TableField("IS_ENABLE")
|
||||
private Integer isEnable;
|
||||
|
||||
/**
|
||||
* 租户号
|
||||
*/
|
||||
@TableField("TENANT_ID")
|
||||
private String tenantId;
|
||||
|
||||
/**
|
||||
* 乐观锁
|
||||
*/
|
||||
@TableField("RE_VISION")
|
||||
private Integer reVision;
|
||||
|
||||
/**
|
||||
* 创建人
|
||||
*/
|
||||
@TableField("CREATE_BY")
|
||||
private String createBy;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
@TableField("CREATE_TIME")
|
||||
private Date createTime;
|
||||
|
||||
/**
|
||||
* 更新人
|
||||
*/
|
||||
@TableField("UPDATE_BY")
|
||||
private String updateBy;
|
||||
|
||||
/**
|
||||
* 逻辑删除
|
||||
*/
|
||||
@TableField("IS_DEL")
|
||||
private Integer isDel;
|
||||
|
||||
/**
|
||||
* 更新时间
|
||||
*/
|
||||
@TableField("UPDATE_TIME")
|
||||
private Date updateTime;
|
||||
|
||||
|
||||
@Override
|
||||
protected Serializable pkVal() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package com.lanhai.mapper;
|
||||
|
||||
import com.lanhai.entity.SurvDeviceDeployRelay;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 设备继电器表 Mapper 接口
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
public interface SurvDeviceDeployRelayMapper extends BaseMapper<SurvDeviceDeployRelay> {
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package com.lanhai.mapper;
|
||||
|
||||
import com.lanhai.entity.SurvDeviceDeployRelaygroup;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 继电器分组 Mapper 接口
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
public interface SurvDeviceDeployRelaygroupMapper extends BaseMapper<SurvDeviceDeployRelaygroup> {
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="com.lanhai.mapper.SurvDeviceDeployRelayMapper">
|
||||
|
||||
</mapper>
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="com.lanhai.mapper.SurvDeviceDeployRelaygroupMapper">
|
||||
|
||||
</mapper>
|
||||
|
|
@ -6,7 +6,10 @@ import com.lanhai.entity.SurvAlertRecord;
|
|||
import com.lanhai.enums.IotInerfaceTopicType;
|
||||
import com.lanhai.mapper.SurvAlertRecordMapper;
|
||||
import com.lanhai.mapper.SurvIotVirtualDeviceMapper;
|
||||
import com.lanhai.o.iot.lhiot.ResponseCmd;
|
||||
import com.lanhai.service.IScEquZhibiaoService;
|
||||
import com.lanhai.service.ISurvIotVirtualDeviceService;
|
||||
import com.lanhai.service.Impl.CommonServiceImpl;
|
||||
import com.lanhai.service.Impl.SurvIotVirtualDeviceServiceImpl;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
||||
|
|
@ -30,8 +33,9 @@ public class MyMqttCallback implements MqttCallbackExtended {
|
|||
//手动注入
|
||||
// private MqttConfig mqttConfig = SpringContextUtil.getBean(MqttConfig.class);
|
||||
|
||||
private SurvIotVirtualDeviceServiceImpl deviceService = SpringContextUtil.getBean("survIotVirtualDeviceServiceImpl", SurvIotVirtualDeviceServiceImpl.class);;
|
||||
private SurvIotVirtualDeviceServiceImpl deviceService = SpringContextUtil.getBean("survIotVirtualDeviceServiceImpl", SurvIotVirtualDeviceServiceImpl.class);
|
||||
|
||||
private CommonServiceImpl commonService = SpringContextUtil.getBean(CommonServiceImpl.class);
|
||||
|
||||
// private static RedisTemplate redisTemplate = SpringContextUtil.getBean("redisTemplate", RedisTemplate.class);
|
||||
|
||||
|
|
@ -114,7 +118,7 @@ public class MyMqttCallback implements MqttCallbackExtended {
|
|||
log.info("== MyMqttCallback ==> messageArrived 接收消息主题: {},接收消息内容: {}", topic, o1);
|
||||
try {
|
||||
if (topic.startsWith(IotInerfaceTopicType.DTU_TOPIC.getCode())) {
|
||||
//todo 处理消息逻辑,解析数据最终入库
|
||||
commonService.processMqttData(topic,o1);
|
||||
// deviceService.processPayLoad(topic,mqttMessage.getPayload());
|
||||
} else if (topic.startsWith(IotInerfaceTopicType.LH_IOT_TOPIC_DOWN.getCode())) {//蓝海虚拟设备下行逻辑
|
||||
//暂时无业务
|
||||
|
|
|
|||
|
|
@ -0,0 +1,16 @@
|
|||
package com.lanhai.o.iot.lhiot;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Data
|
||||
public class QueryCmd {
|
||||
@ApiModelProperty("指令")
|
||||
private QueryCmdDetail rw_prot;
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
package com.lanhai.o.iot.lhiot;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Data
|
||||
public class QueryCmdDetail {
|
||||
|
||||
|
||||
private static final AtomicInteger counter = new AtomicInteger(0);
|
||||
private static final DateTimeFormatter FORMATTER =
|
||||
DateTimeFormatter.ofPattern("yyyyMMddHHmmss")
|
||||
.withZone(ZoneId.systemDefault());
|
||||
|
||||
|
||||
@ApiModelProperty("指令类型")
|
||||
private String dir = "down";
|
||||
|
||||
@ApiModelProperty("指令id")
|
||||
private String id = generate();
|
||||
|
||||
@ApiModelProperty("寄存器编号集合")
|
||||
private List<QueryCmdRegister> r_data;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 线程安全的高并发版本
|
||||
*/
|
||||
public static String generate() {
|
||||
// 1. 生成时间戳部分
|
||||
String timestamp = FORMATTER.format(Instant.now());
|
||||
|
||||
// 2. 生成4位随机数(使用AtomicInteger保证线程安全)
|
||||
int random = counter.incrementAndGet() % 10000;
|
||||
if (random < 0) {
|
||||
random = -random;
|
||||
}
|
||||
String randomStr = String.format("%04d", random);
|
||||
|
||||
return timestamp + randomStr;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
package com.lanhai.o.iot.lhiot;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class QueryCmdRegister {
|
||||
|
||||
@ApiModelProperty("寄存器编号")
|
||||
private String name;
|
||||
}
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
package com.lanhai.o.iot.lhiot;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class ResponseCmd {
|
||||
@ApiModelProperty("指令")
|
||||
private ResponseCmdDetail rw_prot;
|
||||
}
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
package com.lanhai.o.iot.lhiot;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class ResponseCmdData {
|
||||
|
||||
@ApiModelProperty("寄存器编号")
|
||||
private String name;
|
||||
|
||||
@ApiModelProperty("寄存器值")
|
||||
private String value;
|
||||
|
||||
@ApiModelProperty("是否报错")
|
||||
private String err;
|
||||
}
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
package com.lanhai.o.iot.lhiot;
|
||||
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class ResponseCmdDetail {
|
||||
@ApiModelProperty("指令类型")
|
||||
private String dir;
|
||||
|
||||
@ApiModelProperty("指令id")
|
||||
private String id;
|
||||
|
||||
@ApiModelProperty("数据")
|
||||
private List<ResponseCmdData> r_data;
|
||||
}
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package com.lanhai.service;
|
||||
|
||||
import com.lanhai.entity.SurvDeviceDeployRelay;
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 设备继电器表 服务类
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
public interface ISurvDeviceDeployRelayService extends IService<SurvDeviceDeployRelay> {
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package com.lanhai.service;
|
||||
|
||||
import com.lanhai.entity.SurvDeviceDeployRelaygroup;
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 继电器分组 服务类
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
public interface ISurvDeviceDeployRelaygroupService extends IService<SurvDeviceDeployRelaygroup> {
|
||||
|
||||
}
|
||||
|
|
@ -14,5 +14,7 @@ public interface ISurvTransdataAirService extends IService<SurvTransdataAir> {
|
|||
|
||||
SurvTransdataAir getOneByDeviceCode(String deviceCode,String filterId);
|
||||
|
||||
SurvTransdataAir getOneByDeviceId(String deployId);
|
||||
|
||||
void saveBaowen(Map<String, String> baowenMap);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,31 +1,60 @@
|
|||
package com.lanhai.service.Impl;
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.lanhai.constant.IotConstants;
|
||||
import com.lanhai.entity.SurvConfig;
|
||||
import com.lanhai.constant.PollutionConstants;
|
||||
import com.lanhai.entity.*;
|
||||
import com.lanhai.o.iot.lhiot.ResponseCmd;
|
||||
import com.lanhai.o.iot.pbs.WaterCommonTransVo;
|
||||
import com.lanhai.service.*;
|
||||
import com.lanhai.util.LhIotUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 处理业务的服务
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class CommonServiceImpl implements ICommonService {
|
||||
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private SurvConfigServiceImpl configService;
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private SurvDeviceDeployServiceImpl deviceDeployService;
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private ScEquZhibiaoServiceImpl zhibiaoService;
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private SurvStationInfoServiceImpl survStationInfoService;
|
||||
//
|
||||
// @Autowired
|
||||
// private ISurvTransdataAirService transdataAirService;
|
||||
// @Autowired
|
||||
// private ISurvTransdataLivestockwaterService transdataLivestockwaterService;
|
||||
// @Autowired
|
||||
// private ISurvTransdataOrientwaterService transdataOrientwaterService;
|
||||
@Autowired
|
||||
private ISurvTransdataLivestockwaterService transdataLivestockwaterService;
|
||||
@Autowired
|
||||
private ISurvHisdataLivestockwaterService hisdataLivestockwaterService;
|
||||
@Autowired
|
||||
private ISurvTransdataOrientwaterService transdataOrientwaterService;
|
||||
@Autowired
|
||||
private ISurvHisdataOrientwaterService hisdataOrientwaterService;
|
||||
|
||||
// @Autowired
|
||||
// private ISurvTransdataPestlightService transdataPestlightService;
|
||||
// @Autowired
|
||||
|
|
@ -74,6 +103,79 @@ public class CommonServiceImpl implements ICommonService {
|
|||
}
|
||||
|
||||
|
||||
public void processMqttData(String topic,String mqttMessage) {
|
||||
//1. 检查主题所属,确定如何执行后续逻辑
|
||||
List<SurvDeviceDeploy> deployList = deviceDeployService.lambdaQuery()
|
||||
.eq(SurvDeviceDeploy::getDeviceReverseIotUrl,topic)
|
||||
.list();
|
||||
;
|
||||
|
||||
if(!deployList.isEmpty()){
|
||||
SurvDeviceDeploy survDeviceDeploy = deployList.get(0);
|
||||
List<ScEquZhibiao> zhibiaoList = zhibiaoService.getListByEquid(survDeviceDeploy.getId());
|
||||
if(PollutionConstants.WATER_ORIENT.equals(survDeviceDeploy.getDeployType()) || PollutionConstants.WATER_LIVE.equals(survDeviceDeploy.getDeployType())){//面源数据
|
||||
WaterCommonTransVo waterCommonTransVo = null;
|
||||
switch (survDeviceDeploy.getProtocolCode()){
|
||||
case IotConstants.lhviot_standard:
|
||||
waterCommonTransVo = LhIotUtil.transData(zhibiaoList,mqttMessage);
|
||||
break;
|
||||
}
|
||||
if(waterCommonTransVo!=null){
|
||||
//遍历保存所有租户数据
|
||||
for (SurvDeviceDeploy deploy : deployList) {
|
||||
SurvStationInfo survStationInfo = survStationInfoService.getByCode(deploy.getStationCode());
|
||||
String stationName = survStationInfo!=null?survStationInfo.getStationName():"站点";
|
||||
if(PollutionConstants.WATER_ORIENT.equals(deploy.getDeployType())){
|
||||
SurvTransdataOrientwater orientwater = new SurvTransdataOrientwater();
|
||||
BeanUtil.copyProperties(waterCommonTransVo,orientwater);
|
||||
orientwater.setTenantId(deploy.getTenantId());
|
||||
orientwater.setDeployCode(deploy.getDeployCode());
|
||||
orientwater.setStationCode(deploy.getStationCode());
|
||||
orientwater.setStationName(stationName);
|
||||
orientwater.setDeviceName(deploy.getDeployDes());
|
||||
orientwater.setDeviceId(deploy.getId());
|
||||
orientwater.setCreatedBy("task");//创建人
|
||||
orientwater.setCreateTime(new Date());//创建时间
|
||||
|
||||
transdataOrientwaterService.save(orientwater);
|
||||
|
||||
//保存历史表
|
||||
SurvHisdataOrientwater hisOrientWater= new SurvHisdataOrientwater();
|
||||
hisOrientWater.setId(IdUtil.getSnowflakeNextIdStr());
|
||||
hisOrientWater.setTransDate(new Date());
|
||||
hisdataOrientwaterService.save(hisOrientWater);
|
||||
|
||||
} else if (PollutionConstants.WATER_LIVE.equals(deploy.getDeployType())) {
|
||||
SurvTransdataLivestockwater livestockwater = new SurvTransdataLivestockwater();
|
||||
BeanUtil.copyProperties(waterCommonTransVo,livestockwater);
|
||||
livestockwater.setTenantId(deploy.getTenantId());
|
||||
livestockwater.setDeployCode(deploy.getDeployCode());
|
||||
livestockwater.setStationCode(deploy.getStationCode());
|
||||
livestockwater.setStationName(stationName);
|
||||
livestockwater.setDeviceName(deploy.getDeployDes());
|
||||
livestockwater.setDeviceId(deploy.getId());
|
||||
livestockwater.setCreatedBy("task");//创建人
|
||||
livestockwater.setCreateTime(new Date());//创建时间
|
||||
|
||||
transdataLivestockwaterService.save(livestockwater);
|
||||
|
||||
//保存历史表
|
||||
SurvHisdataLivestockwater hisdataLivestockwater = new SurvHisdataLivestockwater();
|
||||
hisdataLivestockwater.setId(IdUtil.getSnowflakeNextIdStr());
|
||||
hisdataLivestockwater.setTransDate(new Date());
|
||||
hisdataLivestockwaterService.save(hisdataLivestockwater);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}else{
|
||||
log.error("主题:{}--消息:{},解析失败,任务中断。",topic,mqttMessage);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
//2. 检查有
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,20 @@
|
|||
package com.lanhai.service.Impl;
|
||||
|
||||
import com.lanhai.entity.SurvDeviceDeployRelay;
|
||||
import com.lanhai.mapper.SurvDeviceDeployRelayMapper;
|
||||
import com.lanhai.service.ISurvDeviceDeployRelayService;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 设备继电器表 服务实现类
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
@Service
|
||||
public class SurvDeviceDeployRelayServiceImpl extends ServiceImpl<SurvDeviceDeployRelayMapper, SurvDeviceDeployRelay> implements ISurvDeviceDeployRelayService {
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
package com.lanhai.service.Impl;
|
||||
|
||||
import com.lanhai.entity.SurvDeviceDeployRelaygroup;
|
||||
import com.lanhai.mapper.SurvDeviceDeployRelaygroupMapper;
|
||||
import com.lanhai.service.ISurvDeviceDeployRelaygroupService;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* 继电器分组 服务实现类
|
||||
* </p>
|
||||
*
|
||||
* @author ${author}
|
||||
* @since 2026-03-14
|
||||
*/
|
||||
@Service
|
||||
public class SurvDeviceDeployRelaygroupServiceImpl extends ServiceImpl<SurvDeviceDeployRelaygroupMapper, SurvDeviceDeployRelaygroup> implements ISurvDeviceDeployRelaygroupService {
|
||||
|
||||
}
|
||||
|
|
@ -50,7 +50,7 @@ public class SurvHisdataSoilServiceImpl extends ServiceImpl<SurvHisdataSoilMappe
|
|||
if (config.getRules() != null && !config.getRules().isEmpty()) {
|
||||
String deployCode = deploy.getDeployCode();
|
||||
try {
|
||||
SurvTransdataSoil newestData = soilService.getOneByDeviceCode(deploy.getDeployCode(),null);
|
||||
SurvTransdataSoil newestData = soilService.getOneByDeviceId(deploy.getId());
|
||||
String orgId = null;
|
||||
if (newestData != null) {
|
||||
orgId = newestData.getId();
|
||||
|
|
@ -155,8 +155,9 @@ public class SurvHisdataSoilServiceImpl extends ServiceImpl<SurvHisdataSoilMappe
|
|||
newestData.setDataDateTime(nowTime);
|
||||
newestData.setRuleType(newestData.getRuleType());
|
||||
newestData.setDataGatherType(newestData.getDataGatherType());
|
||||
log.error("=====================更新五常数据1====================="+newestData.getDataDateTime());
|
||||
soilService.saveOrUpdate(newestData);
|
||||
|
||||
log.error("=====================更新五常数据2====================="+newestData.getDataDateTime());
|
||||
}catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
XxlJobHelper.log(e.getMessage());
|
||||
|
|
|
|||
|
|
@ -64,39 +64,39 @@ public class SurvIotVirtualDeviceServiceImpl extends ServiceImpl<SurvIotVirtualD
|
|||
}
|
||||
}
|
||||
//虚拟设备中需要订阅的主题
|
||||
List<SurvIotVirtualDevice> devices = lambdaQuery().nested(wrapper->
|
||||
wrapper.isNotNull(SurvIotVirtualDevice::getUpperUrl)
|
||||
.or()
|
||||
.isNotNull(SurvIotVirtualDevice::getLowerUrl)
|
||||
).list();
|
||||
if(!devices.isEmpty()){
|
||||
for (SurvIotVirtualDevice device : devices) {
|
||||
if(StringUtils.isNotBlank(device.getUpperUrl())){
|
||||
topics.add(device.getUpperUrl());
|
||||
}
|
||||
if(StringUtils.isNotBlank(device.getLowerUrl())){
|
||||
topics.add(device.getLowerUrl());
|
||||
}
|
||||
}
|
||||
}
|
||||
//物联网模块中的主题
|
||||
List<SurvIotVirtualDeviceNet> nets = netService.lambdaQuery()
|
||||
.nested(wrapper->
|
||||
wrapper.isNotNull(SurvIotVirtualDeviceNet::getNetUpTopic)
|
||||
.or()
|
||||
.isNotNull(SurvIotVirtualDeviceNet::getNetDownTopic)
|
||||
).list();
|
||||
;
|
||||
if(!nets.isEmpty()){
|
||||
for (SurvIotVirtualDeviceNet net : nets) {
|
||||
if(StringUtils.isNotBlank(net.getNetUpTopic())){
|
||||
topics.add(net.getNetUpTopic());
|
||||
}
|
||||
if(StringUtils.isNotBlank(net.getNetDownTopic())){
|
||||
topics.add(net.getNetDownTopic());
|
||||
}
|
||||
}
|
||||
}
|
||||
// List<SurvIotVirtualDevice> devices = lambdaQuery().nested(wrapper->
|
||||
// wrapper.isNotNull(SurvIotVirtualDevice::getUpperUrl)
|
||||
// .or()
|
||||
// .isNotNull(SurvIotVirtualDevice::getLowerUrl)
|
||||
// ).list();
|
||||
// if(!devices.isEmpty()){
|
||||
// for (SurvIotVirtualDevice device : devices) {
|
||||
// if(StringUtils.isNotBlank(device.getUpperUrl())){
|
||||
// topics.add(device.getUpperUrl());
|
||||
// }
|
||||
// if(StringUtils.isNotBlank(device.getLowerUrl())){
|
||||
// topics.add(device.getLowerUrl());
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// //物联网模块中的主题
|
||||
// List<SurvIotVirtualDeviceNet> nets = netService.lambdaQuery()
|
||||
// .nested(wrapper->
|
||||
// wrapper.isNotNull(SurvIotVirtualDeviceNet::getNetUpTopic)
|
||||
// .or()
|
||||
// .isNotNull(SurvIotVirtualDeviceNet::getNetDownTopic)
|
||||
// ).list();
|
||||
// ;
|
||||
// if(!nets.isEmpty()){
|
||||
// for (SurvIotVirtualDeviceNet net : nets) {
|
||||
// if(StringUtils.isNotBlank(net.getNetUpTopic())){
|
||||
// topics.add(net.getNetUpTopic());
|
||||
// }
|
||||
// if(StringUtils.isNotBlank(net.getNetDownTopic())){
|
||||
// topics.add(net.getNetDownTopic());
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
return topics;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -44,6 +44,13 @@ public class SurvTransdataAirServiceImpl extends ServiceImpl<SurvTransdataAirMap
|
|||
return getOne(queryWrapper);
|
||||
}
|
||||
|
||||
public SurvTransdataAir getOneByDeviceId(String deployId) {
|
||||
|
||||
QueryWrapper<SurvTransdataAir> queryWrapper = new QueryWrapper<SurvTransdataAir>();
|
||||
queryWrapper.eq("DEPLOY_ID",deployId).last("limit 1");
|
||||
return getOne(queryWrapper);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public void saveBaowen(Map<String, String> baowenMap) {
|
||||
|
|
|
|||
|
|
@ -50,18 +50,24 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
return getOne(queryWrapper);
|
||||
}
|
||||
|
||||
public SurvTransdataSoil getOneByDeviceId(String deployId) {
|
||||
QueryWrapper<SurvTransdataSoil> queryWrapper = new QueryWrapper<SurvTransdataSoil>();
|
||||
queryWrapper.eq("DEPLOY_ID",deployId).last("limit 1");
|
||||
return getOne(queryWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
*辛普惠 土壤
|
||||
*/
|
||||
@Transactional
|
||||
@Override
|
||||
public void saveBaowen(Map<String, String> baowenMap, SurvDeviceDeploy deploy,Map<String,SurvDeviceDeploy> deviceDeployMap) {
|
||||
public void saveBaowen(Map<String, String> baowenMap, SurvDeviceDeploy survDeviceDeploy,Map<String,SurvDeviceDeploy> deviceDeployMap) {
|
||||
|
||||
String mn = baowenMap.get("mn");
|
||||
String DataTime = baowenMap.get("DataTime");
|
||||
// SurvDeviceDeploy deploy = deviceDeployService.getOneByCode(mn);
|
||||
//保存或者更新实时表
|
||||
SurvTransdataSoil soil = getOneByDeviceCode(mn,null);
|
||||
SurvTransdataSoil soil = getOneByDeviceId(survDeviceDeploy.getId());
|
||||
Date kDateTime = null;
|
||||
Date nowTime = new Date();
|
||||
if(soil == null){
|
||||
|
|
@ -88,8 +94,8 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
int soilCount = 0;
|
||||
String formatSoilTime = kDateTime!=null?DateUtil.format(kDateTime,"yyyy-MM-dd HH:mm:ss"):"";
|
||||
if(!formatSoilTime.equals(DataTime)){
|
||||
if(!deploy.getScEquZhibiaoList().isEmpty()){
|
||||
for (ScEquZhibiao scEquZhibiao : deploy.getScEquZhibiaoList()) {
|
||||
if(!survDeviceDeploy.getScEquZhibiaoList().isEmpty()){
|
||||
for (ScEquZhibiao scEquZhibiao : survDeviceDeploy.getScEquZhibiaoList()) {
|
||||
boolean isSuccess = TUtil.setFieldValue(soil,scEquZhibiao.getEntityField(),baowenMap.get(scEquZhibiao.getCode()));
|
||||
if(isSuccess){
|
||||
soilCount++;
|
||||
|
|
@ -103,9 +109,9 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
soil.setDataDateTime(DateUtil.parse(DataTime));//数据时间
|
||||
soil.setDataGatherType("realTime");//数据类型-realTime=实时,dayTime=日数据,month=月数据,year=年数据
|
||||
soil.setDeployCode(mn);//设备编号
|
||||
soil.setTenantId(deploy.getTenantId());
|
||||
soil.setTenantId(survDeviceDeploy.getTenantId());
|
||||
|
||||
SurvDeviceDeploy survDeviceDeploy = deviceDeployMap.get(deploy.getDeployCode()+"_"+ DeviceDeployEnum.SURV_SOIL.getType());
|
||||
// SurvDeviceDeploy survDeviceDeploy = deviceDeployMap.get(deploy.getDeployCode()+"_"+ DeviceDeployEnum.SURV_SOIL.getType());
|
||||
|
||||
|
||||
|
||||
|
|
@ -122,11 +128,11 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
}
|
||||
soil.setDeployType(survDeviceDeploy.getDeployType());
|
||||
}
|
||||
|
||||
log.error("111111111111111保存中================"+soil.getTenantId()+"=========="+soil.getDeployId());
|
||||
saveOrUpdate(soil);
|
||||
//更新设备的上次更新时间
|
||||
deviceDeployService.update(new LambdaUpdateWrapper<SurvDeviceDeploy>()
|
||||
.eq(SurvDeviceDeploy::getDeployCode, deploy.getDeployCode())
|
||||
.eq(SurvDeviceDeploy::getId, survDeviceDeploy.getId())
|
||||
.set(SurvDeviceDeploy::getLastsyncTime, nowTime));
|
||||
|
||||
//保存土壤历史表
|
||||
|
|
@ -146,7 +152,7 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
|
||||
|
||||
|
||||
SurvTransdataAir air = transdataAirService.getOneByDeviceCode(mn,null);
|
||||
SurvTransdataAir air = transdataAirService.getOneByDeviceId(survDeviceDeploy.getId());
|
||||
Date airkDateTime = null;
|
||||
if(air == null){
|
||||
air = new SurvTransdataAir();
|
||||
|
|
@ -167,8 +173,8 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
int airCount = 0;
|
||||
String formatAirTime = airkDateTime!=null?DateUtil.format(airkDateTime,"yyyy-MM-dd HH:mm:ss"):"";
|
||||
if(!formatAirTime.equals(DataTime)){//数据时间不同才进行入库
|
||||
if(!deploy.getScEquZhibiaoList().isEmpty()){
|
||||
for (ScEquZhibiao scEquZhibiao : deploy.getScEquZhibiaoList()) {
|
||||
if(!survDeviceDeploy.getScEquZhibiaoList().isEmpty()){
|
||||
for (ScEquZhibiao scEquZhibiao : survDeviceDeploy.getScEquZhibiaoList()) {
|
||||
// System.out.println(baowenMap.get(scEquZhibiao.getCode())+"============="+scEquZhibiao.getCode()+"=========airCheck=============="+scEquZhibiao.getEquId()+"==========="+scEquZhibiao.getEntityField());
|
||||
boolean isSuccess = TUtil.setFieldValue(air,scEquZhibiao.getEntityField(),baowenMap.get(scEquZhibiao.getCode()));
|
||||
if(isSuccess){
|
||||
|
|
@ -182,9 +188,9 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
air.setDataDateTime(DateUtil.parse(DataTime));//数据时间
|
||||
air.setDataGatherType("realTime");//数据类型-realTime=实时,dayTime=日数据,month=月数据,year=年数据
|
||||
air.setDeployCode(mn);//设备编号
|
||||
air.setTenantId(deploy.getTenantId());
|
||||
air.setTenantId(survDeviceDeploy.getTenantId());
|
||||
|
||||
SurvDeviceDeploy survDeviceDeploy = deviceDeployMap.get(deploy.getDeployCode()+"_"+ DeviceDeployEnum.SURV_AIR.getType());
|
||||
// SurvDeviceDeploy survDeviceDeploy = deviceDeployMap.get(deploy.getDeployCode()+"_"+ DeviceDeployEnum.SURV_AIR.getType());
|
||||
|
||||
if(survDeviceDeploy!=null){
|
||||
air.setDeployId(survDeviceDeploy.getId());
|
||||
|
|
@ -203,7 +209,7 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
transdataAirService.saveOrUpdate(air);
|
||||
//更新设备的上次更新时间
|
||||
deviceDeployService.update(new LambdaUpdateWrapper<SurvDeviceDeploy>()
|
||||
.eq(SurvDeviceDeploy::getDeployCode, deploy.getDeployCode())//只能根据设备号更新,因为存在单个设备同时对应土壤空气的情况
|
||||
.eq(SurvDeviceDeploy::getId, survDeviceDeploy.getId())//只能根据设备号更新,因为存在单个设备同时对应土壤空气的情况
|
||||
.set(SurvDeviceDeploy::getLastsyncTime, nowTime));
|
||||
|
||||
//保存历史表
|
||||
|
|
@ -220,9 +226,9 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
}
|
||||
|
||||
//处理报警
|
||||
if(deploy != null){
|
||||
if(survDeviceDeploy != null){
|
||||
//暂时不保存报文了
|
||||
alertRecordService.saveBaojingV2(baowenMap,deploy,deviceDeployMap);
|
||||
alertRecordService.saveBaojingV2(baowenMap,survDeviceDeploy,deviceDeployMap);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -232,7 +238,7 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
String DataTime = baowenMap.get("DataTime");
|
||||
// SurvDeviceDeploy deploy = deviceDeployService.getOneByCode(mn);
|
||||
//保存或者更新实时表
|
||||
SurvTransdataSoil soil = getOneByDeviceCode(mn,null);
|
||||
SurvTransdataSoil soil = getOneByDeviceId(deploy.getId());
|
||||
Date nowTime = new Date();
|
||||
Date kDateTime = null;
|
||||
if(soil == null){
|
||||
|
|
@ -311,7 +317,7 @@ public class SurvTransdataSoilServiceImpl extends ServiceImpl<SurvTransdataSoilM
|
|||
|
||||
|
||||
|
||||
SurvTransdataAir air = transdataAirService.getOneByDeviceCode(mn,null);
|
||||
SurvTransdataAir air = transdataAirService.getOneByDeviceId(deploy.getId());
|
||||
Date airkDateTime = null;
|
||||
if(air == null){
|
||||
air = new SurvTransdataAir();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,106 @@
|
|||
package com.lanhai.task;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.lanhai.constant.IotConstants;
|
||||
import com.lanhai.constant.PollutionConstants;
|
||||
import com.lanhai.entity.SurvDeviceDeploy;
|
||||
import com.lanhai.o.iot.common.DeviceStatusVo;
|
||||
import com.lanhai.service.ISurvDeviceDeployService;
|
||||
import com.lanhai.util.LhIotUtil;
|
||||
import com.lanhai.util.XphUtils;
|
||||
import com.lanhai.util.YSUtils;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 多线程
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class MqttDeviceTask {
|
||||
|
||||
|
||||
@Autowired
|
||||
private ISurvDeviceDeployService deviceDeployService;
|
||||
|
||||
/**
|
||||
* mqtt设备采集任务
|
||||
*/
|
||||
@XxlJob("IotMqttDeviceGatherTask")
|
||||
public void TaskHandler() throws Exception{
|
||||
log.warn("========================mqtt设备采集任务启动=========================");
|
||||
XxlJobHelper.log("mqtt设备采集任务启动");
|
||||
try {
|
||||
//查询需要查询的设备
|
||||
int curPageSize = 15;
|
||||
Page<SurvDeviceDeploy> page = new Page<SurvDeviceDeploy>(1,curPageSize);
|
||||
LambdaQueryWrapper<SurvDeviceDeploy> wrapper = Wrappers.<SurvDeviceDeploy>lambdaQuery();
|
||||
//监测类设备,并且是mqtt协议
|
||||
wrapper.eq(SurvDeviceDeploy::getRunStatus,0)
|
||||
.eq(SurvDeviceDeploy::getDeployCate,"surv")
|
||||
.eq(SurvDeviceDeploy::getProtocolType, "mqtt");
|
||||
|
||||
IPage<SurvDeviceDeploy> devicelist = deviceDeployService.page(page,wrapper);
|
||||
|
||||
log.warn("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages());
|
||||
XxlJobHelper.log("需采集的MQTT设备数量:{}-共{}页",page.getTotal(),page.getPages());
|
||||
|
||||
|
||||
if (!devicelist.getRecords().isEmpty()) {
|
||||
List<DeviceStatusVo> dlist = new ArrayList<>();
|
||||
List<SurvDeviceDeploy> updList = new ArrayList<>();
|
||||
for (int i = 1; i <= devicelist.getPages(); i++) {
|
||||
log.warn("=====================x需采集的MQTT设备数量:page:{},共:{}页==========================",i,devicelist.getPages());
|
||||
XxlJobHelper.log("=====================x需采集的MQTT设备数量:page:{},共:{}页==========================",i,devicelist.getPages());
|
||||
if(i>1){//翻页
|
||||
page = new Page<SurvDeviceDeploy>(i,curPageSize);
|
||||
devicelist = deviceDeployService.page(page,wrapper);
|
||||
}
|
||||
for (SurvDeviceDeploy deploy : devicelist.getRecords()) {
|
||||
DeviceStatusVo dvo = new DeviceStatusVo();
|
||||
if (StringUtils.isNotBlank(deploy.getProtocolCode())) {
|
||||
try {
|
||||
switch (deploy.getDeployType()) {
|
||||
case PollutionConstants.CONTROL_CAB:
|
||||
if (IotConstants.lhviot_standard.equals(deploy.getProtocolCode())) {//蓝海设备协议
|
||||
LhIotUtil.DeviceQuery(deploy);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
XxlJobHelper.log("设备:{},执行Mqtt采集失败,跳过",deploy.getId());
|
||||
continue;
|
||||
}
|
||||
dlist.add(dvo);
|
||||
}
|
||||
}
|
||||
}
|
||||
}else{
|
||||
log.warn("=====未查询到需要采集的mqtt设备");
|
||||
}
|
||||
log.warn("======================MQTT设备采集任务结束=========================");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
XxlJobHelper.log(e);
|
||||
XxlJobHelper.handleFail();
|
||||
} finally {
|
||||
XxlJobHelper.log("MQTT设备采集任务结束");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -78,17 +78,17 @@ public class MultithreadTask {
|
|||
if (!deploys.isEmpty()) {
|
||||
//获取各农场的设备,过滤重复
|
||||
Map<String, SurvDeviceDeploy> deviceMap = new HashMap<>();
|
||||
Map<String, List<String>> allDeviceMap = new HashMap<>();
|
||||
Map<String, List<SurvDeviceDeploy>> allDeviceMap = new HashMap<>();
|
||||
Map<String,SurvDeviceDeploy> deviceDeployMap = new HashMap<>();
|
||||
for (SurvDeviceDeploy singleDevice : deploys) {
|
||||
SurvDeviceDeploy checkDeploy = deviceMap.get(singleDevice.getDeployCode());
|
||||
Boolean checkResult = Boolean.TRUE;
|
||||
List<String> codeDeviceLsit =allDeviceMap.get(singleDevice.getDeployCode());
|
||||
List<SurvDeviceDeploy> codeDeviceLsit =allDeviceMap.get(singleDevice.getDeployCode());
|
||||
deviceDeployMap.put(singleDevice.getDeployCode() + "_" + singleDevice.getDeployType(),singleDevice);
|
||||
if(codeDeviceLsit==null){
|
||||
codeDeviceLsit = new ArrayList<>();
|
||||
}
|
||||
codeDeviceLsit.add(singleDevice.getId());
|
||||
codeDeviceLsit.add(singleDevice);
|
||||
allDeviceMap.put(singleDevice.getDeployCode(),codeDeviceLsit);
|
||||
if (checkDeploy != null) {
|
||||
log.error("发现重复设备,编号:{},已有类型:{},现类型:{}", singleDevice.getDeployCode(), checkDeploy.getDeployType(), singleDevice.getDeployType());
|
||||
|
|
|
|||
|
|
@ -0,0 +1,101 @@
|
|||
package com.lanhai.util;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.lanhai.entity.ScEquZhibiao;
|
||||
import com.lanhai.entity.SurvDeviceDeploy;
|
||||
import com.lanhai.entity.SurvDeviceDeployRelay;
|
||||
import com.lanhai.mqtt.MqttService;
|
||||
import com.lanhai.o.iot.lhiot.*;
|
||||
import com.lanhai.o.iot.pbs.WaterCommonTransVo;
|
||||
import com.lanhai.service.IScEquZhibiaoService;
|
||||
import com.lanhai.service.ISurvDeviceDeployRelayService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
public class LhIotUtil {
|
||||
static IScEquZhibiaoService scEquZhibiaoService = SpringContextUtil.getBean(IScEquZhibiaoService.class);
|
||||
static MqttService mqttService = SpringContextUtil.getBean(MqttService.class);
|
||||
/**
|
||||
* 设备发送查询指令
|
||||
*/
|
||||
|
||||
public static boolean DeviceQuery(SurvDeviceDeploy survDeviceDeploy){
|
||||
if(survDeviceDeploy!=null){
|
||||
//构造指令
|
||||
QueryCmd queryCmd = ConstructCmd(survDeviceDeploy);
|
||||
//发送指令,传入下行主题 iotUrl
|
||||
String cmdStr = JSONObject.toJSONString(queryCmd);
|
||||
mqttService.publish(survDeviceDeploy.getDeviceIotUrl(),cmdStr);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 指令构造器
|
||||
*/
|
||||
public static QueryCmd ConstructCmd(SurvDeviceDeploy survDeviceDeploy){
|
||||
QueryCmd queryCmd = new QueryCmd();
|
||||
//查询设备配置
|
||||
List<ScEquZhibiao> eleList = scEquZhibiaoService.lambdaQuery()
|
||||
.eq(ScEquZhibiao::getEquId,survDeviceDeploy.getId())
|
||||
.list();
|
||||
if(eleList!=null && !eleList.isEmpty()){
|
||||
List<QueryCmdRegister> registers = new ArrayList<>();
|
||||
for (ScEquZhibiao scEquZhibiao : eleList) {
|
||||
QueryCmdRegister queryCmdRegister = new QueryCmdRegister();
|
||||
queryCmdRegister.setName(scEquZhibiao.getCode());
|
||||
registers.add(queryCmdRegister);
|
||||
}
|
||||
QueryCmdDetail queryCmdDetail = new QueryCmdDetail();
|
||||
queryCmdDetail.setR_data(registers);
|
||||
queryCmd.setRw_prot(queryCmdDetail);
|
||||
}
|
||||
|
||||
|
||||
return queryCmd;
|
||||
}
|
||||
|
||||
public static WaterCommonTransVo transData(List<ScEquZhibiao> zhibiaoList, String mqttMessage) {
|
||||
WaterCommonTransVo waterCommonTransVo = new WaterCommonTransVo();
|
||||
ResponseCmd responseCmd = JSONObject.parseObject(mqttMessage,ResponseCmd.class);
|
||||
if(responseCmd!=null){
|
||||
if(responseCmd.getRw_prot()!=null){
|
||||
if("rsp".equals(responseCmd.getRw_prot().getDir())) {
|
||||
if (zhibiaoList != null && !zhibiaoList.isEmpty()) {
|
||||
Map<String, String> eleMap = new HashMap<>();
|
||||
for (ScEquZhibiao scEquZhibiao : zhibiaoList) {
|
||||
eleMap.put(scEquZhibiao.getCode(), scEquZhibiao.getEntityField());
|
||||
}
|
||||
|
||||
if (!responseCmd.getRw_prot().getR_data().isEmpty()) {
|
||||
for (ResponseCmdData rDatum : responseCmd.getRw_prot().getR_data()) {
|
||||
String entityName = eleMap.get(rDatum.getName());//字段名称
|
||||
if (StringUtils.isNotBlank(entityName)) {
|
||||
TUtil.setFieldValue(waterCommonTransVo, entityName, rDatum.getValue());
|
||||
} else {
|
||||
log.error("指令:{}----未匹配到监测项目:{}", mqttMessage, rDatum.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log.warn("{}----指令解析结果=============={}",mqttMessage,JSONObject.toJSONString(waterCommonTransVo));
|
||||
}else{
|
||||
log.error("----非主动请求数据,跳过解析==============回执:{}",mqttMessage);
|
||||
}
|
||||
}else{
|
||||
log.error("------------解析失败,回执格式有误2-------------");
|
||||
}
|
||||
}else{
|
||||
log.error("xxxxxxxxxxxxx解析失败,回执格式有误1xxxxxxxxxxxxx");
|
||||
}
|
||||
return waterCommonTransVo;
|
||||
}
|
||||
}
|
||||
|
|
@ -29,6 +29,7 @@ import java.util.HashMap;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 山东仁科工具类
|
||||
|
|
@ -46,10 +47,10 @@ public class SdrkUtils {
|
|||
|
||||
private static String domain = "http://www.0531yun.com";
|
||||
|
||||
public static boolean processSurvData(SurvDeviceDeploy deploy, Map<String, List<String>> allDeviceMap, Map<String, SurvDeviceDeploy> deviceDeployMap) {
|
||||
List<String> allDeployList = allDeviceMap.get(deploy.getDeployCode());
|
||||
public static boolean processSurvData(SurvDeviceDeploy deploy, Map<String, List<SurvDeviceDeploy>> allDeviceMap, Map<String, SurvDeviceDeploy> deviceDeployMap) {
|
||||
List<SurvDeviceDeploy> allDeployList = allDeviceMap.get(deploy.getDeployCode());
|
||||
deploy.setDeviceList(allDeployList);
|
||||
deploy.setScEquZhibiaoList(scEquZhibiaoService.getListByEquid(allDeployList));
|
||||
|
||||
log.error("========正在执行仁科=======,设备编号:{},类型:{}==========", deploy.getDeployCode(), deploy.getDeployType());
|
||||
RenkeDataRealTimePack renkeDataRealTimePack = getDeviceData(deploy);
|
||||
XxlJobHelper.log("仁科回执:"+JSONObject.toJSONString(renkeDataRealTimePack));
|
||||
|
|
@ -57,8 +58,12 @@ public class SdrkUtils {
|
|||
if(renkeDataRealTimePack!=null){
|
||||
if(renkeDataRealTimePack.getData()!=null){
|
||||
if(!renkeDataRealTimePack.getData().isEmpty()){
|
||||
saveData(deploy,deviceDeployMap,renkeDataRealTimePack.getData());
|
||||
b = true;
|
||||
//此处至少有一条数据,所以直接处理
|
||||
for (SurvDeviceDeploy survDeviceDeploy : deploy.getDeviceList()) {
|
||||
survDeviceDeploy.setScEquZhibiaoList(scEquZhibiaoService.getListByEquid(survDeviceDeploy.getId()));
|
||||
saveData(survDeviceDeploy,deviceDeployMap,renkeDataRealTimePack.getData());
|
||||
}
|
||||
b = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import java.time.LocalDateTime;
|
|||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public class XphUtils {
|
||||
|
|
@ -197,16 +198,16 @@ public class XphUtils {
|
|||
/**
|
||||
* 处理保存数据
|
||||
*/
|
||||
public static boolean processSurvData(SurvDeviceDeploy deploy,Map<String, List<String>> allDeviceMap,Map<String,SurvDeviceDeploy> deviceDeployMap){
|
||||
public static boolean processSurvData(SurvDeviceDeploy deploy,Map<String, List<SurvDeviceDeploy>> allDeviceMap,Map<String,SurvDeviceDeploy> deviceDeployMap){
|
||||
String deviceApiUrl = String.format(domain+"/screen/data?deviceId=%s", deploy.getDeployCode());
|
||||
// List<String> allDeployList = allDeviceMap.get(deploy.getFarmId() + "_"+deploy.getDeployCode());
|
||||
List<String> allDeployList = allDeviceMap.get(deploy.getDeployCode());
|
||||
List<SurvDeviceDeploy> allDeployList = allDeviceMap.get(deploy.getDeployCode());
|
||||
deploy.setDeviceList(allDeployList);
|
||||
//根据编号查询该设备在系统中有多少监测项
|
||||
// System.out.println(deploy.getDeployCode()+"==========firstCheck=========="+allDeployList);
|
||||
VOIotAccess voIotAccess = getAccess(deploy);
|
||||
String token = getXphToken(voIotAccess.getAppId(),voIotAccess.getAppSecret());
|
||||
deploy.setScEquZhibiaoList(scEquZhibiaoService.getListByEquid(allDeployList));
|
||||
|
||||
log.error("========正在执行=======设备编号:{},类型:{}==========",deploy.getDeployCode(), deploy.getDeployType());
|
||||
log.error("***设备请求数据:{}",deviceApiUrl);
|
||||
String dataResult = HttpUtil.createGet(deviceApiUrl).header("token",token).execute().body();
|
||||
|
|
@ -216,11 +217,16 @@ public class XphUtils {
|
|||
log.error("==========================执行中断============================");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (PollutionConstants.WATER_QULITY.equals(deploy.getDeployType())) {
|
||||
survTransdataSoilService.saveWaterBaowen(dataMap, deploy,deviceDeployMap);
|
||||
} else {
|
||||
survTransdataSoilService.saveBaowen(dataMap, deploy,deviceDeployMap);
|
||||
//此deviceList至少为1个长度,所以直接处理
|
||||
for (SurvDeviceDeploy survDeviceDeploy : deploy.getDeviceList()) {
|
||||
log.error("@@@@@@@@@@@@@设备号:{}共绑定有{}个@@@@@@@@@@@@@@@@",deploy.getDeployCode(),deploy.getDeviceList().size());
|
||||
survDeviceDeploy.setScEquZhibiaoList(scEquZhibiaoService.getListByEquid(survDeviceDeploy.getId()));
|
||||
log.error("xxxxxxxxxxxxx当前进行============={}================{}==========={}",survDeviceDeploy.getTenantId(),survDeviceDeploy.getId(),survDeviceDeploy.getScEquZhibiaoList().size());
|
||||
if (PollutionConstants.WATER_QULITY.equals(survDeviceDeploy.getDeployType())) {
|
||||
survTransdataSoilService.saveWaterBaowen(dataMap, survDeviceDeploy,deviceDeployMap);
|
||||
} else {
|
||||
survTransdataSoilService.saveBaowen(dataMap, survDeviceDeploy,deviceDeployMap);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue