Переглянути джерело

群聊多人音视频Mesh模式支持

wangqi49 1 місяць тому
батько
коміт
bbce12476e

+ 5 - 0
webchat-common/src/main/java/com/webchat/common/enums/RedisKeyEnum.java

@@ -287,6 +287,11 @@ public enum RedisKeyEnum {
      */
     WEB_CHAT_REDIS_QUEUE("WEB_CHAT_REDIS_QUEUE", -1L),
 
+    /**
+     * Redis 记录群聊房间在线用户
+     *
+     */
+    GROUP_VIDEO_ONLINE_USER_ZSET("GROUP_VIDEO_ONLINE_USER_ZSET", 24 * 60 * 60L),
     ;
 
 

+ 1 - 1
webchat-common/src/main/java/com/webchat/common/enums/VideoSDPMessageType.java

@@ -5,5 +5,5 @@ import lombok.Getter;
 @Getter
 public enum VideoSDPMessageType {
 
-    call, offer, answer, candidate, leave
+    online, offline, call, offer, answer, candidate, leave
 }

+ 2 - 0
webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageBroadChannelEnum.java

@@ -20,6 +20,8 @@ public enum MessageBroadChannelEnum {
 
     QUEUE_GROUP_VIDEO_CALL("queue_group_video_call", "群聊多人音视频呼叫频道"),
 
+    QUEUE_GROUP_VIDEO_USER_CHANGE("queue_group_video_user_change", "群聊多人音视频用户上/下线频道"),
+
     QUEUE_CHAT_ROBOT("queue_chat_robot", "机器人对话消息队列"),
 
     ;

+ 4 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/config/RedisConfig.java

@@ -5,6 +5,7 @@ import com.webchat.connect.messagequeue.consumer.redis.ArticlePushRedisQueueList
 import com.webchat.connect.messagequeue.consumer.redis.ChatMessageRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.ChatNotifyRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.GroupVideoCallRedisQueueListener;
+import com.webchat.connect.messagequeue.consumer.redis.GroupVideoUserChangeRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.WebRtcSDPRedisQueueListener;
 import jakarta.annotation.Resource;
 import org.springframework.context.annotation.Bean;
@@ -26,6 +27,8 @@ public class RedisConfig {
     private WebRtcSDPRedisQueueListener webRtcSDPRedisQueueListener;
     @Resource
     private GroupVideoCallRedisQueueListener groupVideoCallRedisQueueListener;
+    @Resource
+    private GroupVideoUserChangeRedisQueueListener groupVideoUserChangeRedisQueueListener;
 
     @Bean
     public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
@@ -38,6 +41,7 @@ public class RedisConfig {
         container.addMessageListener(articlePushRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_ARTICLE_PUSH_MESSAGE.getChannel()));
         container.addMessageListener(webRtcSDPRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_VIDEO_SDP.getChannel()));
         container.addMessageListener(groupVideoCallRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_GROUP_VIDEO_CALL.getChannel()));
+        container.addMessageListener(groupVideoUserChangeRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_GROUP_VIDEO_USER_CHANGE.getChannel()));
         return container;
     }
 }

+ 31 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/redis/GroupVideoUserChangeRedisQueueListener.java

@@ -0,0 +1,31 @@
+package com.webchat.connect.messagequeue.consumer.redis;
+
+import com.webchat.connect.messagequeue.consumer.service.GroupVideoUserChangeConsumeService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+
+@Slf4j
+@Component
+public class GroupVideoUserChangeRedisQueueListener implements MessageListener {
+
+    @Autowired
+    private RedisTemplate redisTemplate;
+
+    @Autowired
+    private GroupVideoUserChangeConsumeService groupVideoUserChangeConsumeService;
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+
+        String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
+        String messageStr = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
+        log.info("GroupVideoUserChangeRedisQueueListener.onMessage =====> channel:{} messageStr:{}", channel, messageStr);
+
+        groupVideoUserChangeConsumeService.consume(messageStr);
+    }
+}

+ 29 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/rocketmq/GroupVideoUserChangeRocketQueueConsumer.java

@@ -0,0 +1,29 @@
+package com.webchat.connect.messagequeue.consumer.rocketmq;
+
+import com.webchat.connect.messagequeue.consumer.service.GroupVideoUserChangeConsumeService;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * 当前对话消息队列,因为需要WebSocket或SSE服务端主动推送,为了解决分布式websocketsession及ssemetter共享问题,这里必须广播模式
+ */
+//@Component
+//@RocketMQMessageListener(consumerGroup = "web_chat",
+//                         topic = "queue_group_video_user_change",
+//                         messageModel = MessageModel.BROADCASTING)
+public class GroupVideoUserChangeRocketQueueConsumer implements RocketMQListener<String> {
+    //
+    //@Autowired
+    //private GroupVideoUserChangeConsumeService groupVideoUserChangeConsumeService;
+    //
+    /**
+     * 处理来自IM 对话相关消息
+     *
+     * @param message
+     */
+    @Override
+    public void onMessage(String message) {
+
+//        groupVideoUserChangeConsumeService.handleChatMessage(message);
+    }
+}

