From afd23cdf12727de5e5056e7705a68cc4194a3c17 Mon Sep 17 00:00:00 2001 From: lulicheng Date: Thu, 15 Aug 2024 13:21:50 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=BC=9A=E4=B8=8E=E5=89=8D?= =?UTF-8?q?=E7=AB=AF=E6=96=AD=E5=BC=80Websocket=E8=BF=9E=E6=8E=A5=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/ipsplm/ws/SimulationWebsocket.java | 41 ++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/ipsplm/ws/SimulationWebsocket.java b/src/main/java/com/ipsplm/ws/SimulationWebsocket.java index 50dfe00..d3cb7f4 100644 --- a/src/main/java/com/ipsplm/ws/SimulationWebsocket.java +++ b/src/main/java/com/ipsplm/ws/SimulationWebsocket.java @@ -7,6 +7,7 @@ import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; +import java.nio.ByteBuffer; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -20,8 +21,10 @@ public class SimulationWebsocket { private static ISimulationService simulationService; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private boolean isSimulationRunning = false; + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private volatile boolean isSimulationRunning = false; + private ScheduledExecutorService heartbeatScheduler = Executors.newScheduledThreadPool(1); + @Autowired public void setService(ISimulationService simulationService) { @@ -36,6 +39,10 @@ public class SimulationWebsocket { public void OnOpen(Session session) { this.session = session; webSocketSet.add(this); + if(!heartbeatScheduler.isShutdown()){ + heartbeatScheduler = Executors.newScheduledThreadPool(1); + } + startHeartbeat(); log.info("【websocket消息】有新的连接, 总数:{}", webSocketSet.size()); } @@ -46,6 +53,9 @@ public class SimulationWebsocket { //发送开始仿真的信息 String result = simulationService.getPlantData(message); if ("Simulation Started".equals(result)) { + if(scheduler.isShutdown()){ + scheduler = Executors.newScheduledThreadPool(1); + } if (!isSimulationRunning) { isSimulationRunning = true; log.info("【仿真开始】"); @@ -59,10 +69,12 @@ public class SimulationWebsocket { if ("1".equals(data) || "1.00".equals(data)) { sendMessage("ending"); isSimulationRunning = false; + scheduler.shutdown(); } } } catch (Exception e) { isSimulationRunning = false; + scheduler.shutdown(); } }, 0, 10, TimeUnit.SECONDS); } @@ -84,7 +96,9 @@ public class SimulationWebsocket { @OnClose public void onClose() { webSocketSet.remove(this); + stopHeartbeat(); log.info("【websocket消息】连接断开, 总数:{}", webSocketSet.size()); + } @@ -101,4 +115,27 @@ public class SimulationWebsocket { } } } + + private void startHeartbeat() { + heartbeatScheduler.scheduleAtFixedRate(() -> { + try { + if (session.isOpen()) { + sendMessage("ping"); + } + } catch (Exception e) { + log.error("【websocket心跳】发送心跳失败", e); + } + }, 0, 10, TimeUnit.SECONDS); + } + + private void stopHeartbeat() { + heartbeatScheduler.shutdown(); + try { + if (!heartbeatScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + heartbeatScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + heartbeatScheduler.shutdownNow(); + } + } }