第一次提交

master
lulicheng 8 months ago
commit f6249e1753

@ -0,0 +1,37 @@
# 三一华睿实时驱动
#### 介绍
三一华睿实时驱动数据,如机械臂数据等。
#### 软件架构
软件架构说明
#### 安装教程
1. xxxx
2. xxxx
3. xxxx
#### 使用说明
1. xxxx
2. xxxx
3. xxxx
#### 参与贡献
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
#### 特技
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.7</version>
<relativePath/>
</parent>
<groupId>com.ipsplm</groupId>
<artifactId>sany-huarui-realtime-drive</artifactId>
<version>2.0</version>
<name>sany-huarui-realtime-drive</name>
<description>三一华睿实时驱动</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--自定义通用模块-->
<dependency>
<groupId>com.ipsplm</groupId>
<artifactId>common-api</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--数据库多数据源-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>2.5.6</version>
</dependency>
<!--集成redis 底层还是 2.2.4的redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<version>1.4.1.RELEASE</version>
</dependency>
<!--spring整合kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,16 @@
package com.ipsplm;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.TimeZone;
@SpringBootApplication
public class SanyHuaRuiRealtimeDriverApplication {
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
SpringApplication.run(SanyHuaRuiRealtimeDriverApplication.class, args);
}
}

@ -0,0 +1,23 @@
package com.ipsplm.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* @Description
* @Author FanDongqiang
* @Date 2023/1/12 9:37
* @Version 1.0
*/
@Configuration
public class CrossConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**") // 允许跨域访问的路径
.allowedOriginPatterns("*") // 允许跨域访问的源
.allowedMethods("GET", "POST","PUT", "DELETE", "OPTIONS") // 允许请求方法
.allowCredentials(true) // 是否发送cookie
.maxAge(3600); // 预检间隔时间
}
}

@ -0,0 +1,54 @@
package com.ipsplm.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@Configuration
public class KafkaConfig{
/**
* kafka
* Primary
*
* @return kafka
*/
@Primary
@ConfigurationProperties(prefix = "spring.kafka")
@Bean
public KafkaProperties kafkaProperties() {
return new KafkaProperties();
}
/**
* kafka
*
* @param firstKafkaProperties kafka
* @return kafka
*/
@Bean("kafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
firstKafkaListenerContainerFactory(@Autowired @Qualifier("kafkaProperties") KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(firstConsumerFactory(kafkaProperties));
return factory;
}
/**
* kafka
*
* @param firstKafkaProperties kafka
* @return kafka
*/
private ConsumerFactory<? super Integer, ? super String> firstConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
}

@ -0,0 +1,46 @@
package com.ipsplm.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* @Description kafka
* @Author FanDongqiang
* @Date 2023/4/26 10:06
* @Version 1.0
*/
@Configuration
@Slf4j
public class KafkaTopicConfig implements InitializingBean {
@Override
public void afterPropertiesSet() {
Properties properties = new Properties();
// kafka集群
// properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.26.19.61:9092,10.26.19.62:9092,10.26.19.64:9092");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.5.23.15:9092,10.5.23.31:9092,10.5.23.32:9092");
// key序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// value序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 消费者组编号,不同的组提交偏移不同
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "3");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// List<String> topicList = kafkaConsumer.listTopics().keySet().stream()
// .filter(t -> !t.contains("meter") && t.startsWith("SANY.pdev.sany.mdc.")).collect(Collectors.toList());
// String topics = StringUtils.join(topicList, ",");
// System.setProperty("topics", topics);
System.setProperty("topics","topic_kafka_huarui");
kafkaConsumer.close();
}
}

@ -0,0 +1,19 @@
package com.ipsplm.config;
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@MapperScan(basePackages = "com.ipsplm.dao")
public class MybatisPlusConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
}

