123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package com.webchat.connect.websocket.handler;
- import com.webchat.common.enums.RoleCodeEnum;
- import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
- import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
- import com.webchat.common.util.JsonUtil;
- import com.webchat.connect.service.AccountService;
- import com.webchat.domain.vo.request.mess.ChatMessageRequestVO;
- import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.collections.MapUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.socket.CloseStatus;
- import org.springframework.web.socket.TextMessage;
- import org.springframework.web.socket.WebSocketSession;
- import org.springframework.web.socket.handler.TextWebSocketHandler;
- import java.util.Collections;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- @Slf4j
- public class ChatWebSocketEndPointServletHandler extends TextWebSocketHandler {
- @Autowired
- private MessageQueueProducer<Object, Long> messageQueueProducer;
- @Autowired
- private AccountService accountService;
- /**
- * Map<KEY1, Map<KEY2, WebSocketSession>>
- *
- * KEY1:用于区分场景,如:PC-APP、 PC-CHAT、WAP-APP ,详见:WebSocketBizCodeEnum
- * KEY2: 用户
- * WebSocketSession: 不同场景下用户的WebSocket Session对象 -- 全双工、有状态
- */
- public static Map<String, Map<String, WebSocketSession>> sessions = new ConcurrentHashMap<>();
- /**
- * 连接建立
- *
- * @param session
- * @throws Exception
- */
- @Override
- public void afterConnectionEstablished(WebSocketSession session) throws Exception {
- // 获取路径参数
- Map<String, Object> attributes = session.getAttributes();
- String bizCode = (String) attributes.get("bizCode");
- String userId = (String) attributes.get("userId");
- Map<String, WebSocketSession> userSessions = sessions.get(bizCode);
- if (userSessions == null) {
- userSessions = new ConcurrentHashMap<>();
- sessions.put(bizCode, userSessions);
- }
- userSessions.put(userId, session);
- log.info("Chat WebSocket connection ====> userId:{}, sessionId:{}", userId, session.getId());
- }
- /**
- * 收到消息
- *
- * @param session
- * @param message
- * @throws Exception
- */
- @Override
- protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
- String payload = message.getPayload();
- System.out.println("Chat WebSocket Connection Received message: " + payload);
- if ("ping".equals(payload)) {
- // 心跳检测
- return;
- }
- ChatMessageRequestVO chatMessage = JsonUtil.fromJson(payload, ChatMessageRequestVO.class);
- if (chatMessage == null) {
- return;
- }
- // 获取对话消息接受人
- String receiverId = chatMessage.getReceiverId();
- UserBaseResponseInfoVO accountInfo = accountService.accountInfo(receiverId);
- if (accountInfo == null) {
- return;
- }
- if (RoleCodeEnum.ROBOT.getCode().equals(accountInfo.getRoleCode()) ||
- RoleCodeEnum.AIBOT.getCode().equals(accountInfo.getRoleCode())) {
- /**
- * 机器人AGENT对话,走AGENT工作流处理
- */
- messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_CHAT_ROBOT, chatMessage);
- } else {
- /**
- * 对话消息,走MQ广播到所有消费者(底层支持RocketMQ、Redis等)
- */
- messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_CHAT_MESSAGE, chatMessage);
- }
- }
- /**
- * 断开连接
- *
- * @param session
- * @param status
- * @throws Exception
- */
- @Override
- public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
- System.out.println("Chat WebSocket Connection closed: " + session.getId());
- }
- public static WebSocketSession getSession(String bizCode, String userId) {
- if (StringUtils.isBlank(bizCode) || StringUtils.isBlank(userId)) {
- return null;
- }
- Map<String, WebSocketSession> userSessions = sessions.get(bizCode);
- if (MapUtils.isEmpty(userSessions)) {
- return null;
- }
- return userSessions.get(userId);
- }
- public static Map<String, WebSocketSession> getSessions(String bizCode, Set<String> userIds) {
- if (StringUtils.isBlank(bizCode) || CollectionUtils.isEmpty(userIds)) {
- return Collections.emptyMap();
- }
- Map<String, WebSocketSession> userSessions = sessions.get(bizCode);
- if (MapUtils.isEmpty(userSessions)) {
- return Collections.emptyMap();
- }
- Map<String, WebSocketSession> userSessionMap = new ConcurrentHashMap<>();
- userIds.forEach(uid -> {
- WebSocketSession ws = userSessions.get(uid);
- if (ws != null) {
- userSessionMap.put(uid, ws);
- }
- });
- return userSessionMap;
- }
- }
|