diff --git a/APIService/pom.xml b/APIService/pom.xml
index a5b6355..bdae929 100644
--- a/APIService/pom.xml
+++ b/APIService/pom.xml
@@ -28,6 +28,12 @@
okhttps-fastjson
3.5.2
+
+
+ io.netty
+ netty-all
+ 4.1.33.Final
+
\ No newline at end of file
diff --git a/APIService/src/main/java/com/storeroom/common/NettyServer.java b/APIService/src/main/java/com/storeroom/common/NettyServer.java
new file mode 100644
index 0000000..1479677
--- /dev/null
+++ b/APIService/src/main/java/com/storeroom/common/NettyServer.java
@@ -0,0 +1,156 @@
+package com.storeroom.common;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+
+import org.slf4j.Logger;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import io.netty.channel.socket.SocketChannel;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class NettyServer {
+
+ private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
+
+
+ private static final String WEBSOCKET_PROTOCOL="WebSocket";
+
+ /**
+ * 端口号
+ */
+ @Value("${webSocket.netty.port}")
+ private int port;
+
+ /**
+ * webSocket路径
+ */
+ @Value("${webSocket.netty.path}")
+ private String webSocketPath;
+
+ /**
+ * 在Netty心跳检测中配置 - 读空闲超时时间设置
+ */
+ @Value("${webSocket.netty.readerIdleTime}")
+ private long readerIdleTime;
+
+ /**
+ * 在Netty心跳检测中配置 - 写空闲超时时间设置
+ */
+ @Value("${webSocket.netty.writerIdleTime}")
+ private long writerIdleTime;
+
+ /**
+ * 在Netty心跳检测中配置 - 读写空闲超时时间设置
+ */
+ @Value("${webSocket.netty.allIdleTime}")
+ private long allIdleTime;
+
+ @Autowired
+ private WebSocketHandler webSocketHandler;
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workGroup;
+
+
+ /**
+ * 启动
+ * @throws InterruptedException
+ */
+ private void start() throws InterruptedException {
+ bossGroup = new NioEventLoopGroup();
+ workGroup = new NioEventLoopGroup();
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
+ bootstrap.group(bossGroup,workGroup);
+ // 设置NIO类型的channel
+ bootstrap.channel(NioServerSocketChannel.class);
+ // 设置监听端口
+ bootstrap.localAddress(new InetSocketAddress(port));
+ // 连接到达时会创建一个通道
+ bootstrap.childHandler(new ChannelInitializer() {
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ // 心跳检测(一般情况第一个设置,如果超时了,则会调用userEventTriggered方法,且会告诉你超时的类型)
+ ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
+ // 流水线管理通道中的处理程序(Handler),用来处理业务
+ // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
+ ch.pipeline().addLast(new HttpServerCodec());
+ ch.pipeline().addLast(new ObjectEncoder());
+ // 以块的方式来写的处理器
+ ch.pipeline().addLast(new ChunkedWriteHandler());
+ /*
+ 说明:
+ 1、http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合
+ 2、这就是为什么,当浏览器发送大量数据时,就会发送多次http请求
+ */
+ ch.pipeline().addLast(new HttpObjectAggregator(8192));
+ /*
+ 说明:
+ 1、对应webSocket,它的数据是以帧(frame)的形式传递
+ 2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
+ 3、核心功能是将http协议升级为ws协议,保持长连接
+ */
+ ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
+ // 自定义的handler,处理业务逻辑
+ ch.pipeline().addLast(webSocketHandler);
+ }
+ });
+ // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
+ ChannelFuture channelFuture = bootstrap.bind().sync();
+ log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
+ // 对关闭通道进行监听
+ channelFuture.channel().closeFuture().sync();
+ }
+
+ /**
+ * 释放资源
+ * @throws InterruptedException
+ */
+ @PreDestroy
+ public void destroy() throws InterruptedException {
+ if(bossGroup != null){
+ bossGroup.shutdownGracefully().sync();
+ }
+ if(workGroup != null){
+ workGroup.shutdownGracefully().sync();
+ }
+ }
+
+ /**
+ * 初始化(新线程开启)
+ */
+ @PostConstruct()
+ public void init() {
+ //需要开启一个新的线程来执行netty server 服务器
+ new Thread(() -> {
+ try {
+ start();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }).start();
+ }
+
+
+}
diff --git a/APIService/src/main/java/com/storeroom/common/WebSocketHandler.java b/APIService/src/main/java/com/storeroom/common/WebSocketHandler.java
new file mode 100644
index 0000000..3d9ca25
--- /dev/null
+++ b/APIService/src/main/java/com/storeroom/common/WebSocketHandler.java
@@ -0,0 +1,118 @@
+package com.storeroom.common;
+
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.storeroom.config.NettyConfig;
+import io.netty.channel.*;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.AttributeKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+
+@Component
+@ChannelHandler.Sharable
+public class WebSocketHandler extends SimpleChannelInboundHandler{
+
+ private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class);
+
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ log.info("handlerAdded 被调用"+ctx.channel().id().asLongText());
+ // 添加到channelGroup 通道组
+ NettyConfig.getChannelGroup().add(ctx.channel());
+ }
+
+
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
+ // 获取用户ID,关联channel
+ JSONObject jsonObject = JSONUtil.parseObj(msg.text());
+ String uid = jsonObject.getStr("uid");
+ // 当用户ID已存入通道内,则不进行写入,只有第一次建立连接时才会存入,其他情况发送uid则为心跳需求
+ if(!NettyConfig.getUserChannelMap().containsKey(uid)){
+ log.info("服务器收到消息:{}",msg.text());
+ NettyConfig.getUserChannelMap().put(uid,ctx.channel());
+ // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
+ AttributeKey key = AttributeKey.valueOf("userId");
+ ctx.channel().attr(key).setIfAbsent(uid);
+ // 回复消息
+ ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器连接成功!"));
+ }else{
+ // 前端定时请求,保持心跳连接,避免服务端误删通道
+ ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
+ }
+
+ }
+
+ /**
+ * 移除通道及关联用户
+ * @param ctx
+ * @throws Exception
+ */
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText());
+ // 删除通道
+ NettyConfig.getChannelGroup().remove(ctx.channel());
+ removeUserId(ctx);
+ }
+
+ /**
+ * 异常处理
+ * @param ctx
+ * @param cause
+ * @throws Exception
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ log.info("异常:{}",cause.getMessage());
+ // 删除通道
+ NettyConfig.getChannelGroup().remove(ctx.channel());
+ removeUserId(ctx);
+ ctx.close();
+ }
+
+ /**
+ * 心跳检测相关方法 - 会主动调用handlerRemoved
+ * @param ctx
+ * @param evt
+ * @throws Exception
+ */
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+ if(evt instanceof IdleStateEvent){
+ IdleStateEvent event = (IdleStateEvent)evt;
+ if(event.state() == IdleState.ALL_IDLE){
+ //清除超时会话
+ ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
+ writeAndFlush.addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ ctx.channel().close();
+ }
+ });
+ }
+ }else{
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+
+ /**
+ * 删除用户与channel的对应关系
+ * @param ctx
+ */
+ private void removeUserId(ChannelHandlerContext ctx){
+ AttributeKey key = AttributeKey.valueOf("userId");
+ String userId = ctx.channel().attr(key).get();
+ NettyConfig.getUserChannelMap().remove(userId);
+ log.info("删除用户与channel的对应关系,uid:{}",userId);
+ }
+
+}
diff --git a/APIService/src/main/java/com/storeroom/config/NettyConfig.java b/APIService/src/main/java/com/storeroom/config/NettyConfig.java
new file mode 100644
index 0000000..d6bc58f
--- /dev/null
+++ b/APIService/src/main/java/com/storeroom/config/NettyConfig.java
@@ -0,0 +1,43 @@
+package com.storeroom.config;
+
+import io.netty.channel.Channel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class NettyConfig {
+
+
+ /**
+ * 定义一个channel,管理所有channel
+ */
+ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+
+ /**
+ * 存放用户信息,发送指定用户
+ */
+ private static ConcurrentHashMap userChannelMap = new ConcurrentHashMap<>();
+
+ private NettyConfig() {
+
+ }
+
+ /**
+ * 获取用户组
+ * @return /
+ */
+ public static ChannelGroup getChannelGroup(){
+ return channelGroup;
+ }
+
+ /**
+ * 获取用户channel map
+ * @return /
+ */
+ public static ConcurrentHashMap getUserChannelMap(){
+ return userChannelMap;
+ }
+}
diff --git a/APIService/src/main/java/com/storeroom/config/WebSocketConfig.java b/APIService/src/main/java/com/storeroom/config/WebSocketConfig.java
new file mode 100644
index 0000000..6c401ed
--- /dev/null
+++ b/APIService/src/main/java/com/storeroom/config/WebSocketConfig.java
@@ -0,0 +1,19 @@
+package com.storeroom.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Service;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+@Service
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig {
+
+ @Bean
+ public ServerEndpointExporter serverEndpointExporter(){
+ return new ServerEndpointExporter();
+ }
+
+}
diff --git a/APIService/src/main/java/com/storeroom/controller/PushController.java b/APIService/src/main/java/com/storeroom/controller/PushController.java
new file mode 100644
index 0000000..6fc3c0b
--- /dev/null
+++ b/APIService/src/main/java/com/storeroom/controller/PushController.java
@@ -0,0 +1,35 @@
+package com.storeroom.controller;
+
+import com.storeroom.annotaion.rest.AnonymousGetMapping;
+import com.storeroom.annotaion.rest.AnonymousPostMapping;
+import com.storeroom.service.PushService;
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/push/")
+@RequiredArgsConstructor
+public class PushController {
+
+ private final PushService pushService;
+
+
+ @AnonymousPostMapping("pushAll")
+ public void pushAll(@RequestBody String msg) {
+ pushService.pushMsgToAll(msg);
+ }
+
+ @AnonymousPostMapping("pushOne")
+ public void pushMsgToOne(@RequestParam("userId") String userId, @RequestParam("msg") String msg) {
+ pushService.pushMsgToOne(userId, msg);
+ }
+
+ @AnonymousGetMapping("getConnectNum")
+ public int getConnectNum() {
+ return pushService.getConnectCount();
+ }
+
+}
diff --git a/APIService/src/main/java/com/storeroom/controller/TestApiServiceController.java b/APIService/src/main/java/com/storeroom/controller/TestApiServiceController.java
index b3db7e3..6fa7009 100644
--- a/APIService/src/main/java/com/storeroom/controller/TestApiServiceController.java
+++ b/APIService/src/main/java/com/storeroom/controller/TestApiServiceController.java
@@ -51,6 +51,7 @@ public class TestApiServiceController {
@ApiOperation("获取设备监控参数实时值")
@AnonymousPostMapping("getrealtimedata")
public ApiResponse