Selaa lähdekoodia

群聊多人音视频通知开发

wangqi49 1 kuukausi sitten
vanhempi
commit
e3d815aafc
16 muutettua tiedostoa jossa 388 lisäystä ja 30 poistoa
  1. 2 2
      resources/nacos-yaml/webchat-pay-service-dev.yaml
  2. 3 12
      resources/nacos-yaml/webchat-pgc-service-dev.yaml
  3. 3 11
      resources/nacos-yaml/webchat-ugc-service-dev.yaml
  4. 7 2
      webchat-client-chat/src/main/java/com/webchat/client/chat/controller/AccountController.java
  5. 2 1
      webchat-common/src/main/java/com/webchat/common/enums/ChatMessageTypeEnum.java
  6. 2 0
      webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageBroadChannelEnum.java
  7. 4 0
      webchat-connect/src/main/java/com/webchat/connect/messagequeue/config/RedisConfig.java
  8. 32 0
      webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/redis/GroupVideoCallRedisQueueListener.java
  9. 26 0
      webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/rocketmq/GroupVideoRocketQueueConsumer.java
  10. 96 0
      webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/service/GroupVideoCallConsumeService.java
  11. 18 0
      webchat-connect/src/main/java/com/webchat/connect/service/AccountService.java
  12. 16 1
      webchat-connect/src/main/java/com/webchat/connect/websocket/config/WebSocketConnectServletConfig.java
  13. 144 0
      webchat-connect/src/main/java/com/webchat/connect/websocket/handler/GroupVideoWebSocketEndPointServletHandler.java
  14. 1 1
      webchat-connect/src/main/java/com/webchat/connect/websocket/handler/P2PVideoWebSocketEndPointServletHandler.java
  15. 29 0
      webchat-connect/src/main/java/com/webchat/connect/websocket/interceptor/GroupVideoWebSocketEndPointServletInterceptor.java
  16. 3 0
      webchat-domain/src/main/java/com/webchat/domain/vo/request/mess/VideoChatMessageRequestVO.java

+ 2 - 2
resources/nacos-yaml/webchat-pay-service-dev.yaml

@@ -1,7 +1,7 @@
+#---------------------------------数据库配置----------------------------------#
 spring:
-  # set mysql config
   datasource:
-    url: jdbc:mysql://127.0.0.1:3306/webchat_payment?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8&useSSL=false
+    url: jdbc:mysql://127.0.0.1:3306/webchat_payment?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2b8&allowPublicKeyRetrieval=true
     username: root
     password: 12345678
     driver-class-name: com.mysql.jdbc.Driver

+ 3 - 12
resources/nacos-yaml/webchat-pgc-service-dev.yaml

@@ -1,23 +1,14 @@
 #---------------------------------数据库配置----------------------------------#
 spring:
   datasource:
-    url: jdbc:mysql://127.0.0.1:3306/webchat_pgc?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8&&useSSL=false
+    url: jdbc:mysql://127.0.0.1:3306/webchat_pgc?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2b8&allowPublicKeyRetrieval=true
     username: root
     password: 12345678
     driver-class-name: com.mysql.jdbc.Driver
-    type: com.zaxxer.hikari.HikariDataSource
     hikari:
-      minimum-idle: 10
       maximum-pool-size: 50
-      auto-commit: false
-      idle-timeout: 30000
-      pool-name: DatebookHikariCP
-      max-lifetime: 1800000
-      connection-timeout: 30000
-      connection-test-query: SELECT 1
-      connection-init-sql: set names utf8mb4
-    jpa:
-      show-sql: true
+  jpa:
+    show-sql: true
   #---------------------------------redis----------------------------------#
   data:
     redis:

+ 3 - 11
resources/nacos-yaml/webchat-ugc-service-dev.yaml

@@ -1,20 +1,12 @@
+#---------------------------------数据库配置----------------------------------#
 spring:
   datasource:
