From 02359ecdd5ab2966b8fe4a0ef265258f04a13adf Mon Sep 17 00:00:00 2001 From: lulicheng Date: Thu, 13 Jun 2024 15:17:39 +0800 Subject: [PATCH] =?UTF-8?q?Kafka=E7=9B=91=E5=90=AC=E8=A5=BF=E5=AE=89?= =?UTF-8?q?=E8=83=BD=E8=80=97=E4=BB=AA=E8=A1=A8=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/ipsplm/dao/XaMeterListMapper.java | 9 +++ .../com/ipsplm/entity/iot/XaMeterList.java | 59 +++++++++++++++++++ .../ipsplm/entity/iot/XaMeterRealInfo.java | 48 +++++++++++++++ .../ipsplm/kafka/KafkaConsumerListener.java | 26 +++++++- .../ipsplm/service/IXaMeterListService.java | 7 +++ .../service/impl/XaMeterListServiceImpl.java | 11 ++++ 6 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/ipsplm/dao/XaMeterListMapper.java create mode 100644 src/main/java/com/ipsplm/entity/iot/XaMeterList.java create mode 100644 src/main/java/com/ipsplm/entity/iot/XaMeterRealInfo.java create mode 100644 src/main/java/com/ipsplm/service/IXaMeterListService.java create mode 100644 src/main/java/com/ipsplm/service/impl/XaMeterListServiceImpl.java diff --git a/src/main/java/com/ipsplm/dao/XaMeterListMapper.java b/src/main/java/com/ipsplm/dao/XaMeterListMapper.java new file mode 100644 index 0000000..cb0655f --- /dev/null +++ b/src/main/java/com/ipsplm/dao/XaMeterListMapper.java @@ -0,0 +1,9 @@ +package com.ipsplm.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.ipsplm.entity.iot.XaMeterList; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface XaMeterListMapper extends BaseMapper { +} diff --git a/src/main/java/com/ipsplm/entity/iot/XaMeterList.java b/src/main/java/com/ipsplm/entity/iot/XaMeterList.java new file mode 100644 index 0000000..d78aa3a --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/XaMeterList.java @@ -0,0 +1,59 @@ +package com.ipsplm.entity.iot; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.io.Serializable; +/** + * @Description 西安能耗仪表 + * @Author lulicheng + * @Date 2024-06-13 14:16 + * @Version 1.0 + */ +@Data +@TableName("xazz_energy_meter") +public class XaMeterList implements Serializable { + //主键ID + @TableId(value = "id",type = IdType.AUTO) + private Long id; + + //仪表编号 + @TableField("meter_num") + private String meterNum; + + //仪表名称 + @TableField("meter_name") + private String meterName; + + //仪表类型名称 + @TableField("meter_type_name") + private String meterTypeName; + + //仪表等级 + @TableField("meter_level") + private Integer meterLevel; + + //安装位置 + @TableField("meter_position") + private String meterPosition; + + //部门祖籍 + @TableField("organization") + private String organization; + + //所属组织机构名称 + @TableField("group_name") + private String groupName; + + //所属产线名称 + @TableField("production_line_name") + private String productionLineName; + + //所属产线编号 + @TableField("production_line_code") + private String productionLineCode; +} diff --git a/src/main/java/com/ipsplm/entity/iot/XaMeterRealInfo.java b/src/main/java/com/ipsplm/entity/iot/XaMeterRealInfo.java new file mode 100644 index 0000000..f8d46ed --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/XaMeterRealInfo.java @@ -0,0 +1,48 @@ +package com.ipsplm.entity.iot; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @Description 西安仪表实时数据 + * @Author lulicheng + * @Date 2024-06-13 14:22 + * @Version 1.0 + */ +@Data +@TableName("xazz_meter_real_info") +public class XaMeterRealInfo implements Serializable { + //主键ID + @TableId(value = "id",type = IdType.AUTO) + private Long id; + + //仪表编号 + @TableField("meter_num") + private String meterNum; + + //总视在功率(W) + @TableField("sabc") + private Float sabc; + + //当前总有功电能 + @TableField("epp") + private Float epp; + + //仪表时间 + @TableField("meter_time") + private Date meterTime; + + //kafka主题 + @TableField("topic") + private String topic; + + //kafka分区 + @TableField("part") + private String part; +} diff --git a/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java b/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java index a2ce600..cf31816 100644 --- a/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java +++ b/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java @@ -3,8 +3,11 @@ package com.ipsplm.kafka; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.ipsplm.entity.iot.*; import com.ipsplm.service.IXaIotDeviceListSyncService; +import com.ipsplm.service.IXaMeterListService; import com.ipsplm.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -25,21 +28,30 @@ import java.util.concurrent.ConcurrentHashMap; @Slf4j public class KafkaConsumerListener { private IXaIotDeviceListSyncService iXaIotDeviceListSyncService; + private IXaMeterListService iXaMeterListService; @Autowired private RedisUtils redisUtils; // 设备编号List private static Map xaDeviceNumMap = new ConcurrentHashMap<>(); + //仪表编号List + private static Map xaMeterNumMap = new ConcurrentHashMap<>(); private final Map deviceTimestamps = new ConcurrentHashMap<>(); - public KafkaConsumerListener(IXaIotDeviceListSyncService iXaIotDeviceListSyncService) { + public KafkaConsumerListener(IXaIotDeviceListSyncService iXaIotDeviceListSyncService, IXaMeterListService iXaMeterListService) { this.iXaIotDeviceListSyncService = iXaIotDeviceListSyncService; + this.iXaMeterListService = iXaMeterListService; //设备IOT清单 List xaDeviceList = this.iXaIotDeviceListSyncService.list(null); + //能耗仪表清单 + List xaMeterList = this.iXaMeterListService.list(null); xaDeviceList.forEach(dl -> { xaDeviceNumMap.put(dl.getDeviceNum(), dl.getWebsocketGroupId()); }); + xaMeterList.forEach(ml ->{ + xaMeterNumMap.put(ml.getMeterNum(),9); + }); String groupId = "shenyang_real_time_" + System.currentTimeMillis(); System.out.println("kafka消费者组号:" + groupId + "*********"); System.setProperty("groupId", groupId); @@ -53,6 +65,8 @@ public class KafkaConsumerListener { long messageTimestamp = record.timestamp(); // 设备实时数据 XaIotDeviceRealInfo xaIotDeviceRealInfo = new XaIotDeviceRealInfo(); + //仪表实时数据 + XaMeterRealInfo xaMeterRealInfo = new XaMeterRealInfo(); if (record.value() != null) { JSONObject jsonObject = JSON.parseObject(record.value()); if (jsonObject != null) { @@ -119,12 +133,22 @@ public class KafkaConsumerListener { xaIotDeviceRealInfo.setWebsocketGroupId(xaDeviceNumMap.get(deviceNum)); xaIotDeviceRealInfo.setGmtModified(currentTime); xaIotDeviceRealInfo.setGmtCreated(currentTime); + }else if(xaMeterNumMap.containsKey(deviceNum)){ + xaMeterRealInfo.setMeterNum(deviceNum); + xaMeterRealInfo.setSabc(jsonObject.containsKey("Sabc") ? jsonObject.getFloat("Sabc") : null); + xaMeterRealInfo.setEpp(jsonObject.containsKey("EPP") ? jsonObject.getFloat("EPP") : null); + xaMeterRealInfo.setPart(String.valueOf(record.partition())); + xaMeterRealInfo.setTopic(record.topic()); + xaMeterRealInfo.setMeterTime(DateUtil.date(messageTimestamp)); } } } if (xaIotDeviceRealInfo.getDeviceNum() != null && !xaIotDeviceRealInfo.getDeviceNum().isEmpty()) { String deviceInfo = JSON.toJSONString(xaIotDeviceRealInfo); redisUtils.set("XaKafka:" + xaIotDeviceRealInfo.getDeviceNum() + ":" + xaIotDeviceRealInfo.getPart(), deviceInfo); + }else if(xaMeterRealInfo.getMeterNum() != null && !xaMeterRealInfo.getMeterNum().isEmpty()){ + String meterInfo = JSON.toJSONString(xaMeterRealInfo); + redisUtils.set("XaMeterKafka:" + xaMeterRealInfo.getMeterNum() + ":" + xaMeterRealInfo.getPart(), meterInfo); } } } diff --git a/src/main/java/com/ipsplm/service/IXaMeterListService.java b/src/main/java/com/ipsplm/service/IXaMeterListService.java new file mode 100644 index 0000000..1bf2d70 --- /dev/null +++ b/src/main/java/com/ipsplm/service/IXaMeterListService.java @@ -0,0 +1,7 @@ +package com.ipsplm.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.ipsplm.entity.iot.XaMeterList; + +public interface IXaMeterListService extends IService { +} diff --git a/src/main/java/com/ipsplm/service/impl/XaMeterListServiceImpl.java b/src/main/java/com/ipsplm/service/impl/XaMeterListServiceImpl.java new file mode 100644 index 0000000..68d6a06 --- /dev/null +++ b/src/main/java/com/ipsplm/service/impl/XaMeterListServiceImpl.java @@ -0,0 +1,11 @@ +package com.ipsplm.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.ipsplm.dao.XaMeterListMapper; +import com.ipsplm.entity.iot.XaMeterList; +import com.ipsplm.service.IXaMeterListService; +import org.springframework.stereotype.Service; + +@Service +public class XaMeterListServiceImpl extends ServiceImpl implements IXaMeterListService { +}