Selaa lähdekoodia

支持一对一音视频通话

wangqi49 1 kuukausi sitten
vanhempi
commit
4e7d84c8b6

+ 2 - 1
webchat-common/src/main/java/com/webchat/common/enums/ChatMessageTypeEnum.java

@@ -15,7 +15,8 @@ public enum ChatMessageTypeEnum {
     PUBLIC_ACCOUNT_ARTICLE(4, "公众号推文"),
     APPLY(5, "申请添加好友"),
     WALLET_BALANCE(6, "钱包余额"),
-    CHATTING_REFRESH(7, "刷新对话列表");
+    CHATTING_REFRESH(7, "刷新对话列表"),
+    VIDEO_OFFER(8, "音视频呼叫");
 
     private Integer type;
     private String desc;

+ 9 - 0
webchat-common/src/main/java/com/webchat/common/enums/VideoSDPMessageType.java

@@ -0,0 +1,9 @@
+package com.webchat.common.enums;
+
+import lombok.Getter;
+
+@Getter
+public enum VideoSDPMessageType {
+
+    call, offer, answer, candidate, leave
+}

+ 2 - 0
webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageBroadChannelEnum.java

@@ -16,6 +16,8 @@ public enum MessageBroadChannelEnum {
 
     QUEUE_CHAT_NOTIFY("queue_chat_notify", "聊天消息通知"),
 
+    QUEUE_VIDEO_SDP("queue_video_sdp", "音视频通话信令"),
+
     QUEUE_CHAT_ROBOT("queue_chat_robot", "机器人对话消息队列"),
 
     ;

+ 4 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/config/RedisConfig.java

@@ -4,6 +4,7 @@ import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
 import com.webchat.connect.messagequeue.consumer.redis.ArticlePushRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.ChatMessageRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.ChatNotifyRedisQueueListener;
+import com.webchat.connect.messagequeue.consumer.redis.WebRtcSDPRedisQueueListener;
 import jakarta.annotation.Resource;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -20,6 +21,8 @@ public class RedisConfig {
     private ChatMessageRedisQueueListener chatMessageRedisQueueListener;
     @Resource
     private ArticlePushRedisQueueListener articlePushRedisQueueListener;
+    @Resource
+    private WebRtcSDPRedisQueueListener webRtcSDPRedisQueueListener;
 
     @Bean
     public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
@@ -30,6 +33,7 @@ public class RedisConfig {
         container.addMessageListener(notifyQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_CHAT_NOTIFY.getChannel()));
         container.addMessageListener(chatMessageRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_CHAT_MESSAGE.getChannel()));
         container.addMessageListener(articlePushRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_ARTICLE_PUSH_MESSAGE.getChannel()));
+        container.addMessageListener(webRtcSDPRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_VIDEO_SDP.getChannel()));
         return container;
     }
 }

+ 29 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/redis/WebRtcSDPRedisQueueListener.java

@@ -0,0 +1,29 @@
+package com.webchat.connect.messagequeue.consumer.redis;
+
+import com.webchat.connect.messagequeue.consumer.service.WebRtcSDPConsumeService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class WebRtcSDPRedisQueueListener implements MessageListener {
+
+    @Autowired
+    private RedisTemplate redisTemplate;
+
+    @Autowired
+    private WebRtcSDPConsumeService webRtcSDPConsumeService;
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
+        String messageStr = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
+        log.info("ChatMessageRedisQueueListener.onMessage =====> channel:{} messageStr:{}", channel, messageStr);
+
+        webRtcSDPConsumeService.consume(messageStr);
+    }
+}

+ 28 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/rocketmq/WebRtcSDPRocketQueueConsumer.java

@@ -0,0 +1,28 @@
+package com.webchat.connect.messagequeue.consumer.rocketmq;
+
+import com.webchat.connect.messagequeue.consumer.service.ChatNotifyConsumeService;
+import com.webchat.connect.messagequeue.consumer.service.WebRtcSDPConsumeService;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+//@RocketMQMessageListener(consumerGroup = "web_chat", topic = "queue_video_sdp", messageModel = MessageModel.BROADCASTING)
+public class WebRtcSDPRocketQueueConsumer implements RocketMQListener<String> {
+
+        @Autowired
+        private WebRtcSDPConsumeService webRtcSDPConsumeService;
+
+        /**
+         * 处理来自IM 对话相关消息
+         *
+         * @param message
+         */
+        @Override
+        public void onMessage(String message) {
+
+            webRtcSDPConsumeService.consume(message);
+        }
+}

