Browse Source

websoket

master
刘力 3 years ago
parent
commit
271a1ad947
  1. 6
      APIService/pom.xml
  2. 156
      APIService/src/main/java/com/storeroom/common/NettyServer.java
  3. 118
      APIService/src/main/java/com/storeroom/common/WebSocketHandler.java
  4. 43
      APIService/src/main/java/com/storeroom/config/NettyConfig.java
  5. 19
      APIService/src/main/java/com/storeroom/config/WebSocketConfig.java
  6. 35
      APIService/src/main/java/com/storeroom/controller/PushController.java
  7. 1
      APIService/src/main/java/com/storeroom/controller/TestApiServiceController.java
  8. 93
      APIService/src/main/java/com/storeroom/controller/WebSoketService.java
  9. 25
      APIService/src/main/java/com/storeroom/service/PushService.java
  10. 51
      APIService/src/main/java/com/storeroom/service/impl/ApiServiceImpl.java
  11. 46
      APIService/src/main/java/com/storeroom/service/impl/PushServiceImpl.java
  12. 2
      common/pom.xml
  13. 6
      pom.xml
  14. 6
      system/pom.xml
  15. 9
      system/src/main/resources/application.yml

6
APIService/pom.xml

@ -28,6 +28,12 @@
<artifactId>okhttps-fastjson</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.33.Final</version>
</dependency>
</dependencies>
</project>

156
APIService/src/main/java/com/storeroom/common/NettyServer.java

