From 3ed2417133a443332159b180398ec6d90f8690c6 Mon Sep 17 00:00:00 2001 From: lulicheng Date: Fri, 9 Aug 2024 13:48:16 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AC=AC=E4=B8=80=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 37 +++ pom.xml | 77 ++++++ .../SanyShenYangRealtimeDriveApplication.java | 16 ++ .../java/com/ipsplm/config/CrossConfig.java | 23 ++ .../com/ipsplm/config/FirstKafkaConfig.java | 80 +++++++ .../com/ipsplm/config/MybatisPlusConfig.java | 19 ++ .../java/com/ipsplm/config/RedisConfig.java | 74 ++++++ .../com/ipsplm/config/SecondKafkaConfig.java | 76 ++++++ .../com/ipsplm/config/WebSocketConfig.java | 25 ++ .../com/ipsplm/dao/IotDeviceListMapper.java | 16 ++ .../com/ipsplm/dao/XaIotDeviceListMapper.java | 9 + .../ipsplm/dao/XaIotDeviceListSyncMapper.java | 9 + .../com/ipsplm/entity/common/BaseEntity.java | 42 ++++ .../com/ipsplm/entity/iot/IotDeviceList.java | 161 +++++++++++++ .../ipsplm/entity/iot/IotDeviceRealInfo.java | 110 +++++++++ .../ipsplm/entity/iot/XaIotDeviceList.java | 45 ++++ .../entity/iot/XaIotDeviceListSync.java | 43 ++++ .../entity/iot/XaIotDeviceRealInfo.java | 110 +++++++++ .../ipsplm/kafka/KafkaConsumerListener.java | 220 ++++++++++++++++++ .../com/ipsplm/listener/RedisListener.java | 71 ++++++ .../ipsplm/service/IIotDeviceListService.java | 13 ++ .../service/IXaIotDeviceListService.java | 7 + .../service/IXaIotDeviceListSyncService.java | 7 + .../impl/IotDeviceListServiceImpl.java | 17 ++ .../impl/XaIotDeviceListServiceImpl.java | 11 + .../impl/XaIotDeviceListSyncServiceImpl.java | 11 + src/main/java/com/ipsplm/utils/JSONUtils.java | 27 +++ .../ipsplm/websocket/WebSocketHandler1.java | 79 +++++++ .../ipsplm/websocket/WebSocketHandler2.java | 79 +++++++ .../ipsplm/websocket/WebSocketHandler3.java | 79 +++++++ .../ipsplm/websocket/WebSocketHandler4.java | 79 +++++++ .../ipsplm/websocket/WebSocketHandler5.java | 79 +++++++ .../ipsplm/websocket/WebSocketHandler6.java | 79 +++++++ .../ipsplm/websocket/WebSocketHandler7.java | 79 +++++++ .../ipsplm/websocket/WebSocketHandler8.java | 79 +++++++ src/main/resources/application.yml | 77 ++++++ src/main/resources/logback-spring.xml | 31 +++ 37 files changed, 2096 insertions(+) create mode 100644 README.md create mode 100644 pom.xml create mode 100644 src/main/java/com/ipsplm/SanyShenYangRealtimeDriveApplication.java create mode 100644 src/main/java/com/ipsplm/config/CrossConfig.java create mode 100644 src/main/java/com/ipsplm/config/FirstKafkaConfig.java create mode 100644 src/main/java/com/ipsplm/config/MybatisPlusConfig.java create mode 100644 src/main/java/com/ipsplm/config/RedisConfig.java create mode 100644 src/main/java/com/ipsplm/config/SecondKafkaConfig.java create mode 100644 src/main/java/com/ipsplm/config/WebSocketConfig.java create mode 100644 src/main/java/com/ipsplm/dao/IotDeviceListMapper.java create mode 100644 src/main/java/com/ipsplm/dao/XaIotDeviceListMapper.java create mode 100644 src/main/java/com/ipsplm/dao/XaIotDeviceListSyncMapper.java create mode 100644 src/main/java/com/ipsplm/entity/common/BaseEntity.java create mode 100644 src/main/java/com/ipsplm/entity/iot/IotDeviceList.java create mode 100644 src/main/java/com/ipsplm/entity/iot/IotDeviceRealInfo.java create mode 100644 src/main/java/com/ipsplm/entity/iot/XaIotDeviceList.java create mode 100644 src/main/java/com/ipsplm/entity/iot/XaIotDeviceListSync.java create mode 100644 src/main/java/com/ipsplm/entity/iot/XaIotDeviceRealInfo.java create mode 100644 src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java create mode 100644 src/main/java/com/ipsplm/listener/RedisListener.java create mode 100644 src/main/java/com/ipsplm/service/IIotDeviceListService.java create mode 100644 src/main/java/com/ipsplm/service/IXaIotDeviceListService.java create mode 100644 src/main/java/com/ipsplm/service/IXaIotDeviceListSyncService.java create mode 100644 src/main/java/com/ipsplm/service/impl/IotDeviceListServiceImpl.java create mode 100644 src/main/java/com/ipsplm/service/impl/XaIotDeviceListServiceImpl.java create mode 100644 src/main/java/com/ipsplm/service/impl/XaIotDeviceListSyncServiceImpl.java create mode 100644 src/main/java/com/ipsplm/utils/JSONUtils.java create mode 100644 src/main/java/com/ipsplm/websocket/WebSocketHandler1.java create mode 100644 src/main/java/com/ipsplm/websocket/WebSocketHandler2.java create mode 100644 src/main/java/com/ipsplm/websocket/WebSocketHandler3.java create mode 100644 src/main/java/com/ipsplm/websocket/WebSocketHandler4.java create mode 100644 src/main/java/com/ipsplm/websocket/WebSocketHandler5.java create mode 100644 src/main/java/com/ipsplm/websocket/WebSocketHandler6.java create mode 100644 src/main/java/com/ipsplm/websocket/WebSocketHandler7.java create mode 100644 src/main/java/com/ipsplm/websocket/WebSocketHandler8.java create mode 100644 src/main/resources/application.yml create mode 100644 src/main/resources/logback-spring.xml diff --git a/README.md b/README.md new file mode 100644 index 0000000..c0bfe21 --- /dev/null +++ b/README.md @@ -0,0 +1,37 @@ +# 三一沈阳实时驱动 + +#### 介绍 +三一华睿实时驱动数据,如机械臂数据等。 + +#### 软件架构 +软件架构说明 + + +#### 安装教程 + +1. xxxx +2. xxxx +3. xxxx + +#### 使用说明 + +1. xxxx +2. xxxx +3. xxxx + +#### 参与贡献 + +1. Fork 本仓库 +2. 新建 Feat_xxx 分支 +3. 提交代码 +4. 新建 Pull Request + + +#### 特技 + +1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md +2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) +3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 +4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 +5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) +6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ed76690 --- /dev/null +++ b/pom.xml @@ -0,0 +1,77 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.6.7 + + + com.ipsplm + sany-shenyang-realtime-drive + 1.0 + sany-shenyang-realtime-drive + 三一沈阳实时驱动 + + + org.springframework.boot + spring-boot-starter-web + + + org.projectlombok + lombok + + + com.alibaba + fastjson + 1.2.76 + + + mysql + mysql-connector-java + + + + com.ipsplm + common-api + 1.0 + + + org.springframework.boot + spring-boot-starter-security + + + + + + com.baomidou + dynamic-datasource-spring-boot-starter + 2.5.6 + + + org.springframework.boot + spring-boot-starter-data-redis + 3.0.6 + + + + org.springframework.kafka + spring-kafka + + + + org.springframework.boot + spring-boot-starter-websocket + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/src/main/java/com/ipsplm/SanyShenYangRealtimeDriveApplication.java b/src/main/java/com/ipsplm/SanyShenYangRealtimeDriveApplication.java new file mode 100644 index 0000000..72e08d2 --- /dev/null +++ b/src/main/java/com/ipsplm/SanyShenYangRealtimeDriveApplication.java @@ -0,0 +1,16 @@ +package com.ipsplm; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import java.util.TimeZone; + +@SpringBootApplication +public class SanyShenYangRealtimeDriveApplication { + + public static void main(String[] args) { + TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); + SpringApplication.run(SanyShenYangRealtimeDriveApplication.class, args); + } + +} diff --git a/src/main/java/com/ipsplm/config/CrossConfig.java b/src/main/java/com/ipsplm/config/CrossConfig.java new file mode 100644 index 0000000..3242b48 --- /dev/null +++ b/src/main/java/com/ipsplm/config/CrossConfig.java @@ -0,0 +1,23 @@ +package com.ipsplm.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.CorsRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +/** + * @Description 跨域配置 + * @Author FanDongqiang + * @Date 2023/1/12 9:37 + * @Version 1.0 + */ +@Configuration +public class CrossConfig implements WebMvcConfigurer { + @Override + public void addCorsMappings(CorsRegistry registry) { + registry.addMapping("/**") // 允许跨域访问的路径 + .allowedOriginPatterns("*") // 允许跨域访问的源 + .allowedMethods("GET", "POST","PUT", "DELETE", "OPTIONS") // 允许请求方法 + .allowCredentials(true) // 是否发送cookie + .maxAge(3600); // 预检间隔时间 + } +} diff --git a/src/main/java/com/ipsplm/config/FirstKafkaConfig.java b/src/main/java/com/ipsplm/config/FirstKafkaConfig.java new file mode 100644 index 0000000..d44ede2 --- /dev/null +++ b/src/main/java/com/ipsplm/config/FirstKafkaConfig.java @@ -0,0 +1,80 @@ +package com.ipsplm.config; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; + +@Configuration +public class FirstKafkaConfig { + /** + * 读取第一个kafka配置 + * Primary注解表示默认以这个为准 + * + * @return 第一个kafka配置 + */ + @Primary + @ConfigurationProperties(prefix = "spring.kafka.first") + @Bean + public KafkaProperties firstKafkaProperties() { + return new KafkaProperties(); + } + +// /** +// * 构建第一个kafka的生产者发送template +// * +// * @param firstKafkaProperties 第一个kafka配置 +// * @return 第一个kafka的生产者发送template +// */ +// @Primary +// @Bean +// public KafkaTemplate firstKafkaTemplate( +// @Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) { +// return new KafkaTemplate<>(firstProducerFactory(firstKafkaProperties)); +// } + + /** + * 构建第一个kafka的消费者监听容器工厂 + * + * @param firstKafkaProperties 第一个kafka配置 + * @return 第一个kafka的消费者监听容器工厂 + */ + @Bean("firstKafkaListenerContainerFactory") + public KafkaListenerContainerFactory> + firstKafkaListenerContainerFactory(@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(firstConsumerFactory(firstKafkaProperties)); + return factory; + } + + /** + * 新建第一个kafka的消费者工厂 + * + * @param firstKafkaProperties 第一个kafka配置 + * @return 第一个kafka的消费者工厂 + */ + private ConsumerFactory firstConsumerFactory(KafkaProperties firstKafkaProperties) { + return new DefaultKafkaConsumerFactory<>(firstKafkaProperties.buildConsumerProperties()); + } + +// /** +// * 新建第一个kafka的生产者工厂 +// * +// * @param firstKafkaProperties 第一个kafka配置 +// * @return 第一个kafka的生产者工厂 +// */ +// private DefaultKafkaProducerFactory firstProducerFactory(KafkaProperties firstKafkaProperties) { +// return new DefaultKafkaProducerFactory<>(firstKafkaProperties.buildProducerProperties()); +// } +} diff --git a/src/main/java/com/ipsplm/config/MybatisPlusConfig.java b/src/main/java/com/ipsplm/config/MybatisPlusConfig.java new file mode 100644 index 0000000..5c7232b --- /dev/null +++ b/src/main/java/com/ipsplm/config/MybatisPlusConfig.java @@ -0,0 +1,19 @@ +package com.ipsplm.config; + +import com.baomidou.mybatisplus.annotation.DbType; +import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; +import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@MapperScan(basePackages = "com.ipsplm.dao") +public class MybatisPlusConfig { + @Bean + public MybatisPlusInterceptor mybatisPlusInterceptor() { + MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); + interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL)); + return interceptor; + } +} diff --git a/src/main/java/com/ipsplm/config/RedisConfig.java b/src/main/java/com/ipsplm/config/RedisConfig.java new file mode 100644 index 0000000..b253335 --- /dev/null +++ b/src/main/java/com/ipsplm/config/RedisConfig.java @@ -0,0 +1,74 @@ +package com.ipsplm.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.ipsplm.listener.RedisListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.annotation.CachingConfigurerSupport; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +/** + * @Description redis配置 + * @Author FanDongqiang + * @Date 2023/2/3 13:36 + * @Version 1.0 + */ +@Configuration +public class RedisConfig extends CachingConfigurerSupport { + @Autowired + private RedisListener redisListener; + + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(connectionFactory); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.registerModule((new SimpleModule())); + //有属性不能映射的时候不报错 + objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + //对象为空时不抛异常 + objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, + JsonTypeInfo.As.PROPERTY); + + GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(objectMapper); + + // key采用String的序列化方式 + template.setKeySerializer(new StringRedisSerializer()); + // value序列化方式采用jackson + template.setValueSerializer(serializer); + + // Hash的key也采用StringRedisSerializer的序列化方式 + template.setHashKeySerializer(new StringRedisSerializer()); + // Hash的value序列化方式采用jackson + template.setHashValueSerializer(serializer); + + template.afterPropertiesSet(); + return template; + } + + @Bean + RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.addMessageListener(redisListener, redisListener.getTopic()); + return container; + } +} diff --git a/src/main/java/com/ipsplm/config/SecondKafkaConfig.java b/src/main/java/com/ipsplm/config/SecondKafkaConfig.java new file mode 100644 index 0000000..f407a9e --- /dev/null +++ b/src/main/java/com/ipsplm/config/SecondKafkaConfig.java @@ -0,0 +1,76 @@ +package com.ipsplm.config; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; + +@Configuration +public class SecondKafkaConfig { + /** + * 读取第二个kafka配置 + * + * @return 第二个kafka配置 + */ + @ConfigurationProperties(prefix = "spring.kafka.second") + @Bean + public KafkaProperties secondKafkaProperties() { + return new KafkaProperties(); + } + +// /** +// * 构建第二个kafka的生产者发送template +// * +// * @param secondKafkaProperties 第二个kafka配置 +// * @return 第二个kafka的生产者发送template +// */ +// @Bean +// public KafkaTemplate secondKafkaTemplate( +// @Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) { +// return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties)); +// } + + /** + * 构建第二个kafka的消费者监听容器工厂 + * + * @param secondKafkaProperties 第二个kafka配置 + * @return 第二个kafka的消费者监听容器工厂 + */ + @Bean("secondKafkaListenerContainerFactory") + public KafkaListenerContainerFactory> + secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties)); + return factory; + } + + /** + * 新建第二个kafka的消费者工厂 + * + * @param secondKafkaProperties 第二个kafka配置 + * @return 第二个kafka的消费者工厂 + */ + private ConsumerFactory secondConsumerFactory(KafkaProperties secondKafkaProperties) { + return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties()); + } + +// /** +// * 新建第二个kafka的生产者工厂 +// * +// * @param secondKafkaProperties 第二个kafka配置 +// * @return 第二个kafka的生产者工厂 +// */ +// private DefaultKafkaProducerFactory secondProducerFactory(KafkaProperties secondKafkaProperties) { +// return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties()); +// } +} diff --git a/src/main/java/com/ipsplm/config/WebSocketConfig.java b/src/main/java/com/ipsplm/config/WebSocketConfig.java new file mode 100644 index 0000000..8773eec --- /dev/null +++ b/src/main/java/com/ipsplm/config/WebSocketConfig.java @@ -0,0 +1,25 @@ +package com.ipsplm.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +/** + * @Description WebSocket配置类 + * @Author FanDongqiang + * @Date 2023/5/6 11:03 + * @Version 1.0 + */ +@Configuration +@EnableWebSocket +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/src/main/java/com/ipsplm/dao/IotDeviceListMapper.java b/src/main/java/com/ipsplm/dao/IotDeviceListMapper.java new file mode 100644 index 0000000..0cdcfba --- /dev/null +++ b/src/main/java/com/ipsplm/dao/IotDeviceListMapper.java @@ -0,0 +1,16 @@ +package com.ipsplm.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.ipsplm.entity.iot.IotDeviceList; +import org.apache.ibatis.annotations.Mapper; + +/** + * @Description IOT设备清单表Mapper + * @Author FanDongqiang + * @Date 2023/2/16 10:19 + * @Version 1.0 + */ +@Mapper +public interface IotDeviceListMapper extends BaseMapper { + +} diff --git a/src/main/java/com/ipsplm/dao/XaIotDeviceListMapper.java b/src/main/java/com/ipsplm/dao/XaIotDeviceListMapper.java new file mode 100644 index 0000000..28ffc0c --- /dev/null +++ b/src/main/java/com/ipsplm/dao/XaIotDeviceListMapper.java @@ -0,0 +1,9 @@ +package com.ipsplm.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.ipsplm.entity.iot.XaIotDeviceList; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface XaIotDeviceListMapper extends BaseMapper { +} diff --git a/src/main/java/com/ipsplm/dao/XaIotDeviceListSyncMapper.java b/src/main/java/com/ipsplm/dao/XaIotDeviceListSyncMapper.java new file mode 100644 index 0000000..fc606ad --- /dev/null +++ b/src/main/java/com/ipsplm/dao/XaIotDeviceListSyncMapper.java @@ -0,0 +1,9 @@ +package com.ipsplm.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.ipsplm.entity.iot.XaIotDeviceListSync; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface XaIotDeviceListSyncMapper extends BaseMapper { +} diff --git a/src/main/java/com/ipsplm/entity/common/BaseEntity.java b/src/main/java/com/ipsplm/entity/common/BaseEntity.java new file mode 100644 index 0000000..54dcdbb --- /dev/null +++ b/src/main/java/com/ipsplm/entity/common/BaseEntity.java @@ -0,0 +1,42 @@ +package com.ipsplm.entity.common; + +import com.baomidou.mybatisplus.annotation.TableField; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @Description 基础实体类 + * @Author FanDongqiang + * @Date 2023/2/20 10:38 + * @Version 1.0 + */ +@Data +public class BaseEntity implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 创建人 + */ + @TableField("creator") + private Long creator; + + /** + * 创建时间 + */ + @TableField("gmt_created") + private Date gmtCreated; + + /** + * 更新人 + */ + @TableField("editor") + private Long editor; + + /** + * 更新时间 + */ + @TableField("gmt_modified") + private Date gmtModified; +} diff --git a/src/main/java/com/ipsplm/entity/iot/IotDeviceList.java b/src/main/java/com/ipsplm/entity/iot/IotDeviceList.java new file mode 100644 index 0000000..07aeee4 --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/IotDeviceList.java @@ -0,0 +1,161 @@ +package com.ipsplm.entity.iot; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; + +/** + * @Description IOT设备清单表 + * @Author FanDongqiang + * @Date 2023/2/16 10:15 + * @Version 1.0 + */ +@TableName("iot_device_list") +@Data +public class IotDeviceList implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 主键ID + * iot_device_list.id + */ + @TableId(value = "id", type = IdType.AUTO) + private Long id; + + /** + * 工厂 + * iot_device_list.factory + */ + @TableField("factory") + private String factory; + + /** + * 厂房代码 + * iot_device_list.workshop + */ + @TableField("workshop") + private String workshop; + + /** + * 设备编号 + * iot_device_list.device_num + */ + @TableField("device_num") + private String deviceNum; + + /** + * 厂房描述 + * iot_device_list.workshop_desc + */ + @TableField("workshop_desc") + private String workshopDesc; + + /** + * 设备型号 + * iot_device_list.device_model + */ + @TableField("device_model") + private String deviceModel; + + /** + * 设备名称 + * iot_device_list.device_name + */ + @TableField("device_name") + private String deviceName; + + /** + * 设备位置 + * iot_device_list.device_position + */ + @TableField("device_position") + private String devicePosition; + + /** + * 设备等级 + * iot_device_list.device_grade + */ + @TableField("device_grade") + private String deviceGrade; + + /** + * 使用状态 + * iot_device_list.use_state + */ + @TableField("use_state") + private String useState; + + /** + * 工作中心 + * iot_device_list.work_center + */ + @TableField("work_center") + private String workCenter; + + /** + * 管理工作中心 + * iot_device_list.manage_work_center + */ + @TableField("manage_work_center") + private String manageWorkCenter; + + /** + * 设备操作员名称 + * iot_device_list.operator + */ + @TableField("operator") + private String operator; + + /** + * 班组 + * iot_device_list.team_group + */ + @TableField("team_group") + private String teamGroup; + + /** + * 制造厂商 + * iot_device_list.manufacturer + */ + @TableField("manufacturer") + private String manufacturer; + + /** + * 工位编号 + * iot_device_list.station + */ + @TableField("station") + private String station; + + /** + * 备件包编号 + * iot_device_list.spares_kit_num + */ + @TableField("spares_kit_num") + private String sparesKitNum; + + /** + * 备件包描述 + * iot_device_list.spares_kit_desc + */ + @TableField("spares_kit_desc") + private String sparesKitDesc; + + /** + * 设备类型 + * iot_device_list.device_type + */ + @TableField("device_type") + private String deviceType; + + /** + * websocket分组ID + * iot_device_list.websocket_group_id + */ + @TableField("websocket_group_id") + private Integer websocketGroupId; +} diff --git a/src/main/java/com/ipsplm/entity/iot/IotDeviceRealInfo.java b/src/main/java/com/ipsplm/entity/iot/IotDeviceRealInfo.java new file mode 100644 index 0000000..4ec328a --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/IotDeviceRealInfo.java @@ -0,0 +1,110 @@ +package com.ipsplm.entity.iot; + +import com.alibaba.fastjson.JSONArray; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.ipsplm.entity.common.BaseEntity; +import lombok.Data; + +import java.util.Date; + +/** + * @Description IOT设备实时信息表 + * @Author FanDongqiang + * @Date 2023/2/20 10:33 + * @Version 1.0 + */ +@TableName("iot_device_real_info") +@Data +public class IotDeviceRealInfo extends BaseEntity { + /** + * 主键ID + * iot_device_real_info.id + */ + @TableId(value = "id", type = IdType.AUTO) + private Long id; + + /** + * 设备编号 + * iot_device_real_info.device_num + */ + @TableField("device_num") + private String deviceNum; + + /** + * 设备状态(1、作业 2、待机 3、故障 4、关机) + * iot_device_real_info.device_status + */ + @TableField("device_status") + private Integer deviceStatus; + + /** + * 设备时间 + * iot_device_real_info.device_time + */ + @TableField("device_time") + private Date deviceTime; + + /** + * 设备时间戳毫秒 + * iot_device_real_info.device_timestamp + */ + @TableField("device_timestamp") + private String deviceTimestamp; + + /** + * 设备时间 + * iot_device_real_info.device_time + */ + @TableField(exist = false) + private String deviceTimeStr; + + /** + * 报警(0、正常 1、报警) + * iot_device_real_info.alarm + */ + @TableField("alarm") + private Integer alarm; + + /** + * 报警信息Json字符串 + * iot_device_real_info.alarm_msg + */ + @TableField("alarm_msg") + private String alarmMsg; + + /** + * 设备点位Json字符串 + * iot_device_real_info.point_position + */ + @TableField("point_position") + private String pointPosition; + + /** + * 设备点位Json数组 + */ + @TableField(exist = false) + private JSONArray pointPositionArray; + + /** + * kafka主题 + * iot_device_real_info.topic + */ + @TableField("topic") + private String topic; + + /** + * kafka分区 + * iot_device_real_info.part + */ + @TableField("part") + private String part; + + /** + * websocket分组ID + */ + @TableField(exist = false) + private Integer websocketGroupId; +} diff --git a/src/main/java/com/ipsplm/entity/iot/XaIotDeviceList.java b/src/main/java/com/ipsplm/entity/iot/XaIotDeviceList.java new file mode 100644 index 0000000..f5ae353 --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/XaIotDeviceList.java @@ -0,0 +1,45 @@ +package com.ipsplm.entity.iot; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; + +@Data +@TableName("xazz_iot_device_list") +public class XaIotDeviceList implements Serializable { + //主键id + @TableId(value = "id",type = IdType.AUTO) + private Long id; + //设备编号 + @TableField("device_num") + private String deviceNum; + //设备名称 + @TableField("device_name") + private String deviceName; + //设备型号 + @TableField("device_model") + private String deviceModel; + //公司名称 + @TableField("company_name") + private String companyName; + //工作中心名称 + @TableField("work_center_name") + private String workCenterName; + //班组名称 + @TableField("product_line") + private String teamGroupName; + //一级工艺 + @TableField("first_process") + private String firstProcess; + //二级工艺 + @TableField("sec_process") + private String secProcess; + //websocket分组 + @TableField("websocket_group_id") + private Integer websocketGroupId; + +} diff --git a/src/main/java/com/ipsplm/entity/iot/XaIotDeviceListSync.java b/src/main/java/com/ipsplm/entity/iot/XaIotDeviceListSync.java new file mode 100644 index 0000000..ed9263e --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/XaIotDeviceListSync.java @@ -0,0 +1,43 @@ +package com.ipsplm.entity.iot; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; +@Data +@TableName("xazz_iot_device_list_sync") +public class XaIotDeviceListSync implements Serializable { + //主键id + @TableId(value = "id",type = IdType.AUTO) + private Long id; + //设备编号 + @TableField("device_num") + private String deviceNum; + //设备名称 + @TableField("device_name") + private String deviceName; + //设备型号 + @TableField("device_model") + private String deviceModel; + //公司名称 + @TableField("company_name") + private String companyName; + //工作中心名称 + @TableField("work_center_name") + private String workCenterName; + //班组名称 + @TableField("product_line") + private String teamGroupName; + //一级工艺 + @TableField("first_process") + private String firstProcess; + //二级工艺 + @TableField("sec_process") + private String secProcess; + //websocket分组 + @TableField("websocket_group_id") + private Integer websocketGroupId; +} diff --git a/src/main/java/com/ipsplm/entity/iot/XaIotDeviceRealInfo.java b/src/main/java/com/ipsplm/entity/iot/XaIotDeviceRealInfo.java new file mode 100644 index 0000000..74eaed8 --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/XaIotDeviceRealInfo.java @@ -0,0 +1,110 @@ +package com.ipsplm.entity.iot; + +import com.alibaba.fastjson.JSONArray; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.ipsplm.entity.common.BaseEntity; +import lombok.Data; + +import java.util.Date; +@Data +@TableName("xazz_iot_device_real_info") +public class XaIotDeviceRealInfo extends BaseEntity { + /** + * 主键ID + * iot_device_real_info.id + */ + @TableId(value = "id", type = IdType.AUTO) + private Long id; + + /** + * 设备编号 + * iot_device_real_info.device_num + */ + @TableField("device_num") + private String deviceNum; + + /** + * 设备模式 + * iot_device_real_info.mode + */ + @TableField("mode") + private String mode; + + /** + * 设备状态(1、作业 2、待机 3、故障 4、关机) + * iot_device_real_info.device_status + */ + @TableField("device_status") + private Integer deviceStatus; + + /** + * 设备时间 + * iot_device_real_info.device_time + */ + @TableField("device_time") + private Date deviceTime; + + /** + * 设备时间戳毫秒 + * iot_device_real_info.device_timestamp + */ + @TableField("device_timestamp") + private String deviceTimestamp; + + /** + * 设备时间 + * iot_device_real_info.device_time + */ + @TableField(exist = false) + private String deviceTimeStr; + + /** + * 报警(0、正常 1、报警) + * iot_device_real_info.alarm + */ + @TableField("alarm") + private Integer alarm; + + /** + * 报警信息Json字符串 + * iot_device_real_info.alarm_msg + */ + @TableField("alarm_msg") + private String alarmMsg; + + /** + * 设备点位Json字符串 + * iot_device_real_info.point_position + */ + @TableField("point_position") + private String pointPosition; + + /** + * 设备点位Json数组 + */ + @TableField(exist = false) + private JSONArray pointPositionArray; + + /** + * kafka主题 + * iot_device_real_info.topic + */ + @TableField("topic") + private String topic; + + /** + * kafka分区 + * iot_device_real_info.part + */ + @TableField("part") + private String part; + + /** + * websocket分组ID + */ + @TableField(exist = false) + private Integer websocketGroupId; +} diff --git a/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java b/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java new file mode 100644 index 0000000..f406295 --- /dev/null +++ b/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java @@ -0,0 +1,220 @@ +package com.ipsplm.kafka; + +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.ipsplm.entity.iot.*; +import com.ipsplm.service.IIotDeviceListService; +import com.ipsplm.service.IXaIotDeviceListService; +import com.ipsplm.service.IXaIotDeviceListSyncService; +import com.ipsplm.utils.JSONUtils; +import com.ipsplm.utils.RedisUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @Description kafka消费者监听 + * @Author FanDongqiang + * @Date 2023/4/26 10:19 + * @Version 1.0 + */ +@Component +@Slf4j +public class KafkaConsumerListener { + private IIotDeviceListService iIotDeviceListService; + private IXaIotDeviceListService iXaIotDeviceListService; + private IXaIotDeviceListSyncService iXaIotDeviceListSyncService; + + @Autowired + private RedisUtils redisUtils; + + // 设备编号List + private static Map deviceNumMap = new ConcurrentHashMap<>(); + private static Map xaDeviceNumMap = new ConcurrentHashMap<>(); + + private final Map deviceTimestamps = new ConcurrentHashMap<>(); + public KafkaConsumerListener(IIotDeviceListService iIotDeviceListService, IXaIotDeviceListService iXaIotDeviceListService, IXaIotDeviceListSyncService iXaIotDeviceListSyncService) { + this.iIotDeviceListService = iIotDeviceListService; + this.iXaIotDeviceListService = iXaIotDeviceListService; + this.iXaIotDeviceListSyncService = iXaIotDeviceListSyncService; + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.ne("websocket_group_id", 0); + // IOT设备清单 + List deviceList = this.iIotDeviceListService.list(queryWrapper); + + deviceList.forEach(dl -> { + deviceNumMap.put(dl.getDeviceNum(), dl.getWebsocketGroupId()); + }); +// List xaDeviceList = this.iXaIotDeviceListService.list(null); + List xaDeviceList = this.iXaIotDeviceListSyncService.list(null); + xaDeviceList.forEach(dl -> { + xaDeviceNumMap.put(dl.getDeviceNum(), dl.getWebsocketGroupId()); + }); + String groupId = "shenyang_real_time_" + System.currentTimeMillis(); + System.out.println("kafka消费者组号:" + groupId + "*********"); + System.setProperty("groupId", groupId); + } + + /** + * 处理单条消息 + * + * @param record 消费记录 + */ + @KafkaListener(topics = {"topic_kfk_syzz_0925", "topic_kfk_syzz_0925_meter"}, groupId = "${groupId}",containerFactory = "firstKafkaListenerContainerFactory") + public void handleMessage(ConsumerRecord record) { + // 当前时间 + Date currentTime = new Date(); + // 设备实时数据 + IotDeviceRealInfo iotDeviceRealInfo = null; + // 设备互联数据 + String deviceInfo = record.value(); + // 消息时间戳 + long timestamp = record.timestamp(); + if (StrUtil.isNotBlank(deviceInfo) && JSONUtils.isJSONValidate(deviceInfo)) { + JSONObject deviceInfoJson = JSON.parseObject(deviceInfo); + // 设备编号 + String deviceNum = StrUtil.isNotBlank(deviceInfoJson.getString("clientUUID")) ? deviceInfoJson.getString("clientUUID") : deviceInfoJson.getString("ClientUUID"); + if (deviceNumMap.keySet().contains(deviceNum)) { + JSONArray reportedArray = deviceInfoJson.getJSONArray("reported"); + if (!reportedArray.isEmpty()) { + // 设备状态 + Integer deviceStatus = null; + iotDeviceRealInfo = new IotDeviceRealInfo(); + iotDeviceRealInfo.setDeviceNum(deviceNum); + iotDeviceRealInfo.setWebsocketGroupId(deviceNumMap.get(deviceNum)); + iotDeviceRealInfo.setPointPosition(JSONUtil.toJsonStr(reportedArray)); + iotDeviceRealInfo.setPointPositionArray(reportedArray); + iotDeviceRealInfo.setGmtModified(currentTime); + iotDeviceRealInfo.setGmtCreated(currentTime); + iotDeviceRealInfo.setTopic(record.topic()); + iotDeviceRealInfo.setPart(String.valueOf(record.partition())); + // 设备时间 = 消息时间 + iotDeviceRealInfo.setDeviceTime(DateUtil.date(timestamp)); + Date date = new Date(timestamp); + iotDeviceRealInfo.setDeviceTimeStr(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss SSS")); + iotDeviceRealInfo.setDeviceTimestamp(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss SSS")); + for (Iterator iterator = reportedArray.iterator(); iterator.hasNext(); ) { + JSONObject jsonObject = (JSONObject) iterator.next(); + boolean isStatus = jsonObject.keySet().contains("Status"); + if (isStatus) { + deviceStatus = jsonObject.getInteger("Status"); + if (deviceStatus != null) { + iotDeviceRealInfo.setDeviceStatus(deviceStatus); + } + } + boolean isAlarmMsg = jsonObject.keySet().contains("AlarmMsg"); + if (isAlarmMsg) { + Object alarmMsg = jsonObject.get("AlarmMsg"); + if (alarmMsg != null) { + iotDeviceRealInfo.setAlarmMsg(JSONUtil.toJsonStr(alarmMsg)); + } + } + boolean isAlarm = jsonObject.keySet().contains("Alarm"); + if (isAlarm) { + iotDeviceRealInfo.setAlarm(jsonObject.getInteger("Alarm")); + } + } + } + } + } + if (iotDeviceRealInfo != null) { + String deviceRealInfotring = JSON.toJSONString(iotDeviceRealInfo); + redisUtils.set("kafka:" + iotDeviceRealInfo.getDeviceNum() + ":" + iotDeviceRealInfo.getPart(), deviceRealInfotring); + } + } + + @KafkaListener(topics = {"topic_kfk_iotproduction_xazz_1129"}, groupId = "${groupId}",containerFactory = "secondKafkaListenerContainerFactory") + public void handleMessage2(ConsumerRecord record) { + // 当前时间 + Date currentTime = new Date(); + //消息时间戳 + long messageTimestamp = record.timestamp(); + // 设备实时数据 + XaIotDeviceRealInfo xaIotDeviceRealInfo = new XaIotDeviceRealInfo(); + if (record.value() != null) { + JSONObject jsonObject = JSON.parseObject(record.value()); + if (jsonObject != null) { + // 设备编号 + String deviceNum = jsonObject.getString("__assetId__"); + if (deviceTimestamps.containsKey(deviceNum)) { + long previousTimestamp = deviceTimestamps.get(deviceNum); + if (messageTimestamp <= previousTimestamp) { + return ; + } + } + deviceTimestamps.put(deviceNum, messageTimestamp); + if (xaDeviceNumMap.containsKey(deviceNum)) { + //设备编号 + xaIotDeviceRealInfo.setDeviceNum(deviceNum); + //kafka主题 + xaIotDeviceRealInfo.setTopic(record.topic()); + //设备状态 + Integer status = jsonObject.getInteger("Status"); + xaIotDeviceRealInfo.setDeviceStatus(status); + //设备告警状态 + xaIotDeviceRealInfo.setAlarm(jsonObject.containsKey("Alarm") ? jsonObject.getInteger("Alarm") : null); + //设备告警信息 + xaIotDeviceRealInfo.setAlarmMsg(jsonObject.containsKey("AlarmMsg") ? jsonObject.getString("AlarmMsg") : null); + //设备模式 + xaIotDeviceRealInfo.setMode(jsonObject.containsKey("Mode") ? jsonObject.getString("Mode") : null); + //设备时间 + xaIotDeviceRealInfo.setDeviceTime(DateUtil.date(messageTimestamp)); + //设备时间戳 + Date date = new Date(messageTimestamp); + xaIotDeviceRealInfo.setDeviceTimeStr(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss SSS")); + xaIotDeviceRealInfo.setDeviceTimestamp(DateUtil.format(date, "yyyy-MM-dd HH:mm:ss SSS")); + //设备分区 + xaIotDeviceRealInfo.setPart(String.valueOf(record.partition())); + //设备点位 + Map positionMap = new HashMap<>(); + if (jsonObject.getString("__thingName__").contains("AGV")) { + positionMap.put("AGVPoseX", jsonObject.containsKey("AGVPoseX") ? jsonObject.getDouble("AGVPoseX") : null); + positionMap.put("AGVPoseY", jsonObject.containsKey("AGVPoseY") ? jsonObject.getDouble("AGVPoseY") : null); + positionMap.put("AGVPoseTheta", jsonObject.containsKey("AGVPoseTheta") ? jsonObject.getDouble("AGVPoseTheta") : null); + //电流 + positionMap.put("BatCurrent", jsonObject.containsKey("BatCurrent") ? jsonObject.getDouble("BatCurrent") : null); + //电压 + positionMap.put("BatVoltage", jsonObject.containsKey("BatVoltage") ? jsonObject.getDouble("BatVoltage") : null); + //功能状态 + positionMap.put("BatPowerSupplyStatus", jsonObject.containsKey("BatPowerSupplyStatus") ? jsonObject.getDouble("BatPowerSupplyStatus") : null); + //电池电量 + positionMap.put("BatteryCapacity", jsonObject.containsKey("BatteryCapacity") ? jsonObject.getDouble("BatteryCapacity") : null); + //总行驶里程 + positionMap.put("TotalMileage", jsonObject.containsKey("TotalMileage") ? jsonObject.getDouble("TotalMileage") : null); + //日行驶里程 + positionMap.put("DayMileage", jsonObject.containsKey("DayMileage") ? jsonObject.getDouble("DayMileage") : null); + //工作任务状态 + positionMap.put("TaskStatus", jsonObject.containsKey("TaskStatus") ? jsonObject.getDouble("TaskStatus") : null); + } else { + String[] keys = {"J1", "J2", "J3", "J4", "J5", "J6", "E1","J7","J8","J9","FixName","WeldDetect"}; + for (String key : keys) { + if (jsonObject.containsKey(key)) { + positionMap.put(key, jsonObject.get(key)); + } + } + } + String position = JSON.toJSONString(positionMap); + xaIotDeviceRealInfo.setPointPosition(position); + //设备websocket分组 + xaIotDeviceRealInfo.setWebsocketGroupId(xaDeviceNumMap.get(deviceNum)); + xaIotDeviceRealInfo.setGmtModified(currentTime); + xaIotDeviceRealInfo.setGmtCreated(currentTime); + } + } + } + if (xaIotDeviceRealInfo.getDeviceNum() != null && !xaIotDeviceRealInfo.getDeviceNum().isEmpty()) { + String deviceInfo = JSON.toJSONString(xaIotDeviceRealInfo); + redisUtils.set("XaKafka:" + xaIotDeviceRealInfo.getDeviceNum() + ":" + xaIotDeviceRealInfo.getPart(), deviceInfo); + } + } +} diff --git a/src/main/java/com/ipsplm/listener/RedisListener.java b/src/main/java/com/ipsplm/listener/RedisListener.java new file mode 100644 index 0000000..c88d040 --- /dev/null +++ b/src/main/java/com/ipsplm/listener/RedisListener.java @@ -0,0 +1,71 @@ +package com.ipsplm.listener; + + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.ipsplm.entity.iot.IotDeviceRealInfo; +import com.ipsplm.utils.RedisUtils; +import com.ipsplm.websocket.*; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.stereotype.Component; + +/** + * @Description Redis监听器 + * @Author FanDongqiang + * @Date 2023/5/4 11:32 + * @Version 1.0 + */ +@Component +@Slf4j +@Getter +public class RedisListener implements MessageListener { + private final PatternTopic topic = new PatternTopic("__keyevent@0__:set"); + + @Autowired + @Lazy + private RedisUtils redisUtils; + @Override + public void onMessage(Message message, byte[] pattern) { + String msg = redisUtils.get(new String(message.getBody())).toString(); + int websocketGroupId = JSON.parseObject(msg).getInteger("websocketGroupId"); + if (websocketGroupId != 0) { + sendInfoToWebSocketHandler(websocketGroupId, msg); + } + } + private void sendInfoToWebSocketHandler(int websocketGroupId, String message) { + switch (websocketGroupId) { + case 1: + WebSocketHandler1.sendInfo(message); + break; + case 2: + WebSocketHandler2.sendInfo(message); + break; + case 3: + WebSocketHandler3.sendInfo(message); + break; + case 4: + WebSocketHandler4.sendInfo(message); + break; + case 5: + WebSocketHandler5.sendInfo(message); + break; + case 6: + WebSocketHandler6.sendInfo(message); + break; + case 7: + WebSocketHandler7.sendInfo(message); + break; + case 8: + WebSocketHandler8.sendInfo(message); + break; + default: + log.warn("Invalid websocketGroupId: {}", websocketGroupId); + } + } +} diff --git a/src/main/java/com/ipsplm/service/IIotDeviceListService.java b/src/main/java/com/ipsplm/service/IIotDeviceListService.java new file mode 100644 index 0000000..1ae55fa --- /dev/null +++ b/src/main/java/com/ipsplm/service/IIotDeviceListService.java @@ -0,0 +1,13 @@ +package com.ipsplm.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.ipsplm.entity.iot.IotDeviceList; + +/** + * @Description IOT设备清单表Service + * @Author FanDongqiang + * @Date 2023/2/16 10:21 + * @Version 1.0 + */ +public interface IIotDeviceListService extends IService { +} diff --git a/src/main/java/com/ipsplm/service/IXaIotDeviceListService.java b/src/main/java/com/ipsplm/service/IXaIotDeviceListService.java new file mode 100644 index 0000000..625445a --- /dev/null +++ b/src/main/java/com/ipsplm/service/IXaIotDeviceListService.java @@ -0,0 +1,7 @@ +package com.ipsplm.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.ipsplm.entity.iot.XaIotDeviceList; + +public interface IXaIotDeviceListService extends IService { +} diff --git a/src/main/java/com/ipsplm/service/IXaIotDeviceListSyncService.java b/src/main/java/com/ipsplm/service/IXaIotDeviceListSyncService.java new file mode 100644 index 0000000..1935310 --- /dev/null +++ b/src/main/java/com/ipsplm/service/IXaIotDeviceListSyncService.java @@ -0,0 +1,7 @@ +package com.ipsplm.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.ipsplm.entity.iot.XaIotDeviceListSync; + +public interface IXaIotDeviceListSyncService extends IService { +} diff --git a/src/main/java/com/ipsplm/service/impl/IotDeviceListServiceImpl.java b/src/main/java/com/ipsplm/service/impl/IotDeviceListServiceImpl.java new file mode 100644 index 0000000..3426f5d --- /dev/null +++ b/src/main/java/com/ipsplm/service/impl/IotDeviceListServiceImpl.java @@ -0,0 +1,17 @@ +package com.ipsplm.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.ipsplm.dao.IotDeviceListMapper; +import com.ipsplm.entity.iot.IotDeviceList; +import com.ipsplm.service.IIotDeviceListService; +import org.springframework.stereotype.Service; + +/** + * @Description IOT设备清单表Service实现类 + * @Author FanDongqiang + * @Date 2023/2/16 10:21 + * @Version 1.0 + */ +@Service +public class IotDeviceListServiceImpl extends ServiceImpl implements IIotDeviceListService { +} diff --git a/src/main/java/com/ipsplm/service/impl/XaIotDeviceListServiceImpl.java b/src/main/java/com/ipsplm/service/impl/XaIotDeviceListServiceImpl.java new file mode 100644 index 0000000..f1b04c5 --- /dev/null +++ b/src/main/java/com/ipsplm/service/impl/XaIotDeviceListServiceImpl.java @@ -0,0 +1,11 @@ +package com.ipsplm.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.ipsplm.dao.XaIotDeviceListMapper; +import com.ipsplm.entity.iot.XaIotDeviceList; +import com.ipsplm.service.IXaIotDeviceListService; +import org.springframework.stereotype.Service; + +@Service +public class XaIotDeviceListServiceImpl extends ServiceImpl implements IXaIotDeviceListService { +} diff --git a/src/main/java/com/ipsplm/service/impl/XaIotDeviceListSyncServiceImpl.java b/src/main/java/com/ipsplm/service/impl/XaIotDeviceListSyncServiceImpl.java new file mode 100644 index 0000000..1d71156 --- /dev/null +++ b/src/main/java/com/ipsplm/service/impl/XaIotDeviceListSyncServiceImpl.java @@ -0,0 +1,11 @@ +package com.ipsplm.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.ipsplm.dao.XaIotDeviceListSyncMapper; +import com.ipsplm.entity.iot.XaIotDeviceListSync; +import com.ipsplm.service.IXaIotDeviceListSyncService; +import org.springframework.stereotype.Service; + +@Service +public class XaIotDeviceListSyncServiceImpl extends ServiceImpl implements IXaIotDeviceListSyncService { +} diff --git a/src/main/java/com/ipsplm/utils/JSONUtils.java b/src/main/java/com/ipsplm/utils/JSONUtils.java new file mode 100644 index 0000000..fc74c52 --- /dev/null +++ b/src/main/java/com/ipsplm/utils/JSONUtils.java @@ -0,0 +1,27 @@ +package com.ipsplm.utils; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONException; + +/** + * @Description JSON工具类 + * @Author FanDongqiang + * @Date 2023/2/17 10:42 + * @Version 1.0 + */ +public class JSONUtils { + /** + * 是否合法Json + * + * @param jsonStr 待解析字符串 + * @return + */ + public static boolean isJSONValidate(String jsonStr){ + try { + JSON.parse(jsonStr); + return true; + } catch (JSONException e) { + return false; + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler1.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler1.java new file mode 100644 index 0000000..ceb012a --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler1.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token1") +public class WebSocketHandler1 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; +// if (StringUtils.isEmpty(token)) { +// sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); +// } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler2.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler2.java new file mode 100644 index 0000000..0c8b622 --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler2.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token2") +public class WebSocketHandler2 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; +// if (StringUtils.isEmpty(token)) { +// sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); +// } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler3.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler3.java new file mode 100644 index 0000000..6b3ede8 --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler3.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token3") +public class WebSocketHandler3 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; +// if (StringUtils.isEmpty(token)) { +// sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); +// } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler4.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler4.java new file mode 100644 index 0000000..843e6a2 --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler4.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token4") +public class WebSocketHandler4 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; +// if (StringUtils.isEmpty(token)) { +// sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); +// } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler5.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler5.java new file mode 100644 index 0000000..d685c64 --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler5.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token5") +public class WebSocketHandler5 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; +// if (StringUtils.isEmpty(token)) { +// sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); +// } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler6.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler6.java new file mode 100644 index 0000000..ca9c1e4 --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler6.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类(单两个设备) + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token6") +public class WebSocketHandler6 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; +// if (StringUtils.isEmpty(token)) { +// sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); +// } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler7.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler7.java new file mode 100644 index 0000000..a3dea78 --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler7.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类(单两个设备) + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token7") +public class WebSocketHandler7 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; +// if (StringUtils.isEmpty(token)) { +// sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); +// } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler8.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler8.java new file mode 100644 index 0000000..f5e6a5e --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler8.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类(单两个设备) + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token8") +public class WebSocketHandler8 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; +// if (StringUtils.isEmpty(token)) { +// sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); +// } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..24d5162 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,77 @@ +server: + port: 8088 + servlet: + context-path: /sany-shenyang-realtime-drive + +spring: + application: + name: sany-shenyang-realtime-drive + datasource: + dynamic: + primary: shenyang + datasource: + shenyang: + driver-class-name: com.mysql.jdbc.Driver + url: jdbc:mysql://127.0.0.1:3306/sany?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&allowMultiQueries=true + username: root + password: SySany!0307# + type: com.zaxxer.hikari.HikariDataSource + hikari: + minimum-idle: 5 + maximum-pool-size: 15 + connection-test-query: SELECT 1 + max-lifetime: 1800000 + connection-timeout: 30000 + pool-name: DatebookHikariCP + + redis: + # 地址 + host: localhost + # 端口,默认为6379 + port: 6379 + # 数据库索引 + database: 0 + # 密码 + password: SySany!318# + # 连接超时时间 + timeout: 10000 + lettuce: + pool: + # 连接池中的最小空闲连接 + min-idle: 1000 + # 连接池中的最大空闲连接 + max-idle: -1 + # 连接池的最大数据库连接数 + max-active: -1 + #连接池最大阻塞等待时间(使用负值表示没有限制) + max-wait: -1ms + + kafka: + first: + bootstrap-servers: 10.0.18.202:9092,10.0.18.203:9092,10.0.18.204:9092 + consumer: + auto-offset-reset: latest + enable-auto-commit: false + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + listener: + # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错 + missing-topics-fatal: false + # 配置20个消费者线程 + concurrency: 20 + second: + bootstrap-servers: 10.31.11.64:9092,10.31.11.91:9092,10.31.11.93:9092 + consumer: + auto-offset-reset: latest + enable-auto-commit: false + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + listener: + # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错 + missing-topics-fatal: false + # 配置20个消费者线程 + concurrency: 20 + + +mybatis-plus: + mapper-locations: classpath:/mapper/**/*.xml \ No newline at end of file diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..2d10e4a --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,31 @@ + + + + + + %date [%level] [%thread] %logger{80} [%file : %line] %msg%n + + + %date{yyyy-MM-dd HH:mm:ss.Asia/Shanghai} %-5level [%thread] %logger{56}.%method:%L -%msg%n + + + + + + %date{yyyy-MM-dd HH:mm:ss.Asia/Shanghai} [%highlight(%level)] [%boldMagenta(%thread)] [%cyan(%file) : %line] %msg%n + + + + + + + + + + + + + + + + \ No newline at end of file