+ 85 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/service/GroupVideoUserChangeConsumeService.java

@@ -0,0 +1,85 @@
+package com.webchat.connect.messagequeue.consumer.service;
+
+
+import com.webchat.common.constants.ConnectConstants;
+import com.webchat.common.enums.VideoSDPMessageType;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.connect.service.AccountService;
+import com.webchat.connect.service.GroupVideoBizService;
+import com.webchat.connect.websocket.handler.GroupVideoWebSocketEndPointServletHandler;
+import com.webchat.domain.vo.request.mess.VideoChatMessageRequestVO;
+import com.webchat.domain.vo.request.mess.VideoUserChangeMessageVO;
+import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
+import com.webchat.domain.vo.response.mess.VideoChatMessageResponseVO;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+@Service
+public class GroupVideoUserChangeConsumeService {
+
+
+    @Autowired
+    private GroupVideoBizService groupVideoBizService;
+    @Autowired
+    private AccountService accountService;
+
+    /**
+     * 处理群聊多人音视频通话,用户上线下通知
+     *
+     * 1、获取当前群组音视频房间所有在线用户
+     * 2、推送当前变更用户给所有在线成员
+     *
+     * @param messageJson
+     */
+    public void consume(String messageJson) {
+
+        log.info("群聊多人音视频用户上/下线消息:{}", messageJson);
+
+        VideoUserChangeMessageVO message = JsonUtil.fromJson(messageJson, VideoUserChangeMessageVO.class);
+        String groupId = message.getGroupId();
+        String eventUserId = message.getEventUserId();
+        // 1、获取当前群组音视频房间所有在线用户
+        Set<String> onlineUsers = groupVideoBizService.getOnlineUserIds(message.getGroupId());
+        if (CollectionUtils.isEmpty(onlineUsers)) {
+            return;
+        }
+        Map<String, UserBaseResponseInfoVO> onlineUserMaps = accountService.batchGet(onlineUsers);
+        Map<String, String> onlineUsernames = onlineUserMaps.entrySet().stream()
+                .filter(entry -> entry.getValue() != null) // 确保UserBaseResponseInfoVO对象不为null
+                .collect(Collectors.toMap(
+                        Map.Entry::getKey,
+                        entry -> entry.getValue().getUserName()
+                ));
+        VideoChatMessageResponseVO chatMessage = new VideoChatMessageResponseVO();
+        chatMessage.setUserId(eventUserId);
+        chatMessage.setOnlineUserIds(onlineUsers);
+        chatMessage.setOnlineUsernames(onlineUsernames);
+        chatMessage.setType(ObjectUtils.equals(message.getIsOnline(), true) ?
+                VideoSDPMessageType.online.name() : VideoSDPMessageType.offline.name());
+        String pushMessage = JsonUtil.toJsonString(chatMessage);
+        // 2、推送当前变更用户给所有在线成员
+        for (String onlineUser : onlineUsers) {
+            WebSocketSession session = GroupVideoWebSocketEndPointServletHandler.getSession(groupId, onlineUser);
+            if (session == null || !session.isOpen()) {
+                continue;
+            }
+            try {
+                session.sendMessage(new TextMessage(pushMessage));
+            } catch (IOException e) {
+                // TODO 策略:例如:重拾
+            }
+        }
+    }
+}

+ 12 - 1
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/service/WebRtcSDPConsumeService.java

@@ -3,9 +3,11 @@ package com.webchat.connect.messagequeue.consumer.service;
 
 import com.webchat.common.constants.ConnectConstants;
 import com.webchat.common.util.JsonUtil;
+import com.webchat.connect.websocket.handler.GroupVideoWebSocketEndPointServletHandler;
 import com.webchat.connect.websocket.handler.P2PVideoWebSocketEndPointServletHandler;
 import com.webchat.domain.vo.request.mess.VideoChatMessageRequestVO;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Service;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
