Kafka监听西安能耗仪表数据

master
lulicheng 10 months ago
parent 02359ecdd5
commit 161072fa4f

@ -50,10 +50,10 @@ public class XaMeterList implements Serializable {
private String groupName;
//所属产线名称
@TableField("production_line_name")
private String productionLineName;
@TableField("subordinate_name")
private String subordinateName;
//所属产线编号
@TableField("production_line_code")
private String productionLineCode;
@TableField("subordinate_code")
private String subordinateCode;
}

@ -45,4 +45,10 @@ public class XaMeterRealInfo implements Serializable {
//kafka分区
@TableField("part")
private String part;
/**
* websocketID
*/
@TableField(exist = false)
private Integer websocketGroupId;
}

@ -140,6 +140,7 @@ public class KafkaConsumerListener {
xaMeterRealInfo.setPart(String.valueOf(record.partition()));
xaMeterRealInfo.setTopic(record.topic());
xaMeterRealInfo.setMeterTime(DateUtil.date(messageTimestamp));
xaMeterRealInfo.setWebsocketGroupId(xaMeterNumMap.get(deviceNum));
}
}
}

@ -0,0 +1,71 @@
package com.ipsplm.listener;
import com.alibaba.fastjson.JSON;
import com.ipsplm.utils.RedisUtils;
import com.ipsplm.websocket.*;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.stereotype.Component;
/**
* @Description Redis
* @Author FanDongqiang
* @Date 2023/5/4 11:32
* @Version 1.0
*/
@Component
@Slf4j
@Getter
public class RedisListener implements MessageListener {
private final PatternTopic topic = new PatternTopic("__keyevent@0__:set");
@Autowired
@Lazy
private RedisUtils redisUtils;
@Override
public void onMessage(Message message, byte[] pattern) {
String msg = redisUtils.get(new String(message.getBody())).toString();
int websocketGroupId = JSON.parseObject(msg).getInteger("websocketGroupId");
if (websocketGroupId != 0) {
sendInfoToWebSocketHandler(websocketGroupId, msg);
}
}
private void sendInfoToWebSocketHandler(int websocketGroupId, String message) {
switch (websocketGroupId) {
case 1:
WebSocketHandler1.sendInfo(message);
break;
case 2:
WebSocketHandler2.sendInfo(message);
break;
case 3:
WebSocketHandler3.sendInfo(message);
break;
case 4:
WebSocketHandler4.sendInfo(message);
break;
case 5:
WebSocketHandler5.sendInfo(message);
break;
case 6:
WebSocketHandler6.sendInfo(message);
break;
case 7:
WebSocketHandler7.sendInfo(message);
break;
case 8:
WebSocketHandler8.sendInfo(message);
break;
default:
log.warn("Invalid websocketGroupId: {}", websocketGroupId);
}
}
}

@ -26,7 +26,7 @@ spring:
redis:
# 地址
host: 10.31.12.195
host: 127.0.0.1
# 端口默认为6379
port: 6379
# 数据库索引

Loading…
Cancel
Save