-    url: jdbc:mysql://127.0.0.1:3306/webchat_ugc?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8&&useSSL=false
+    url: jdbc:mysql://127.0.0.1:3306/webchat_ugc?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2b8&allowPublicKeyRetrieval=true
     username: root
     password: 12345678
     driver-class-name: com.mysql.jdbc.Driver
-    type: com.zaxxer.hikari.HikariDataSource
     hikari:
-    minimum-idle: 10
-    maximum-pool-size: 50
-    auto-commit: false
-    idle-timeout: 30000
-    pool-name: DatebookHikariCP
-    max-lifetime: 1800000
-    connection-timeout: 30000
-    connection-test-query: SELECT 1
-    connection-init-sql: set names utf8mb4
+      maximum-pool-size: 50
   jpa:
     show-sql: true
   data:

+ 7 - 2
webchat-client-chat/src/main/java/com/webchat/client/chat/controller/AccountController.java

@@ -9,6 +9,7 @@ import com.webchat.common.helper.SessionHelper;
 import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
 import jakarta.servlet.http.HttpServletRequest;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
@@ -27,6 +28,9 @@ public class AccountController {
     @Autowired
     private HttpServletRequest request;
 
+    @Value("${oauth.server-url}")
+    private String logoutServer;
+
     @GetMapping("/current/info")
     public APIResponseBean<UserBaseResponseInfoVO> getCurrentUserInfo() {
         String userId = SessionHelper.getCurrentUserId();
@@ -39,9 +43,10 @@ public class AccountController {
      * @return
      */
     @GetMapping("/logout")
-    public APIResponseBean<Boolean> logout() {
+    public APIResponseBean<String> logout() {
         userSessionService.logout(request);
-        return APIResponseBeanUtil.success(true);
+        String ssoServerUrl = String.format(logoutServer, request.getHeader("origin-url"));
+        return APIResponseBeanUtil.success(ssoServerUrl);
     }
 
     /**

+ 2 - 1
webchat-common/src/main/java/com/webchat/common/enums/ChatMessageTypeEnum.java

@@ -16,7 +16,8 @@ public enum ChatMessageTypeEnum {
     APPLY(5, "申请添加好友"),
     WALLET_BALANCE(6, "钱包余额"),
     CHATTING_REFRESH(7, "刷新对话列表"),
-    VIDEO_OFFER(8, "音视频呼叫");
+    VIDEO_CALL(8, "音视频呼叫"),
+    GROUP_VIDEO_CALL(9, "群聊音视频呼叫");
 
     private Integer type;
     private String desc;

+ 2 - 0
webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageBroadChannelEnum.java

@@ -18,6 +18,8 @@ public enum MessageBroadChannelEnum {
 
     QUEUE_VIDEO_SDP("queue_video_sdp", "音视频通话信令"),
 
+    QUEUE_GROUP_VIDEO_CALL("queue_group_video_call", "群聊多人音视频呼叫频道"),
+
     QUEUE_CHAT_ROBOT("queue_chat_robot", "机器人对话消息队列"),
 
     ;

+ 4 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/config/RedisConfig.java

@@ -4,6 +4,7 @@ import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
 import com.webchat.connect.messagequeue.consumer.redis.ArticlePushRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.ChatMessageRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.ChatNotifyRedisQueueListener;
+import com.webchat.connect.messagequeue.consumer.redis.GroupVideoCallRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.WebRtcSDPRedisQueueListener;
 import jakarta.annotation.Resource;
 import org.springframework.context.annotation.Bean;
@@ -23,6 +24,8 @@ public class RedisConfig {
     private ArticlePushRedisQueueListener articlePushRedisQueueListener;
     @Resource
     private WebRtcSDPRedisQueueListener webRtcSDPRedisQueueListener;
+    @Resource
+    private GroupVideoCallRedisQueueListener groupVideoCallRedisQueueListener;
 
     @Bean
     public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
@@ -34,6 +37,7 @@ public class RedisConfig {
         container.addMessageListener(chatMessageRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_CHAT_MESSAGE.getChannel()));
         container.addMessageListener(articlePushRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_ARTICLE_PUSH_MESSAGE.getChannel()));
         container.addMessageListener(webRtcSDPRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_VIDEO_SDP.getChannel()));
+        container.addMessageListener(groupVideoCallRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_GROUP_VIDEO_CALL.getChannel()));
         return container;
     }
 }

+ 32 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/redis/GroupVideoCallRedisQueueListener.java

@@ -0,0 +1,32 @@
+package com.webchat.connect.messagequeue.consumer.redis;
+
+import com.webchat.connect.messagequeue.consumer.service.GroupVideoCallConsumeService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+
+@Slf4j
+@Component
+public class GroupVideoCallRedisQueueListener implements MessageListener {
+
+
+    @Autowired
+    private GroupVideoCallConsumeService groupVideoCallConsumeService;
+
+    @Autowired
+    private RedisTemplate redisTemplate;
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+
+        String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
+        String messageStr = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
+        log.info("ChatMessageNotifyQueueListener.onMessage =====> channel:{} messageStr:{}", channel, messageStr);
+
+        groupVideoCallConsumeService.consume(messageStr);
+    }
+}

+ 26 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/rocketmq/GroupVideoRocketQueueConsumer.java

@@ -0,0 +1,26 @@
+package com.webchat.connect.messagequeue.consumer.rocketmq;
+
+import com.webchat.connect.messagequeue.consumer.service.GroupVideoCallConsumeService;
+import com.webchat.connect.messagequeue.consumer.service.WebRtcSDPConsumeService;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+//@RocketMQMessageListener(consumerGroup = "web_chat", topic = "queue_group_video_call", messageModel = MessageModel.BROADCASTING)
+public class GroupVideoRocketQueueConsumer implements RocketMQListener<String> {
+
+        @Autowired
+        private GroupVideoCallConsumeService groupVideoCallConsumeService;
+
+        /**
+         * 处理来自IM 对话相关消息
+         *
+         * @param message
+         */
+        @Override
+        public void onMessage(String message) {
+
+             groupVideoCallConsumeService.consume(message);
+        }
+}

+ 96 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/service/GroupVideoCallConsumeService.java

@@ -0,0 +1,96 @@
+package com.webchat.connect.messagequeue.consumer.service;
+
+
+import com.google.common.collect.Sets;
+import com.webchat.common.constants.ConnectConstants;
+import com.webchat.common.enums.ChatMessageTypeEnum;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.connect.service.AccountService;
+import com.webchat.connect.websocket.handler.ChatWebSocketEndPointServletHandler;
+import com.webchat.domain.vo.request.mess.MessageNotifyVO;
+import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.Assert;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * 处理群聊多人音视频呼叫消息
+ */
+@Service
+public class GroupVideoCallConsumeService {
+
+    @Autowired
+    private AccountService accountService;
+
+    /**
+     * 处理群聊多人音视频呼叫消息
+     *
+     * @param message
+     */
+    public void consume(String message) {
+
+        if (StringUtils.isBlank(message)) {
+            // TODO, 预警
+            return;
+        }
+        MessageNotifyVO messageBase = JsonUtil.fromJson(message, MessageNotifyVO.class);
+        Assert.notNull(messageBase, "群聊多人音视频呼叫消息反序列化失败!");
+        // 这里的senderId不是音视频的实际发起人,而是群聊id(消息代理人)
+        String groupId = messageBase.getSenderId();
+        String senderId = messageBase.getProxySenderId();
+        // 获取群聊下所有用户
+        Set<String> userIds = accountService.getGroupUserIds(groupId);
+        if (CollectionUtils.isEmpty(userIds)) {
+            // 群聊下没有任何用户
+            return;
+        }
+        Map<String, UserBaseResponseInfoVO> accountMap = accountService.batchGet(Sets.newHashSet(senderId, groupId));
+        if (accountMap == null) {
+            // TODO, 预警
+            return;
+        }
+
+        UserBaseResponseInfoVO group = accountMap.get(groupId);
+        UserBaseResponseInfoVO sender = accountMap.get(senderId);
+
+        /**
+         * 走client最外层对话ws推送音视频呼叫提醒
+         */
+        Set<String> bizCodes = ConnectConstants.ConnectBiz.getBizCode(ConnectConstants.BizEnum.INDEX);
+        for (String bizCode : bizCodes) {
+            for (String userId : userIds) {
+                if (ObjectUtils.equals(userId, senderId)) {
+                    continue;
+                }
+                /**
+                 * 群场景推送,类如用户多端登录,同时移动、PC在线,多端需要同时收到音视频邀请通知
+                 */
+                WebSocketSession session = ChatWebSocketEndPointServletHandler.getSession(bizCode, userId);
+                if (session == null || !session.isOpen()) {
+                    continue;
+                }
+                MessageNotifyVO videoCallMessage = new MessageNotifyVO();
+                videoCallMessage.setSender(group);
+                videoCallMessage.setProxySender(sender);
+                videoCallMessage.setType(ChatMessageTypeEnum.GROUP_VIDEO_CALL.getType());
+                try {
+                    session.sendMessage(new TextMessage(JsonUtil.toJsonString(videoCallMessage)));
+                } catch (IOException e) {
+                    // 这里不能跑出异常,否则会中断整个推送任务
+                    // TODO 处理策略:重拾一次/预警通知
+                }
+            }
+
+        }
+
+    }
+}

+ 18 - 0
webchat-connect/src/main/java/com/webchat/connect/service/AccountService.java

@@ -11,7 +11,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.web.bind.annotation.RequestBody;
 
+import java.util.Map;
 import java.util.Set;
 
 @Slf4j
@@ -22,6 +24,22 @@ public class AccountService {
     private UserServiceClient userServiceClient;
 
     /**
+     * 批量查询账号详情
+     *
+     * @param userIds
+     * @return
+     */
+    public Map<String, UserBaseResponseInfoVO> batchGet(Set<String> userIds) {
+        APIResponseBean<Map<String, UserBaseResponseInfoVO>> responseBean = userServiceClient.batchGet(userIds);
+        if (APIResponseBeanUtil.isOk(responseBean)) {
+            return responseBean.getData();
+        }
+        // TODO 建议做都低策略(两级缓存 local cache + redis)
+        return null;
+    }
+
+
+    /**
      * 获取账号详情,底层走Redis查询
      * @param account
      * @return

+ 16 - 1
webchat-connect/src/main/java/com/webchat/connect/websocket/config/WebSocketConnectServletConfig.java

@@ -1,8 +1,10 @@
 package com.webchat.connect.websocket.config;
 
 import com.webchat.connect.websocket.handler.ChatWebSocketEndPointServletHandler;
+import com.webchat.connect.websocket.handler.GroupVideoWebSocketEndPointServletHandler;
 import com.webchat.connect.websocket.handler.P2PVideoWebSocketEndPointServletHandler;
 import com.webchat.connect.websocket.interceptor.ChatWebSocketEndPointServletInterceptor;
+import com.webchat.connect.websocket.interceptor.GroupVideoWebSocketEndPointServletInterceptor;
 import com.webchat.connect.websocket.interceptor.VideoWebSocketEndPointServletInterceptor;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -18,6 +20,8 @@ public class WebSocketConnectServletConfig implements WebSocketConfigurer {
 
     private static final String P2P_VIDEO_WEBSOCKET_PATH = "/connect-service/ws/p2p/video/{bizCode}/{userId}";
 
+    private static final String GROUP_VIDEO_WEBSOCKET_PATH = "/connect-service/ws/group/video/{bizCode}/{groupId}/{userId}";
+
     @Override
     public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
         /**
@@ -31,7 +35,13 @@ public class WebSocketConnectServletConfig implements WebSocketConfigurer {
          */
         registry.addHandler(p2PVideoWebSocketEndPointServletHandler(), P2P_VIDEO_WEBSOCKET_PATH)
                 .setAllowedOrigins("*")
-                .addInterceptors(new VideoWebSocketEndPointServletInterceptor()); ;
+                .addInterceptors(new VideoWebSocketEndPointServletInterceptor());
+        /**
+         * 注册群聊多人音视频通话ws信令服务
+         */
+        registry.addHandler(groupVideoWebSocketEndPointServletHandler(), GROUP_VIDEO_WEBSOCKET_PATH)
+                .setAllowedOrigins("*")
+                .addInterceptors(new GroupVideoWebSocketEndPointServletInterceptor());
     }
 
     @Bean
@@ -43,4 +53,9 @@ public class WebSocketConnectServletConfig implements WebSocketConfigurer {
     public P2PVideoWebSocketEndPointServletHandler p2PVideoWebSocketEndPointServletHandler() {
         return new P2PVideoWebSocketEndPointServletHandler();
     }
+
+    @Bean
+    public GroupVideoWebSocketEndPointServletHandler groupVideoWebSocketEndPointServletHandler() {
+        return new GroupVideoWebSocketEndPointServletHandler();
+    }
 }

+ 144 - 0
webchat-connect/src/main/java/com/webchat/connect/websocket/handler/GroupVideoWebSocketEndPointServletHandler.java

@@ -0,0 +1,144 @@
+package com.webchat.connect.websocket.handler;
+
+import com.webchat.common.enums.AccountRelationTypeEnum;
+import com.webchat.common.enums.ChatMessageTypeEnum;
+import com.webchat.common.enums.VideoSDPMessageType;
+import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
+import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.connect.service.AccountService;
+import com.webchat.domain.vo.request.mess.MessageNotifyVO;
+import com.webchat.domain.vo.request.mess.VideoChatMessageRequestVO;
+import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class GroupVideoWebSocketEndPointServletHandler extends TextWebSocketHandler {
+
+    @Autowired
+    private MessageQueueProducer<Object, Long> messageQueueProducer;
+
+    @Autowired
+    private AccountService accountService;
+
+    /**
+     * Map<KEY1, Map<KEY2, WebSocketSession>>
+     *
+     * KEY1:用于区分场景,如:PC-APP、 PC-CHAT、WAP-APP ,详见:WebSocketBizCodeEnum
+     * KEY2:group id 群聊id,实现用户ws链接隔离(类似房间号)
+     * KEY3: 用户id
+     * WebSocketSession: 不同场景下用户的WebSocket Session对象 -- 全双工、有状态
+     */
+    public static Map<String, Map<String, Map<String, WebSocketSession>>> sessions = new ConcurrentHashMap<>();
+
+    /**
+     * 连接建立
+     *
+     * @param session
+     * @throws Exception
+     */
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+        // 获取路径参数
+        Map<String, Object> attributes = session.getAttributes();
+        String bizCode = (String) attributes.get("bizCode");
+        String groupId = (String) attributes.get("groupId");
+        String userId = (String) attributes.get("userId");
+        Map<String, Map<String, WebSocketSession>> groupUserSessions = sessions.get(bizCode);
+        Map<String, WebSocketSession> userSessions;
+        if (groupUserSessions == null) {
+            // 服务重新启动,首次用户发起WS链接
+            groupUserSessions = new ConcurrentHashMap<>();
+            userSessions = new ConcurrentHashMap<>();
+            groupUserSessions.put(groupId, userSessions);
+        } else {
+            // 初始化处理首页群聊下多人音视频ws session结构
+            userSessions = groupUserSessions.get(userId);
+            if (userSessions == null) {
+                userSessions = new ConcurrentHashMap<>();
+                groupUserSessions.put(groupId, userSessions);
+            }
+        }
+        userSessions.put(userId, session);
+        log.info("Chat WebSocket connection ====> groupId:{} userId:{}, sessionId:{}", groupId, userId, session.getId());
+        // TODO 新用户加入群聊音视频,通知其他在线用户(创建已在线跟新上线用的webRTC链接)--- MESH
+
+    }
+
+    /**
+     * 收到消息
+     *
+     * @param session
+     * @param message
+     * @throws Exception
+     */
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        String payload = message.getPayload();
+        System.out.println("Chat WebSocket Connection Received message: " + payload);
+        if ("ping".equals(payload)) {
+            // 心跳检测
+            return;
+        }
+        VideoChatMessageRequestVO videoMessage = JsonUtil.fromJson(payload, VideoChatMessageRequestVO.class);
+        String groupId = videoMessage.getGroupId();
+        if (StringUtils.isBlank(groupId)) {
+            return;
+        }
+        if (VideoSDPMessageType.call.name().equals(videoMessage.getType())) {
+            /**
+             * 1. 给被呼叫人推送呼叫提醒,广播音视频呼叫信息给被呼人
+             */
+            MessageNotifyVO messageBase = new MessageNotifyVO();
+            // 这里角色反转,引入消息代理人机制(同理同群聊多人对话)
+            messageBase.setProxySenderId(videoMessage.getUserId());
+            messageBase.setSenderId(groupId);
+            messageBase.setType(ChatMessageTypeEnum.VIDEO_CALL.getType());
+            messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_GROUP_VIDEO_CALL, messageBase);
+            return;
+        }
+        /**
+         * 2. 信命推送
+         */
+        messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_VIDEO_SDP, videoMessage);
+    }
+
+    /**
+     * 断开连接
+     *
+     * @param session
+     * @param status
+     * @throws Exception
+     */
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+        System.out.println("Chat WebSocket Connection closed: " + session.getId());
+    }
+
+    public static WebSocketSession getSession(String bizCode, String groupId, String userId) {
+        if (StringUtils.isBlank(bizCode) || StringUtils.isBlank(userId)) {
+            return null;
+        }
+        Map<String, Map<String, WebSocketSession>> groupUserSessions = sessions.get(bizCode);
+        if (MapUtils.isEmpty(groupUserSessions)) {
+            return null;
+        }
+        Map<String, WebSocketSession> userSessions = groupUserSessions.get(groupId);
+        if (MapUtils.isEmpty(userSessions)) {
+            return null;
+        }
+        return userSessions.get(userId);
+    }
+}