@ -0,0 +1,74 @@
package com.ipsplm.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.ipsplm.listener.RedisListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @Description redis
* @Author FanDongqiang
* @Date 2023/2/3 13:36
* @Version 1.0
*/
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Autowired
private RedisListener redisListener;
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
objectMapper.registerModule((new SimpleModule()));
//有属性不能映射的时候不报错
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
//对象为空时不抛异常
objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL,
JsonTypeInfo.As.PROPERTY);
GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(objectMapper);
// key采用String的序列化方式
template.setKeySerializer(new StringRedisSerializer());
// value序列化方式采用jackson
template.setValueSerializer(serializer);
// Hash的key也采用StringRedisSerializer的序列化方式
template.setHashKeySerializer(new StringRedisSerializer());
// Hash的value序列化方式采用jackson
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(redisListener, redisListener.getTopic());
return container;
}
}

@ -0,0 +1,25 @@
package com.ipsplm.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/6 11:03
* @Version 1.0
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig {
/**
* ServerEndpointExporter
* bean使@ServerEndpointWebsocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

@ -0,0 +1,16 @@
package com.ipsplm.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ipsplm.entity.iot.IotDeviceList;
import org.apache.ibatis.annotations.Mapper;
/**
* @Description IOTMapper
* @Author FanDongqiang
* @Date 2023/2/16 10:19
* @Version 1.0
*/
@Mapper
public interface IotDeviceListMapper extends BaseMapper<IotDeviceList> {
}

@ -0,0 +1,17 @@
package com.ipsplm.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ipsplm.entity.iot.IotDeviceList;
import com.ipsplm.entity.iot.IotDeviceListNew;
import org.apache.ibatis.annotations.Mapper;
/**
* @Description IOTMapper
* @Author FanDongqiang
* @Date 2023/2/16 10:19
* @Version 1.0
*/
@Mapper
public interface IotDeviceListNewMapper extends BaseMapper<IotDeviceListNew> {
}

@ -0,0 +1,42 @@
package com.ipsplm.entity.common;
import com.baomidou.mybatisplus.annotation.TableField;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @Description
* @Author FanDongqiang
* @Date 2023/2/20 10:38
* @Version 1.0
*/
@Data
public class BaseEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
*
*/
@TableField("creator")
private Long creator;
/**
*
*/
@TableField("gmt_created")
private Date gmtCreated;
/**
*
*/
@TableField("editor")
private Long editor;
/**
*
*/
@TableField("gmt_modified")
private Date gmtModified;
}

@ -0,0 +1,63 @@
package com.ipsplm.entity.iot;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
/**
* @Description IOT
* @Author FanDongqiang
* @Date 2023/2/16 10:15
* @Version 1.0
*/
@TableName("iot_device_list")
@Data
public class IotDeviceList implements Serializable {
private static final long serialVersionUID = 1L;
/**
* ID
* iot_device_list.id
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
*
* iot_device_list.device_num
*/
@TableField("device_num")
private String deviceNum;
/**
*
* iot_device_list.device_name
*/
@TableField("device_name")
private String deviceName;
/**
*
* iot_device_list.device_type
*/
@TableField("device_type")
private String deviceType;
/**
*
* iot_device_list.device_area
*/
@TableField("device_area")
private String deviceArea;
/**
* websocketID
* iot_device_list.websocket_group_id
*/
@TableField("websocket_group_id")
private Integer websocketGroupId;
}

@ -0,0 +1,47 @@
package com.ipsplm.entity.iot;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
@Data
@TableName("iot_device_list_new")
public class IotDeviceListNew implements Serializable {
private static final long serialVersionUID = 1L;
//主键
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
//设备编号
@TableField("device_num")
private String deviceNum;
//设备名称
@TableField("device_name")
private String deviceName;
//子公司
@TableField("associate_company")
private String associateCompany;
//工厂
@TableField("factory")
private String factory;
//工作重心
@TableField("work_center")
private String workCenter;
//班组
@TableField("work_group")
private String workGroup;
//websocket分组ID
@TableField("websocket_group_id")
private Integer websocketGroupId;
}