+ 43 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/service/WebRtcSDPConsumeService.java

@@ -0,0 +1,43 @@
+package com.webchat.connect.messagequeue.consumer.service;
+
+
+import com.webchat.common.constants.ConnectConstants;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.connect.websocket.handler.P2PVideoWebSocketEndPointServletHandler;
+import com.webchat.domain.vo.request.mess.VideoChatMessageRequestVO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.Set;
+
+@Slf4j
+@Service
+public class WebRtcSDPConsumeService {
+
+
+    public void consume(String message) {
+
+        log.info("音视频信令消息消费 =====> message:{}", message);
+        VideoChatMessageRequestVO chatMessage = JsonUtil.fromJson(message, VideoChatMessageRequestVO.class);
+        if (chatMessage == null) {
+            return;
+        }
+        String receiverId = chatMessage.getTargetUserId();
+        // 校验用户ws 连接session是否在当前机器Hash表P2PVideoWebSocketEndPointServletHandler
+        Set<String> bizCodes = ConnectConstants.ConnectBiz.getBizCode(ConnectConstants.BizEnum.CHAT);
+        for (String bizCode: bizCodes) {
+            WebSocketSession session = P2PVideoWebSocketEndPointServletHandler.getSession(bizCode, receiverId);
+            if (session == null || !session.isOpen()) {
+                continue;
+            }
+            try {
+                session.sendMessage(new TextMessage(message));
+            } catch (IOException e) {
+                log.error("音视频信令消息推送异常!", e);
+            }
+        }
+    }
+}

+ 19 - 1
webchat-connect/src/main/java/com/webchat/connect/websocket/config/WebSocketConnectServletConfig.java

@@ -1,7 +1,9 @@
 package com.webchat.connect.websocket.config;
 
 import com.webchat.connect.websocket.handler.ChatWebSocketEndPointServletHandler;
+import com.webchat.connect.websocket.handler.P2PVideoWebSocketEndPointServletHandler;
 import com.webchat.connect.websocket.interceptor.ChatWebSocketEndPointServletInterceptor;
+import com.webchat.connect.websocket.interceptor.VideoWebSocketEndPointServletInterceptor;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.web.socket.config.annotation.EnableWebSocket;
@@ -14,15 +16,31 @@ public class WebSocketConnectServletConfig implements WebSocketConfigurer {
 
     private static final String CHAT_WEBSOCKET_PATH = "/connect-service/ws/chat/{bizCode}/{userId}";
 
+    private static final String P2P_VIDEO_WEBSOCKET_PATH = "/connect-service/ws/p2p/video/{bizCode}/{userId}";
+
     @Override
     public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+        /**
+         * 注册对话场景WS端点服务
+         */
         registry.addHandler(chatWebSocketEndPointServletHandler(), CHAT_WEBSOCKET_PATH)
                 .setAllowedOrigins("*")
-                .addInterceptors(new ChatWebSocketEndPointServletInterceptor()); ;
+                .addInterceptors(new ChatWebSocketEndPointServletInterceptor());
+        /**
+         * 注册点对点音视频通话ws信令服务
+         */
+        registry.addHandler(p2PVideoWebSocketEndPointServletHandler(), P2P_VIDEO_WEBSOCKET_PATH)
+                .setAllowedOrigins("*")
+                .addInterceptors(new VideoWebSocketEndPointServletInterceptor()); ;
     }
 
     @Bean
     public ChatWebSocketEndPointServletHandler chatWebSocketEndPointServletHandler() {
         return new ChatWebSocketEndPointServletHandler();
     }
+
+    @Bean
+    public P2PVideoWebSocketEndPointServletHandler p2PVideoWebSocketEndPointServletHandler() {
+        return new P2PVideoWebSocketEndPointServletHandler();
+    }
 }

+ 128 - 0
webchat-connect/src/main/java/com/webchat/connect/websocket/handler/P2PVideoWebSocketEndPointServletHandler.java