+ 1 - 1
webchat-connect/src/main/java/com/webchat/connect/websocket/handler/P2PVideoWebSocketEndPointServletHandler.java

@@ -90,7 +90,7 @@ public class P2PVideoWebSocketEndPointServletHandler extends TextWebSocketHandle
             MessageNotifyVO messageBase = new MessageNotifyVO();
             messageBase.setSender(sender);
             messageBase.setReceiverId(videoMessage.getTargetUserId());
-            messageBase.setType(ChatMessageTypeEnum.VIDEO_OFFER.getType());
+            messageBase.setType(ChatMessageTypeEnum.VIDEO_CALL.getType());
             /**
              * 广播音视频呼叫信息给被呼人
              */

+ 29 - 0
webchat-connect/src/main/java/com/webchat/connect/websocket/interceptor/GroupVideoWebSocketEndPointServletInterceptor.java

@@ -0,0 +1,29 @@
+package com.webchat.connect.websocket.interceptor;
+
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+import java.util.Map;
+
+public class GroupVideoWebSocketEndPointServletInterceptor implements HandshakeInterceptor {
+
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
+        String path = request.getURI().getPath();
+        String bizCode = path.split("/")[5];
+        String groupId = path.split("/")[6];
+        String userId = path.split("/")[7];
+        // 将bizCode、userId存储到session属性中
+        attributes.put("bizCode", bizCode);
+        attributes.put("groupId", groupId);
+        attributes.put("userId", userId);
+        return true;
+    }
+
+    @Override
+    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
+        // 在这里处理握手完成后的逻辑
+    }
+}

+ 3 - 0
webchat-domain/src/main/java/com/webchat/domain/vo/request/mess/VideoChatMessageRequestVO.java

@@ -13,6 +13,9 @@ public class VideoChatMessageRequestVO {
 
     private String targetUserId;
 
+    /**
+     * 群聊id支持群聊多人音视频
+     */
     private String groupId;
 
     private String type;