@ -0,0 +1,103 @@
package com.ipsplm.entity.iot;
import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.ipsplm.entity.common.BaseEntity;
import lombok.Data;
import java.util.Date;
/**
* @Description IOT
* @Author FanDongqiang
* @Date 2023/2/20 10:33
* @Version 1.0
*/
@TableName("iot_device_real_info")
@Data
public class IotDeviceRealInfo extends BaseEntity {
/**
* ID
* iot_device_real_info.id
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
*
* iot_device_real_info.device_num
*/
@TableField("device_num")
private String deviceNum;
/**
* 1 2 3 4
* iot_device_real_info.device_status
*/
@TableField("device_status")
private Integer deviceStatus;
/**
*
* iot_device_real_info.device_time
*/
@TableField("device_time")
private Date deviceTime;
/**
*
* iot_device_real_info.device_time
*/
@TableField(exist = false)
private String deviceTimeStr;
/**
* 0 1
* iot_device_real_info.alarm
*/
@TableField("alarm")
private String alarm;
/**
* Json
* iot_device_real_info.alarm_msg
*/
@TableField("alarm_msg")
private String alarmMsg;
/**
* Json
* iot_device_real_info.point_position
*/
@TableField("point_position")
private String pointPosition;
/**
* Json
*/
@TableField(exist = false)
private JSONArray pointPositionArray;
/**
* kafka
* iot_device_real_info.topic
*/
@TableField("topic")
private String topic;
/**
* kafka
* iot_device_real_info.part
*/
@TableField("part")
private String part;
/**
* websocketID
*/
@TableField(exist = false)
private Integer websocketGroupId;
}

