diff --git a/src/main/java/com/ipsplm/entity/iot/XaMeterList.java b/src/main/java/com/ipsplm/entity/iot/XaMeterList.java index d78aa3a..feeb8f1 100644 --- a/src/main/java/com/ipsplm/entity/iot/XaMeterList.java +++ b/src/main/java/com/ipsplm/entity/iot/XaMeterList.java @@ -50,10 +50,10 @@ public class XaMeterList implements Serializable { private String groupName; //所属产线名称 - @TableField("production_line_name") - private String productionLineName; + @TableField("subordinate_name") + private String subordinateName; //所属产线编号 - @TableField("production_line_code") - private String productionLineCode; + @TableField("subordinate_code") + private String subordinateCode; } diff --git a/src/main/java/com/ipsplm/entity/iot/XaMeterRealInfo.java b/src/main/java/com/ipsplm/entity/iot/XaMeterRealInfo.java index f8d46ed..2bcc6f4 100644 --- a/src/main/java/com/ipsplm/entity/iot/XaMeterRealInfo.java +++ b/src/main/java/com/ipsplm/entity/iot/XaMeterRealInfo.java @@ -45,4 +45,10 @@ public class XaMeterRealInfo implements Serializable { //kafka分区 @TableField("part") private String part; + + /** + * websocket分组ID + */ + @TableField(exist = false) + private Integer websocketGroupId; } diff --git a/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java b/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java index cf31816..ef81dd6 100644 --- a/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java +++ b/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java @@ -140,6 +140,7 @@ public class KafkaConsumerListener { xaMeterRealInfo.setPart(String.valueOf(record.partition())); xaMeterRealInfo.setTopic(record.topic()); xaMeterRealInfo.setMeterTime(DateUtil.date(messageTimestamp)); + xaMeterRealInfo.setWebsocketGroupId(xaMeterNumMap.get(deviceNum)); } } } diff --git a/src/main/java/com/ipsplm/listener/RedisListener.java b/src/main/java/com/ipsplm/listener/RedisListener.java new file mode 100644 index 0000000..c2e42c9 --- /dev/null +++ b/src/main/java/com/ipsplm/listener/RedisListener.java @@ -0,0 +1,71 @@ +package com.ipsplm.listener; + + +import com.alibaba.fastjson.JSON; +import com.ipsplm.utils.RedisUtils; +import com.ipsplm.websocket.*; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.stereotype.Component; + +/** + * @Description Redis监听器 + * @Author FanDongqiang + * @Date 2023/5/4 11:32 + * @Version 1.0 + */ +@Component +@Slf4j +@Getter +public class RedisListener implements MessageListener { + private final PatternTopic topic = new PatternTopic("__keyevent@0__:set"); + + @Autowired + @Lazy + private RedisUtils redisUtils; + + @Override + public void onMessage(Message message, byte[] pattern) { + String msg = redisUtils.get(new String(message.getBody())).toString(); + int websocketGroupId = JSON.parseObject(msg).getInteger("websocketGroupId"); + if (websocketGroupId != 0) { + sendInfoToWebSocketHandler(websocketGroupId, msg); + } + } + + private void sendInfoToWebSocketHandler(int websocketGroupId, String message) { + switch (websocketGroupId) { + case 1: + WebSocketHandler1.sendInfo(message); + break; + case 2: + WebSocketHandler2.sendInfo(message); + break; + case 3: + WebSocketHandler3.sendInfo(message); + break; + case 4: + WebSocketHandler4.sendInfo(message); + break; + case 5: + WebSocketHandler5.sendInfo(message); + break; + case 6: + WebSocketHandler6.sendInfo(message); + break; + case 7: + WebSocketHandler7.sendInfo(message); + break; + case 8: + WebSocketHandler8.sendInfo(message); + break; + default: + log.warn("Invalid websocketGroupId: {}", websocketGroupId); + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 2eab58e..af60f1f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -26,7 +26,7 @@ spring: redis: # 地址 - host: 10.31.12.195 + host: 127.0.0.1 # 端口,默认为6379 port: 6379 # 数据库索引