第一次提交

master
lulicheng 10 months ago
commit 17efe4dc57

@ -0,0 +1,37 @@
# 三一西安实时驱动
#### 介绍
三一西安实时驱动数据,如机械臂数据等。
#### 软件架构
软件架构说明
#### 安装教程
1. xxxx
2. xxxx
3. xxxx
#### 使用说明
1. xxxx
2. xxxx
3. xxxx
#### 参与贡献
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
#### 特技
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.7</version>
<relativePath/>
</parent>
<groupId>com.ipsplm</groupId>
<artifactId>sany-xian-realtime-drive</artifactId>
<version>1.0</version>
<name>sany-xian-realtime-drive</name>
<description>三一西安实时驱动</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--自定义通用模块-->
<dependency>
<groupId>com.ipsplm</groupId>
<artifactId>common-api</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--数据库多数据源-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>2.5.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>3.0.6</version>
</dependency>
<!--spring整合kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,16 @@
package com.ipsplm;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.TimeZone;
@SpringBootApplication
public class SanyShenYangRealtimeDriveApplication {
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
SpringApplication.run(SanyShenYangRealtimeDriveApplication.class, args);
}
}

@ -0,0 +1,23 @@
package com.ipsplm.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* @Description
* @Author FanDongqiang
* @Date 2023/1/12 9:37
* @Version 1.0
*/
@Configuration
public class CrossConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**") // 允许跨域访问的路径
.allowedOriginPatterns("*") // 允许跨域访问的源
.allowedMethods("GET", "POST","PUT", "DELETE", "OPTIONS") // 允许请求方法
.allowCredentials(true) // 是否发送cookie
.maxAge(3600); // 预检间隔时间
}
}

@ -0,0 +1,19 @@
package com.ipsplm.config;
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@MapperScan(basePackages = "com.ipsplm.dao")
public class MybatisPlusConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
}

@ -0,0 +1,74 @@
package com.ipsplm.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.ipsplm.listener.RedisListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @Description redis
* @Author FanDongqiang
* @Date 2023/2/3 13:36
* @Version 1.0
*/
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Autowired
private RedisListener redisListener;
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
objectMapper.registerModule((new SimpleModule()));
//有属性不能映射的时候不报错
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
//对象为空时不抛异常
objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL,
JsonTypeInfo.As.PROPERTY);
GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(objectMapper);
// key采用String的序列化方式
template.setKeySerializer(new StringRedisSerializer());
// value序列化方式采用jackson
template.setValueSerializer(serializer);
// Hash的key也采用StringRedisSerializer的序列化方式
template.setHashKeySerializer(new StringRedisSerializer());
// Hash的value序列化方式采用jackson
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(redisListener, redisListener.getTopic());
return container;
}
}

@ -0,0 +1,76 @@
package com.ipsplm.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@Configuration
public class SecondKafkaConfig {
/**
* kafka
*
* @return kafka
*/
@ConfigurationProperties(prefix = "spring.kafka.second")
@Bean
public KafkaProperties secondKafkaProperties() {
return new KafkaProperties();
}
// /**
// * 构建第二个kafka的生产者发送template
// *
// * @param secondKafkaProperties 第二个kafka配置
// * @return 第二个kafka的生产者发送template
// */
// @Bean
// public KafkaTemplate<String, String> secondKafkaTemplate(
// @Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
// return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties));
// }
/**
* kafka
*
* @param secondKafkaProperties kafka
* @return kafka
*/
@Bean("secondKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties));
return factory;
}
/**
* kafka
*
* @param secondKafkaProperties kafka
* @return kafka
*/
private ConsumerFactory<? super Integer, ? super String> secondConsumerFactory(KafkaProperties secondKafkaProperties) {
return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties());
}
// /**
// * 新建第二个kafka的生产者工厂
// *
// * @param secondKafkaProperties 第二个kafka配置
// * @return 第二个kafka的生产者工厂
// */
// private DefaultKafkaProducerFactory<String, String> secondProducerFactory(KafkaProperties secondKafkaProperties) {
// return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties());
// }
}

