ChatWebSocketEndPointServletHandler.java 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package com.webchat.connect.websocket.handler;
  2. import com.webchat.common.enums.RoleCodeEnum;
  3. import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
  4. import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
  5. import com.webchat.common.util.JsonUtil;
  6. import com.webchat.connect.service.AccountService;
  7. import com.webchat.domain.vo.request.mess.ChatMessageRequestVO;
  8. import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.apache.commons.collections.CollectionUtils;
  11. import org.apache.commons.collections.MapUtils;
  12. import org.apache.commons.lang3.StringUtils;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.web.socket.CloseStatus;
  15. import org.springframework.web.socket.TextMessage;
  16. import org.springframework.web.socket.WebSocketSession;
  17. import org.springframework.web.socket.handler.TextWebSocketHandler;
  18. import java.util.Collections;
  19. import java.util.Map;
  20. import java.util.Set;
  21. import java.util.concurrent.ConcurrentHashMap;
  22. @Slf4j
  23. public class ChatWebSocketEndPointServletHandler extends TextWebSocketHandler {
  24. @Autowired
  25. private MessageQueueProducer<Object, Long> messageQueueProducer;
  26. @Autowired
  27. private AccountService accountService;
  28. /**
  29. * Map<KEY1, Map<KEY2, WebSocketSession>>
  30. *
  31. * KEY1:用于区分场景,如:PC-APP、 PC-CHAT、WAP-APP ,详见:WebSocketBizCodeEnum
  32. * KEY2: 用户
  33. * WebSocketSession: 不同场景下用户的WebSocket Session对象 -- 全双工、有状态
  34. */
  35. public static Map<String, Map<String, WebSocketSession>> sessions = new ConcurrentHashMap<>();
  36. /**
  37. * 连接建立
  38. *
  39. * @param session
  40. * @throws Exception
  41. */
  42. @Override
  43. public void afterConnectionEstablished(WebSocketSession session) throws Exception {
  44. // 获取路径参数
  45. Map<String, Object> attributes = session.getAttributes();
  46. String bizCode = (String) attributes.get("bizCode");
  47. String userId = (String) attributes.get("userId");
  48. Map<String, WebSocketSession> userSessions = sessions.get(bizCode);
  49. if (userSessions == null) {
  50. userSessions = new ConcurrentHashMap<>();
  51. sessions.put(bizCode, userSessions);
  52. }
  53. userSessions.put(userId, session);
  54. log.info("Chat WebSocket connection ====> userId:{}, sessionId:{}", userId, session.getId());
  55. }
  56. /**
  57. * 收到消息
  58. *
  59. * @param session
  60. * @param message
  61. * @throws Exception
  62. */
  63. @Override
  64. protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
  65. String payload = message.getPayload();
  66. System.out.println("Chat WebSocket Connection Received message: " + payload);
  67. if ("ping".equals(payload)) {
  68. // 心跳检测
  69. return;
  70. }
  71. ChatMessageRequestVO chatMessage = JsonUtil.fromJson(payload, ChatMessageRequestVO.class);
  72. if (chatMessage == null) {
  73. return;
  74. }
  75. // 获取对话消息接受人
  76. String receiverId = chatMessage.getReceiverId();
  77. UserBaseResponseInfoVO accountInfo = accountService.accountInfo(receiverId);
  78. if (accountInfo == null) {
  79. return;
  80. }
  81. if (RoleCodeEnum.ROBOT.getCode().equals(accountInfo.getRoleCode()) ||
  82. RoleCodeEnum.AIBOT.getCode().equals(accountInfo.getRoleCode())) {
  83. /**
  84. * 机器人AGENT对话,走AGENT工作流处理
  85. */
  86. messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_CHAT_ROBOT, chatMessage);
  87. } else {
  88. /**
  89. * 对话消息,走MQ广播到所有消费者(底层支持RocketMQ、Redis等)
  90. */
  91. messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_CHAT_MESSAGE, chatMessage);
  92. }
  93. }
  94. /**
  95. * 断开连接
  96. *
  97. * @param session
  98. * @param status
  99. * @throws Exception
  100. */
  101. @Override
  102. public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
  103. System.out.println("Chat WebSocket Connection closed: " + session.getId());
  104. }
  105. public static WebSocketSession getSession(String bizCode, String userId) {
  106. if (StringUtils.isBlank(bizCode) || StringUtils.isBlank(userId)) {
  107. return null;
  108. }
  109. Map<String, WebSocketSession> userSessions = sessions.get(bizCode);
  110. if (MapUtils.isEmpty(userSessions)) {
  111. return null;
  112. }
  113. return userSessions.get(userId);
  114. }
  115. public static Map<String, WebSocketSession> getSessions(String bizCode, Set<String> userIds) {
  116. if (StringUtils.isBlank(bizCode) || CollectionUtils.isEmpty(userIds)) {
  117. return Collections.emptyMap();
  118. }
  119. Map<String, WebSocketSession> userSessions = sessions.get(bizCode);
  120. if (MapUtils.isEmpty(userSessions)) {
  121. return Collections.emptyMap();
  122. }
  123. Map<String, WebSocketSession> userSessionMap = new ConcurrentHashMap<>();
  124. userIds.forEach(uid -> {
  125. WebSocketSession ws = userSessions.get(uid);
  126. if (ws != null) {
  127. userSessionMap.put(uid, ws);
  128. }
  129. });
  130. return userSessionMap;
  131. }
  132. }