@ -0,0 +1,156 @@
package com.storeroom.common;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import io.netty.channel.socket.SocketChannel;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
@Component
public class NettyServer {
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
private static final String WEBSOCKET_PROTOCOL="WebSocket";
/**
* 端口号
*/
@Value("${webSocket.netty.port}")
private int port;
/**
* webSocket路径
*/
@Value("${webSocket.netty.path}")
private String webSocketPath;
/**
* 在Netty心跳检测中配置 - 读空闲超时时间设置
*/
@Value("${webSocket.netty.readerIdleTime}")
private long readerIdleTime;
/**
* 在Netty心跳检测中配置 - 写空闲超时时间设置
*/
@Value("${webSocket.netty.writerIdleTime}")
private long writerIdleTime;
/**
* 在Netty心跳检测中配置 - 读写空闲超时时间设置
*/
@Value("${webSocket.netty.allIdleTime}")
private long allIdleTime;
@Autowired
private WebSocketHandler webSocketHandler;
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
/**
* 启动
* @throws InterruptedException
*/
private void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
bootstrap.group(bossGroup,workGroup);
// 设置NIO类型的channel
bootstrap.channel(NioServerSocketChannel.class);
// 设置监听端口
bootstrap.localAddress(new InetSocketAddress(port));
// 连接到达时会创建一个通道
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 心跳检测(一般情况第一个设置,如果超时了,则会调用userEventTriggered方法,且会告诉你超时的类型)
ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
// 流水线管理通道中的处理程序Handler用来处理业务
// webSocket协议本身是基于http协议的所以这边也要使用http编解码器
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ObjectEncoder());
// 以块的方式来写的处理器
ch.pipeline().addLast(new ChunkedWriteHandler());
/*
说明
1http数据在传输过程中是分段的HttpObjectAggregator可以将多个段聚合
2这就是为什么当浏览器发送大量数据时就会发送多次http请求
*/
ch.pipeline().addLast(new HttpObjectAggregator(8192));
/*
说明
1对应webSocket它的数据是以帧frame的形式传递
2浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
3核心功能是将http协议升级为ws协议保持长连接
*/
ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
// 自定义的handler处理业务逻辑
ch.pipeline().addLast(webSocketHandler);
}
});
// 配置完成开始绑定server通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = bootstrap.bind().sync();
log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}
/**
* 释放资源
* @throws InterruptedException
*/
@PreDestroy
public void destroy() throws InterruptedException {
if(bossGroup != null){
bossGroup.shutdownGracefully().sync();
}
if(workGroup != null){
workGroup.shutdownGracefully().sync();
}
}
/**
* 初始化(新线程开启)
*/
@PostConstruct()
public void init() {
//需要开启一个新的线程来执行netty server 服务器
new Thread(() -> {
try {
start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

118
APIService/src/main/java/com/storeroom/common/WebSocketHandler.java

@ -0,0 +1,118 @@
package com.storeroom.common;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.storeroom.config.NettyConfig;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("handlerAdded 被调用"+ctx.channel().id().asLongText());
// 添加到channelGroup 通道组
NettyConfig.getChannelGroup().add(ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 获取用户ID,关联channel
JSONObject jsonObject = JSONUtil.parseObj(msg.text());
String uid = jsonObject.getStr("uid");
// 当用户ID已存入通道内,则不进行写入,只有第一次建立连接时才会存入,其他情况发送uid则为心跳需求
if(!NettyConfig.getUserChannelMap().containsKey(uid)){
log.info("服务器收到消息:{}",msg.text());
NettyConfig.getUserChannelMap().put(uid,ctx.channel());
// 将用户ID作为自定义属性加入到channel中方便随时channel中获取用户ID
AttributeKey<String> key = AttributeKey.valueOf("userId");
ctx.channel().attr(key).setIfAbsent(uid);
// 回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器连接成功!"));
}else{
// 前端定时请求,保持心跳连接,避免服务端误删通道
ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
}
}
/**
* 移除通道及关联用户
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText());
// 删除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
}
/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("异常:{}",cause.getMessage());
// 删除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
ctx.close();
}
/**
* 心跳检测相关方法 - 会主动调用handlerRemoved
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.ALL_IDLE){
//清除超时会话
ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
writeAndFlush.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.channel().close();
}
});
}
}else{
super.userEventTriggered(ctx, evt);
}
}
/**
* 删除用户与channel的对应关系
* @param ctx
*/
private void removeUserId(ChannelHandlerContext ctx){
AttributeKey<String> key = AttributeKey.valueOf("userId");
String userId = ctx.channel().attr(key).get();
NettyConfig.getUserChannelMap().remove(userId);
log.info("删除用户与channel的对应关系,uid:{}",userId);
}
}

43
APIService/src/main/java/com/storeroom/config/NettyConfig.java

@ -0,0 +1,43 @@
package com.storeroom.config;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
public class NettyConfig {
/**
* 定义一个channel管理所有channel
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存放用户信息发送指定用户
*/
private static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();
private NettyConfig() {
}
/**
* 获取用户组
* @return /
*/
public static ChannelGroup getChannelGroup(){
return channelGroup;
}
/**
* 获取用户channel map
* @return /
*/
public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
return userChannelMap;
}
}

19
APIService/src/main/java/com/storeroom/config/WebSocketConfig.java

@ -0,0 +1,19 @@
package com.storeroom.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Service
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}

35
APIService/src/main/java/com/storeroom/controller/PushController.java

@ -0,0 +1,35 @@
package com.storeroom.controller;
import com.storeroom.annotaion.rest.AnonymousGetMapping;
import com.storeroom.annotaion.rest.AnonymousPostMapping;
import com.storeroom.service.PushService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/push/")
@RequiredArgsConstructor
public class PushController {
private final PushService pushService;
@AnonymousPostMapping("pushAll")
public void pushAll(@RequestBody String msg) {
pushService.pushMsgToAll(msg);
}
@AnonymousPostMapping("pushOne")
public void pushMsgToOne(@RequestParam("userId") String userId, @RequestParam("msg") String msg) {
pushService.pushMsgToOne(userId, msg);
}
@AnonymousGetMapping("getConnectNum")
public int getConnectNum() {
return pushService.getConnectCount();
}
}

1
APIService/src/main/java/com/storeroom/controller/TestApiServiceController.java

@ -51,6 +51,7 @@ public class TestApiServiceController {
@ApiOperation("获取设备监控参数实时值")
@AnonymousPostMapping("getrealtimedata")
public ApiResponse<Object> getrealtimedata(@RequestBody Set<String> deviceIds) {
;
return ApiResponse.success(apiService.getRealTimeData(deviceIds));
}
}

93
APIService/src/main/java/com/storeroom/controller/WebSoketService.java

@ -0,0 +1,93 @@
package com.storeroom.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ServerEndpoint(value = "/webSocket/{name}")
public class WebSoketService {
/**
* 与某个客户端的连接对话需要通过它来给客户端发送消息
*/
private Session session;
/**
* 标识当前连接客户端的用户名
*/
private String name;
/**
* 用于存所有的连接服务的客户端这个对象存储是安全的
*/
private static ConcurrentHashMap<String, WebSoketService> webSocketSet = new ConcurrentHashMap<>();
@OnOpen
public void OnOpen(Session session, @PathParam(value = "name") String name) {
this.session = session;
this.name = name;
// name是用来表示唯一客户端如果需要指定发送需要指定发送通过name来区分
webSocketSet.put(name, this);
log.info("[WebSocket] 连接成功,当前连接人数为:={}", webSocketSet.size());
}
@OnClose
public void OnClose() {
webSocketSet.remove(this.name);
log.info("[WebSocket] 退出成功,当前连接人数为:={}", webSocketSet.size());
}
@OnMessage
public void OnMessage(String message) {
log.info("[WebSocket] 收到消息:{}", message);
//判断是否需要指定发送具体规则自定义
if (message.indexOf("TOUSER") == 0) {
String name = message.substring(message.indexOf("TOUSER") + 6, message.indexOf(";"));
AppointSending(name, message.substring(message.indexOf(";") + 1, message.length()));
} else {
GroupSending(message);
}
}
/**
* 群发
*
* @param message
*/
public void GroupSending(String message) {
for (String name : webSocketSet.keySet()) {
try {
webSocketSet.get(name).session.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 指定发送
*
* @param name
* @param message
*/
public void AppointSending(String name, String message) {
try {
webSocketSet.get(name).session.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}

25
APIService/src/main/java/com/storeroom/service/PushService.java

@ -0,0 +1,25 @@
package com.storeroom.service;
public interface PushService {
/**
* 推送给指定用户
* @param userId 用户ID
* @param msg 消息信息
*/
void pushMsgToOne(String userId,String msg);
/**
* 推送给所有用户
* @param msg 消息信息
*/
void pushMsgToAll(String msg);
/**
* 获取当前连接数
* @return 连接数
*/
int getConnectCount();
}

51
APIService/src/main/java/com/storeroom/service/impl/ApiServiceImpl.java

@ -11,21 +11,31 @@ import com.storeroom.service.dto.GetCurAlarmDto;
import com.storeroom.service.dto.RealTimeDataDto;
import com.storeroom.utils.FastjsonUtils;
import com.storeroom.utils.HttpUtils;
import com.storeroom.utils.RedisUtils;
import com.storeroom.utils.StringUtils;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import okhttp3.*;
import org.apache.http.HttpResponse;
import org.apache.http.util.EntityUtils;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Service("ApiServiceImpl")
@RequiredArgsConstructor
@CacheConfig(cacheNames = "thirdApi")
public class ApiServiceImpl implements ApiService {
String access_token = "";
private final RedisUtils redisUtils;
@SneakyThrows
@Override
public String getToken() {
@ -43,21 +53,26 @@ public class ApiServiceImpl implements ApiService {
access_token = Data.get("access_token").toString();
}
});
System.out.println(access_token);
Object token = redisUtils.get("api_token");
if (!ObjectUtils.isEmpty(token)) {
redisUtils.del("api_token");
}
redisUtils.set("api_token", access_token, 30, TimeUnit.MINUTES);
return access_token;
} else {
//throw new BaseException("访问失败" + response.getStatusLine().getStatusCode() + "");
return null;
throw new BaseException("获取第三方token接口失败,接口异常");
}
}
@SneakyThrows
@Override
public List<DeviceAllDto> getDeviceAll() {
HashMap<String, String> map = new HashMap<>();
HashMap<String, String> body = new HashMap<>();
String token = getToken();
String token = findApiToken();
map.put("Content-type", "application/json");
map.put("Authorization", "Bearer" + token + "");
body.put("keys", "");
@ -96,7 +111,7 @@ public class ApiServiceImpl implements ApiService {
@Override
public Object getAllDeviceAlarm() {
HashMap<String, String> map = new HashMap<>();
String token = getToken();
String token = findApiToken();
map.put("Content-type", "application/json");
map.put("Authorization", "Bearer" + token + "");
HttpResponse response = HttpUtils.doGet("http://jiton.8800.org:800", "/Api/Third/GetCurAlarm", "Get", map, null);
@ -132,10 +147,25 @@ public class ApiServiceImpl implements ApiService {
}
}
/**
* 查询第三方token是否存在
* @return
*/
private String findApiToken() {
Object token = redisUtils.get("api_token");
String APiToken = "";
if (ObjectUtils.isEmpty(token)) {
APiToken = getToken();
return APiToken;
}
APiToken = token.toString();
return APiToken;
}
@SneakyThrows
@Override
public Object getRealTimeData(Set<String> deviceIds) {
String token = getToken();
String token = findApiToken();
JSONObject json = new JSONObject();
json.put("device_ids", deviceIds);
RequestBody body = RequestBody.create(
@ -148,7 +178,7 @@ public class ApiServiceImpl implements ApiService {
Call call = client.newCall(request);
Response response = call.execute();
Map<String, Object> resultMap;
List<RealTimeDataDto> list =new ArrayList<>();
List<RealTimeDataDto> list = new ArrayList<>();
if (!ObjectUtils.isEmpty(response)) {
String strResult = response.body().string();
resultMap = FastjsonUtils.toJavaMap(strResult);
@ -156,7 +186,7 @@ public class ApiServiceImpl implements ApiService {
if (k.equals("Data") && !ObjectUtils.isEmpty(v)) {
JSONArray jsonArray = (JSONArray) v;
for (int i= 0 ;i<jsonArray.size();i++){
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
RealTimeDataDto realTimeDataDto = new RealTimeDataDto();
realTimeDataDto.setProperty_id(jsonObject.get("property_id") == null ? "" : jsonObject.get("property_id").toString());
@ -171,9 +201,10 @@ public class ApiServiceImpl implements ApiService {
}
});
return list;
} else {
throw new BaseException("第三方接口连接异常");
}
return list;
}
}

46
APIService/src/main/java/com/storeroom/service/impl/PushServiceImpl.java

@ -0,0 +1,46 @@
package com.storeroom.service.impl;
import com.storeroom.config.NettyConfig;
import com.storeroom.service.PushService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class PushServiceImpl implements PushService {
/**
* 推送指定用户
* @param userId 用户ID
* @param msg 消息信息
*/
@Override
public void pushMsgToOne(String userId, String msg) {
ConcurrentHashMap<String, Channel> userChannelMap = NettyConfig.getUserChannelMap();
Channel channel = userChannelMap.get(userId);
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
/**
* 推送给所有用户
* @param msg 消息信息
*/
@Override
public void pushMsgToAll(String msg) {
NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
}
/**
* 获取当前连接数
* @return /
*/
@Override
public int getConnectCount() {
return NettyConfig.getChannelGroup().size();
}
}

2
common/pom.xml

@ -38,6 +38,8 @@
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.belerweb</groupId>
<artifactId>pinyin4j</artifactId>

6
pom.xml

@ -229,6 +229,12 @@
<version>1.21</version>
</dependency>
<!-- Spring boot websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>javax.persistence-api</artifactId>

6
system/pom.xml

@ -68,11 +68,7 @@
</dependency>
<!-- Spring boot websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- quartz -->
<dependency>

9
system/src/main/resources/application.yml

@ -80,4 +80,13 @@ user-cache:
hand-held:
ip: 47.98.148.152:8057
webSocket:
netty:
port: 7071
path: /wsServer
readerIdleTime: 30 #读空闲超时时间设置(Netty心跳检测配置)
writerIdleTime: 30 #写空闲超时时间设置(Netty心跳检测配置)
allIdleTime: 30 #读写空闲超时时间设置(Netty心跳检测配置)
Loading…
Cancel
Save