@ -0,0 +1,25 @@
package com.ipsplm.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/6 11:03
* @Version 1.0
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig {
/**
* ServerEndpointExporter
* bean使@ServerEndpointWebsocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

@ -0,0 +1,9 @@
package com.ipsplm.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ipsplm.entity.iot.XaIotDeviceListSync;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface XaIotDeviceListSyncMapper extends BaseMapper<XaIotDeviceListSync> {
}

@ -0,0 +1,42 @@
package com.ipsplm.entity.common;
import com.baomidou.mybatisplus.annotation.TableField;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @Description
* @Author FanDongqiang
* @Date 2023/2/20 10:38
* @Version 1.0
*/
@Data
public class BaseEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
*
*/
@TableField("creator")
private Long creator;
/**
*
*/
@TableField("gmt_created")
private Date gmtCreated;
/**
*
*/
@TableField("editor")
private Long editor;
/**
*
*/
@TableField("gmt_modified")
private Date gmtModified;
}

@ -0,0 +1,43 @@
package com.ipsplm.entity.iot;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
@Data
@TableName("xazz_iot_device_list_sync")
public class XaIotDeviceListSync implements Serializable {
//主键id
@TableId(value = "id",type = IdType.AUTO)
private Long id;
//设备编号
@TableField("device_num")
private String deviceNum;
//设备名称
@TableField("device_name")
private String deviceName;
//设备型号
@TableField("device_model")
private String deviceModel;
//公司名称
@TableField("company_name")
private String companyName;
//工作中心名称
@TableField("work_center_name")
private String workCenterName;
//班组名称
@TableField("product_line")
private String teamGroupName;
//一级工艺
@TableField("first_process")
private String firstProcess;
//二级工艺
@TableField("sec_process")
private String secProcess;
//websocket分组
@TableField("websocket_group_id")
private Integer websocketGroupId;
}

@ -0,0 +1,103 @@
package com.ipsplm.entity.iot;
import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.ipsplm.entity.common.BaseEntity;
import lombok.Data;
import java.util.Date;
@Data
@TableName("xazz_iot_device_real_info")
public class XaIotDeviceRealInfo extends BaseEntity {
/**
* ID
* iot_device_real_info.id
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
*
* iot_device_real_info.device_num
*/
@TableField("device_num")
private String deviceNum;
/**
* 1 2 3 4
* iot_device_real_info.device_status
*/
@TableField("device_status")
private Integer deviceStatus;
/**
*
* iot_device_real_info.device_time
*/
@TableField("device_time")
private Date deviceTime;
/**
*
* iot_device_real_info.device_timestamp
*/
@TableField("device_timestamp")
private String deviceTimestamp;
/**
*
* iot_device_real_info.device_time
*/
@TableField(exist = false)
private String deviceTimeStr;
/**
* 0 1
* iot_device_real_info.alarm
*/
@TableField("alarm")
private Integer alarm;
/**
* Json
* iot_device_real_info.alarm_msg
*/
@TableField("alarm_msg")
private String alarmMsg;
/**
* Json
* iot_device_real_info.point_position
*/
@TableField("point_position")
private String pointPosition;
/**
* Json
*/
@TableField(exist = false)
private JSONArray pointPositionArray;
/**
* kafka
* iot_device_real_info.topic
*/
@TableField("topic")
private String topic;
/**
* kafka
* iot_device_real_info.part
*/
@TableField("part")
private String part;
/**
* websocketID
*/
@TableField(exist = false)
private Integer websocketGroupId;
}

