12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package com.webchat.connect.websocket;
- import com.webchat.common.enums.RedisMessageChannelTopicEnum;
- import com.webchat.common.service.RedisMessageSender;
- import com.webchat.common.util.SpringContextUtil;
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * OnOpen 表示有浏览器链接过来的时候被调用
- * OnClose 表示浏览器发出关闭请求的时候被调用
- * OnMessage 表示浏览器发消息的时候被调用
- * OnError 表示有错误发生,比如网络断开了等等
- */
- @Data
- @Slf4j
- @Component
- @ServerEndpoint("/connect-service/ws/chat/{userId}")
- public class ChatWebSocketEndPoint{
- /**
- * 在线人数
- */
- public static int onlineNumber = 0;
- /**
- * 以用户的Id作为key,WebSocket为对象保存起来
- */
- public static Map<String, ChatWebSocketEndPoint> clients = new ConcurrentHashMap<String, ChatWebSocketEndPoint>();
- /**
- * 会话
- */
- private Session session;
- /**
- * 用户ID
- */
- private String userId;
- /**
- * 建立连接
- *
- * @param session
- */
- @OnOpen
- public void onOpen(@PathParam("userId") String userId, Session session) {
- this.onlineNumber++;
- this.userId = userId;
- this.session = session;
- clients.put(userId, this);
- log.info("onOpen success. sessionId:{}, userId:{}, online user count:{}", session.getId(), userId, onlineNumber);
- }
- @OnError
- public void onError(Session session, Throwable error) {
- log.info("服务端发生了错误, error message:{}", error.getMessage());
- }
- /**
- * 连接关闭
- */
- @OnClose
- public void onClose() {
- this.onlineNumber--;
- this.clients.remove(userId);
- log.info("onClose success! online user count:P{}", this.onlineNumber);
- }
- /**
- * 收到客户端的消息
- *
- * @param message 消息
- * @param session 会话
- */
- @OnMessage
- public void onMessage(String message, Session session) {
- if ("ping".equals(message)) {
- // 心跳检测
- clients.get(userId).session.getAsyncRemote().sendText("ok");
- return;
- }
- /**
- * 基于redis发布订阅模式的分布式WebSocketSession共享问题解决方案
- * 发布消息事件到Chat Topic
- */
- RedisMessageSender redisMessageSender = SpringContextUtil.getBean(RedisMessageSender.class);
- redisMessageSender.sendMessage(RedisMessageChannelTopicEnum.CHAT.getChannel(), message);
- }
- }
|