@ -0,0 +1,119 @@
package com.ipsplm.kafka;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ipsplm.entity.iot.IotDeviceListNew;
import com.ipsplm.entity.iot.IotDeviceRealInfo;
import com.ipsplm.service.IIotDeviceListNewService;
import com.ipsplm.utils.JSONUtils;
import com.ipsplm.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* @Description kafka
* @Author FanDongqiang
* @Date 2023/4/26 10:19
* @Version 1.0
*/
@Component
@Slf4j
public class ConsumerRedisListener {
private IIotDeviceListNewService iIotDeviceListNewService;
@Autowired
private RedisUtils redisUtils;
// 设备编号List
private static Map<String, Integer> deviceNumMap = new HashMap<>();
public ConsumerRedisListener(IIotDeviceListNewService iIotDeviceListNewService) {
this.iIotDeviceListNewService = iIotDeviceListNewService;
//IOT设备清单(新)
List<IotDeviceListNew> deviceListNew = this.iIotDeviceListNewService.list();
deviceListNew.forEach(dln -> {
deviceNumMap.put(dln.getDeviceNum(), dln.getWebsocketGroupId());
});
String groupId = "shenyang_real_time_" + System.currentTimeMillis();
System.out.println("kafka消费者组号" + groupId + "*********");
System.setProperty("groupId", groupId);
}
/**
*
*
* @param record
*/
@KafkaListener(topics = {"topic_kafka_huarui"}, groupId = "${groupId}", containerFactory = "kafkaListenerContainerFactory")
public void handleMessage(ConsumerRecord<String, String> record) {
// 当前时间
Date currentTime = new Date();
// 设备实时数据
IotDeviceRealInfo iotDeviceRealInfo = new IotDeviceRealInfo();
// 设备互联数据
String deviceInfo = record.value();
// 消息时间戳
long timestamp = record.timestamp();
if (StrUtil.isNotBlank(deviceInfo) && JSONUtils.isJSONValidate(deviceInfo)) {
JSONObject deviceInfoJson = JSON.parseObject(deviceInfo);
// 设备编号
String deviceNum = deviceInfoJson.getString("__assetId__");
if (deviceNumMap.keySet().contains(deviceNum)) {
//设备编号
iotDeviceRealInfo.setDeviceNum(deviceNum);
iotDeviceRealInfo.setDeviceStatus(deviceInfoJson.containsKey("Status") ? deviceInfoJson.getInteger("Status") : deviceInfoJson.getInteger("status"));
iotDeviceRealInfo.setDeviceTime(DateUtil.date(timestamp));
iotDeviceRealInfo.setDeviceTimeStr(DateUtil.format(DateUtil.date(timestamp), "yyyy-MM-dd HH:mm:ss"));
if (deviceInfoJson.containsKey("alarm")) {
iotDeviceRealInfo.setAlarm(String.valueOf(deviceInfoJson.getInteger("alarm")));
}
if (deviceInfoJson.containsKey("AlarmMsg")) {
iotDeviceRealInfo.setAlarmMsg(deviceInfoJson.getString("AlarmMsg"));
}
Map<String, Object> positionMap = new HashMap<>();
if (deviceInfoJson.getString("__thingName__").contains("AGV")) {
positionMap.put("AGVPoseX", deviceInfoJson.containsKey("AGVPoseX") ? deviceInfoJson.getDouble("AGVPoseX") : null);
positionMap.put("AGVPoseY", deviceInfoJson.containsKey("AGVPoseY") ? deviceInfoJson.getDouble("AGVPoseY") : null);
positionMap.put("AGVPoseTheta", deviceInfoJson.containsKey("AGVPoseTheta") ? deviceInfoJson.getDouble("AGVPoseTheta") : null);
//电流
positionMap.put("BatCurrent", deviceInfoJson.containsKey("BatCurrent") ? deviceInfoJson.getDouble("BatCurrent") : null);
//电压
positionMap.put("BatVoltage", deviceInfoJson.containsKey("BatVoltage") ? deviceInfoJson.getDouble("BatVoltage") : null);
//功能状态
positionMap.put("BatPowerSupplyStatus", deviceInfoJson.containsKey("BatPowerSupplyStatus") ? deviceInfoJson.getDouble("BatPowerSupplyStatus") : null);
//电池电量
positionMap.put("BatteryCapacity", deviceInfoJson.containsKey("BatteryCapacity") ? deviceInfoJson.getDouble("BatteryCapacity") : null);
//总行驶里程
positionMap.put("TotalMileage", deviceInfoJson.containsKey("TotalMileage") ? deviceInfoJson.getDouble("TotalMileage") : null);
//日行驶里程
positionMap.put("DayMileage", deviceInfoJson.containsKey("DayMileage") ? deviceInfoJson.getDouble("DayMileage") : null);
//工作任务状态
positionMap.put("TaskStatus", deviceInfoJson.containsKey("TaskStatus") ? deviceInfoJson.getDouble("TaskStatus") : null);
} else {
String[] keys = {"J1", "J2", "J3", "J4", "J5", "J6", "E1","J7","J8","J9","FixName","WeldDetect"};
for (String key : keys) {
if (deviceInfoJson.containsKey(key)) {
positionMap.put(key, deviceInfoJson.get(key));
}
}
}
iotDeviceRealInfo.setPointPosition(JSON.toJSONString(positionMap));
iotDeviceRealInfo.setTopic(record.topic());
iotDeviceRealInfo.setPart(String.valueOf(record.partition()));
iotDeviceRealInfo.setWebsocketGroupId(deviceNumMap.get(deviceNum));
iotDeviceRealInfo.setGmtModified(currentTime);
iotDeviceRealInfo.setGmtCreated(currentTime);
}
}
if (iotDeviceRealInfo.getDeviceNum() != null) {
redisUtils.set("kafka:" + iotDeviceRealInfo.getDeviceNum() + ":" + iotDeviceRealInfo.getPart(), JSON.toJSONString(iotDeviceRealInfo));
}
}
}

@ -0,0 +1,59 @@
package com.ipsplm.listener;
import com.alibaba.fastjson.JSON;
import com.ipsplm.entity.iot.IotDeviceRealInfo;
import com.ipsplm.utils.RedisUtils;
import com.ipsplm.websocket.*;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.stereotype.Component;
/**
* @Description Redis
* @Author FanDongqiang
* @Date 2023/5/4 11:32
* @Version 1.0
*/
@Component
@Slf4j
@Getter
public class RedisListener implements MessageListener {
private final PatternTopic topic = new PatternTopic("__keyevent@0__:set");
@Autowired
@Lazy
private RedisUtils redisUtils;
private String msg;
private int websocketGroupId;
@Override
public void onMessage(Message message, byte[] pattern) {
msg = redisUtils.get(new String(message.getBody())).toString();
websocketGroupId = JSON.parseObject(msg, IotDeviceRealInfo.class).getWebsocketGroupId();
if (websocketGroupId == 1) {
WebSocketHandler1.sendInfo(msg);
}
if (websocketGroupId == 2) {
WebSocketHandler2.sendInfo(msg);
}
if (websocketGroupId == 3) {
WebSocketHandler3.sendInfo(msg);
}
if (websocketGroupId == 4) {
WebSocketHandler4.sendInfo(msg);
}
if (websocketGroupId == 5) {
WebSocketHandler5.sendInfo(msg);
}
if (websocketGroupId == 6) {
WebSocketHandler6.sendInfo(msg);
}
}
}

