Browse Source

RAG 全流程实现:embedding+Milvus+LLM

wangqi49 1 week ago
parent
commit
65f45cf778
34 changed files with 1336 additions and 72 deletions
  1. 19 0
      webchat-aigc/pom.xml
  2. 1 1
      webchat-aigc/src/main/java/com/webchat/aigc/config/properties/AliBabaEmbeddingPropertiesConfig.java
  3. 33 1
      webchat-aigc/src/main/java/com/webchat/aigc/controller/BasicModelServiceController.java
  4. 24 1
      webchat-aigc/src/main/java/com/webchat/aigc/llm/AbstractEmbeddingModel.java
  5. 3 1
      webchat-aigc/src/main/java/com/webchat/aigc/llm/AiBotChatService.java
  6. 140 0
      webchat-aigc/src/main/java/com/webchat/aigc/llm/AiBotQAService.java
  7. 47 1
      webchat-aigc/src/main/java/com/webchat/aigc/llm/AlibabaEmbeddingModel.java
  8. 37 1
      webchat-aigc/src/main/java/com/webchat/aigc/llm/BasicEmbeddingModel.java
  9. 28 1
      webchat-aigc/src/main/java/com/webchat/aigc/llm/BasicModelService.java
  10. 7 15
      webchat-aigc/src/main/java/com/webchat/aigc/llm/EmbeddingModelFactory.java
  11. 21 0
      webchat-aigc/src/main/resources/templates/ftl/AI_BOT_RAG.ftl
  12. 37 0
      webchat-common/src/main/java/com/webchat/common/constants/MilvusConstants.java
  13. 8 10
      webchat-common/src/main/java/com/webchat/common/enums/EmbeddingModelEnum.java
  14. 2 0
      webchat-common/src/main/java/com/webchat/common/enums/PromptTemplateEnum.java
  15. 4 1
      webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageQueueEnum.java
  16. 15 0
      webchat-common/src/main/java/com/webchat/common/enums/search/DataSourceTypeEnum.java
  17. 6 6
      webchat-common/src/main/java/com/webchat/common/exception/AiCallException.java
  18. 30 1
      webchat-domain/src/main/java/com/webchat/domain/dto/search/SyncSearchEngineDTO.java
  19. 4 25
      webchat-domain/src/main/java/com/webchat/domain/dto/search/SyncSearchEngineListDTO.java
  20. 19 0
      webchat-domain/src/main/java/com/webchat/domain/vo/request/search/MilvusSearchRequestVO.java
  21. 16 0
      webchat-domain/src/main/java/com/webchat/domain/vo/response/search/ArticleMilvusSearchResponse.java
  22. 34 1
      webchat-pgc/src/main/java/com/webchat/pgc/messagequeue/consumer/ArticlePushDelayQueueConsumer.java
  23. 30 1
      webchat-remote/src/main/java/com/webchat/rmi/aigc/BasicModelServiceClient.java
  24. 42 0
      webchat-remote/src/main/java/com/webchat/rmi/search/VectorSearchEngineClient.java
  25. 7 0
      webchat-search/pom.xml
  26. 10 1
      webchat-search/src/main/java/com/webchat/search/WebchatSearchApplication.java
  27. 28 0
      webchat-search/src/main/java/com/webchat/search/config/MilvusConfig.java
  28. 40 0
      webchat-search/src/main/java/com/webchat/search/controller/VectorSearchEngineController.java
  29. 66 0
      webchat-search/src/main/java/com/webchat/search/messagequeue/consumer/ArticleSyncRedisQueueConsumer.java
  30. 0 4
      webchat-search/src/main/java/com/webchat/search/messagequeue/consumer/SyncDataRedisQueueConsumer.java
  31. 81 0
      webchat-search/src/main/java/com/webchat/search/service/voctor/AbstractMilvusQueueService.java
  32. 191 0
      webchat-search/src/main/java/com/webchat/search/service/voctor/AbstractMilvusService.java
  33. 256 0
      webchat-search/src/main/java/com/webchat/search/service/voctor/ArticleMilvusService.java
  34. 50 0
      webchat-search/src/main/java/com/webchat/search/service/voctor/MilvusServiceFactory.java

+ 19 - 0
webchat-aigc/pom.xml

@@ -19,9 +19,28 @@
 
     <properties>
         <java.version>17</java.version>
