Selaa lähdekoodia

机器人对话消息持久化

wangqi49 1 kuukausi sitten
vanhempi
commit
4a98fd06f8

+ 1 - 1
webchat-aigc/src/main/java/com/webchat/aigc/llm/AbstractLLMChatService.java

@@ -31,5 +31,5 @@ public abstract class AbstractLLMChatService {
      * @return
      * @throws Exception
      */
-    protected abstract void chat(SseEmitter emitter, List<ChatCompletionMessage> messageList) throws Exception;
+    protected abstract String chat(SseEmitter emitter, List<ChatCompletionMessage> messageList) throws Exception;
 }

+ 50 - 2
webchat-aigc/src/main/java/com/webchat/aigc/llm/AiAgentFlowService.java

@@ -3,7 +3,11 @@ package com.webchat.aigc.llm;
 
 import com.webchat.common.constants.ConnectConstants;
 import com.webchat.common.enums.AiFunctionEnum;
+import com.webchat.common.enums.ChatMessageTypeEnum;
+import com.webchat.common.enums.messagequeue.MessageQueueEnum;
+import com.webchat.common.helper.SessionHelper;
 import com.webchat.common.helper.SseEmitterHelper;
+import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
 import com.webchat.common.util.JsonUtil;
 import com.webchat.domain.vo.llm.FunctionCallResponse;
 import com.webchat.domain.vo.llm.image.ImageGenerateResponse;
@@ -29,6 +33,9 @@ public class AiAgentFlowService {
     @Autowired
     private LibLibGenerateImageService libLibGenerateImageService;
 
+    @Autowired
+    private MessageQueueProducer<ChatMessageRequestVO, Long> messageQueueProducer;
+
     /**
      * 旗舰版文生图模版id
      */
@@ -48,7 +55,22 @@ public class AiAgentFlowService {
      * @param message
      */
     public void doAgent(String message) {
+
         ChatMessageRequestVO chatMessage = JsonUtil.fromJson(message, ChatMessageRequestVO.class);
+
+        if (!SseEmitterHelper.isExist(getSSEBizCode(), chatMessage.getSenderId())) {
+            /**
+             * 因为当前doAgent AIGC服务消费来自webchat-connect服务的websocket端点的MQ广播消息
+             * 所以,在分布式部署场景下,可能当前消息发送人的sse链接并不在当前节点。
+             */
+            return;
+        }
+
+        /**
+         * 消息持久化用户发给机器人消息
+         */
+        this.persistentAiMessage(chatMessage.getSenderId(), chatMessage.getReceiverId(), chatMessage.getMessage());
+
         String senderId = chatMessage.getSenderId();
         /**
          * 1. 意图识别
@@ -71,9 +93,10 @@ public class AiAgentFlowService {
          * 2. 根据意图执行对应工作流
          */
         SseEmitter sseEmitter = SseEmitterHelper.get(this.getSSEBizCode(), senderId);
+        String agentResponse = null;
         if (AiFunctionEnum.CHAT.name().equals(functionCallResponse.getFunction())) {
             // 机器人对话
-            gptChatService.chat(sseEmitter, chatMessage);
+            agentResponse = gptChatService.chat(sseEmitter, chatMessage);
         } else if (AiFunctionEnum.IMAGE.name().equals(functionCallResponse.getFunction())) {
             // 机器人对话
             RequestGenerateBaseParam baseParam = RequestGenerateBaseParam.of(chatMessage.getMessage());
@@ -94,11 +117,36 @@ public class AiAgentFlowService {
                         stringBuilder.append("\n");
                     }
                 }
-                SseEmitterHelper.send(this.getSSEBizCode(), senderId, stringBuilder.toString());
+                agentResponse = stringBuilder.toString();
+                SseEmitterHelper.send(this.getSSEBizCode(), senderId, agentResponse);
             } catch (Exception e) {
                 log.error("文生图服务异常 =====> param: {}", JsonUtil.toJsonString(param), e);
                 SseEmitterHelper.send(this.getSSEBizCode(), senderId, "意图识别失败,稍后重试~");
             }
         }
+
+        /**
+         * 机器人响应消息持久化
+         */
+        if (agentResponse != null) {
+            this.persistentAiMessage(chatMessage.getReceiverId(), chatMessage.getSenderId(), agentResponse);
+        }
+    }
+
+
+    /**
+     * 机器人对话场景,消息持久化
+     *
+     * @param senderId
+     * @param receiverId
+     * @param message
+     */
+    private void persistentAiMessage(String senderId, String receiverId, String message) {
+        ChatMessageRequestVO chatMessageRequestVO = new ChatMessageRequestVO();
+        chatMessageRequestVO.setSenderId(senderId);
+        chatMessageRequestVO.setReceiverId(receiverId);
+        chatMessageRequestVO.setMessage(message);
+        chatMessageRequestVO.setType(ChatMessageTypeEnum.CHAT_TEXT.getType());
+        messageQueueProducer.send(MessageQueueEnum.QUEUE_PERSISTENT_MESSAGE, chatMessageRequestVO);
     }
 }