@ -0,0 +1,13 @@
package com.ipsplm.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.ipsplm.entity.iot.IotDeviceListNew;
/**
* @Description IOTService
* @Author FanDongqiang
* @Date 2023/2/16 10:21
* @Version 1.0
*/
public interface IIotDeviceListNewService extends IService<IotDeviceListNew> {
}

@ -0,0 +1,13 @@
package com.ipsplm.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.ipsplm.entity.iot.IotDeviceList;
/**
* @Description IOTService
* @Author FanDongqiang
* @Date 2023/2/16 10:21
* @Version 1.0
*/
public interface IIotDeviceListService extends IService<IotDeviceList> {
}

@ -0,0 +1,17 @@
package com.ipsplm.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ipsplm.dao.IotDeviceListNewMapper;
import com.ipsplm.entity.iot.IotDeviceListNew;
import com.ipsplm.service.IIotDeviceListNewService;
import org.springframework.stereotype.Service;
/**
* @Description IOTService
* @Author FanDongqiang
* @Date 2023/2/16 10:21
* @Version 1.0
*/
@Service
public class IotDeviceListNewServiceImpl extends ServiceImpl<IotDeviceListNewMapper, IotDeviceListNew> implements IIotDeviceListNewService {
}

@ -0,0 +1,17 @@
package com.ipsplm.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ipsplm.dao.IotDeviceListMapper;
import com.ipsplm.entity.iot.IotDeviceList;
import com.ipsplm.service.IIotDeviceListService;
import org.springframework.stereotype.Service;
/**
* @Description IOTService
* @Author FanDongqiang
* @Date 2023/2/16 10:21
* @Version 1.0
*/
@Service
public class IotDeviceListServiceImpl extends ServiceImpl<IotDeviceListMapper, IotDeviceList> implements IIotDeviceListService {
}