@@ -0,0 +1,128 @@
+package com.webchat.connect.websocket.handler;
+
+import com.webchat.common.enums.ChatMessageTypeEnum;
+import com.webchat.common.enums.RedisMessageChannelTopicEnum;
+import com.webchat.common.enums.RoleCodeEnum;
+import com.webchat.common.enums.VideoSDPMessageType;
+import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
+import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.connect.service.AccountService;
+import com.webchat.domain.vo.request.mess.ChatMessageRequestVO;
+import com.webchat.domain.vo.request.mess.MessageNotifyVO;
+import com.webchat.domain.vo.request.mess.VideoChatMessageRequestVO;
+import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class P2PVideoWebSocketEndPointServletHandler extends TextWebSocketHandler {
+
+    @Autowired
+    private MessageQueueProducer<Object, Long> messageQueueProducer;
+
+    @Autowired
+    private AccountService accountService;
+
+    /**
+     * Map<KEY1, Map<KEY2, WebSocketSession>>
+     *
+     * KEY1:用于区分场景,如:PC-APP、 PC-CHAT、WAP-APP ,详见:WebSocketBizCodeEnum
+     * KEY2: 用户
+     * WebSocketSession: 不同场景下用户的WebSocket Session对象 -- 全双工、有状态
+     */
+    public static Map<String, Map<String, WebSocketSession>> sessions = new ConcurrentHashMap<>();
+
+    /**
+     * 连接建立
+     *
+     * @param session
+     * @throws Exception
+     */
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+        // 获取路径参数
+        Map<String, Object> attributes = session.getAttributes();
+        String bizCode = (String) attributes.get("bizCode");
+        String userId = (String) attributes.get("userId");
+        Map<String, WebSocketSession> userSessions = sessions.get(bizCode);
+        if (userSessions == null) {
+            userSessions = new ConcurrentHashMap<>();
+            sessions.put(bizCode, userSessions);
+        }
+        userSessions.put(userId, session);
+        log.info("Chat WebSocket connection ====> userId:{}, sessionId:{}", userId, session.getId());
+    }
+
+    /**
+     * 收到消息
+     *
+     * @param session
+     * @param message
+     * @throws Exception
+     */
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        String payload = message.getPayload();
+        System.out.println("Chat WebSocket Connection Received message: " + payload);
+        if ("ping".equals(payload)) {
+            // 心跳检测
+            return;
+        }
+        VideoChatMessageRequestVO videoMessage = JsonUtil.fromJson(payload, VideoChatMessageRequestVO.class);
+        if (VideoSDPMessageType.call.name().equals(videoMessage.getType())) {
+            /**
+             * 1. 给被呼叫人推送呼叫提醒
+             */
+            UserBaseResponseInfoVO sender = accountService.accountInfo(videoMessage.getUserId());
+            MessageNotifyVO messageBase = new MessageNotifyVO();
+            messageBase.setSender(sender);
+            messageBase.setReceiverId(videoMessage.getTargetUserId());
+            messageBase.setType(ChatMessageTypeEnum.VIDEO_OFFER.getType());
+            /**
+             * 广播音视频呼叫信息给被呼人
+             */
+            messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_CHAT_NOTIFY, messageBase);
+            return;
+        }
+        /**
+         * 2. 信命推送
+         */
+        messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_VIDEO_SDP, videoMessage);
+    }
+
+    /**
+     * 断开连接
+     *
+     * @param session
+     * @param status
+     * @throws Exception
+     */
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+        System.out.println("Chat WebSocket Connection closed: " + session.getId());
+    }
+
+    public static WebSocketSession getSession(String bizCode, String userId) {
+        if (StringUtils.isBlank(bizCode) || StringUtils.isBlank(userId)) {
+            return null;
+        }
+        Map<String, WebSocketSession> userSessions = sessions.get(bizCode);
+        if (MapUtils.isEmpty(userSessions)) {
+            return null;
+        }
+        return userSessions.get(userId);
+    }
+}

+ 27 - 0
webchat-connect/src/main/java/com/webchat/connect/websocket/interceptor/VideoWebSocketEndPointServletInterceptor.java

@@ -0,0 +1,27 @@
+package com.webchat.connect.websocket.interceptor;
+
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+import java.util.Map;
+
+public class VideoWebSocketEndPointServletInterceptor implements HandshakeInterceptor {
+
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
+        String path = request.getURI().getPath();
+        String bizCode = path.split("/")[5];
+        String userId = path.split("/")[6];
+        // 将bizCode、userId存储到session属性中
+        attributes.put("bizCode", bizCode);
+        attributes.put("userId", userId);
+        return true;
+    }
+
+    @Override
+    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
+        // 在这里处理握手完成后的逻辑
+    }
+}

+ 5 - 0
webchat-domain/src/main/java/com/webchat/domain/vo/request/mess/MessageBaseVO.java

@@ -25,6 +25,11 @@ public class MessageBaseVO extends BaseQueueDTO {
      */
     private String proxySenderId;
 
+    /**
+     * 消息发送人详情信息
+     */
+    private UserBaseResponseInfoVO sender;
+
     private UserBaseResponseInfoVO proxySender;
 
     /**