+ 7 - 1
webchat-aigc/src/main/java/com/webchat/aigc/llm/DeepSeekAIService.java

@@ -50,7 +50,11 @@ public class DeepSeekAIService extends AbstractLLMChatService {
      * @param messageList
      * @throws Exception
      */
-    public void chat(SseEmitter emitter, List<ChatCompletionMessage> messageList) throws Exception {
+    public String chat(SseEmitter emitter, List<ChatCompletionMessage> messageList) throws Exception {
+
+        // 用于记录流式推理完整结果,返回用于,用于对话消息持久化
+        StringBuilder aiMessage = new StringBuilder();
+
         DeepSeekAIClient client = new DeepSeekAIClient(deepseekLLMPropertiesConfig.getApiKey());
         final List<ChatCompletionMessage> messages = messageList;
         try {
@@ -78,6 +82,7 @@ public class DeepSeekAIService extends AbstractLLMChatService {
                             responseContent = responseContent.replaceAll("\n", "<br>");
                             System.out.println(responseContent);
                             emitter.send(responseContent);
+                            aiMessage.append(responseContent);
                         }
                     },
                     error -> {
@@ -90,5 +95,6 @@ public class DeepSeekAIService extends AbstractLLMChatService {
         } catch (Exception e) {
             e.printStackTrace();
         }
+        return aiMessage.toString();
     }
 }

+ 4 - 3
webchat-aigc/src/main/java/com/webchat/aigc/llm/GPTChatService.java

@@ -43,7 +43,7 @@ public class GPTChatService {
     private static final String DEFAULT_GPT_ROLE = "你是一个智能助手,帮我解决一切问题。";
     private static final String DEFAULT_ROLE = "暂无签名";
 
-    public void chat( SseEmitter sseEmitter, ChatMessageRequestVO chatMessage) {
+    public String chat( SseEmitter sseEmitter, ChatMessageRequestVO chatMessage) {
         // 使用工厂模式,取对应的LLM服务,来完成对话
         AbstractLLMChatService abstractLLMChatService = LLMServiceFactory.getLLMService(model);
         // 获取用户发送消息内容
@@ -65,16 +65,17 @@ public class GPTChatService {
         } catch (Exception e) {
             log.error("GPTChatService.chat create prompt error. sendUserId:{}, receiverId:{}, msg:{}",
                     sendUserId, receiverId, message, e);
-            return;
+            return null;
         }
         final List<ChatCompletionMessage> messageList = Arrays.asList(
                 new ChatCompletionMessage(ChatMessageRole.SYSTEM.value(), gptRoleSetting),
                 new ChatCompletionMessage(ChatMessageRole.USER.value(), prompt));
         try {
-            abstractLLMChatService.chat(sseEmitter, messageList);
+            return abstractLLMChatService.chat(sseEmitter, messageList);
         } catch (Exception e) {
             log.error("GPTChatService.chat error. sendUserId:{}, receiverId:{}, msg:{}",
                     sendUserId, receiverId, message, e);
         }
+        return null;
     }
 }

+ 4 - 1
webchat-aigc/src/main/java/com/webchat/aigc/llm/KimiAIService.java

@@ -50,8 +50,9 @@ public class KimiAIService extends AbstractLLMChatService {
      * @param messageList
      * @throws Exception
      */
-    public void chat(SseEmitter emitter, List<ChatCompletionMessage> messageList) throws Exception {
+    public String chat(SseEmitter emitter, List<ChatCompletionMessage> messageList) throws Exception {
 
+        StringBuilder aiMessage = new StringBuilder();
 
         MoonShotAIClient client = new MoonShotAIClient(kimiLLMPropertiesConfig.getApiKey());
 
@@ -80,6 +81,7 @@ public class KimiAIService extends AbstractLLMChatService {
                                 responseContent = responseContent.replaceAll("\n", "<br>");
                                 System.out.println(responseContent);
                                 emitter.send(responseContent);
+                                aiMessage.append(responseContent);
                             }
                         }
                     },
@@ -93,5 +95,6 @@ public class KimiAIService extends AbstractLLMChatService {
         } catch (Exception e) {
             e.printStackTrace();
         }
+        return aiMessage.toString();
     }
 }

+ 13 - 0
webchat-common/src/main/java/com/webchat/common/helper/SseEmitterHelper.java

@@ -22,6 +22,19 @@ public class SseEmitterHelper {
     private static ConcurrentHashMap<String, ConcurrentHashMap<String, SseEmitter>> sseEmitterMap = new ConcurrentHashMap<>();
 
     /**
+     * 判断当前用户SSE链接是否在当前节点
+     *
+     *
+     * @param biz
+     * @param userId
+     * @return
+     */
+    public static boolean isExist(String biz, String userId) {
+        ConcurrentHashMap<String, SseEmitter> userSseEmitter = sseEmitterMap.get(biz);
+        return userSseEmitter.get(userId) != null;
+    }
+
+    /**
      * 获取用户的 SseEmitter 对象,如果不存在重新创建一个
      *
      * @param userId

+ 2 - 1
webchat-search/src/main/java/com/webchat/search/controller/SearchClientController.java

@@ -36,6 +36,7 @@ public class SearchClientController implements SearchEngineClient {
                                      @RequestParam(value = "page", required = false, defaultValue = "1") Integer page,
                                      @RequestParam(value = "size", required = false, defaultValue = "10") Integer size) {
         String userId = SessionHelper.getCurrentUserId();
-        return webChatElasticSearchService.query(type, userId, query, page, size);
+//        return webChatElasticSearchService.query(type, userId, query, page, size);
+        return null;
     }
 }

+ 6 - 4
webchat-search/src/main/java/com/webchat/search/controller/SearchController.java

@@ -20,13 +20,15 @@ public class SearchController {
 
     @GetMapping("/createIndex")
     public APIResponseBean<Boolean> createChatMessageIndex() {
-        boolean createIndexResult = webChatElasticSearchService.createIndex();
-        return APIResponseBeanUtil.success(createIndexResult);
+//        boolean createIndexResult = webChatElasticSearchService.createIndex();
+//        return APIResponseBeanUtil.success(createIndexResult);
+        return null;
     }
 
     @GetMapping("/deleteIndex")
     public APIResponseBean<Boolean> deleteIndex() {
-        boolean createIndexResult = webChatElasticSearchService.deleteIndex();
-        return APIResponseBeanUtil.success(createIndexResult);
+//        boolean createIndexResult = webChatElasticSearchService.deleteIndex();
+//        return APIResponseBeanUtil.success(createIndexResult);
+        return null;
     }
 }