@ -0,0 +1,27 @@
package com.ipsplm.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
/**
* @Description JSON
* @Author FanDongqiang
* @Date 2023/2/17 10:42
* @Version 1.0
*/
public class JSONUtils {
/**
* Json
*
* @param jsonStr
* @return
*/
public static boolean isJSONValidate(String jsonStr){
try {
JSON.parse(jsonStr);
return true;
} catch (JSONException e) {
return false;
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token1")
public class WebSocketHandler1 {
private static ConcurrentHashMap<String, WebSocketHandler1> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
if (StringUtils.isEmpty(token)) {
sendInfo("[websocket ERROR] 客户端Token错误连接失败");
}
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token2")
public class WebSocketHandler2 {
private static ConcurrentHashMap<String, WebSocketHandler2> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
if (StringUtils.isEmpty(token)) {
sendInfo("[websocket ERROR] 客户端Token错误连接失败");
}
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token3")
public class WebSocketHandler3 {
private static ConcurrentHashMap<String, WebSocketHandler3> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
if (StringUtils.isEmpty(token)) {
sendInfo("[websocket ERROR] 客户端Token错误连接失败");
}
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token4")
public class WebSocketHandler4 {
private static ConcurrentHashMap<String, WebSocketHandler4> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
if (StringUtils.isEmpty(token)) {
sendInfo("[websocket ERROR] 客户端Token错误连接失败");
}
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token5")
public class WebSocketHandler5 {
private static ConcurrentHashMap<String, WebSocketHandler5> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
if (StringUtils.isEmpty(token)) {
sendInfo("[websocket ERROR] 客户端Token错误连接失败");
}
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,79 @@
package com.ipsplm.websocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description WebSocket
* @Author FanDongqiang
* @Date 2023/5/8 9:17
* @Version 1.0
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/test-token6")
public class WebSocketHandler6 {
private static ConcurrentHashMap<String, WebSocketHandler6> webSocketMap = new ConcurrentHashMap<>();
private final ReentrantLock reentrantLock = new ReentrantLock();
private Session session;
@OnOpen
public void handleOpen(Session session, @PathParam("token") String token) {
this.session = session;
if (StringUtils.isEmpty(token)) {
sendInfo("[websocket ERROR] 客户端Token错误连接失败");
}
webSocketMap.put(session.getId(), this);
log.info("[websocket]客户端创建连接session ID = {} ", session.getId());
log.info("webSocketMap内容" + webSocketMap);
}
@OnClose
public void handleClose(Session session) {
if (webSocketMap.containsKey(session.getId())) {
webSocketMap.remove(session.getId());
}
log.info("[websocket]客户端断开websocket连接session ID = {} ", session.getId());
}
@OnError
public void handleError(Session session, Throwable throwable) {
log.info("[websocket]出现错误session ID = {} ", session.getId());
log.info("[websocket]出现错误throwable = {} ", throwable);
}
// 发送数据到客户端
public static void sendInfo(String message) {
for (String item : webSocketMap.keySet()) {
webSocketMap.get(item).sendMessage(message);
}
}
public void sendMessage(String message) {
try {
//加锁
reentrantLock.lock();
if (this.session != null && this.session.isOpen()) {
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//解锁
reentrantLock.unlock();
}
}
}

@ -0,0 +1,64 @@
server:
port: 8091
servlet:
context-path: /sany-huarui-realtime-drive
spring:
application:
name: sany-huarui-realtime-drive
datasource:
dynamic:
primary: huarui
datasource:
huarui:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/sany?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&allowMultiQueries=true
username: root
password: HrSany!0307#
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimum-idle: 5
maximum-pool-size: 15
connection-test-query: SELECT 1
max-lifetime: 1800000
connection-timeout: 30000
pool-name: DatebookHikariCP
redis:
# 地址
host: localhost
# 端口默认为6379
port: 6379
# 数据库索引
database: 0
# 密码
password: 123456
# 连接超时时间
timeout: 10000
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 1000
# 连接池中的最大空闲连接
max-idle: -1
# 连接池的最大数据库连接数
max-active: -1
#连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
kafka:
bootstrap-servers: 10.5.23.15:9092,10.5.23.31:9092,10.5.23.32:9092
consumer:
auto-offset-reset: latest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
missing-topics-fatal: false
# 配置20个消费者线程
concurrency: 20
mybatis-plus:
mapper-locations: classpath:/mapper/**/*.xml

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="log.path" value="logs/" />
<appender name="logbackAll" class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder>
<pattern>%date [%level] [%thread] %logger{80} [%file : %line] %msg%n</pattern>
</encoder>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%date{yyyy-MM-dd HH:mm:ss.Asia/Shanghai} %-5level [%thread] %logger{56}.%method:%L -%msg%n</pattern>
</layout>
</appender>
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
<pattern>%date{yyyy-MM-dd HH:mm:ss.Asia/Shanghai} [%highlight(%level)] [%boldMagenta(%thread)] [%cyan(%file) : %line] %msg%n</pattern>
</pattern>
</encoder>
</appender>
<!-- kafka 日志关闭 -->
<logger name="org.apache.kafka" level="OFF"/>
<logger name="io.netty" level="OFF" />
<logger name="org.springframework" level="INFO" additivity="false">
<appender-ref ref="Console"/>
<appender-ref ref="logbackAll"/>
</logger>
<logger name="com.ipsplm.dao" level="info" />
<root level="INFO">
<appender-ref ref="Console"/>
<appender-ref ref="logbackAll"/>
</root>
</configuration>
Loading…
Cancel
Save