commit f6249e175374e48a7f0dc79b7f15cc2fd6766e63 Author: lulicheng Date: Fri Aug 9 13:21:46 2024 +0800 第一次提交 diff --git a/README.md b/README.md new file mode 100644 index 0000000..0606089 --- /dev/null +++ b/README.md @@ -0,0 +1,37 @@ +# 三一华睿实时驱动 + +#### 介绍 +三一华睿实时驱动数据,如机械臂数据等。 + +#### 软件架构 +软件架构说明 + + +#### 安装教程 + +1. xxxx +2. xxxx +3. xxxx + +#### 使用说明 + +1. xxxx +2. xxxx +3. xxxx + +#### 参与贡献 + +1. Fork 本仓库 +2. 新建 Feat_xxx 分支 +3. 提交代码 +4. 新建 Pull Request + + +#### 特技 + +1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md +2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) +3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 +4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 +5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) +6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..be0df42 --- /dev/null +++ b/pom.xml @@ -0,0 +1,78 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.6.7 + + + com.ipsplm + sany-huarui-realtime-drive + 2.0 + sany-huarui-realtime-drive + 三一华睿实时驱动 + + + org.springframework.boot + spring-boot-starter-web + + + org.projectlombok + lombok + + + com.alibaba + fastjson + 1.2.76 + + + mysql + mysql-connector-java + + + + com.ipsplm + common-api + 1.0 + + + org.springframework.boot + spring-boot-starter-security + + + + + + com.baomidou + dynamic-datasource-spring-boot-starter + 2.5.6 + + + + org.springframework.boot + spring-boot-starter-redis + 1.4.1.RELEASE + + + + org.springframework.kafka + spring-kafka + + + + org.springframework.boot + spring-boot-starter-websocket + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/src/main/java/com/ipsplm/SanyHuaRuiRealtimeDriverApplication.java b/src/main/java/com/ipsplm/SanyHuaRuiRealtimeDriverApplication.java new file mode 100644 index 0000000..4177799 --- /dev/null +++ b/src/main/java/com/ipsplm/SanyHuaRuiRealtimeDriverApplication.java @@ -0,0 +1,16 @@ +package com.ipsplm; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import java.util.TimeZone; + +@SpringBootApplication +public class SanyHuaRuiRealtimeDriverApplication { + + public static void main(String[] args) { + TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); + SpringApplication.run(SanyHuaRuiRealtimeDriverApplication.class, args); + } + +} diff --git a/src/main/java/com/ipsplm/config/CrossConfig.java b/src/main/java/com/ipsplm/config/CrossConfig.java new file mode 100644 index 0000000..3242b48 --- /dev/null +++ b/src/main/java/com/ipsplm/config/CrossConfig.java @@ -0,0 +1,23 @@ +package com.ipsplm.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.CorsRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +/** + * @Description 跨域配置 + * @Author FanDongqiang + * @Date 2023/1/12 9:37 + * @Version 1.0 + */ +@Configuration +public class CrossConfig implements WebMvcConfigurer { + @Override + public void addCorsMappings(CorsRegistry registry) { + registry.addMapping("/**") // 允许跨域访问的路径 + .allowedOriginPatterns("*") // 允许跨域访问的源 + .allowedMethods("GET", "POST","PUT", "DELETE", "OPTIONS") // 允许请求方法 + .allowCredentials(true) // 是否发送cookie + .maxAge(3600); // 预检间隔时间 + } +} diff --git a/src/main/java/com/ipsplm/config/KafkaConfig.java b/src/main/java/com/ipsplm/config/KafkaConfig.java new file mode 100644 index 0000000..49ec659 --- /dev/null +++ b/src/main/java/com/ipsplm/config/KafkaConfig.java @@ -0,0 +1,54 @@ +package com.ipsplm.config; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +@Configuration +public class KafkaConfig{ + /** + * 读取kafka配置 + * Primary注解表示默认以这个为准 + * + * @return 第一个kafka配置 + */ + @Primary + @ConfigurationProperties(prefix = "spring.kafka") + @Bean + public KafkaProperties kafkaProperties() { + return new KafkaProperties(); + } + + /** + * 构建第一个kafka的消费者监听容器工厂 + * + * @param firstKafkaProperties 第一个kafka配置 + * @return 第一个kafka的消费者监听容器工厂 + */ + @Bean("kafkaListenerContainerFactory") + public KafkaListenerContainerFactory> + firstKafkaListenerContainerFactory(@Autowired @Qualifier("kafkaProperties") KafkaProperties kafkaProperties) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(firstConsumerFactory(kafkaProperties)); + return factory; + } + + /** + * 新建第一个kafka的消费者工厂 + * + * @param firstKafkaProperties 第一个kafka配置 + * @return 第一个kafka的消费者工厂 + */ + private ConsumerFactory firstConsumerFactory(KafkaProperties kafkaProperties) { + return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); + } +} diff --git a/src/main/java/com/ipsplm/config/KafkaTopicConfig.java b/src/main/java/com/ipsplm/config/KafkaTopicConfig.java new file mode 100644 index 0000000..1e44f10 --- /dev/null +++ b/src/main/java/com/ipsplm/config/KafkaTopicConfig.java @@ -0,0 +1,46 @@ +package com.ipsplm.config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.annotation.Configuration; + +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +/** + * @Description kafka主题配置类 + * @Author FanDongqiang + * @Date 2023/4/26 10:06 + * @Version 1.0 + */ +@Configuration +@Slf4j +public class KafkaTopicConfig implements InitializingBean { + @Override + public void afterPropertiesSet() { + Properties properties = new Properties(); + // kafka集群 +// properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.26.19.61:9092,10.26.19.62:9092,10.26.19.64:9092"); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.5.23.15:9092,10.5.23.31:9092,10.5.23.32:9092"); + // key序列化 + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + // value序列化 + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + // 消费者组编号,不同的组提交偏移不同 + properties.put(ConsumerConfig.GROUP_ID_CONFIG, "3"); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); +// List topicList = kafkaConsumer.listTopics().keySet().stream() +// .filter(t -> !t.contains("meter") && t.startsWith("SANY.pdev.sany.mdc.")).collect(Collectors.toList()); +// String topics = StringUtils.join(topicList, ","); +// System.setProperty("topics", topics); + System.setProperty("topics","topic_kafka_huarui"); + kafkaConsumer.close(); + } +} diff --git a/src/main/java/com/ipsplm/config/MybatisPlusConfig.java b/src/main/java/com/ipsplm/config/MybatisPlusConfig.java new file mode 100644 index 0000000..5c7232b --- /dev/null +++ b/src/main/java/com/ipsplm/config/MybatisPlusConfig.java @@ -0,0 +1,19 @@ +package com.ipsplm.config; + +import com.baomidou.mybatisplus.annotation.DbType; +import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; +import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@MapperScan(basePackages = "com.ipsplm.dao") +public class MybatisPlusConfig { + @Bean + public MybatisPlusInterceptor mybatisPlusInterceptor() { + MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); + interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL)); + return interceptor; + } +} diff --git a/src/main/java/com/ipsplm/config/RedisConfig.java b/src/main/java/com/ipsplm/config/RedisConfig.java new file mode 100644 index 0000000..b253335 --- /dev/null +++ b/src/main/java/com/ipsplm/config/RedisConfig.java @@ -0,0 +1,74 @@ +package com.ipsplm.config; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.ipsplm.listener.RedisListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.annotation.CachingConfigurerSupport; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +/** + * @Description redis配置 + * @Author FanDongqiang + * @Date 2023/2/3 13:36 + * @Version 1.0 + */ +@Configuration +public class RedisConfig extends CachingConfigurerSupport { + @Autowired + private RedisListener redisListener; + + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(connectionFactory); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.registerModule((new SimpleModule())); + //有属性不能映射的时候不报错 + objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + //对象为空时不抛异常 + objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, + JsonTypeInfo.As.PROPERTY); + + GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(objectMapper); + + // key采用String的序列化方式 + template.setKeySerializer(new StringRedisSerializer()); + // value序列化方式采用jackson + template.setValueSerializer(serializer); + + // Hash的key也采用StringRedisSerializer的序列化方式 + template.setHashKeySerializer(new StringRedisSerializer()); + // Hash的value序列化方式采用jackson + template.setHashValueSerializer(serializer); + + template.afterPropertiesSet(); + return template; + } + + @Bean + RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.addMessageListener(redisListener, redisListener.getTopic()); + return container; + } +} diff --git a/src/main/java/com/ipsplm/config/WebSocketConfig.java b/src/main/java/com/ipsplm/config/WebSocketConfig.java new file mode 100644 index 0000000..8773eec --- /dev/null +++ b/src/main/java/com/ipsplm/config/WebSocketConfig.java @@ -0,0 +1,25 @@ +package com.ipsplm.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +/** + * @Description WebSocket配置类 + * @Author FanDongqiang + * @Date 2023/5/6 11:03 + * @Version 1.0 + */ +@Configuration +@EnableWebSocket +public class WebSocketConfig { + /** + * 注入ServerEndpointExporter, + * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/src/main/java/com/ipsplm/dao/IotDeviceListMapper.java b/src/main/java/com/ipsplm/dao/IotDeviceListMapper.java new file mode 100644 index 0000000..0cdcfba --- /dev/null +++ b/src/main/java/com/ipsplm/dao/IotDeviceListMapper.java @@ -0,0 +1,16 @@ +package com.ipsplm.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.ipsplm.entity.iot.IotDeviceList; +import org.apache.ibatis.annotations.Mapper; + +/** + * @Description IOT设备清单表Mapper + * @Author FanDongqiang + * @Date 2023/2/16 10:19 + * @Version 1.0 + */ +@Mapper +public interface IotDeviceListMapper extends BaseMapper { + +} diff --git a/src/main/java/com/ipsplm/dao/IotDeviceListNewMapper.java b/src/main/java/com/ipsplm/dao/IotDeviceListNewMapper.java new file mode 100644 index 0000000..6eb49dd --- /dev/null +++ b/src/main/java/com/ipsplm/dao/IotDeviceListNewMapper.java @@ -0,0 +1,17 @@ +package com.ipsplm.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.ipsplm.entity.iot.IotDeviceList; +import com.ipsplm.entity.iot.IotDeviceListNew; +import org.apache.ibatis.annotations.Mapper; + +/** + * @Description IOT设备清单表Mapper + * @Author FanDongqiang + * @Date 2023/2/16 10:19 + * @Version 1.0 + */ +@Mapper +public interface IotDeviceListNewMapper extends BaseMapper { + +} diff --git a/src/main/java/com/ipsplm/entity/common/BaseEntity.java b/src/main/java/com/ipsplm/entity/common/BaseEntity.java new file mode 100644 index 0000000..54dcdbb --- /dev/null +++ b/src/main/java/com/ipsplm/entity/common/BaseEntity.java @@ -0,0 +1,42 @@ +package com.ipsplm.entity.common; + +import com.baomidou.mybatisplus.annotation.TableField; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @Description 基础实体类 + * @Author FanDongqiang + * @Date 2023/2/20 10:38 + * @Version 1.0 + */ +@Data +public class BaseEntity implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 创建人 + */ + @TableField("creator") + private Long creator; + + /** + * 创建时间 + */ + @TableField("gmt_created") + private Date gmtCreated; + + /** + * 更新人 + */ + @TableField("editor") + private Long editor; + + /** + * 更新时间 + */ + @TableField("gmt_modified") + private Date gmtModified; +} diff --git a/src/main/java/com/ipsplm/entity/iot/IotDeviceList.java b/src/main/java/com/ipsplm/entity/iot/IotDeviceList.java new file mode 100644 index 0000000..e16f000 --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/IotDeviceList.java @@ -0,0 +1,63 @@ +package com.ipsplm.entity.iot; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; + +/** + * @Description IOT设备清单表 + * @Author FanDongqiang + * @Date 2023/2/16 10:15 + * @Version 1.0 + */ +@TableName("iot_device_list") +@Data +public class IotDeviceList implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 主键ID + * iot_device_list.id + */ + @TableId(value = "id", type = IdType.AUTO) + private Integer id; + + /** + * 设备编号 + * iot_device_list.device_num + */ + @TableField("device_num") + private String deviceNum; + + /** + * 设备名称 + * iot_device_list.device_name + */ + @TableField("device_name") + private String deviceName; + + /** + * 设备类型 + * iot_device_list.device_type + */ + @TableField("device_type") + private String deviceType; + + /** + * 设备区域 + * iot_device_list.device_area + */ + @TableField("device_area") + private String deviceArea; + + /** + * websocket分组ID + * iot_device_list.websocket_group_id + */ + @TableField("websocket_group_id") + private Integer websocketGroupId; +} diff --git a/src/main/java/com/ipsplm/entity/iot/IotDeviceListNew.java b/src/main/java/com/ipsplm/entity/iot/IotDeviceListNew.java new file mode 100644 index 0000000..262813a --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/IotDeviceListNew.java @@ -0,0 +1,47 @@ +package com.ipsplm.entity.iot; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serializable; + +@Data +@TableName("iot_device_list_new") +public class IotDeviceListNew implements Serializable { + private static final long serialVersionUID = 1L; + //主键 + @TableId(value = "id", type = IdType.AUTO) + private Integer id; + + //设备编号 + @TableField("device_num") + private String deviceNum; + + //设备名称 + @TableField("device_name") + private String deviceName; + + //子公司 + @TableField("associate_company") + private String associateCompany; + + //工厂 + @TableField("factory") + private String factory; + + //工作重心 + @TableField("work_center") + private String workCenter; + + //班组 + @TableField("work_group") + private String workGroup; + + //websocket分组ID + @TableField("websocket_group_id") + private Integer websocketGroupId; + +} diff --git a/src/main/java/com/ipsplm/entity/iot/IotDeviceRealInfo.java b/src/main/java/com/ipsplm/entity/iot/IotDeviceRealInfo.java new file mode 100644 index 0000000..9283e30 --- /dev/null +++ b/src/main/java/com/ipsplm/entity/iot/IotDeviceRealInfo.java @@ -0,0 +1,103 @@ +package com.ipsplm.entity.iot; + +import com.alibaba.fastjson.JSONArray; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.ipsplm.entity.common.BaseEntity; +import lombok.Data; + +import java.util.Date; + +/** + * @Description IOT设备实时信息表 + * @Author FanDongqiang + * @Date 2023/2/20 10:33 + * @Version 1.0 + */ +@TableName("iot_device_real_info") +@Data +public class IotDeviceRealInfo extends BaseEntity { + /** + * 主键ID + * iot_device_real_info.id + */ + @TableId(value = "id", type = IdType.AUTO) + private Long id; + + /** + * 设备编号 + * iot_device_real_info.device_num + */ + @TableField("device_num") + private String deviceNum; + + /** + * 设备状态(1、作业 2、待机 3、故障 4、关机) + * iot_device_real_info.device_status + */ + @TableField("device_status") + private Integer deviceStatus; + + /** + * 设备时间 + * iot_device_real_info.device_time + */ + @TableField("device_time") + private Date deviceTime; + + /** + * 设备时间 + * iot_device_real_info.device_time + */ + @TableField(exist = false) + private String deviceTimeStr; + + /** + * 报警(0、正常 1、报警) + * iot_device_real_info.alarm + */ + @TableField("alarm") + private String alarm; + + /** + * 报警信息Json字符串 + * iot_device_real_info.alarm_msg + */ + @TableField("alarm_msg") + private String alarmMsg; + + /** + * 设备点位Json字符串 + * iot_device_real_info.point_position + */ + @TableField("point_position") + private String pointPosition; + + /** + * 设备点位Json数组 + */ + @TableField(exist = false) + private JSONArray pointPositionArray; + + /** + * kafka主题 + * iot_device_real_info.topic + */ + @TableField("topic") + private String topic; + + /** + * kafka分区 + * iot_device_real_info.part + */ + @TableField("part") + private String part; + + /** + * websocket分组ID + */ + @TableField(exist = false) + private Integer websocketGroupId; +} diff --git a/src/main/java/com/ipsplm/kafka/ConsumerRedisListener.java b/src/main/java/com/ipsplm/kafka/ConsumerRedisListener.java new file mode 100644 index 0000000..b4df718 --- /dev/null +++ b/src/main/java/com/ipsplm/kafka/ConsumerRedisListener.java @@ -0,0 +1,119 @@ +package com.ipsplm.kafka; + +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.ipsplm.entity.iot.IotDeviceListNew; +import com.ipsplm.entity.iot.IotDeviceRealInfo; +import com.ipsplm.service.IIotDeviceListNewService; +import com.ipsplm.utils.JSONUtils; +import com.ipsplm.utils.RedisUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.util.*; + +/** + * @Description kafka消费者监听 + * @Author FanDongqiang + * @Date 2023/4/26 10:19 + * @Version 1.0 + */ +@Component +@Slf4j +public class ConsumerRedisListener { + private IIotDeviceListNewService iIotDeviceListNewService; + + @Autowired + private RedisUtils redisUtils; + + // 设备编号List + private static Map deviceNumMap = new HashMap<>(); + + public ConsumerRedisListener(IIotDeviceListNewService iIotDeviceListNewService) { + this.iIotDeviceListNewService = iIotDeviceListNewService; + //IOT设备清单(新) + List deviceListNew = this.iIotDeviceListNewService.list(); + deviceListNew.forEach(dln -> { + deviceNumMap.put(dln.getDeviceNum(), dln.getWebsocketGroupId()); + }); + String groupId = "shenyang_real_time_" + System.currentTimeMillis(); + System.out.println("kafka消费者组号:" + groupId + "*********"); + System.setProperty("groupId", groupId); + } + + /** + * 处理单条消息 + * + * @param record 消费记录 + */ + @KafkaListener(topics = {"topic_kafka_huarui"}, groupId = "${groupId}", containerFactory = "kafkaListenerContainerFactory") + public void handleMessage(ConsumerRecord record) { + // 当前时间 + Date currentTime = new Date(); + // 设备实时数据 + IotDeviceRealInfo iotDeviceRealInfo = new IotDeviceRealInfo(); + // 设备互联数据 + String deviceInfo = record.value(); + // 消息时间戳 + long timestamp = record.timestamp(); + if (StrUtil.isNotBlank(deviceInfo) && JSONUtils.isJSONValidate(deviceInfo)) { + JSONObject deviceInfoJson = JSON.parseObject(deviceInfo); + // 设备编号 + String deviceNum = deviceInfoJson.getString("__assetId__"); + if (deviceNumMap.keySet().contains(deviceNum)) { + //设备编号 + iotDeviceRealInfo.setDeviceNum(deviceNum); + iotDeviceRealInfo.setDeviceStatus(deviceInfoJson.containsKey("Status") ? deviceInfoJson.getInteger("Status") : deviceInfoJson.getInteger("status")); + iotDeviceRealInfo.setDeviceTime(DateUtil.date(timestamp)); + iotDeviceRealInfo.setDeviceTimeStr(DateUtil.format(DateUtil.date(timestamp), "yyyy-MM-dd HH:mm:ss")); + if (deviceInfoJson.containsKey("alarm")) { + iotDeviceRealInfo.setAlarm(String.valueOf(deviceInfoJson.getInteger("alarm"))); + } + if (deviceInfoJson.containsKey("AlarmMsg")) { + iotDeviceRealInfo.setAlarmMsg(deviceInfoJson.getString("AlarmMsg")); + } + Map positionMap = new HashMap<>(); + if (deviceInfoJson.getString("__thingName__").contains("AGV")) { + positionMap.put("AGVPoseX", deviceInfoJson.containsKey("AGVPoseX") ? deviceInfoJson.getDouble("AGVPoseX") : null); + positionMap.put("AGVPoseY", deviceInfoJson.containsKey("AGVPoseY") ? deviceInfoJson.getDouble("AGVPoseY") : null); + positionMap.put("AGVPoseTheta", deviceInfoJson.containsKey("AGVPoseTheta") ? deviceInfoJson.getDouble("AGVPoseTheta") : null); + //电流 + positionMap.put("BatCurrent", deviceInfoJson.containsKey("BatCurrent") ? deviceInfoJson.getDouble("BatCurrent") : null); + //电压 + positionMap.put("BatVoltage", deviceInfoJson.containsKey("BatVoltage") ? deviceInfoJson.getDouble("BatVoltage") : null); + //功能状态 + positionMap.put("BatPowerSupplyStatus", deviceInfoJson.containsKey("BatPowerSupplyStatus") ? deviceInfoJson.getDouble("BatPowerSupplyStatus") : null); + //电池电量 + positionMap.put("BatteryCapacity", deviceInfoJson.containsKey("BatteryCapacity") ? deviceInfoJson.getDouble("BatteryCapacity") : null); + //总行驶里程 + positionMap.put("TotalMileage", deviceInfoJson.containsKey("TotalMileage") ? deviceInfoJson.getDouble("TotalMileage") : null); + //日行驶里程 + positionMap.put("DayMileage", deviceInfoJson.containsKey("DayMileage") ? deviceInfoJson.getDouble("DayMileage") : null); + //工作任务状态 + positionMap.put("TaskStatus", deviceInfoJson.containsKey("TaskStatus") ? deviceInfoJson.getDouble("TaskStatus") : null); + } else { + String[] keys = {"J1", "J2", "J3", "J4", "J5", "J6", "E1","J7","J8","J9","FixName","WeldDetect"}; + for (String key : keys) { + if (deviceInfoJson.containsKey(key)) { + positionMap.put(key, deviceInfoJson.get(key)); + } + } + } + iotDeviceRealInfo.setPointPosition(JSON.toJSONString(positionMap)); + iotDeviceRealInfo.setTopic(record.topic()); + iotDeviceRealInfo.setPart(String.valueOf(record.partition())); + iotDeviceRealInfo.setWebsocketGroupId(deviceNumMap.get(deviceNum)); + iotDeviceRealInfo.setGmtModified(currentTime); + iotDeviceRealInfo.setGmtCreated(currentTime); + } + } + if (iotDeviceRealInfo.getDeviceNum() != null) { + redisUtils.set("kafka:" + iotDeviceRealInfo.getDeviceNum() + ":" + iotDeviceRealInfo.getPart(), JSON.toJSONString(iotDeviceRealInfo)); + } + } +} diff --git a/src/main/java/com/ipsplm/listener/RedisListener.java b/src/main/java/com/ipsplm/listener/RedisListener.java new file mode 100644 index 0000000..a82cdcf --- /dev/null +++ b/src/main/java/com/ipsplm/listener/RedisListener.java @@ -0,0 +1,59 @@ +package com.ipsplm.listener; + +import com.alibaba.fastjson.JSON; +import com.ipsplm.entity.iot.IotDeviceRealInfo; +import com.ipsplm.utils.RedisUtils; +import com.ipsplm.websocket.*; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.stereotype.Component; + +/** + * @Description Redis监听器 + * @Author FanDongqiang + * @Date 2023/5/4 11:32 + * @Version 1.0 + */ +@Component +@Slf4j +@Getter +public class RedisListener implements MessageListener { + private final PatternTopic topic = new PatternTopic("__keyevent@0__:set"); + + @Autowired + @Lazy + private RedisUtils redisUtils; + + private String msg; + + private int websocketGroupId; + + @Override + public void onMessage(Message message, byte[] pattern) { + msg = redisUtils.get(new String(message.getBody())).toString(); + websocketGroupId = JSON.parseObject(msg, IotDeviceRealInfo.class).getWebsocketGroupId(); + if (websocketGroupId == 1) { + WebSocketHandler1.sendInfo(msg); + } + if (websocketGroupId == 2) { + WebSocketHandler2.sendInfo(msg); + } + if (websocketGroupId == 3) { + WebSocketHandler3.sendInfo(msg); + } + if (websocketGroupId == 4) { + WebSocketHandler4.sendInfo(msg); + } + if (websocketGroupId == 5) { + WebSocketHandler5.sendInfo(msg); + } + if (websocketGroupId == 6) { + WebSocketHandler6.sendInfo(msg); + } + } +} diff --git a/src/main/java/com/ipsplm/service/IIotDeviceListNewService.java b/src/main/java/com/ipsplm/service/IIotDeviceListNewService.java new file mode 100644 index 0000000..fddf056 --- /dev/null +++ b/src/main/java/com/ipsplm/service/IIotDeviceListNewService.java @@ -0,0 +1,13 @@ +package com.ipsplm.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.ipsplm.entity.iot.IotDeviceListNew; + +/** + * @Description IOT设备清单表Service + * @Author FanDongqiang + * @Date 2023/2/16 10:21 + * @Version 1.0 + */ +public interface IIotDeviceListNewService extends IService { +} diff --git a/src/main/java/com/ipsplm/service/IIotDeviceListService.java b/src/main/java/com/ipsplm/service/IIotDeviceListService.java new file mode 100644 index 0000000..1ae55fa --- /dev/null +++ b/src/main/java/com/ipsplm/service/IIotDeviceListService.java @@ -0,0 +1,13 @@ +package com.ipsplm.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.ipsplm.entity.iot.IotDeviceList; + +/** + * @Description IOT设备清单表Service + * @Author FanDongqiang + * @Date 2023/2/16 10:21 + * @Version 1.0 + */ +public interface IIotDeviceListService extends IService { +} diff --git a/src/main/java/com/ipsplm/service/impl/IotDeviceListNewServiceImpl.java b/src/main/java/com/ipsplm/service/impl/IotDeviceListNewServiceImpl.java new file mode 100644 index 0000000..a9ca68a --- /dev/null +++ b/src/main/java/com/ipsplm/service/impl/IotDeviceListNewServiceImpl.java @@ -0,0 +1,17 @@ +package com.ipsplm.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.ipsplm.dao.IotDeviceListNewMapper; +import com.ipsplm.entity.iot.IotDeviceListNew; +import com.ipsplm.service.IIotDeviceListNewService; +import org.springframework.stereotype.Service; + +/** + * @Description IOT设备清单表Service实现类 + * @Author FanDongqiang + * @Date 2023/2/16 10:21 + * @Version 1.0 + */ +@Service +public class IotDeviceListNewServiceImpl extends ServiceImpl implements IIotDeviceListNewService { +} diff --git a/src/main/java/com/ipsplm/service/impl/IotDeviceListServiceImpl.java b/src/main/java/com/ipsplm/service/impl/IotDeviceListServiceImpl.java new file mode 100644 index 0000000..3426f5d --- /dev/null +++ b/src/main/java/com/ipsplm/service/impl/IotDeviceListServiceImpl.java @@ -0,0 +1,17 @@ +package com.ipsplm.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.ipsplm.dao.IotDeviceListMapper; +import com.ipsplm.entity.iot.IotDeviceList; +import com.ipsplm.service.IIotDeviceListService; +import org.springframework.stereotype.Service; + +/** + * @Description IOT设备清单表Service实现类 + * @Author FanDongqiang + * @Date 2023/2/16 10:21 + * @Version 1.0 + */ +@Service +public class IotDeviceListServiceImpl extends ServiceImpl implements IIotDeviceListService { +} diff --git a/src/main/java/com/ipsplm/utils/JSONUtils.java b/src/main/java/com/ipsplm/utils/JSONUtils.java new file mode 100644 index 0000000..fc74c52 --- /dev/null +++ b/src/main/java/com/ipsplm/utils/JSONUtils.java @@ -0,0 +1,27 @@ +package com.ipsplm.utils; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONException; + +/** + * @Description JSON工具类 + * @Author FanDongqiang + * @Date 2023/2/17 10:42 + * @Version 1.0 + */ +public class JSONUtils { + /** + * 是否合法Json + * + * @param jsonStr 待解析字符串 + * @return + */ + public static boolean isJSONValidate(String jsonStr){ + try { + JSON.parse(jsonStr); + return true; + } catch (JSONException e) { + return false; + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler1.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler1.java new file mode 100644 index 0000000..68ef14d --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler1.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token1") +public class WebSocketHandler1 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; + if (StringUtils.isEmpty(token)) { + sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); + } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler2.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler2.java new file mode 100644 index 0000000..742a072 --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler2.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token2") +public class WebSocketHandler2 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; + if (StringUtils.isEmpty(token)) { + sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); + } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler3.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler3.java new file mode 100644 index 0000000..598f1fa --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler3.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token3") +public class WebSocketHandler3 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; + if (StringUtils.isEmpty(token)) { + sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); + } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler4.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler4.java new file mode 100644 index 0000000..4ac1c92 --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler4.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token4") +public class WebSocketHandler4 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; + if (StringUtils.isEmpty(token)) { + sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); + } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler5.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler5.java new file mode 100644 index 0000000..c373a38 --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler5.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token5") +public class WebSocketHandler5 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; + if (StringUtils.isEmpty(token)) { + sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); + } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/java/com/ipsplm/websocket/WebSocketHandler6.java b/src/main/java/com/ipsplm/websocket/WebSocketHandler6.java new file mode 100644 index 0000000..009a01f --- /dev/null +++ b/src/main/java/com/ipsplm/websocket/WebSocketHandler6.java @@ -0,0 +1,79 @@ +package com.ipsplm.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @Description WebSocket处理类 + * @Author FanDongqiang + * @Date 2023/5/8 9:17 + * @Version 1.0 + */ +@Component +@Slf4j +@ServerEndpoint("/websocket/test-token6") +public class WebSocketHandler6 { + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + + private final ReentrantLock reentrantLock = new ReentrantLock(); + + private Session session; + + @OnOpen + public void handleOpen(Session session, @PathParam("token") String token) { + this.session = session; + if (StringUtils.isEmpty(token)) { + sendInfo("[websocket ERROR] 客户端Token错误,连接失败"); + } + webSocketMap.put(session.getId(), this); + log.info("[websocket]客户端创建连接,session ID = {} ", session.getId()); + log.info("webSocketMap内容:" + webSocketMap); + } + + @OnClose + public void handleClose(Session session) { + if (webSocketMap.containsKey(session.getId())) { + webSocketMap.remove(session.getId()); + } + log.info("[websocket]客户端断开websocket连接,session ID = {} ", session.getId()); + } + + @OnError + public void handleError(Session session, Throwable throwable) { + log.info("[websocket]出现错误,session ID = {} ", session.getId()); + log.info("[websocket]出现错误,throwable = {} ", throwable); + } + + // 发送数据到客户端 + public static void sendInfo(String message) { + for (String item : webSocketMap.keySet()) { + webSocketMap.get(item).sendMessage(message); + } + } + + public void sendMessage(String message) { + try { + //加锁 + reentrantLock.lock(); + if (this.session != null && this.session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + //解锁 + reentrantLock.unlock(); + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..81d1280 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,64 @@ +server: + port: 8091 + servlet: + context-path: /sany-huarui-realtime-drive + +spring: + application: + name: sany-huarui-realtime-drive + datasource: + dynamic: + primary: huarui + datasource: + huarui: + driver-class-name: com.mysql.jdbc.Driver + url: jdbc:mysql://127.0.0.1:3306/sany?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&allowMultiQueries=true + username: root + password: HrSany!0307# + type: com.zaxxer.hikari.HikariDataSource + hikari: + minimum-idle: 5 + maximum-pool-size: 15 + connection-test-query: SELECT 1 + max-lifetime: 1800000 + connection-timeout: 30000 + pool-name: DatebookHikariCP + + redis: + # 地址 + host: localhost + # 端口,默认为6379 + port: 6379 + # 数据库索引 + database: 0 + # 密码 + password: 123456 + # 连接超时时间 + timeout: 10000 + lettuce: + pool: + # 连接池中的最小空闲连接 + min-idle: 1000 + # 连接池中的最大空闲连接 + max-idle: -1 + # 连接池的最大数据库连接数 + max-active: -1 + #连接池最大阻塞等待时间(使用负值表示没有限制) + max-wait: -1ms + + kafka: + bootstrap-servers: 10.5.23.15:9092,10.5.23.31:9092,10.5.23.32:9092 + consumer: + auto-offset-reset: latest + enable-auto-commit: false + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + listener: + # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错 + missing-topics-fatal: false + # 配置20个消费者线程 + concurrency: 20 + + +mybatis-plus: + mapper-locations: classpath:/mapper/**/*.xml \ No newline at end of file diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..2d10e4a --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,31 @@ + + + + + + %date [%level] [%thread] %logger{80} [%file : %line] %msg%n + + + %date{yyyy-MM-dd HH:mm:ss.Asia/Shanghai} %-5level [%thread] %logger{56}.%method:%L -%msg%n + + + + + + %date{yyyy-MM-dd HH:mm:ss.Asia/Shanghai} [%highlight(%level)] [%boldMagenta(%thread)] [%cyan(%file) : %line] %msg%n + + + + + + + + + + + + + + + + \ No newline at end of file