+        <dashscope-sdk-java.version>2.20.1</dashscope-sdk-java.version>
     </properties>
     <dependencies>
 
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>dashscope-sdk-java</artifactId>
+            <!-- 请将 'the-latest-version' 替换为最新版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java -->
+            <version>${dashscope-sdk-java.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-simple</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>com.webchat</groupId>
             <artifactId>webchat-remote</artifactId>

+ 1 - 1
webchat-aigc/src/main/java/com/webchat/aigc/config/properties/AliBabaEmbeddingPropertiesConfig.java

@@ -6,7 +6,7 @@ import org.springframework.stereotype.Component;
 
 @Data
 @Component
-@ConfigurationProperties(prefix = "llm.config.embedding.alibaba")
+@ConfigurationProperties(prefix = "llm.embedding.alibaba")
 public class AliBabaEmbeddingPropertiesConfig {
 
     private String apiKey;

+ 33 - 1
webchat-aigc/src/main/java/com/webchat/aigc/controller/BasicModelServiceController.java

@@ -1,2 +1,34 @@
-package com.webchat.aigc.controller;public class BasicModelServiceController {
+package com.webchat.aigc.controller;
+
+import com.webchat.aigc.llm.AbstractEmbeddingModel;
+import com.webchat.aigc.llm.BasicEmbeddingModel;
+import com.webchat.aigc.llm.EmbeddingModelFactory;
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.rmi.aigc.BasicModelServiceClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+@RestController
+public class BasicModelServiceController implements BasicModelServiceClient {
+
+    @Autowired
+    private BasicEmbeddingModel embeddingModel;
+
+    @Override
+    public APIResponseBean<float[]> embed(String text) {
+
+        return APIResponseBeanUtil.success(embeddingModel.embed(text));
+    }
+
+    @Override
+    public APIResponseBean<List<float[]>> embed(List<String> texts) {
+
+        return APIResponseBeanUtil.success(embeddingModel.embed(texts));
+    }
 }

+ 24 - 1
webchat-aigc/src/main/java/com/webchat/aigc/llm/AbstractEmbeddingModel.java

@@ -1,2 +1,25 @@
-package com.webchat.aigc.llm;public class AbstractEmbeddingModel {
+package com.webchat.aigc.llm;
+
+import java.util.List;
+
+public abstract class AbstractEmbeddingModel {
+
+
+    /**
+     * 批量同步embedding
+     *
+     * @param text
+     * @return
+     */
+    public abstract float[] embed(String text) throws Exception;
+
+
+    /**
+     * 批量同步embedding
+     *
+     * @param texts
+     * @return
+     */
+    public abstract List<float[]> embed(List<String> texts) throws Exception;
+
 }

+ 3 - 1
webchat-aigc/src/main/java/com/webchat/aigc/llm/AiBotChatService.java

@@ -37,6 +37,8 @@ public class AiBotChatService {
     private GPTChatService gptChatService;
     @Autowired
     private BotService botService;
+    @Autowired
+    private AiBotQAService aiBotQAService;
 
     private String getSSEBizCode() {
         return ConnectConstants.ConnectBiz.getBizCode(ConnectConstants.ClientEnum.PC,
@@ -103,7 +105,7 @@ public class AiBotChatService {
             /**
              * 通用对话
              */
-            gptChatService.chat(sseEmitter, chatMessage);
+            aiBotQAService.chat(sseEmitter, chatMessage);
         } else {
             /**
              * 插件类

+ 140 - 0
webchat-aigc/src/main/java/com/webchat/aigc/llm/AiBotQAService.java

@@ -0,0 +1,140 @@
+package com.webchat.aigc.llm;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.common.constants.MilvusConstants;
+import com.webchat.common.enums.PromptTemplateEnum;
+import com.webchat.common.service.FreeMarkEngineService;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.domain.vo.llm.ChatCompletionMessage;
+import com.webchat.domain.vo.llm.ChatMessageRole;
+import com.webchat.domain.vo.request.mess.ChatMessageRequestVO;
+import com.webchat.domain.vo.request.search.MilvusSearchRequestVO;
+import com.webchat.domain.vo.response.search.ArticleMilvusSearchResponse;
+import com.webchat.rmi.search.VectorSearchEngineClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.stereotype.Service;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+@RefreshScope
+@Service
+public class AiBotQAService {
+
+
+    @Autowired
+    private VectorSearchEngineClient vectorSearchEngineClient;
+
+    @Autowired
+    private FreeMarkEngineService freeMarkEngineService;
+
+    @Value("${llm.config.model}")
+    private String model;
+
+    /**
+     * 向量相似度搜索默认取相似度最高前两条数据
+     */
+    private static final int SEARCH_TOP_K = 2;
+
+    /**
+     * 相似度限制条件
+     */
+    private static final float MIN_SCORE = 0.8f;
+
+    private static final String AIBOT_ROLE = "你是Chat4j项目的Ai助手,任务是根据用户给出的参考数据,基于输入query,精准的给出答案。";
+
+    /**
+     * 我的ai助手问答场景实现 (RAG)
+     *
+     * @param sseEmitter
+     * @param chatMessage
+     * @return
+     */
+    public String chat(SseEmitter sseEmitter, ChatMessageRequestVO chatMessage) {
+
+        String query = chatMessage.getMessage();
+        String senderId = chatMessage.getSenderId();
+        /**
+         * 基于公众号文章相似度实现搜索
+         */
+        List<ArticleMilvusSearchResponse> searchResponses = this.search(senderId, query);
+
+        /**
+         * prompt工程构件需要大模型处理指令
+         */
+        Map<String, Object> vars = new HashMap<>();
+        vars.put("searchResponses", searchResponses);
+        vars.put("input", query);
+        String prompt;
+        try {
+            prompt = freeMarkEngineService.getContentByTemplate(PromptTemplateEnum.AIBOT_RAG.getPath(), vars);
+        } catch (Exception e) {
+            this.sseSend(sseEmitter, "prompt构建失败");
+            e.printStackTrace();
+            return null;
+        }
+        /**
+         * 增强
+         */
+        // 使用工厂模式,取对应的LLM服务,来完成对话
+        AbstractLLMChatService abstractLLMChatService = LLMServiceFactory.getLLMService(model);
+        final List<ChatCompletionMessage> messageList = Arrays.asList(
+                new ChatCompletionMessage(ChatMessageRole.SYSTEM.value(), AIBOT_ROLE),
+                new ChatCompletionMessage(ChatMessageRole.USER.value(), prompt));
+        String aiResponse;
+        try {
+            aiResponse = abstractLLMChatService.chat(sseEmitter, messageList);
+        } catch (Exception e) {
+            this.sseSend(sseEmitter, "服务器繁忙");
+            return null;
+        }
+        return aiResponse;
+    }
+
+    /**
+     * RPC 远程请求我们的搜索引擎服务,基于向量相似度检索,搜索topK条公众号文章片段
+     *
+     * @param userId
+     * @param query
+     * @return
+     */
+    private List<ArticleMilvusSearchResponse> search(String userId, String query) {
+        String biz = MilvusConstants.SearchBiz.SEARCH_ARTICLE.getBiz();
+        MilvusSearchRequestVO req = MilvusSearchRequestVO.builder()
+                .userId(userId)
+                .query(query)
+                .score(MIN_SCORE)
+                .topK(SEARCH_TOP_K)
+                .build();
+        APIResponseBean<Object> response = vectorSearchEngineClient.search(biz, req);
+        if (APIResponseBeanUtil.isOk(response)) {
+            return JsonUtil.fromJson(JsonUtil.toJsonString(response.getData()), new TypeReference<List<ArticleMilvusSearchResponse>>() { });
+        }
+        return Collections.emptyList();
+    }
+
+    /**
+     * sse 消息推送
+     *
+     * @param message
+     */
+    public static void sseSend(SseEmitter sseEmitter, String message) {
+        try {
+            sseEmitter.send(message);
+            sseEmitter.send("finished");
+        } catch (IOException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+}

+ 47 - 1
webchat-aigc/src/main/java/com/webchat/aigc/llm/AlibabaEmbeddingModel.java

@@ -1,2 +1,48 @@
-package com.webchat.aigc.llm;public class AlibabaEmbeddingModel {
+package com.webchat.aigc.llm;
+
+import com.alibaba.dashscope.embeddings.TextEmbedding;
+import com.alibaba.dashscope.embeddings.TextEmbeddingParam;
+import com.alibaba.dashscope.embeddings.TextEmbeddingResult;
+import com.webchat.aigc.config.properties.AliBabaEmbeddingPropertiesConfig;
+import org.apache.commons.lang3.ArrayUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+@Service
+public class AlibabaEmbeddingModel extends AbstractEmbeddingModel {
+
+    @Autowired
+    private AliBabaEmbeddingPropertiesConfig embeddingPropertiesConfig;
+
+
+    @Override
+    public float[] embed(String text) throws Exception {
+        TextEmbeddingParam param = TextEmbeddingParam
+                .builder()
+                .model(TextEmbedding.Models.TEXT_EMBEDDING_V3)
+                .apiKey(embeddingPropertiesConfig.getApiKey())
+                .text(text)
+                .build();
+        TextEmbedding textEmbedding = new TextEmbedding();
+        TextEmbeddingResult result = textEmbedding.call(param);
+        List<Double> embeddings = result.getOutput().getEmbeddings().get(0).getEmbedding();
+        List<Float> fEmbeddings = embeddings.stream().map(embed -> embed.floatValue()).toList();
+        return ArrayUtils.toPrimitive(fEmbeddings.toArray(new Float[0]));
+    }
+
+    @Override
+    public List<float[]> embed(List<String> texts) throws Exception {
+        TextEmbeddingParam param = TextEmbeddingParam
+                .builder()
+                .model(TextEmbedding.Models.TEXT_EMBEDDING_V3)
+                .texts(Arrays.asList("风急天高猿啸哀", "渚清沙白鸟飞回", "无边落木萧萧下", "不尽长江滚滚来")).build();
+        TextEmbedding textEmbedding = new TextEmbedding();
+        TextEmbeddingResult result = textEmbedding.call(param);
+        System.out.println(result);
+        return null;
+    }
 }

+ 37 - 1
webchat-aigc/src/main/java/com/webchat/aigc/llm/BasicEmbeddingModel.java

@@ -1,2 +1,38 @@
-package com.webchat.aigc.llm;public class BasicEmbeddingModel {
+package com.webchat.aigc.llm;
+
+
+import com.webchat.common.exception.AiCallException;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+
+@RefreshScope
+@Service
+public class BasicEmbeddingModel {
+
+
+    @Value("${llm.embedding.model}")
+    private String embedModel;
+
+
+    public float[] embed(String text) {
+        AbstractEmbeddingModel abstractEmbeddingModel = EmbeddingModelFactory.getEmbeddingModel(embedModel);
+        try {
+            return abstractEmbeddingModel.embed(text);
+        } catch (Exception e) {
+            throw new AiCallException(String.format("embedding error:%s", e.getMessage()), e);
+        }
+    }
+
+    public List<float[]> embed(List<String> texts) {
+        AbstractEmbeddingModel abstractEmbeddingModel = EmbeddingModelFactory.getEmbeddingModel(embedModel);
+        try {
+            return abstractEmbeddingModel.embed(texts);
+        } catch (Exception e) {
+            throw new AiCallException(String.format("embedding error:%s", e.getMessage()), e);
+        }
+    }
 }

+ 28 - 1
webchat-aigc/src/main/java/com/webchat/aigc/llm/BasicModelService.java

@@ -1,2 +1,29 @@
-package com.webchat.aigc.llm;public class BasicModelService {
+package com.webchat.aigc.llm;
+
+
+import com.alibaba.dashscope.embeddings.TextEmbedding;
+import com.alibaba.dashscope.embeddings.TextEmbeddingParam;
+import com.alibaba.dashscope.embeddings.TextEmbeddingResult;
+import com.alibaba.dashscope.exception.ApiException;
+import com.alibaba.dashscope.exception.NoApiKeyException;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+
+@Service
+public class BasicModelService {
+
+
+
+
+
+    public static void basicCall() throws ApiException, NoApiKeyException {
+        TextEmbeddingParam param = TextEmbeddingParam
+                .builder()
+                .model(TextEmbedding.Models.TEXT_EMBEDDING_V3)
+                .texts(Arrays.asList("风急天高猿啸哀", "渚清沙白鸟飞回", "无边落木萧萧下", "不尽长江滚滚来")).build();
+        TextEmbedding textEmbedding = new TextEmbedding();
+        TextEmbeddingResult result = textEmbedding.call(param);
+        System.out.println(result);
+    }
 }

+ 7 - 15
webchat-aigc/src/main/java/com/webchat/aigc/llm/EmbeddingModelFactory.java

@@ -1,5 +1,6 @@
 package com.webchat.aigc.llm;
 
+import com.webchat.common.enums.EmbeddingModelEnum;
 import com.webchat.common.enums.LlmModelEnum;
 import com.webchat.common.exception.BusinessException;
 import org.springframework.beans.BeansException;
@@ -18,11 +19,11 @@ import java.util.Map;
  * 抽象大模型对话工厂服务
  */
 @Component
-public class LLMServiceFactory implements InitializingBean, ApplicationContextAware {
+public class EmbeddingModelFactory implements InitializingBean, ApplicationContextAware {
 
     private ApplicationContext applicationContext;
 
-    private static final Map<String, AbstractLLMChatService> serviceMap = new HashMap<>();
+    private static final Map<String, AbstractEmbeddingModel> serviceMap = new HashMap<>();
 
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
@@ -39,22 +40,13 @@ public class LLMServiceFactory implements InitializingBean, ApplicationContextAw
      */
     private void initServiceMap() {
         /**
-         * kimi
+         * aliabba
          */
-        serviceMap.put(LlmModelEnum.KIMI.getModel(), applicationContext.getBean(KimiAIService.class));
-        /**
-         * deepseek
-         */
-        serviceMap.put(LlmModelEnum.DEEPSEEK.getModel(), applicationContext.getBean(DeepSeekAIService.class));
-
-        /**
-         * ollama
-         */
-        serviceMap.put(LlmModelEnum.OLLAMA.getModel(), applicationContext.getBean(OllamaService.class));
+        serviceMap.put(EmbeddingModelEnum.ALIBABA.getModel(), applicationContext.getBean(AlibabaEmbeddingModel.class));
     }
 
-    public static AbstractLLMChatService getLLMService(String model) {
-        AbstractLLMChatService llmChatService = serviceMap.get(model);
+    public static AbstractEmbeddingModel getEmbeddingModel(String model) {
+        AbstractEmbeddingModel llmChatService = serviceMap.get(model);
         if (llmChatService == null) {
             throw new BusinessException("不支持的模型");
         }

+ 21 - 0
webchat-aigc/src/main/resources/templates/ftl/AI_BOT_RAG.ftl

@@ -0,0 +1,21 @@
+# 任务简介
+你是Chat4j项目的Ai助手,任务是根据用户给出的参考数据,基于输入query,精准的给出答案。
+
+# 参考内容
+<#if searchResponses?hasContent>
+    <#list searchResponses as data>
+        - 总结:${data.summary}
+        - 正文:${data.content}
+    </#list>
+<#else>
+    暂无参考内容
+</#if>
+
+# 任务限制
+- 如果有给出参考内容,严格按照参考内容数据回复
+- 如果暂无参考内容,仍然需要保证回复的严谨性、准确性、精简
+
+# 用户输入
+${input}
+
+# 输出:

+ 37 - 0
webchat-common/src/main/java/com/webchat/common/constants/MilvusConstants.java

@@ -0,0 +1,37 @@
+package com.webchat.common.constants;
+
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+public class MilvusConstants {
+
+
+
+
+    @Getter
+    @AllArgsConstructor
+    public enum CollectionName {
+
+        COLLECTION_ARTICLE("文章集合");
+
+        private String description;
+    }
+
+
+    @Getter
+    @AllArgsConstructor
+    public enum SearchBiz {
+
+        SEARCH_ARTICLE("文章搜索业务场景");
+
+        private String description;
+
+        public String getBiz() {
+            return this.name();
+        }
+    }
+
+
+
+}

+ 8 - 10
webchat-common/src/main/java/com/webchat/common/enums/EmbeddingModelEnum.java

@@ -1,19 +1,17 @@
 package com.webchat.common.enums;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 
 @Getter
-public enum LlmModelEnum {
+@AllArgsConstructor
+public enum EmbeddingModelEnum {
 
-    KIMI("kimi"),
-
-    DEEPSEEK("deepseek"),
-
-    OLLAMA("ollama");
+    ALIBABA("ALIBABA", 1024);
 
     private String model;
-
-    LlmModelEnum(String model) {
-        this.model = model;
-    }
+    /**
+     * 纬度,默认使用v3模型,默认纬度1024
+     */
+    private int dimension;
 }

+ 2 - 0
webchat-common/src/main/java/com/webchat/common/enums/PromptTemplateEnum.java

@@ -23,6 +23,8 @@ public enum PromptTemplateEnum {
 
     AIBOT_FC("/ftl/AI_BOT_FC.ftl", "我的Ai助手用户输入query意图识别"),
 
+    AIBOT_RAG("/ftl/AI_BOT_RAG.ftl", "我的Ai助手RAG问答"),
+
     RAG("/ftl/RAG.ftl", "公众号文章RAG问答"),
 
     MOMENT_REVIEW("/ftl/MOMENT_REVIEW.ftl", "朋友圈动态内容审核"),

+ 4 - 1
webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageQueueEnum.java

@@ -21,7 +21,10 @@ public enum MessageQueueEnum {
 
     QUEUE_CHAT_VIDEO_MESH("queue_chat_video_mesh", "基于Mesh模式的多人音视频聊天信令消息队列"),
 
-    QUEUE_MOMENT_PUBLISH("queue_moment_publish", "朋友圈动态发布消息队列");
+    QUEUE_MOMENT_PUBLISH("queue_moment_publish", "朋友圈动态发布消息队列"),
+
+    QUEUE_SYNC_SEARCH_DB_MESSAGE("queue_sync_search_db_message", "同步搜索服务存储队列"),
+    ;
 
     private String queue;
 

+ 15 - 0
webchat-common/src/main/java/com/webchat/common/enums/search/DataSourceTypeEnum.java

@@ -1,8 +1,23 @@
 package com.webchat.common.enums.search;
 
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 
 @Getter
+@AllArgsConstructor
 public enum DataSourceTypeEnum {
+
+
+    ARTICLE("公众号文章"),
+
+    IM_MESSAGE("IM消息");
+
+    private String sourceName;
+
+
+    public String getSource() {
+        return this.name();
+    }
+
 }

+ 6 - 6
webchat-common/src/main/java/com/webchat/common/exception/AiCallException.java

@@ -2,29 +2,29 @@ package com.webchat.common.exception;
 
 import com.webchat.common.enums.APIErrorCommonEnum;
 
-public class BusinessException extends RuntimeException {
+public class AiCallException extends RuntimeException {
 
     private Integer code = 500;
 
-    public BusinessException() {
+    public AiCallException() {
         super();
     }
 
-    public BusinessException(String msg) {
+    public AiCallException(String msg) {
         super(msg);
     }
 
-    public BusinessException(APIErrorCommonEnum apiErrorCommonEnum) {
+    public AiCallException(APIErrorCommonEnum apiErrorCommonEnum) {
         super(apiErrorCommonEnum.getMessage());
         this.code = apiErrorCommonEnum.getCode();
     }
 
-    public BusinessException(Integer code, String msg) {
+    public AiCallException(Integer code, String msg) {
         super(msg);
         this.code = code;
     }
 
-    public BusinessException(String message, Throwable cause) {
+    public AiCallException(String message, Throwable cause) {
         super(message, cause);
     }
 

+ 30 - 1
webchat-domain/src/main/java/com/webchat/domain/dto/search/SyncSearchEngineDTO.java

@@ -1,4 +1,33 @@
 package com.webchat.domain.dto.search;
 
-public class SyncSearchEngineDTO {
+
+import com.webchat.domain.dto.queue.BaseQueueDTO;
+import lombok.Data;
+
+@Data
+public class SyncSearchEngineDTO extends BaseQueueDTO {
+
+    /**
+     * 业务数据主键
+     */
+    private String pk;
+
+
+    /**
+     * 业务数据类型
+     * com.webchat.common.enums.search.DataSourceTypeEnum
+     */
+    private String sourceType;
+
+    /**
+     * 内容总结,如文章标题
+     */
+    private String summary;
+
+    /**
+     * 内容,主要对content做embedding
+     */
+    private String content;
+
+
 }

+ 4 - 25
webchat-domain/src/main/java/com/webchat/domain/dto/search/SyncSearchEngineListDTO.java

@@ -2,34 +2,13 @@ package com.webchat.domain.dto.search;
 
 
 import com.webchat.domain.dto.queue.BaseQueueDTO;
-import lombok.Builder;
 import lombok.Data;
 
-@Data
-@Builder
-public class SyncSearchEngineDTO extends BaseQueueDTO {
-
-    /**
-     * 业务数据主键
-     */
-    private String pk;
-
+import java.util.List;
 
-    /**
-     * 业务数据类型
-     * com.webchat.common.enums.search.DataSourceTypeEnum
-     */
-    private String sourceType;
-
-    /**
-     * 内容总结,如文章标题
-     */
-    private String summary;
-
-    /**
-     * 内容,主要对content做embedding
-     */
-    private String content;
+@Data
+public class SyncSearchEngineListDTO extends BaseQueueDTO {
 
 
+    private List<SyncSearchEngineDTO> dataList;
 }

+ 19 - 0
webchat-domain/src/main/java/com/webchat/domain/vo/request/search/MilvusSearchRequestVO.java

@@ -0,0 +1,19 @@
+package com.webchat.domain.vo.request.search;
+
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class MilvusSearchRequestVO {
+
+
+    private String userId;
+
+    private String query;
+
+    private Integer topK;
+
+    private float score;
+}

+ 16 - 0
webchat-domain/src/main/java/com/webchat/domain/vo/response/search/ArticleMilvusSearchResponse.java

@@ -0,0 +1,16 @@
+package com.webchat.domain.vo.response.search;
+
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ArticleMilvusSearchResponse {
+
+    private Long articleId;
+    private String summary;
+    private String content;
+}

+ 34 - 1
webchat-pgc/src/main/java/com/webchat/pgc/messagequeue/consumer/ArticlePushDelayQueueConsumer.java

@@ -2,14 +2,21 @@ package com.webchat.pgc.messagequeue.consumer;
 
 import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
 import com.webchat.common.enums.messagequeue.MessageQueueEnum;
+import com.webchat.common.enums.search.DataSourceTypeEnum;
 import com.webchat.common.service.messagequeue.consumer.AbstractRedisDelayQueueConsumer;
 import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
 import com.webchat.common.util.JsonUtil;
 import com.webchat.domain.dto.queue.ArticleDelayMessageDTO;
+import com.webchat.domain.dto.search.SyncSearchEngineDTO;
+import com.webchat.domain.dto.search.SyncSearchEngineListDTO;
+import com.webchat.domain.vo.response.publicaccount.ArticleBaseResponseVO;
 import com.webchat.pgc.repository.dao.IArticleDAO;
+import com.webchat.pgc.service.OfficialArticleService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -23,8 +30,14 @@ public class ArticlePushDelayQueueConsumer extends AbstractRedisDelayQueueConsum
     private MessageQueueProducer<Set<String>, Long> messageQueueProducer;
 
     @Autowired
+    private MessageQueueProducer<SyncSearchEngineListDTO, Long> syncSearchEngineMessageQueueProducer;
+
+    @Autowired
     private IArticleDAO articleDAO;
 
+    @Autowired
+    private OfficialArticleService officialArticleService;
+
     @Override
     protected ArticleDelayMessageDTO convert(String s) {
 
@@ -40,6 +53,27 @@ public class ArticlePushDelayQueueConsumer extends AbstractRedisDelayQueueConsum
     @Override
     protected void doNextConsume(Set<String> messages) {
 
+        Set<ArticleDelayMessageDTO> articleDelayMessages = messages.stream().map(this::convert).collect(Collectors.toSet());
+
+        // 走redis批量查询文章详情
+        List<Long> articleIdList = articleDelayMessages.stream().map(ArticleDelayMessageDTO::getArticleId).toList();
+        Map<Long, ArticleBaseResponseVO> articleDetailMap = officialArticleService.batchGetArticleDetailFromCache(articleIdList);
+        /**
+         * 构造同步搜索引擎业务数据
+         */
+        List<SyncSearchEngineDTO> syncDataList = articleDelayMessages.stream().map(am -> {
+            ArticleBaseResponseVO article = articleDetailMap.get(am.getArticleId());
+            SyncSearchEngineDTO syncSearchEngineDTO = new SyncSearchEngineDTO();
+            syncSearchEngineDTO.setPk(String.valueOf(am.getArticleId()));
+            syncSearchEngineDTO.setSummary(String.format("《%s》: %s", article.getTitle(), article.getDescription()));
+            syncSearchEngineDTO.setContent(article.getContent());
+            syncSearchEngineDTO.setSourceType(DataSourceTypeEnum.ARTICLE.getSource());
+            return syncSearchEngineDTO;
+        }).collect(Collectors.toList());
+        SyncSearchEngineListDTO syncData = new SyncSearchEngineListDTO();
+        syncData.setDataList(syncDataList);
+        syncSearchEngineMessageQueueProducer.send(MessageQueueEnum.QUEUE_SYNC_SEARCH_DB_MESSAGE, syncData);
+
         /**
          * 1. 广播通知connect service完成推文、以及消息持久化,批量提交任务到普通队列
          */
@@ -47,7 +81,6 @@ public class ArticlePushDelayQueueConsumer extends AbstractRedisDelayQueueConsum
         /**
          * 2. 更新文章已经推送状态
          */
-        Set<ArticleDelayMessageDTO> articleDelayMessages = messages.stream().map(this::convert).collect(Collectors.toSet());
         Set<Long> articleIds = articleDelayMessages.stream()
                 .map(ArticleDelayMessageDTO::getArticleId).collect(Collectors.toSet());
         articleDAO.updatePushedStatus(articleIds);

+ 30 - 1
webchat-remote/src/main/java/com/webchat/rmi/aigc/BasicModelServiceClient.java

@@ -1,2 +1,31 @@
-package com.webchat.rmi.aigc;public class BasicModelServiceClient {
+package com.webchat.rmi.aigc;
+
+
+import com.webchat.common.bean.APIResponseBean;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+import java.util.List;
+
+@FeignClient(name = "webchat-aigc-service", contextId = "basicModelServiceClient")
+public interface BasicModelServiceClient {
+
+    /**
+     * 单个字符串embedding
+     *
+     * @param text
+     * @return
+     */
+    @PostMapping("/aigc-service/basic/ai/embedding/text")
+    APIResponseBean<float[]> embed(@RequestBody String text);
+
+    /**
+     * 批量字符串embedding
+     *
+     * @param texts
+     * @return
+     */
+    @PostMapping("/aigc-service/basic/ai/embedding/texts")
+    APIResponseBean<List<float[]>> embed(@RequestBody List<String> texts);
 }

+ 42 - 0
webchat-remote/src/main/java/com/webchat/rmi/search/VectorSearchEngineClient.java

@@ -0,0 +1,42 @@
+package com.webchat.rmi.search;
+
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.domain.vo.request.search.MilvusSearchRequestVO;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+
+@FeignClient(name = "webchat-search-service", contextId = "vectorSearchEngineClient")
+public interface VectorSearchEngineClient {
+
+
+    /**
+     * 基于场景创建向量集合
+     *
+     * @return
+     */
+    @PostMapping("/search-service/vector/connection/create/{biz}")
+    APIResponseBean<Boolean> createCollection(@PathVariable String biz);
+
+    /**
+     * 删除指定场景的向量集合
+     *
+     * @return
+     */
+    @PostMapping("/search-service/vector/connection/drop/{biz}")
+    APIResponseBean<Boolean> dropCollection(@PathVariable String biz);
+
+    /**
+     * 基于向量相似度数据搜索
+     *
+     * @param biz
+     * @param milvusSearchReq
+     * @return
+     */
+    @PostMapping("/search-service/vector/search/{biz}")
+    APIResponseBean<Object> search(@PathVariable String biz,
+                                   @RequestBody MilvusSearchRequestVO milvusSearchReq);
+
+}

+ 7 - 0
webchat-search/pom.xml

@@ -19,11 +19,18 @@
 
     <properties>
 
+        <milvus-sdk-java.version>2.5.7</milvus-sdk-java.version>
     </properties>
 
     <dependencies>
 
         <dependency>
+            <groupId>io.milvus</groupId>
+            <artifactId>milvus-sdk-java</artifactId>
+            <version>${milvus-sdk-java.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.webchat</groupId>
             <artifactId>webchat-remote</artifactId>
             <version>1.0-SNAPSHOT</version>

+ 10 - 1
webchat-search/src/main/java/com/webchat/search/WebchatSearchApplication.java

@@ -1,8 +1,10 @@
 package com.webchat.search;
 
+import com.webchat.common.util.SpringContextUtil;
+import com.webchat.search.messagequeue.consumer.ArticleSyncRedisQueueConsumer;
+import com.webchat.search.service.voctor.ArticleMilvusService;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
 import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
 import org.springframework.cloud.openfeign.EnableFeignClients;
 import org.springframework.context.annotation.ComponentScan;
@@ -15,5 +17,12 @@ public class WebchatSearchApplication {
 
     public static void main(String[] args) {
         SpringApplication.run(WebchatSearchApplication.class, args);
+
+        /**
+         * 服务启动成功后开启队列消费(文章数据同步Milvus)
+         */
+        SpringContextUtil.getBean(ArticleSyncRedisQueueConsumer.class).initBean();
+
+        SpringContextUtil.getBean(ArticleMilvusService.class).consumeTask();
     }
 }

+ 28 - 0
webchat-search/src/main/java/com/webchat/search/config/MilvusConfig.java

@@ -0,0 +1,28 @@
+package com.webchat.search.config;
+
+
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MilvusConfig {
+
+
+    @Value("${milvus.host}")
+    private String host;
+
+    @Value("${milvus.port}")
+    private String port;
+
+
+    @Bean
+    public MilvusClientV2 milvusClientV2() {
+        String uri = "http://" + host + ":" + port;
+        ConnectConfig connectConfig = ConnectConfig.builder().uri(uri).build();
+        return new MilvusClientV2(connectConfig);
+    }
+
+}

+ 40 - 0
webchat-search/src/main/java/com/webchat/search/controller/VectorSearchEngineController.java

@@ -0,0 +1,40 @@
+package com.webchat.search.controller;
+
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.domain.vo.request.search.MilvusSearchRequestVO;
+import com.webchat.rmi.search.VectorSearchEngineClient;
+import com.webchat.search.service.voctor.AbstractMilvusService;
+import com.webchat.search.service.voctor.MilvusServiceFactory;
+import org.springframework.util.Assert;
+import org.springframework.web.bind.annotation.RestController;
+
+
+@RestController
+public class VectorSearchEngineController implements VectorSearchEngineClient {
+
+
+    @Override
+    public APIResponseBean<Boolean> createCollection(String biz) {
+        MilvusServiceFactory.getService(biz).createCollection();
+        return APIResponseBeanUtil.success(true);
+    }
+
+    @Override
+    public APIResponseBean<Boolean> dropCollection(String biz) {
+        MilvusServiceFactory.getService(biz).dropCollection();
+        return APIResponseBeanUtil.success(true);
+    }
+
+    @Override
+    public APIResponseBean<Object> search(String biz, MilvusSearchRequestVO milvusSearchReq) {
+        String query = milvusSearchReq.getQuery();
+        Assert.notNull(query, "query must not be null!");
+        Integer topK = milvusSearchReq.getTopK();
+        topK = topK == null ? 1 : topK;
+        float score = milvusSearchReq.getScore();
+        AbstractMilvusService milvusService = MilvusServiceFactory.getService(biz);
+        Object result = milvusService.search(query, topK, score);
+        return APIResponseBeanUtil.success(result);
+    }
+}

+ 66 - 0
webchat-search/src/main/java/com/webchat/search/messagequeue/consumer/ArticleSyncRedisQueueConsumer.java

@@ -0,0 +1,66 @@
+package com.webchat.search.messagequeue.consumer;
+
+import com.webchat.common.enums.messagequeue.MessageQueueEnum;
+import com.webchat.common.service.messagequeue.consumer.AbstractRedisQueueConsumer;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.domain.dto.search.SyncSearchEngineListDTO;
+import com.webchat.search.service.voctor.ArticleMilvusService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
+
+/**
+ * 公众号推文consumer
+ */
+@Component
+@Lazy(value = false)
+@Slf4j
+public class ArticleSyncRedisQueueConsumer extends AbstractRedisQueueConsumer<SyncSearchEngineListDTO> {
+
+
+    @Autowired
+    private ArticleMilvusService articleMilvusService;
+
+    public void initBean() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                schedule();
+            }
+        }).start();
+    }
+
+    @Override
+    protected SyncSearchEngineListDTO convert(String s) {
+
+        return JsonUtil.fromJson(s, SyncSearchEngineListDTO.class);
+    }
+
+    @Override
+    protected MessageQueueEnum getMessageQueue() {
+
+        return MessageQueueEnum.QUEUE_SYNC_SEARCH_DB_MESSAGE;
+    }
+
+    @Override
+    protected void receive(SyncSearchEngineListDTO data) {
+        /**
+         * 消息处理:向量库
+         */
+        articleMilvusService.doHandleSyncData(data);
+    }
+
+    @Override
+    protected void error(SyncSearchEngineListDTO data, Exception ex) {
+
+        /**
+         * 消费失败,重新放回队列
+         */
+        backToQueue(data);
+        /**
+         * 预警通知
+         */
+        log.error("SyncDataRedisQueueConsumer error", ex);
+    }
+}

+ 0 - 4
webchat-search/src/main/java/com/webchat/search/messagequeue/consumer/SyncDataRedisQueueConsumer.java

@@ -1,4 +0,0 @@
-package com.webchat.search.messagequeue.consumer;
-
-public class SyncDataRedisQueueConsumer {
-}

+ 81 - 0
webchat-search/src/main/java/com/webchat/search/service/voctor/AbstractMilvusQueueService.java

@@ -0,0 +1,81 @@
+package com.webchat.search.service.voctor;
+
+
+import io.milvus.param.R;
+import jakarta.annotation.PostConstruct;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+@Slf4j
+public abstract class AbstractMilvusQueueService<T, R, D> extends AbstractMilvusService<T, R> {
+
+    /**
+     * 队列消费失败,最大重试3次
+     */
+    private static final int RETRY_TIMES = 3;
+
+    /**
+     * 由具体的实现类返回
+     *
+     * @return
+     */
+    protected abstract ArrayBlockingQueue<D> queue();
+
+    /**
+     * 采用队列模式消费
+     *
+     * @return
+     */
+    @Override
+    protected boolean queueModel() {return true;}
+
+    /**
+     * 如果走队列实现方式,可以再次重新数据写入的方法
+     *
+     * @param data
+     * @return
+     */
+    public abstract boolean doWriteCollection(D data);
+
+    @Override
+    public boolean writeCollection(T data) {
+
+        return true;
+    }
+
+    /**
+     * 重新加入队列
+     *
+     * @param task
+     */
+    protected void putTaskQueue(D task) {
+        int retryTimes = 0;
+        while (retryTimes <= RETRY_TIMES) {
+            try {
+                queue().put(task);
+                return;
+            } catch (InterruptedException e) {
+                retryTimes ++;
+            }
+        }
+    }
+
+    public void consumeTask() {
+        while (true) {
+            D task = null;
+            try {
+                task = queue().take();
+                Thread.sleep(1000);
+                if (task != null) {
+                    // 处理任务
+                    this.doWriteCollection(task);
+                }
+            } catch (Exception e) {
+                // TODO
+                this.putTaskQueue(task);
+            }
+        }
+    }
+}

+ 191 - 0
webchat-search/src/main/java/com/webchat/search/service/voctor/AbstractMilvusService.java

@@ -0,0 +1,191 @@
+package com.webchat.search.service.voctor;
+
+
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.common.exception.AiCallException;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.rmi.aigc.BasicModelServiceClient;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.common.IndexParam;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import io.milvus.v2.service.collection.request.DropCollectionReq;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.List;
+
+
+@Slf4j
+public abstract class AbstractMilvusService<T, R> {
+
+    @Autowired
+    public MilvusClientV2 client;
+
+    @Autowired
+    private BasicModelServiceClient basicModelClient;
+
+    /**
+     * 默认查询相似度最高的1条
+     */
+    private final static int DEFAULT_TOP_K = 1;
+
+    private final static int DEFAULT_SCORE = 0;
+
+    /**
+     * 是否需要走队列模式,默认不需要
+     *
+     * @return
+     */
+    protected boolean queueModel() {
+        return false;
+    }
+
+    /**
+     * 加入队列,普通业务场景不需要单独实现,队列消费场景自己实现
+     * @param data
+     * @return
+     */
+    protected boolean addTaskQueue(T data) {
+        return true;
+    }
+
+    /**
+     * 向量纬度
+     * @return
+     */
+    protected abstract int dimension();
+
+    /**
+     * 集合名称,类似RDBMS中的数据库概念
+     *
+     * 明明定义由具体实现类来定义
+     * @return
+     */
+    protected abstract String collectionName();
+
+    /**
+     * schema 定义
+     *
+     * @return
+     */
+    protected abstract CreateCollectionReq.CollectionSchema schema();
+
+    /**
+     * 集合schema索引定义
+     *
+     * @return
+     */
+    protected abstract List<IndexParam> indexParams();
+
+    /**
+     * 数据写入
+     *
+     * @param data
+     * @return
+     */
+    public abstract boolean writeCollection(T data);
+
+    /**
+     * 处理同步任务数据
+     * @param data
+     */
+    public void doHandleSyncData(T data) {
+
+        boolean queueMedal = this.queueModel();
+        if (queueMedal) {
+            // 队列模式处理
+            this.addTaskQueue(data);
+        } else {
+            this.writeCollection(data);
+        }
+    }
+
+    /**
+     * 基于相似度的 TopK 条数据搜索
+     * @param query
+     * @param topK
+     * @return
+     */
+    public abstract List<R> search(String query, int topK, float gtScore);
+
+    public List<R>  search(String query, int topK) {
+        return this.search(query, topK, DEFAULT_SCORE);
+    }
+
+    public List<R>  search(String query) {
+
+        return this.search(query, DEFAULT_TOP_K);
+    }
+
+    /**
+     * 单条文本 embedding(文本转向量)
+     *
+     * @param text
+     * @return
+     */
+    protected float[] embed(String text) {
+
+        APIResponseBean<float[]> response = basicModelClient.embed(text);
+        if (APIResponseBeanUtil.isOk(response)) {
+            return response.getData();
+        }
+        log.error("embeeding err ===> text:{}, response:{}", text, JsonUtil.toJsonString(response));
+        throw new AiCallException("embedding err!");
+    }
+
+    /**
+     * 批量文本 embedding(文本转向量)
+     *
+     * @param texts
+     * @return
+     */
+    protected List<float[]> embed(List<String> texts) {
+
+        APIResponseBean<List<float[]>> response = basicModelClient.embed(texts);
+        if (APIResponseBeanUtil.isOk(response)) {
+            return response.getData();
+        }
+        log.error("embeeding err ===> texts:{}, response:{}",
+                JsonUtil.toJsonString(texts), JsonUtil.toJsonString(response));
+        throw new AiCallException("embedding err!");
+    }
+
+    /**
+     * 创建集合
+     */
+    public void createCollection() {
+        /**
+         * 集合名
+         */
+        String collectionName = this.collectionName();
+        /**
+         * 定义schema
+         */
+        CreateCollectionReq.CollectionSchema schema = this.schema();
+        /**
+         * 索引
+         */
+        List<IndexParam> indexParams = this.indexParams();
+        /**
+         * 创建集合
+         */
+        CreateCollectionReq request = CreateCollectionReq.builder()
+                .collectionName(collectionName)
+                .collectionSchema(schema)
+                .indexParams(indexParams)
+                .build();
+        client.createCollection(request);
+    }
+
+    /**
+     * 删除集合
+     *
+     */
+    public void dropCollection() {
+        DropCollectionReq request = DropCollectionReq.builder()
+                .collectionName(this.collectionName())
+                .build();
+        client.dropCollection(request);
+    }
+}

+ 256 - 0
webchat-search/src/main/java/com/webchat/search/service/voctor/ArticleMilvusService.java

@@ -0,0 +1,256 @@
+package com.webchat.search.service.voctor;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.webchat.common.constants.MilvusConstants;
+import com.webchat.common.enums.EmbeddingModelEnum;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.domain.dto.search.SyncSearchEngineDTO;
+import com.webchat.domain.dto.search.SyncSearchEngineListDTO;
+import com.webchat.domain.vo.response.search.ArticleMilvusSearchResponse;
+import io.milvus.v2.common.DataType;
+import io.milvus.v2.common.IndexParam;
+import io.milvus.v2.service.collection.request.AddFieldReq;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import io.milvus.v2.service.vector.request.AnnSearchReq;
+import io.milvus.v2.service.vector.request.HybridSearchReq;
+import io.milvus.v2.service.vector.request.InsertReq;
+import io.milvus.v2.service.vector.request.data.FloatVec;
+import io.milvus.v2.service.vector.request.ranker.WeightedRanker;
+import io.milvus.v2.service.vector.response.InsertResp;
+import io.milvus.v2.service.vector.response.SearchResp;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.springframework.stereotype.Service;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+@Slf4j
+@Service
+public class ArticleMilvusService extends AbstractMilvusQueueService<SyncSearchEngineListDTO, ArticleMilvusSearchResponse, SyncSearchEngineDTO> {
+
+
+    /**
+     * 本地队列初始长度
+     */
+    private static final int CAPACITY = 2000;
+
+    /**
+     * 本地队列,用于同步数据入向量库
+     */
+    public ArrayBlockingQueue<SyncSearchEngineDTO> queue = new ArrayBlockingQueue(CAPACITY);
+
+    /**
+     * 纬度,跟embedding模型纬度保持一致
+     * @return
+     */
+    @Override
+    protected int dimension() {
+        return EmbeddingModelEnum.ALIBABA.getDimension();
+    }
+
+    /**
+     * 集合定义
+     * @return
+     */
+    @Override
+    protected String collectionName() {
+
+        return MilvusConstants.CollectionName.COLLECTION_ARTICLE.name();
+    }
+
+    /**
+     * schema 定义
+     *
+     * @return
+     */
+    @Override
+    protected CreateCollectionReq.CollectionSchema schema() {
+
+        CreateCollectionReq.CollectionSchema schema = client.createSchema();
+        schema.addField(AddFieldReq.builder()
+                .fieldName("id")
+                .dataType(DataType.VarChar)
+                .isPrimaryKey(true)
+                .autoID(true)
+                .build());
+
+        /**
+         * 业务字段
+         */
+        schema.addField(AddFieldReq.builder()
+                .fieldName("pk")
+                .dataType(DataType.VarChar)
+                .maxLength(64)
+                .build());
+        schema.addField(AddFieldReq.builder()
+                .fieldName("source_type")
+                .dataType(DataType.VarChar)
+                .maxLength(64)
+                .build());
+        schema.addField(AddFieldReq.builder()
+                .fieldName("summary")
+                .dataType(DataType.VarChar)
+                .maxLength(500)
+                .build());
+        schema.addField(AddFieldReq.builder()
+                .fieldName("content")
+                .dataType(DataType.VarChar)
+                .maxLength(65535)
+                .build());
+
+        /**
+         * 向量字段
+         */
+        schema.addField(AddFieldReq.builder()
+                .fieldName("summary_vector")
+                .dataType(DataType.FloatVector)
+                .dimension(this.dimension())
+                .build());
+        schema.addField(AddFieldReq.builder()
+                .fieldName("content_vector")
+                .dataType(DataType.FloatVector)
+                .dimension(this.dimension())
+                .build());
+        return schema;
+    }
+
+    /**
+     * 索引定义
+     * @return
+     */
+    @Override
+    protected List<IndexParam> indexParams() {
+
+        IndexParam summaryVectorIndex = IndexParam.builder()
+                .fieldName("summary_vector")
+                // 余弦相似度
+                .metricType(IndexParam.MetricType.COSINE)
+                .build();
+        IndexParam contentVectorIndex = IndexParam.builder()
+                .fieldName("content_vector")
+                // 余弦相似度
+                .metricType(IndexParam.MetricType.COSINE)
+                .build();
+
+        return List.of(summaryVectorIndex, contentVectorIndex);
+    }
+
+    /**
+     * 入队业务逻辑实现,实际入队由父级putTaskQueue完成。
+     * @param syncData
+     * @return
+     */
+    @Override
+    protected boolean addTaskQueue(SyncSearchEngineListDTO syncData) {
+        if (CollectionUtils.isEmpty(syncData.getDataList())) {
+            return true;
+        }
+        // 文章同步任务加入队列
+        syncData.getDataList().forEach(super::putTaskQueue);
+        return true;
+    }
+
+    /**
+     * 基于相似度的topK条数据搜索
+     *
+     * 混合多向量搜索
+     *
+     * @param query
+     * @param topK
+     * @return
+     */
+    @Override
+    public List<ArticleMilvusSearchResponse> search(String query, int topK, float score) {
+
+        // 对用户query做embedding:这里embedding模型需要同同原数据embed采用同模型
+        float[] queryVector = super.embed(query);
+        AnnSearchReq summaryReq = AnnSearchReq.builder()
+                // 采用相似度检索算法:余弦相似度
+                .metricType(IndexParam.MetricType.COSINE)
+                .vectorFieldName("summary_vector")
+                .topK(topK)
+                .vectors(Collections.singletonList(new FloatVec(queryVector)))
+                .build();
+        AnnSearchReq contentReq = AnnSearchReq.builder()
+                // 采用相似度检索算法:余弦相似度
+                .metricType(IndexParam.MetricType.COSINE)
+                .vectorFieldName("content_vector")
+                .topK(topK)
+                .vectors(Collections.singletonList(new FloatVec(queryVector)))
+                .build();
+        HybridSearchReq request = HybridSearchReq.builder()
+                .collectionName(this.collectionName())
+                .searchRequests(Lists.newArrayList(summaryReq, contentReq))
+                .ranker(new WeightedRanker(Arrays.asList(0.6f, 0.4f)))
+                .topK(topK)
+                .outFields(Lists.newArrayList("pk", "summary", "content"))
+                .build();
+        SearchResp searchResp = client.hybridSearch(request);
+        List<List<SearchResp.SearchResult>> searchResults = searchResp.getSearchResults();
+        List<SearchResp.SearchResult> queryResults;
+        if (CollectionUtils.isEmpty(searchResults) || CollectionUtils.isEmpty(queryResults = searchResults.get(0))) {
+            return Collections.emptyList();
+        }
+        return queryResults.stream()
+                .filter(r -> r.getScore() > score)
+                .map(r -> {
+                    Map<String, Object> resultMap = r.getEntity();
+                    return new ArticleMilvusSearchResponse(Long.valueOf(String.valueOf(resultMap.get("pk"))),
+                                                           String.valueOf(resultMap.getOrDefault("summary", "")),
+                                                           String.valueOf(resultMap.getOrDefault("content", "")));
+                }).toList();
+    }
+
+    /**
+     * 队列
+     * @return
+     */
+    @Override
+    protected ArrayBlockingQueue<SyncSearchEngineDTO> queue() {
+        return queue;
+    }
+
+    /**
+     * 真正的数据写入实现
+     * @param data
+     * @return
+     */
+    @Override
+    public boolean doWriteCollection(SyncSearchEngineDTO data) {
+        JsonObject vector = new JsonObject();
+        String summary = data.getSummary();
+        String content = data.getContent();
+        // 对文章摘要做embed
+        float[] summaryVector = super.embed(summary);
+        float[] contentVector = super.embed(content);
+        /**
+         * 原数据写入
+         */
+        vector.addProperty("pk", data.getPk());
+        vector.addProperty("source_type", data.getSourceType());
+        vector.addProperty("summary", data.getSummary());
+        vector.addProperty("content", data.getContent());
+        /**
+         * 向量字段处理
+         */
+        Gson gson = new Gson();
+        vector.add("summary_vector", gson.toJsonTree(summaryVector));
+        vector.add("content_vector", gson.toJsonTree(contentVector));
+        InsertReq request = InsertReq.builder()
+                .collectionName(collectionName())
+                .data(Collections.singletonList(vector))
+                .build();
+        log.info("文章向量数据入库>>>> pk:{}", data.getPk());
+        InsertResp resp = client.insert(request);
+        log.info("文章向量数据入库完成>>>> pk:{}, resp:{}", data.getPk(), JsonUtil.toJsonString(resp));
+        return true;
+    }
+}

+ 50 - 0
webchat-search/src/main/java/com/webchat/search/service/voctor/MilvusServiceFactory.java

@@ -0,0 +1,50 @@
+package com.webchat.search.service.voctor;
+
+
+import com.webchat.common.constants.MilvusConstants;
+import com.webchat.common.enums.AccountRelationTypeEnum;
+import com.webchat.common.enums.RoleCodeEnum;
+import com.webchat.common.exception.BusinessException;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+public class MilvusServiceFactory implements InitializingBean, ApplicationContextAware {
+
+    private ApplicationContext applicationContext;
+
+    private static Map<String, AbstractMilvusService> services = new HashMap<>();
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        this.registryMilvusSearchService();
+    }
+
+    /**
+     * 注册向量检索服务
+     *
+     */
+    private void registryMilvusSearchService() {
+
+        services.put(MilvusConstants.SearchBiz.SEARCH_ARTICLE.getBiz(), applicationContext.getBean(ArticleMilvusService.class));
+    }
+
+    public static AbstractMilvusService getService(String biz) {
+        AbstractMilvusService abstractMilvusService = services.get(biz);
+        if (abstractMilvusService != null) {
+            return abstractMilvusService;
+        }
+        throw new BusinessException("不支持的向量检索服务");
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+}