Kaynağa Gözat

基于netty,websocket server端实现

wangqi49 3 gün önce
ebeveyn
işleme
68b558d136

+ 7 - 0
webchat-connect/pom.xml

@@ -40,6 +40,13 @@
             <version>1.1</version>
             <scope>provided</scope>
         </dependency>
+
+        <!-- netty 网络应用程序框架 -->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.1.115.Final</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 15 - 0
webchat-connect/src/main/java/com/webchat/connect/netty/common/NettySocketConstants.java

@@ -0,0 +1,15 @@
+package com.webchat.connect.netty.common;
+
+
+public class NettySocketConstants {
+
+
+    /**
+     * 一对一 长链接服务端path
+     */
+    public static final String CHAT_WEBSOCKET_PATH = "/netty-service/ws/chat";
+
+    public static final String QUERY_USER_ID = "userId";
+
+    public static final String QUERY_BIZ_CODE = "bizCode";
+}

+ 69 - 0
webchat-connect/src/main/java/com/webchat/connect/netty/config/NettyWebSocketServerConfig.java

@@ -0,0 +1,69 @@
+package com.webchat.connect.netty.config;
+
+
+import com.webchat.connect.netty.handler.NettyWebSocketChannelInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+
+@Slf4j
+//@Configuration
+public class NettyWebSocketServerConfig {
+
+    @Value("${netty.websocket.server.port:9999}")
+    private Integer port;
+
+    private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup(10);
+
+    /**
+     * 服务端链接管道
+     */
+    private Channel serverChannel;
+
+    @EventListener(ContextRefreshedEvent.class)
+    public void start() throws InterruptedException {
+
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup, workerGroup)
+                 .channel(NioServerSocketChannel.class)
+                 .handler(new LoggingHandler(LogLevel.INFO))
+                 // 业务自定义handler链
+                 .childHandler(new NettyWebSocketChannelInitializer())
+                 .option(ChannelOption.SO_BACKLOG, 1024)
+                 .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        ChannelFuture future = bootstrap.bind(port).sync();
+        serverChannel = future.channel();
+        log.info("Netty WebSocket Server started on port {}", port);
+    }
+
+    @PreDestroy
+    @EventListener(ContextClosedEvent.class)
+    public void stop() {
+        if (serverChannel == null) {
+            return;
+        }
+        serverChannel.close();
+        if (!bossGroup.isShutdown()) {
+            bossGroup.shutdownGracefully();
+        }
+        if (!workerGroup.isShutdown()) {
+            workerGroup.shutdownGracefully();
+        }
+    }
+}

+ 34 - 0
webchat-connect/src/main/java/com/webchat/connect/netty/handler/NettyWebSocketChannelInitializer.java

@@ -0,0 +1,34 @@
+package com.webchat.connect.netty.handler;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+
+import static com.webchat.connect.netty.common.NettySocketConstants.CHAT_WEBSOCKET_PATH;
+
+public class NettyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+    @Override
+    protected void initChannel(SocketChannel socketChannel) throws Exception {
+
+        ChannelPipeline pipeline = socketChannel.pipeline();
+
+        // 设置http请求编解码处理器,使用默认的
+        pipeline.addLast(new HttpServerCodec());
+
+        // 合并http请求,设置允许请求数据大小,默认64kb
+        pipeline.addLast(new HttpObjectAggregator(65536 * 5));
+
+        // 请求处理handler
+        pipeline.addLast(new NettyWebSocketRequestHandler());
+
+        // websocket 协议处理
+        pipeline.addLast(new WebSocketServerProtocolHandler(CHAT_WEBSOCKET_PATH));
+
+        // 自定义消息处理handler
+        pipeline.addLast(new NettyWebSocketMessageHandler());
+    }
+}

+ 75 - 0
webchat-connect/src/main/java/com/webchat/connect/netty/handler/NettyWebSocketMessageHandler.java

@@ -0,0 +1,75 @@
+package com.webchat.connect.netty.handler;
+
+import com.webchat.connect.netty.common.NettySocketConstants;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+@Slf4j
+public class NettyWebSocketMessageHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
+
+    public static Map<String, Map<String, Channel>> channels = new ConcurrentHashMap<>();
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
+        String message = textWebSocketFrame.text();
+        if (StringUtils.equals(message, "ping")) {
+            return;
+        }
+            log.info("Netty Chat WebSocket Connection Received message: {}", message);
+        // TODO
+        ctx.writeAndFlush(new TextWebSocketFrame("ok"));
+    }
+
+    /**
+     * 链接创建
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+
+        super.channelActive(ctx);
+        log.info("Netty Chat WebSocket Connection Active!");
+    }
+
+
+    /**
+     * 链接断开
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        super.channelInactive(ctx);
+        log.info("Netty Chat WebSocket Connection Inactive!");
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        super.exceptionCaught(ctx, cause);
+        log.info("Netty Chat WebSocket Connection Exp!", cause);
+    }
+
+    private String getConnectionUserId(ChannelHandlerContext ctx) {
+
+        Object userIdObj = ctx.channel().attr(AttributeKey.valueOf(NettySocketConstants.QUERY_USER_ID)).get();
+        return userIdObj == null ? null : userIdObj.toString();
+    }
+
+    private String getConnectionBizCode(ChannelHandlerContext ctx) {
+
+        Object bizCodeObj = ctx.channel().attr(AttributeKey.valueOf(NettySocketConstants.QUERY_BIZ_CODE)).get();
+        return bizCodeObj == null ? null : bizCodeObj.toString();
+    }
+}

+ 41 - 0
webchat-connect/src/main/java/com/webchat/connect/netty/handler/NettyWebSocketRequestHandler.java

@@ -0,0 +1,41 @@
+package com.webchat.connect.netty.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.Map;
+
+import static com.webchat.connect.netty.common.NettySocketConstants.CHAT_WEBSOCKET_PATH;
+import static com.webchat.connect.netty.common.NettySocketConstants.QUERY_BIZ_CODE;
+import static com.webchat.connect.netty.common.NettySocketConstants.QUERY_USER_ID;
+
+
+@Slf4j
+public class NettyWebSocketRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+
+        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri());
+        Map<String, List<String>> parameters = queryStringDecoder.parameters();
+
+        // 从请求中获取query参数(业务参数)
+        String userId = parameters.getOrDefault(QUERY_USER_ID, List.of("")).get(0);
+        String bizCode = parameters.getOrDefault(QUERY_BIZ_CODE, List.of("")).get(0);
+        // 保存到handler链式处理上下文
+        ctx.channel().attr(AttributeKey.valueOf(QUERY_USER_ID)).set(userId);
+        ctx.channel().attr(AttributeKey.valueOf(QUERY_BIZ_CODE)).set(bizCode);
+
+        // 处理技巧
+        request.setUri(CHAT_WEBSOCKET_PATH);
+
+        // 很关键,重新修改后的request需要传给后续调用链
+        ctx.fireChannelRead(request.retain());
+    }
+}