Kafka监听西安能耗仪表数据

master
lulicheng 10 months ago
parent 0cdc3595e5
commit 02359ecdd5

@ -0,0 +1,9 @@
package com.ipsplm.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ipsplm.entity.iot.XaMeterList;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface XaMeterListMapper extends BaseMapper<XaMeterList> {
}

@ -0,0 +1,59 @@
package com.ipsplm.entity.iot;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.io.Serializable;
/**
* @Description 西
* @Author lulicheng
* @Date 2024-06-13 14:16
* @Version 1.0
*/
@Data
@TableName("xazz_energy_meter")
public class XaMeterList implements Serializable {
//主键ID
@TableId(value = "id",type = IdType.AUTO)
private Long id;
//仪表编号
@TableField("meter_num")
private String meterNum;
//仪表名称
@TableField("meter_name")
private String meterName;
//仪表类型名称
@TableField("meter_type_name")
private String meterTypeName;
//仪表等级
@TableField("meter_level")
private Integer meterLevel;
//安装位置
@TableField("meter_position")
private String meterPosition;
//部门祖籍
@TableField("organization")
private String organization;
//所属组织机构名称
@TableField("group_name")
private String groupName;
//所属产线名称
@TableField("production_line_name")
private String productionLineName;
//所属产线编号
@TableField("production_line_code")
private String productionLineCode;
}

@ -0,0 +1,48 @@
package com.ipsplm.entity.iot;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @Description 西
* @Author lulicheng
* @Date 2024-06-13 14:22
* @Version 1.0
*/
@Data
@TableName("xazz_meter_real_info")
public class XaMeterRealInfo implements Serializable {
//主键ID
@TableId(value = "id",type = IdType.AUTO)
private Long id;
//仪表编号
@TableField("meter_num")
private String meterNum;
//总视在功率W
@TableField("sabc")
private Float sabc;
//当前总有功电能
@TableField("epp")
private Float epp;
//仪表时间
@TableField("meter_time")
private Date meterTime;
//kafka主题
@TableField("topic")
private String topic;
//kafka分区
@TableField("part")
private String part;
}

@ -3,8 +3,11 @@ package com.ipsplm.kafka;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ipsplm.entity.iot.*;
import com.ipsplm.service.IXaIotDeviceListSyncService;
import com.ipsplm.service.IXaMeterListService;
import com.ipsplm.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -25,21 +28,30 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class KafkaConsumerListener {
private IXaIotDeviceListSyncService iXaIotDeviceListSyncService;
private IXaMeterListService iXaMeterListService;
@Autowired
private RedisUtils redisUtils;
// 设备编号List
private static Map<String, Integer> xaDeviceNumMap = new ConcurrentHashMap<>();
//仪表编号List
private static Map<String,Integer> xaMeterNumMap = new ConcurrentHashMap<>();
private final Map<String, Long> deviceTimestamps = new ConcurrentHashMap<>();
public KafkaConsumerListener(IXaIotDeviceListSyncService iXaIotDeviceListSyncService) {
public KafkaConsumerListener(IXaIotDeviceListSyncService iXaIotDeviceListSyncService, IXaMeterListService iXaMeterListService) {
this.iXaIotDeviceListSyncService = iXaIotDeviceListSyncService;
this.iXaMeterListService = iXaMeterListService;
//设备IOT清单
List<XaIotDeviceListSync> xaDeviceList = this.iXaIotDeviceListSyncService.list(null);
//能耗仪表清单
List<XaMeterList> xaMeterList = this.iXaMeterListService.list(null);
xaDeviceList.forEach(dl -> {
xaDeviceNumMap.put(dl.getDeviceNum(), dl.getWebsocketGroupId());
});
xaMeterList.forEach(ml ->{
xaMeterNumMap.put(ml.getMeterNum(),9);
});
String groupId = "shenyang_real_time_" + System.currentTimeMillis();
System.out.println("kafka消费者组号" + groupId + "*********");
System.setProperty("groupId", groupId);
@ -53,6 +65,8 @@ public class KafkaConsumerListener {
long messageTimestamp = record.timestamp();
// 设备实时数据
XaIotDeviceRealInfo xaIotDeviceRealInfo = new XaIotDeviceRealInfo();
//仪表实时数据
XaMeterRealInfo xaMeterRealInfo = new XaMeterRealInfo();
if (record.value() != null) {
JSONObject jsonObject = JSON.parseObject(record.value());
if (jsonObject != null) {
@ -119,12 +133,22 @@ public class KafkaConsumerListener {
xaIotDeviceRealInfo.setWebsocketGroupId(xaDeviceNumMap.get(deviceNum));
xaIotDeviceRealInfo.setGmtModified(currentTime);
xaIotDeviceRealInfo.setGmtCreated(currentTime);
}else if(xaMeterNumMap.containsKey(deviceNum)){
xaMeterRealInfo.setMeterNum(deviceNum);
xaMeterRealInfo.setSabc(jsonObject.containsKey("Sabc") ? jsonObject.getFloat("Sabc") : null);
xaMeterRealInfo.setEpp(jsonObject.containsKey("EPP") ? jsonObject.getFloat("EPP") : null);
xaMeterRealInfo.setPart(String.valueOf(record.partition()));
xaMeterRealInfo.setTopic(record.topic());
xaMeterRealInfo.setMeterTime(DateUtil.date(messageTimestamp));
}
}
}
if (xaIotDeviceRealInfo.getDeviceNum() != null && !xaIotDeviceRealInfo.getDeviceNum().isEmpty()) {
String deviceInfo = JSON.toJSONString(xaIotDeviceRealInfo);
redisUtils.set("XaKafka:" + xaIotDeviceRealInfo.getDeviceNum() + ":" + xaIotDeviceRealInfo.getPart(), deviceInfo);
}else if(xaMeterRealInfo.getMeterNum() != null && !xaMeterRealInfo.getMeterNum().isEmpty()){
String meterInfo = JSON.toJSONString(xaMeterRealInfo);
redisUtils.set("XaMeterKafka:" + xaMeterRealInfo.getMeterNum() + ":" + xaMeterRealInfo.getPart(), meterInfo);
}
}
}

@ -0,0 +1,7 @@
package com.ipsplm.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.ipsplm.entity.iot.XaMeterList;
public interface IXaMeterListService extends IService<XaMeterList> {
}

@ -0,0 +1,11 @@
package com.ipsplm.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ipsplm.dao.XaMeterListMapper;
import com.ipsplm.entity.iot.XaMeterList;
import com.ipsplm.service.IXaMeterListService;
import org.springframework.stereotype.Service;
@Service
public class XaMeterListServiceImpl extends ServiceImpl<XaMeterListMapper, XaMeterList> implements IXaMeterListService {
}
Loading…
Cancel
Save