ChatWebSocketEndPoint.java 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package com.webchat.connect.websocket;
  2. import com.webchat.common.enums.RedisMessageChannelTopicEnum;
  3. import com.webchat.common.service.RedisMessageSender;
  4. import com.webchat.common.util.SpringContextUtil;
  5. import lombok.Data;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.stereotype.Component;
  8. import javax.websocket.*;
  9. import javax.websocket.server.PathParam;
  10. import javax.websocket.server.ServerEndpoint;
  11. import java.util.Map;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. /**
  14. * OnOpen 表示有浏览器链接过来的时候被调用
  15. * OnClose 表示浏览器发出关闭请求的时候被调用
  16. * OnMessage 表示浏览器发消息的时候被调用
  17. * OnError 表示有错误发生,比如网络断开了等等
  18. */
  19. @Data
  20. @Slf4j
  21. @Component
  22. @ServerEndpoint("/connect-service/ws/chat/{userId}")
  23. public class ChatWebSocketEndPoint{
  24. /**
  25. * 在线人数
  26. */
  27. public static int onlineNumber = 0;
  28. /**
  29. * 以用户的Id作为key,WebSocket为对象保存起来
  30. */
  31. public static Map<String, ChatWebSocketEndPoint> clients = new ConcurrentHashMap<String, ChatWebSocketEndPoint>();
  32. /**
  33. * 会话
  34. */
  35. private Session session;
  36. /**
  37. * 用户ID
  38. */
  39. private String userId;
  40. /**
  41. * 建立连接
  42. *
  43. * @param session
  44. */
  45. @OnOpen
  46. public void onOpen(@PathParam("userId") String userId, Session session) {
  47. this.onlineNumber++;
  48. this.userId = userId;
  49. this.session = session;
  50. clients.put(userId, this);
  51. log.info("onOpen success. sessionId:{}, userId:{}, online user count:{}", session.getId(), userId, onlineNumber);
  52. }
  53. @OnError
  54. public void onError(Session session, Throwable error) {
  55. log.info("服务端发生了错误, error message:{}", error.getMessage());
  56. }
  57. /**
  58. * 连接关闭
  59. */
  60. @OnClose
  61. public void onClose() {
  62. this.onlineNumber--;
  63. this.clients.remove(userId);
  64. log.info("onClose success! online user count:P{}", this.onlineNumber);
  65. }
  66. /**
  67. * 收到客户端的消息
  68. *
  69. * @param message 消息
  70. * @param session 会话
  71. */
  72. @OnMessage
  73. public void onMessage(String message, Session session) {
  74. if ("ping".equals(message)) {
  75. // 心跳检测
  76. clients.get(userId).session.getAsyncRemote().sendText("ok");
  77. return;
  78. }
  79. /**
  80. * 基于redis发布订阅模式的分布式WebSocketSession共享问题解决方案
  81. * 发布消息事件到Chat Topic
  82. */
  83. RedisMessageSender redisMessageSender = SpringContextUtil.getBean(RedisMessageSender.class);
  84. redisMessageSender.sendMessage(RedisMessageChannelTopicEnum.CHAT.getChannel(), message);
  85. }
  86. }