@@ -25,11 +27,13 @@ public class WebRtcSDPConsumeService {
         if (chatMessage == null) {
             return;
         }
+        // 如果是来自P2P一对一音视频信令广播消息:groupId为空,只有群聊视频视频场景下groupId有值
+        String groupId = chatMessage.getGroupId();
         String receiverId = chatMessage.getTargetUserId();
         // 校验用户ws 连接session是否在当前机器Hash表P2PVideoWebSocketEndPointServletHandler
         Set<String> bizCodes = ConnectConstants.ConnectBiz.getBizCode(ConnectConstants.BizEnum.CHAT);
         for (String bizCode: bizCodes) {
-            WebSocketSession session = P2PVideoWebSocketEndPointServletHandler.getSession(bizCode, receiverId);
+            WebSocketSession session = this.getSession(bizCode, groupId, receiverId);
             if (session == null || !session.isOpen()) {
                 continue;
             }
@@ -40,4 +44,11 @@ public class WebRtcSDPConsumeService {
             }
         }
     }
+
+    private WebSocketSession getSession(String bizCode, String groupId, String userId) {
+        if (StringUtils.isNotBlank(groupId)) {
+            return GroupVideoWebSocketEndPointServletHandler.getSession(bizCode, groupId, userId);
+        }
+        return P2PVideoWebSocketEndPointServletHandler.getSession(bizCode, userId);
+    }
 }

+ 56 - 0
webchat-connect/src/main/java/com/webchat/connect/service/GroupVideoBizService.java

@@ -0,0 +1,56 @@
+package com.webchat.connect.service;
+
+import com.webchat.common.enums.RedisKeyEnum;
+import com.webchat.common.service.RedisService;
+import com.webchat.common.util.DateUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Set;
+
+@Service
+public class GroupVideoBizService {
+
+
+    @Autowired
+    private RedisService redisService;
+
+    /**
+     * 群聊多人音视频房间,上线新用户
+     *
+     * Sorted Set
+     * @param groupId
+     * @param userId
+     */
+    public void online(String groupId, String userId) {
+        String key = getOnlineUsersRedisKey(groupId);
+        redisService.zadd(key, userId, DateUtils.getCurrentTimeMillis(),
+                RedisKeyEnum.GROUP_VIDEO_ONLINE_USER_ZSET.getExpireTime());
+    }
+
+    /**
+     * 群聊多人音视频房间,上线新用户
+     *
+     * @param groupId
+     * @param userId
+     */
+    public void offline(String groupId, String userId) {
+        String key = getOnlineUsersRedisKey(groupId);
+        redisService.zremove(key, userId);
+    }
+
+    /**
+     * 获取群聊音视频通话所有在线用户在线
+     *
+     * @param groupId
+     * @return
+     */
+    public Set<String> getOnlineUserIds(String groupId) {
+        String key = getOnlineUsersRedisKey(groupId);
+        return redisService.zrangeByScore(key, 0, Long.MAX_VALUE);
+    }
+
+    private String getOnlineUsersRedisKey(String groupId) {
+        return RedisKeyEnum.GROUP_VIDEO_ONLINE_USER_ZSET.getKey(groupId);
+    }
+}

+ 52 - 20
webchat-connect/src/main/java/com/webchat/connect/websocket/handler/GroupVideoWebSocketEndPointServletHandler.java

@@ -1,17 +1,15 @@
 package com.webchat.connect.websocket.handler;
 
-import com.webchat.common.enums.AccountRelationTypeEnum;
 import com.webchat.common.enums.ChatMessageTypeEnum;
 import com.webchat.common.enums.VideoSDPMessageType;
 import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
 import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
 import com.webchat.common.util.JsonUtil;
-import com.webchat.connect.service.AccountService;
+import com.webchat.connect.service.GroupVideoBizService;
 import com.webchat.domain.vo.request.mess.MessageNotifyVO;
 import com.webchat.domain.vo.request.mess.VideoChatMessageRequestVO;
-import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
+import com.webchat.domain.vo.request.mess.VideoUserChangeMessageVO;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -21,7 +19,6 @@ import org.springframework.web.socket.WebSocketSession;
 import org.springframework.web.socket.handler.TextWebSocketHandler;
 
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 @Slf4j
@@ -31,7 +28,7 @@ public class GroupVideoWebSocketEndPointServletHandler extends TextWebSocketHand
     private MessageQueueProducer<Object, Long> messageQueueProducer;
 
     @Autowired