@ -0,0 +1,130 @@
package com.ipsplm.kafka;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ipsplm.entity.iot.*;
import com.ipsplm.service.IXaIotDeviceListSyncService;
import com.ipsplm.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Description kafka
* @Author FanDongqiang
* @Date 2023/4/26 10:19
* @Version 1.0
*/
@Component
@Slf4j
public class KafkaConsumerListener {
private IXaIotDeviceListSyncService iXaIotDeviceListSyncService;
@Autowired
private RedisUtils redisUtils;
// 设备编号List
private static Map<String, Integer> xaDeviceNumMap = new ConcurrentHashMap<>();
private final Map<String, Long> deviceTimestamps = new ConcurrentHashMap<>();
public KafkaConsumerListener(IXaIotDeviceListSyncService iXaIotDeviceListSyncService) {
this.iXaIotDeviceListSyncService = iXaIotDeviceListSyncService;
//设备IOT清单
List<XaIotDeviceListSync> xaDeviceList = this.iXaIotDeviceListSyncService.list(null);
xaDeviceList.forEach(dl -> {
xaDeviceNumMap.put(dl.getDeviceNum(), dl.getWebsocketGroupId());
});
String groupId = "shenyang_real_time_" + System.currentTimeMillis();
System.out.println("kafka消费者组号" + groupId + "*********");
System.setProperty("groupId", groupId);
}
@KafkaListener(topics = {"topic_kfk_iotproduction_xazz_1129"}, groupId = "${groupId}",containerFactory = "secondKafkaListenerContainerFactory")
public void handleMessage2(ConsumerRecord<String, String> record) {
// 当前时间
Date currentTime = new Date();
//消息时间戳
long messageTimestamp = record.timestamp();
// 设备实时数据
XaIotDeviceRealInfo xaIotDeviceRealInfo = new XaIotDeviceRealInfo();
if (record.value() != null) {
JSONObject jsonObject = JSON.parseObject(record.value());
if (jsonObject != null) {
// 设备编号
String deviceNum = jsonObject.getString("__assetId__");
if (deviceTimestamps.containsKey(deviceNum)) {
long previousTimestamp = deviceTimestamps.get(deviceNum);
if (messageTimestamp <= previousTimestamp) {
return ;
}
}
deviceTimestamps.put(deviceNum, messageTimestamp);
if (xaDeviceNumMap.containsKey(deviceNum)) {
//设备编号
xaIotDeviceRealInfo.setDeviceNum(deviceNum);
//kafka主题
xaIotDeviceRealInfo.setTopic(record.topic());
//设备状态
Integer status = jsonObject.getInteger("Status");
xaIotDeviceRealInfo.setDeviceStatus(status);
//设备告警状态
xaIotDeviceRealInfo.setAlarm(jsonObject.containsKey("Alarm") ? jsonObject.getInteger("Alarm") : null);
//设备告警信息
xaIotDeviceRealInfo.setAlarmMsg(jsonObject.containsKey("AlarmMsg") ? jsonObject.getString("AlarmMsg") : null);
//设备时间
xaIotDeviceRealInfo.setDeviceTime(DateUtil.date(messageTimestamp));
//设备时间戳
Date date = new Date(messageTimestamp);
xaIotDeviceRealInfo.setDeviceTimeStr(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss SSS"));
xaIotDeviceRealInfo.setDeviceTimestamp(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss SSS"));
//设备分区
xaIotDeviceRealInfo.setPart(String.valueOf(record.partition()));
//设备点位
Map<String, Object> positionMap = new HashMap<>();
if (jsonObject.getString("__thingName__").contains("AGV")) {
positionMap.put("AGVPoseX", jsonObject.containsKey("AGVPoseX") ? jsonObject.getDouble("AGVPoseX") : null);
positionMap.put("AGVPoseY", jsonObject.containsKey("AGVPoseY") ? jsonObject.getDouble("AGVPoseY") : null);
positionMap.put("AGVPoseTheta", jsonObject.containsKey("AGVPoseTheta") ? jsonObject.getDouble("AGVPoseTheta") : null);
//电流
positionMap.put("BatCurrent", jsonObject.containsKey("BatCurrent") ? jsonObject.getDouble("BatCurrent") : null);
//电压
positionMap.put("BatVoltage", jsonObject.containsKey("BatVoltage") ? jsonObject.getDouble("BatVoltage") : null);
//功能状态
positionMap.put("BatPowerSupplyStatus", jsonObject.containsKey("BatPowerSupplyStatus") ? jsonObject.getDouble("BatPowerSupplyStatus") : null);
//电池电量
positionMap.put("BatteryCapacity", jsonObject.containsKey("BatteryCapacity") ? jsonObject.getDouble("BatteryCapacity") : null);
//总行驶里程
positionMap.put("TotalMileage", jsonObject.containsKey("TotalMileage") ? jsonObject.getDouble("TotalMileage") : null);
//日行驶里程
positionMap.put("DayMileage", jsonObject.containsKey("DayMileage") ? jsonObject.getDouble("DayMileage") : null);
//工作任务状态
positionMap.put("TaskStatus", jsonObject.containsKey("TaskStatus") ? jsonObject.getDouble("TaskStatus") : null);
} else {
String[] keys = {"J1", "J2", "J3", "J4", "J5", "J6", "E1","J7","J8","J9","FixName","WeldDetect"};
for (String key : keys) {
if (jsonObject.containsKey(key)) {
positionMap.put(key, jsonObject.get(key));
}
}
}
String position = JSON.toJSONString(positionMap);
xaIotDeviceRealInfo.setPointPosition(position);
//设备websocket分组
xaIotDeviceRealInfo.setWebsocketGroupId(xaDeviceNumMap.get(deviceNum));
xaIotDeviceRealInfo.setGmtModified(currentTime);
xaIotDeviceRealInfo.setGmtCreated(currentTime);
}
}
}
if (xaIotDeviceRealInfo.getDeviceNum() != null && !xaIotDeviceRealInfo.getDeviceNum().isEmpty()) {
String deviceInfo = JSON.toJSONString(xaIotDeviceRealInfo);
redisUtils.set("XaKafka:" + xaIotDeviceRealInfo.getDeviceNum() + ":" + xaIotDeviceRealInfo.getPart(), deviceInfo);
}
}
}

@ -0,0 +1,69 @@
package com.ipsplm.listener;
import com.alibaba.fastjson.JSON;
import com.ipsplm.utils.RedisUtils;
import com.ipsplm.websocket.*;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.stereotype.Component;
/**
* @Description Redis
* @Author FanDongqiang
* @Date 2023/5/4 11:32
* @Version 1.0
*/
@Component
@Slf4j
@Getter
public class RedisListener implements MessageListener {
private final PatternTopic topic = new PatternTopic("__keyevent@0__:set");
@Autowired
@Lazy
private RedisUtils redisUtils;
@Override
public void onMessage(Message message, byte[] pattern) {
String msg = redisUtils.get(new String(message.getBody())).toString();
int websocketGroupId = JSON.parseObject(msg).getInteger("websocketGroupId");
if (websocketGroupId != 0) {
sendInfoToWebSocketHandler(websocketGroupId, msg);
}
}
private void sendInfoToWebSocketHandler(int websocketGroupId, String message) {
switch (websocketGroupId) {
case 1:
WebSocketHandler1.sendInfo(message);
break;
case 2:
WebSocketHandler2.sendInfo(message);
break;
case 3:
WebSocketHandler3.sendInfo(message);
break;
case 4:
WebSocketHandler4.sendInfo(message);
break;
case 5:
WebSocketHandler5.sendInfo(message);
break;
case 6:
WebSocketHandler6.sendInfo(message);
break;
case 7:
WebSocketHandler7.sendInfo(message);
break;
case 8:
WebSocketHandler8.sendInfo(message);
break;
default:
log.warn("Invalid websocketGroupId: {}", websocketGroupId);
}
}
}

@ -0,0 +1,7 @@
package com.ipsplm.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.ipsplm.entity.iot.XaIotDeviceListSync;
public interface IXaIotDeviceListSyncService extends IService<XaIotDeviceListSync> {
}

@ -0,0 +1,11 @@
package com.ipsplm.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ipsplm.dao.XaIotDeviceListSyncMapper;
import com.ipsplm.entity.iot.XaIotDeviceListSync;
import com.ipsplm.service.IXaIotDeviceListSyncService;
import org.springframework.stereotype.Service;
@Service
public class XaIotDeviceListSyncServiceImpl extends ServiceImpl<XaIotDeviceListSyncMapper, XaIotDeviceListSync> implements IXaIotDeviceListSyncService {
}

@ -0,0 +1,27 @@
package com.ipsplm.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
/**
* @Description JSON
* @Author FanDongqiang
* @Date 2023/2/17 10:42
* @Version 1.0
*/
public class JSONUtils {
/**
* Json
*
* @param jsonStr
* @return
*/
public static boolean isJSONValidate(String jsonStr){
try {
JSON.parse(jsonStr);
return true;
} catch (JSONException e) {
return false;
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token1")
public class WebSocketHandler1 {
private static ConcurrentHashMap<String, WebSocketHandler1> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
// if (StringUtils.isEmpty(token)) {
// sendInfo("[websocket ERROR] 客户端Token错误连接失败");
// }
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token2")
public class WebSocketHandler2 {
private static ConcurrentHashMap<String, WebSocketHandler2> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
// if (StringUtils.isEmpty(token)) {
// sendInfo("[websocket ERROR] 客户端Token错误连接失败");
// }
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token3")
public class WebSocketHandler3 {
private static ConcurrentHashMap<String, WebSocketHandler3> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
// if (StringUtils.isEmpty(token)) {
// sendInfo("[websocket ERROR] 客户端Token错误连接失败");
// }
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token4")
public class WebSocketHandler4 {
private static ConcurrentHashMap<String, WebSocketHandler4> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
// if (StringUtils.isEmpty(token)) {
// sendInfo("[websocket ERROR] 客户端Token错误连接失败");
// }
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token5")
public class WebSocketHandler5 {
private static ConcurrentHashMap<String, WebSocketHandler5> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
// if (StringUtils.isEmpty(token)) {
// sendInfo("[websocket ERROR] 客户端Token错误连接失败");
// }
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket(
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token6")
public class WebSocketHandler6 {
private static ConcurrentHashMap<String, WebSocketHandler6> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
// if (StringUtils.isEmpty(token)) {
// sendInfo("[websocket ERROR] 客户端Token错误连接失败");
// }
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket(
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token7")
public class WebSocketHandler7 {
private static ConcurrentHashMap<String, WebSocketHandler7> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
// if (StringUtils.isEmpty(token)) {
// sendInfo("[websocket ERROR] 客户端Token错误连接失败");
// }
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket(
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token8")
public class WebSocketHandler8 {
private static ConcurrentHashMap<String, WebSocketHandler8> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
// if (StringUtils.isEmpty(token)) {
// sendInfo("[websocket ERROR] 客户端Token错误连接失败");
// }
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,65 @@
server:
port: 8088
servlet:
context-path: /sany-shenyang-realtime-drive
spring:
application:
name: sany-shenyang-realtime-drive
datasource:
dynamic:
primary: shenyang
datasource:
shenyang:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://10.31.12.195:3306/sany?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&allowMultiQueries=true
username: root
password: SySany!0307#
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimum-idle: 5
maximum-pool-size: 15
connection-test-query: SELECT 1
max-lifetime: 1800000
connection-timeout: 30000
pool-name: DatebookHikariCP
redis:
# 地址
host: 127.0.0.1
# 端口默认为6379
port: 6379
# 数据库索引
database: 0
# 密码
password: 123456
# 连接超时时间
timeout: 10000
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 1000
# 连接池中的最大空闲连接
max-idle: -1
# 连接池的最大数据库连接数
max-active: -1
#连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
kafka:
second:
bootstrap-servers: 10.31.11.64:9092,10.31.11.91:9092,10.31.11.93:9092
consumer:
auto-offset-reset: latest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
missing-topics-fatal: false
# 配置20个消费者线程
concurrency: 20
mybatis-plus:
mapper-locations: classpath:/mapper/**/*.xml

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="log.path" value="logs/" />
<appender name="logbackAll" class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder>
<pattern>%date [%level] [%thread] %logger{80} [%file : %line] %msg%n</pattern>
</encoder>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%date{yyyy-MM-dd HH:mm:ss.Asia/Shanghai} %-5level [%thread] %logger{56}.%method:%L -%msg%n</pattern>
</layout>
</appender>
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
<pattern>%date{yyyy-MM-dd HH:mm:ss.Asia/Shanghai} [%highlight(%level)] [%boldMagenta(%thread)] [%cyan(%file) : %line] %msg%n</pattern>
</pattern>
</encoder>
</appender>
<!-- kafka 日志关闭 -->
<logger name="org.apache.kafka" level="OFF"/>
<logger name="io.netty" level="OFF" />
<logger name="org.springframework" level="INFO" additivity="false">
<appender-ref ref="Console"/>
<appender-ref ref="logbackAll"/>
</logger>
<logger name="com.ipsplm.dao" level="info" />
<root level="INFO">
<appender-ref ref="Console"/>
<appender-ref ref="logbackAll"/>
</root>
</configuration>
Loading…
Cancel
Save