-    private AccountService accountService;
+    private GroupVideoBizService groupVideoBizService;
 
     /**
      * Map<KEY1, Map<KEY2, WebSocketSession>>
@@ -63,18 +60,38 @@ public class GroupVideoWebSocketEndPointServletHandler extends TextWebSocketHand
             groupUserSessions = new ConcurrentHashMap<>();
             userSessions = new ConcurrentHashMap<>();
             groupUserSessions.put(groupId, userSessions);
+            sessions.put(bizCode, groupUserSessions);
         } else {
             // 初始化处理首页群聊下多人音视频ws session结构
-            userSessions = groupUserSessions.get(userId);
-            if (userSessions == null) {
-                userSessions = new ConcurrentHashMap<>();
-                groupUserSessions.put(groupId, userSessions);
-            }
+            userSessions = groupUserSessions.get(groupId);
         }
         userSessions.put(userId, session);
         log.info("Chat WebSocket connection ====> groupId:{} userId:{}, sessionId:{}", groupId, userId, session.getId());
-        // TODO 新用户加入群聊音视频,通知其他在线用户(创建已在线跟新上线用的webRTC链接)--- MESH
+        // 记录当前在线用户到redis
+        groupVideoBizService.online(groupId, userId);
+        // 新用户加入群聊音视频,通知其他在线用户(创建已在线跟新上线用的webRTC链接)--- MESH
+        this.doNotifyUserChangeEventForCurrOnlineUsers(groupId, userId, true);
+    }
 
+
+    /**
+     * 断开连接
+     *
+     * @param session
+     * @param status
+     * @throws Exception
+     */
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+        System.out.println("Chat WebSocket Connection closed: " + session.getId());
+        Map<String, Object> attributes = session.getAttributes();
+        String bizCode = (String) attributes.get("bizCode");
+        String groupId = (String) attributes.get("groupId");
+        String userId = (String) attributes.get("userId");
+        // 剔除当前离线线用户
+        groupVideoBizService.offline(groupId, userId);
+        // 通知群聊音视频其他在线用户有人下线
+        this.doNotifyUserChangeEventForCurrOnlineUsers(groupId, userId, false);
     }
 
     /**
@@ -116,15 +133,15 @@ public class GroupVideoWebSocketEndPointServletHandler extends TextWebSocketHand
     }
 
     /**
-     * 断开连接
-     *
-     * @param session
-     * @param status
-     * @throws Exception
+     * 通知当前群聊多人音视频在线用户:有新用户加入音视频群聊/离线
      */
-    @Override
-    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
-        System.out.println("Chat WebSocket Connection closed: " + session.getId());
+    private void doNotifyUserChangeEventForCurrOnlineUsers(String groupId, String eventUser, boolean isOnline) {
+
+        VideoUserChangeMessageVO message = new VideoUserChangeMessageVO();
+        message.setEventUserId(eventUser);
+        message.setGroupId(groupId);
+        message.setIsOnline(isOnline);
+        messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_GROUP_VIDEO_USER_CHANGE, message);
     }
 
     public static WebSocketSession getSession(String bizCode, String groupId, String userId) {
@@ -141,4 +158,19 @@ public class GroupVideoWebSocketEndPointServletHandler extends TextWebSocketHand
         }
         return userSessions.get(userId);
     }
+
+    public static WebSocketSession getSession(String groupId, String userId) {
+        if (MapUtils.isEmpty(sessions)) {
+            return null;
+        }
+        for (Map.Entry<String, Map<String, Map<String, WebSocketSession>>> session : sessions.entrySet()) {
+            Map<String, Map<String, WebSocketSession>> groupUserSessions = session.getValue();
+            Map<String, WebSocketSession> userSessions = groupUserSessions.get(groupId);
+            WebSocketSession userSession = userSessions.get(userId);
+            if (userSession != null) {
+                return userSession;
+            }
+        }
+        return null;
+    }
 }

+ 16 - 0
webchat-domain/src/main/java/com/webchat/domain/vo/request/mess/VideoUserChangeMessageVO.java

@@ -0,0 +1,16 @@
+package com.webchat.domain.vo.request.mess;
+
+import lombok.Data;
+
+@Data
+public class VideoUserChangeMessageVO {
+
+    private String eventUserId;
+
+    private String groupId;
+
+    /**
+     * 用户上/下线 true:上线 false:下线
+     */
+    private Boolean isOnline;
+}

+ 32 - 0
webchat-domain/src/main/java/com/webchat/domain/vo/response/mess/VideoChatMessageResponseVO.java

@@ -0,0 +1,32 @@
+package com.webchat.domain.vo.response.mess;
+
+
+import lombok.Data;
+
+import java.util.Map;
+import java.util.Set;
+
+@Data
+public class VideoChatMessageResponseVO {
+
+    private String userId;
+
+    private String targetUserId;
+
+    private Set<String> onlineUserIds;
+
+    private Map<String, String> onlineUsernames;
+
+    /**
+     * 群聊id支持群聊多人音视频
+     */
+    private String groupId;
+
+    private String type;
+
+    private Object candidate;
+
+    private Object answer;
+
+    private Object offer;
+}