Przeglądaj źródła

基于延迟队列+广播公众号推文功能更新

wangqi49 1 miesiąc temu
rodzic
commit
4d5b4539ec
35 zmienionych plików z 1065 dodań i 89 usunięć
  1. 24 0
      resources/database-sql/webchat-pgc.sql
  2. 26 19
      resources/nacos-yaml/webchat-pgc-service-dev.yaml
  3. 35 0
      webchat-admin/src/main/java/com/webchat/admin/controller/OfficialArticleController.java
  4. 28 0
      webchat-admin/src/main/java/com/webchat/admin/service/OfficialArticleService.java
  5. 1 1
      webchat-admin/src/main/resources/application.yml
  6. 2 0
      webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageBroadChannelEnum.java
  7. 2 0
      webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageQueueEnum.java
  8. 6 6
      webchat-common/src/main/java/com/webchat/common/service/messagequeue/consumer/AbstractRedisDelayQueueConsumer.java
  9. 11 1
      webchat-common/src/main/java/com/webchat/common/service/messagequeue/producer/AbstractMessageQueueSender.java
  10. 7 0
      webchat-common/src/main/java/com/webchat/common/service/messagequeue/producer/RedisMessageQueueSender.java
  11. 5 0
      webchat-common/src/main/java/com/webchat/common/service/messagequeue/producer/RocketMessageQueueSender.java
  12. 4 0
      webchat-connect/src/main/java/com/webchat/connect/messagequeue/config/RedisConfig.java
  13. 29 0
      webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/redis/ArticlePushRedisQueueListener.java
  14. 122 0
      webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/service/ArticlePushConsumeService.java
  15. 28 4
      webchat-connect/src/main/java/com/webchat/connect/service/AccountService.java
  16. 0 1
      webchat-domain/src/main/java/com/webchat/domain/dto/queue/ArticleDelayMessageDTO.java
  17. 0 1
      webchat-domain/src/main/java/com/webchat/domain/vo/request/publicaccount/SaveArticleRequestVO.java
  18. 3 0
      webchat-domain/src/main/java/com/webchat/domain/vo/response/mess/PublicAccountArticleMessageVO.java
  19. 9 9
      webchat-pgc/pom.xml
  20. 4 0
      webchat-pgc/src/main/java/com/webchat/pgc/WebchatPGCApplication.java
  21. 34 0
      webchat-pgc/src/main/java/com/webchat/pgc/controller/OfficialArticleController.java
  22. 0 34
      webchat-pgc/src/main/java/com/webchat/pgc/controller/TestController.java
  23. 43 0
      webchat-pgc/src/main/java/com/webchat/pgc/messagequeue/consumer/ArticlePushDelayQueueConsumer.java
  24. 22 0
      webchat-pgc/src/main/java/com/webchat/pgc/repository/dao/IArticleDAO.java
  25. 88 0
      webchat-pgc/src/main/java/com/webchat/pgc/repository/entity/ArticleEntity.java
  26. 28 0
      webchat-pgc/src/main/java/com/webchat/pgc/repository/entity/BaseEntity.java
  27. 36 0
      webchat-pgc/src/main/java/com/webchat/pgc/repository/entity/SimpleBaseEntity.java
  28. 83 0
      webchat-pgc/src/main/java/com/webchat/pgc/service/AccountService.java
  29. 333 0
      webchat-pgc/src/main/java/com/webchat/pgc/service/OfficialArticleService.java
  30. 35 0
      webchat-remote/src/main/java/com/webchat/rmi/pgc/OfficialArticleClient.java
  31. 3 3
      webchat-remote/src/main/java/com/webchat/rmi/user/UserServiceClient.java
  32. 3 2
      webchat-user/src/main/java/com/webchat/user/controller/UserServiceController.java
  33. 7 5
      webchat-user/src/main/java/com/webchat/user/service/UserService.java
  34. 1 2
      webchat-user/src/main/java/com/webchat/user/service/relation/AbstractAccountRelationService.java
  35. 3 1
      webchat-user/src/main/java/com/webchat/user/service/relation/User2OfficialAccountRelationService.java

+ 24 - 0
resources/database-sql/webchat-pgc.sql

@@ -0,0 +1,24 @@
+
+-- 公众号文章表
+CREATE TABLE webchat_pgc.web_chat_article (
+      `ID` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID',
+      `PUBLIC_ACCOUNT` char(60) NOT NULL COMMENT '绑定公众号账号',
+      `AUTHOR` char(60) NOT NULL COMMENT '作者',
+      `TITLE` varchar(100) NOT NULL COMMENT '文章标题',
+      `CONTENT` longtext NOT NULL COMMENT '正文',
+      `COVER` varchar(400) NOT NULL COMMENT '封面图',
+      `REDIRECT_URL` varchar(400) DEFAULT NULL COMMENT '外部连接',
+      `DESCRIPTION` varchar(200) DEFAULT NULL COMMENT '摘要/智能总结',
+      `signs` varchar(200) DEFAULT NULL COMMENT '标签',
+      `PLAN_PUSH_DATE` datetime DEFAULT NULL  COMMENT '计划推文时间',
+      `STATUS` tinyint(1) DEFAULT 1 COMMENT '文章状态',
+      `CREATE_BY` char(100) DEFAULT NULL COMMENT '创建人',
+      `CREATE_DATE` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+      `UPDATE_BY` char(100) DEFAULT NULL COMMENT '更新人',
+      `UPDATE_DATE` datetime DEFAULT NULL COMMENT '更新时间',
+      `VERSION` int DEFAULT '0' COMMENT '版本',
+      PRIMARY KEY (`ID`),
+      KEY `INDEX_STATUS_PUBLIC_ACCOUNT` (`STATUS`, `PUBLIC_ACCOUNT`),
+      KEY `INDEX_PLAN_PUSH_DATE` (`PLAN_PUSH_DATE`)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='公众号文章表';
+

+ 26 - 19
resources/nacos-yaml/webchat-pgc-service-dev.yaml

@@ -1,30 +1,37 @@
 #---------------------------------数据库配置----------------------------------#
 spring:
   datasource:
-    url: jdbc:mysql://127.0.0.1:3306/webchat?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8&&useSSL=false
+    url: jdbc:mysql://127.0.0.1:3306/webchat_pgc?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8&&useSSL=false
     username: root
     password: 12345678
     driver-class-name: com.mysql.jdbc.Driver
     type: com.zaxxer.hikari.HikariDataSource
     hikari:
-    minimum-idle: 10
-    maximum-pool-size: 50
-    auto-commit: false
-    idle-timeout: 30000
-    pool-name: DatebookHikariCP
-    max-lifetime: 1800000
-    connection-timeout: 30000
-    connection-test-query: SELECT 1
-    connection-init-sql: set names utf8mb4
-  jpa:
-    show-sql: true
+      minimum-idle: 10
+      maximum-pool-size: 50
+      auto-commit: false
+      idle-timeout: 30000
+      pool-name: DatebookHikariCP
+      max-lifetime: 1800000
+      connection-timeout: 30000
+      connection-test-query: SELECT 1
+      connection-init-sql: set names utf8mb4
+    jpa:
+      show-sql: true
   #---------------------------------redis----------------------------------#
   data:
     redis:
-    port: 6379
-    database: 6
-    jedis:
-      pool:
-      max-active: 100
-      max-wait: -1
-      min-idle: 10
+      port: 6379
+      database: 6
+      jedis:
+        pool:
+          max-active: 100
+          max-wait: -1
+          min-idle: 10
+
+rocketmq:
+  name-server: 127.0.0.1:9876
+  consumer:
+    group: web_chat
+  producer:
+    group: web_chat

+ 35 - 0
webchat-admin/src/main/java/com/webchat/admin/controller/OfficialArticleController.java

@@ -0,0 +1,35 @@
+package com.webchat.admin.controller;
+
+
+import com.webchat.admin.service.OfficialArticleService;
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.common.helper.SessionHelper;
+import com.webchat.domain.vo.request.publicaccount.SaveArticleRequestVO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/admin-service/official/article")
+public class OfficialArticleController {
+
+    @Autowired
+    private OfficialArticleService officialArticleService;
+
+    /**
+     * 提交文章
+     *
+     * @return
+     */
+    @PostMapping("/submit")
+    public APIResponseBean<Long> submit(@RequestBody SaveArticleRequestVO saveArticleRequest) {
+        // TODO 参数校验
+        String userId = SessionHelper.getCurrentUserId();
+        saveArticleRequest.setAuthor(userId);
+        return APIResponseBeanUtil.success(officialArticleService.submit(saveArticleRequest));
+    }
+
+}

+ 28 - 0
webchat-admin/src/main/java/com/webchat/admin/service/OfficialArticleService.java

@@ -0,0 +1,28 @@
+package com.webchat.admin.service;
+
+
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.common.exception.BusinessException;
+import com.webchat.domain.vo.request.publicaccount.SaveArticleRequestVO;
+import com.webchat.rmi.pgc.OfficialArticleClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class OfficialArticleService {
+
+
+    @Autowired
+    private OfficialArticleClient officialArticleClient;
+
+
+    public Long submit(SaveArticleRequestVO saveArticleRequest) {
+
+        APIResponseBean<Long> responseBean = officialArticleClient.submit(saveArticleRequest);
+        if (APIResponseBeanUtil.isOk(responseBean)) {
+            return responseBean.getData();
+        }
+        throw new BusinessException("公众号文章提交失败!");
+    }
+}

+ 1 - 1
webchat-admin/src/main/resources/application.yml

@@ -1 +1 @@
-server:
  port: 8081
+server:
  port: 8010

+ 2 - 0
webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageBroadChannelEnum.java

@@ -12,6 +12,8 @@ public enum MessageBroadChannelEnum {
 
     QUEUE_CHAT_MESSAGE("queue_chat_message", "聊天消息队列"),
 
+    QUEUE_ARTICLE_PUSH_MESSAGE("queue_article_push_message", "公众号推文队列"),
+
     QUEUE_CHAT_NOTIFY("queue_chat_notify", "聊天消息通知"),
 
     QUEUE_CHAT_ROBOT("queue_chat_robot", "机器人对话消息队列"),

+ 2 - 0
webchat-common/src/main/java/com/webchat/common/enums/messagequeue/MessageQueueEnum.java

@@ -13,6 +13,8 @@ public enum MessageQueueEnum {
 
     QUEUE_CHAT_MESSAGE("queue_chat_message", "聊天消息队列"),
 
+    QUEUE_OFFICIAL_ARTICLE_PUSH_MESSAGE("queue_official_article_push_message", "公众号推文"),
+
     QUEUE_PERSISTENT_MESSAGE("queue_persistent_message", "消息持久化队列"),
 
     QUEUE_CHAT_VIDEO_P2P("queue_chat_video_p2p", "P2P(一对一)音视频聊天信令消息队列"),

+ 6 - 6
webchat-common/src/main/java/com/webchat/common/service/messagequeue/consumer/AbstractRedisDelayQueueConsumer.java

@@ -25,9 +25,6 @@ public abstract class AbstractRedisDelayQueueConsumer<T extends BaseDelayQueueDT
     @Autowired
     protected RedisService redisService;
 
-    @Autowired
-    private MessageQueueProducer messageQueueProducer;
-
     private volatile boolean isInit = false;
 
     private volatile boolean isRun = true;
@@ -103,6 +100,11 @@ public abstract class AbstractRedisDelayQueueConsumer<T extends BaseDelayQueueDT
      */
     protected abstract MessageQueueEnum getMessageQueue();
 
+    /**
+     * 执行下一步消费实际处理逻辑
+     */
+    protected abstract void doNextConsume(Set<String> messages);
+
     protected String getQueueName() {
         return this.getMessageQueue().getQueue();
     }
@@ -147,13 +149,11 @@ public abstract class AbstractRedisDelayQueueConsumer<T extends BaseDelayQueueDT
                     semaphore.release();
                     continue;
                 }
-                BaseDelayNormalQueueDTO normalQueueDTO = BaseDelayNormalQueueDTO.of(messages);
                 poolExecutor.execute(new Runnable() {
                     @Override
                     public void run() {
                         try {
-                            // 批量提交任务到普通队列
-                            messageQueueProducer.send(getMessageQueue(), normalQueueDTO);
+                            doNextConsume(messages);
                         } finally {
                             // 任务执行完释放信号量
                             semaphore.release();

+ 11 - 1
webchat-common/src/main/java/com/webchat/common/service/messagequeue/producer/AbstractMessageQueueSender.java

@@ -34,6 +34,15 @@ public abstract class AbstractMessageQueueSender<T, P> implements MessageQueueSe
      */
     protected abstract boolean broadSend(String channel, T message);
 
+    /**
+     * 优先级队列消息推送
+     *
+     * @param channel
+     * @param message
+     * @return
+     */
+    protected abstract boolean prioritySend(String channel, T message, P priority);
+
 
     @Override
     public void send(MessageQueueEnum queue, T message) {
@@ -43,7 +52,8 @@ public abstract class AbstractMessageQueueSender<T, P> implements MessageQueueSe
 
     @Override
     public void prioritySend(MessageQueueEnum queue, T message, P priority) {
-        // TODO
+        // 优先级消息推送
+        this.prioritySend(queue.getQueue(), message, priority);
     }
 
     @Override

+ 7 - 0
webchat-common/src/main/java/com/webchat/common/service/messagequeue/producer/RedisMessageQueueSender.java

@@ -36,4 +36,11 @@ public class RedisMessageQueueSender<T, P> extends AbstractMessageQueueSender<T,
         redisTemplate.convertAndSend(channel, messageValue);
         return true;
     }
+
+    @Override
+    protected boolean prioritySend(String queue, T message, P priority) {
+        String messageValue = super.serializeMessage(message);
+        redisService.zadd(queue, messageValue, (Long) priority, -1L);
+        return true;
+    }
 }

+ 5 - 0
webchat-common/src/main/java/com/webchat/common/service/messagequeue/producer/RocketMessageQueueSender.java

@@ -24,4 +24,9 @@ public class RocketMessageQueueSender<T, P> extends AbstractMessageQueueSender<T
         // 对RocketMQ而言,广播模式的关键在于消费者端的配置。消费者需要明确指定消费模式为广播模式
         return this.doSend(channel, message);
     }
+
+    @Override
+    protected boolean prioritySend(String channel, T message, P priority) {
+        return false;
+    }
 }

+ 4 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/config/RedisConfig.java

@@ -1,6 +1,7 @@
 package com.webchat.connect.messagequeue.config;
 
 import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
+import com.webchat.connect.messagequeue.consumer.redis.ArticlePushRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.ChatMessageRedisQueueListener;
 import com.webchat.connect.messagequeue.consumer.redis.ChatNotifyRedisQueueListener;
 import jakarta.annotation.Resource;
@@ -17,6 +18,8 @@ public class RedisConfig {
     private ChatNotifyRedisQueueListener notifyQueueListener;
     @Resource
     private ChatMessageRedisQueueListener chatMessageRedisQueueListener;
+    @Resource
+    private ArticlePushRedisQueueListener articlePushRedisQueueListener;
 
     @Bean
     public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
@@ -26,6 +29,7 @@ public class RedisConfig {
         // 配置消息监听器
         container.addMessageListener(notifyQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_CHAT_NOTIFY.getChannel()));
         container.addMessageListener(chatMessageRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_CHAT_MESSAGE.getChannel()));
+        container.addMessageListener(articlePushRedisQueueListener, new ChannelTopic(MessageBroadChannelEnum.QUEUE_ARTICLE_PUSH_MESSAGE.getChannel()));
         return container;
     }
 }

+ 29 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/redis/ArticlePushRedisQueueListener.java

@@ -0,0 +1,29 @@
+package com.webchat.connect.messagequeue.consumer.redis;
+
+import com.webchat.connect.messagequeue.consumer.service.ArticlePushConsumeService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ArticlePushRedisQueueListener implements MessageListener {
+
+    @Autowired
+    private RedisTemplate redisTemplate;
+
+    @Autowired
+    private ArticlePushConsumeService articlePushConsumeService;
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
+        String messageStr = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
+        log.info("ChatMessageRedisQueueListener.onMessage =====> channel:{} messageStr:{}", channel, messageStr);
+
+        articlePushConsumeService.consume(messageStr);
+    }
+}

+ 122 - 0
webchat-connect/src/main/java/com/webchat/connect/messagequeue/consumer/service/ArticlePushConsumeService.java

@@ -0,0 +1,122 @@
+package com.webchat.connect.messagequeue.consumer.service;
+
+
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.common.constants.ConnectConstants;
+import com.webchat.common.enums.ChatMessageTypeEnum;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.connect.service.AccountService;
+import com.webchat.connect.websocket.handler.ChatWebSocketEndPointServletHandler;
+import com.webchat.domain.dto.queue.ArticleDelayMessageDTO;
+import com.webchat.domain.vo.response.mess.ChatMessageResponseVO;
+import com.webchat.domain.vo.response.mess.PublicAccountArticleMessageVO;
+import com.webchat.domain.vo.response.publicaccount.ArticleBaseResponseVO;
+import com.webchat.rmi.pgc.OfficialArticleClient;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+/**
+ * 公众号推文消费服务
+ */
+@Slf4j
+@Service
+public class ArticlePushConsumeService {
+
+    @Autowired
+    private AccountService accountService;
+
+    @Autowired
+    private OfficialArticleClient officialArticleClient;
+
+    public void consume(String message) {
+
+        /**
+         * 发序列化解析消息对象
+         */
+        Set<String> messages = JsonUtil.fromJson(message, Set.class);
+        Set<ArticleDelayMessageDTO> messageDtos =
+                messages.stream().map(mess ->
+                        JsonUtil.fromJson(mess, ArticleDelayMessageDTO.class))
+                        .collect(Collectors.toSet());
+        /**
+         * 消息推送
+         */
+        for (ArticleDelayMessageDTO eventMessage : messageDtos) {
+            this.doPush(eventMessage);
+        }
+    }
+
+    private void doPush(ArticleDelayMessageDTO eventMessage) {
+
+        String publicAccount = eventMessage.getPublicAccount();
+        Long articleId = eventMessage.getArticleId();
+        // 获取文章详情,走feign 远程获取
+        PublicAccountArticleMessageVO articleVO = this.getArticleBaseFromRemote(articleId);
+        if (articleVO == null) {
+            return;
+        }
+        // 获取公众号所有订阅用户
+        Set<String> userIds = accountService.getOfficialUserIds(publicAccount);
+        if (CollectionUtils.isEmpty(userIds)) {
+            return;
+        }
+        Set<String> bizCodes = ConnectConstants.ConnectBiz.getBizCode(ConnectConstants.BizEnum.CHAT);
+        for (String bizCode : bizCodes) {
+            Map<String, WebSocketSession> userWsMap = ChatWebSocketEndPointServletHandler.getSessions(bizCode, userIds);
+            for (String userId : userIds) {
+                WebSocketSession wsSession = userWsMap.get(userId);
+                if (wsSession == null || !wsSession.isOpen()) {
+                    continue;
+                }
+                ChatMessageResponseVO chatMessageResponseVO = new ChatMessageResponseVO();
+                chatMessageResponseVO.setPublicAccountArticle(articleVO);
+                chatMessageResponseVO.setReceiverId(userId);
+                chatMessageResponseVO.setSenderId(publicAccount);
+                chatMessageResponseVO.setType(ChatMessageTypeEnum.PUBLIC_ACCOUNT_ARTICLE.getType());
+                try {
+                    wsSession.sendMessage(new TextMessage(JsonUtil.toJsonString(chatMessageResponseVO)));
+                } catch (IOException e) {
+                    // 不能抛异常出去,不能阻塞其他用户的推送
+                    log.error("公众号文章推送异常 =====> publicAccount:{}, userId:{}, articleId:{}",
+                            publicAccount, userId, articleId, e);
+                }
+
+            }
+        }
+    }
+
+    /**
+     * 获取文章详情,这里不需要返回正文(降低相应网络传输包大小,节约流量成本,提高接口性能)
+     *
+     * @param id
+     * @return
+     */
+    private PublicAccountArticleMessageVO getArticleBaseFromRemote(Long id) {
+        APIResponseBean<ArticleBaseResponseVO> responseBean = officialArticleClient.detail(id, false);
+        if (APIResponseBeanUtil.isOk(responseBean)) {
+            ArticleBaseResponseVO baseResponseVO = responseBean.getData();
+            PublicAccountArticleMessageVO publicAccountArticleMessageVO = new PublicAccountArticleMessageVO();
+            publicAccountArticleMessageVO.setArticleId(baseResponseVO.getId());
+            publicAccountArticleMessageVO.setTime(baseResponseVO.getPlanPushTime());
+            publicAccountArticleMessageVO.setTitle(baseResponseVO.getTitle());
+            publicAccountArticleMessageVO.setCover(baseResponseVO.getCover());
+            publicAccountArticleMessageVO.setDescription(baseResponseVO.getDescription());
+            publicAccountArticleMessageVO.setRedirectUrl(baseResponseVO.getRedirectUrl());
+            return publicAccountArticleMessageVO;
+        }
+        log.error("远程获取公众号文章详情失败 =====> id:{}, responseBean:{}", id, JsonUtil.toJsonString(responseBean));
+        return null;
+    }
+
+}

+ 28 - 4
webchat-connect/src/main/java/com/webchat/connect/service/AccountService.java

@@ -3,6 +3,7 @@ package com.webchat.connect.service;
 
 import com.webchat.common.bean.APIResponseBean;
 import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.common.enums.AccountRelationTypeEnum;
 import com.webchat.common.util.JsonUtil;
 import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
 import com.webchat.rmi.user.UserServiceClient;
@@ -39,14 +40,37 @@ public class AccountService {
     }
 
     /**
-     * 获取群组下的群成员用户id集合
+     * 查询所有群成员
+     * @param account
+     * @return
+     */
+    public Set<String> getGroupUserIds(String account) {
+
+        return getAllSubscriberByAccount(account, AccountRelationTypeEnum.USER_GROUP);
+    }
+
+    /**
+     * 查询公众号所有订阅用户
      *
-     * @param groupAccount
+     * @param account
      * @return
      */
-    public Set<String> getGroupUserIds(String groupAccount) {
+    public Set<String> getOfficialUserIds(String account) {
+
+        return getAllSubscriberByAccount(account, AccountRelationTypeEnum.USER_OFFICIAL);
+    }
 
-        APIResponseBean<Set<String>> responseBean = userServiceClient.getGroupUserIds(groupAccount);
+    /**
+     * 获取群组下的群成员用户id集合
+     *
+     * @return
+     */
+    public Set<String> getAllSubscriberByAccount(String account, AccountRelationTypeEnum accountRelationType) {
+        /**
+         * TODO 建议: 对数据做降级
+         */
+        APIResponseBean<Set<String>> responseBean =
+                userServiceClient.getAllSubscriberByAccount(accountRelationType.getType(), account);
         if (APIResponseBeanUtil.isOk(responseBean)) {
             return responseBean.getData();
         }

+ 0 - 1
webchat-domain/src/main/java/com/webchat/domain/dto/queue/ArticleDelayMessageDTO.java

@@ -9,7 +9,6 @@ import lombok.Data;
 @Data
 public class ArticleDelayMessageDTO extends BaseDelayQueueDTO {
 
-
     /**
      * 公众号id
      */

+ 0 - 1
webchat-domain/src/main/java/com/webchat/domain/vo/request/publicaccount/SaveArticleRequestVO.java

@@ -28,7 +28,6 @@ public class SaveArticleRequestVO {
      */
     private Long planPushTime;
 
-
     /**
      * 文章标题
      */

+ 3 - 0
webchat-domain/src/main/java/com/webchat/domain/vo/response/mess/PublicAccountArticleMessageVO.java

@@ -11,6 +11,9 @@ public class PublicAccountArticleMessageVO {
 
     private String title;
 
+    /**
+     * 文章推送时间
+     */
     private Long time;
 
     private String description;

+ 9 - 9
webchat-pgc/pom.xml

@@ -23,16 +23,16 @@
     <dependencies>
 
         <!--引入mysql驱动-->
-<!--        <dependency>-->
-<!--            <groupId>mysql</groupId>-->
-<!--            <artifactId>mysql-connector-java</artifactId>-->
-<!--            <version>5.1.46</version>-->
-<!--        </dependency>-->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.46</version>
+        </dependency>
         <!--使用JPA作为ORM框架 -->
-<!--        <dependency>-->
-<!--            <groupId>org.springframework.boot</groupId>-->
-<!--            <artifactId>spring-boot-starter-data-jpa</artifactId>-->
-<!--        </dependency>-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-jpa</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>com.webchat</groupId>

+ 4 - 0
webchat-pgc/src/main/java/com/webchat/pgc/WebchatPGCApplication.java

@@ -1,5 +1,7 @@
 package com.webchat.pgc;
 
+import com.webchat.common.util.SpringContextUtil;
+import com.webchat.pgc.messagequeue.consumer.ArticlePushDelayQueueConsumer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@@ -14,6 +16,8 @@ public class WebchatPGCApplication {
 
     public static void main(String[] args) {
         SpringApplication.run(WebchatPGCApplication.class, args);
+
+        SpringContextUtil.getBean(ArticlePushDelayQueueConsumer.class).schedule();
     }
 
 }

+ 34 - 0
webchat-pgc/src/main/java/com/webchat/pgc/controller/OfficialArticleController.java

@@ -0,0 +1,34 @@
+package com.webchat.pgc.controller;
+
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.domain.vo.request.publicaccount.SaveArticleRequestVO;
+import com.webchat.domain.vo.response.publicaccount.ArticleBaseResponseVO;
+import com.webchat.pgc.service.OfficialArticleService;
+import com.webchat.rmi.pgc.OfficialArticleClient;
+import org.apache.commons.lang3.ObjectUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class OfficialArticleController implements OfficialArticleClient {
+
+
+    @Autowired
+    private OfficialArticleService officialArticleService;
+
+    @Override
+    public APIResponseBean<Long> submit(@RequestBody SaveArticleRequestVO saveArticleRequest) {
+
+        return APIResponseBeanUtil.success(officialArticleService.submit(saveArticleRequest));
+    }
+
+    @Override
+    public APIResponseBean<ArticleBaseResponseVO> detail(Long id, Boolean isNeedContent) {
+
+        ArticleBaseResponseVO articleVo =
+                officialArticleService.getArticleDetailFromCache(id, ObjectUtils.equals(isNeedContent, true));
+        return APIResponseBeanUtil.success(articleVo);
+    }
+}

+ 0 - 34
webchat-pgc/src/main/java/com/webchat/pgc/controller/TestController.java

@@ -1,34 +0,0 @@
-package com.webchat.pgc.controller;
-
-import com.webchat.common.bean.APIResponseBean;
-import com.webchat.common.bean.APIResponseBeanUtil;
-import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
-import com.webchat.rmi.user.UserServiceClient;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.context.config.annotation.RefreshScope;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-/**
- * @Author 程序员七七
- * @webSite https://www.coderutil.com
- * @Date 2025/1/14 00:55
- * @description
- */
-@RefreshScope
-@RestController
-@RequestMapping("/pgc-service")
-public class TestController {
-
-    @Autowired
-    private UserServiceClient userServiceClient;
-
-    @GetMapping("/user/{userId}")
-    public APIResponseBean<UserBaseResponseInfoVO> user(@PathVariable String userId) {
-
-        return APIResponseBeanUtil.success(userServiceClient.userInfo(userId));
-    }
-
-}

+ 43 - 0
webchat-pgc/src/main/java/com/webchat/pgc/messagequeue/consumer/ArticlePushDelayQueueConsumer.java

@@ -0,0 +1,43 @@
+package com.webchat.pgc.messagequeue.consumer;
+
+import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
+import com.webchat.common.enums.messagequeue.MessageQueueEnum;
+import com.webchat.common.service.messagequeue.consumer.AbstractRedisDelayQueueConsumer;
+import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.domain.dto.queue.ArticleDelayMessageDTO;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Set;
+
+/**
+ * 公众号推送MQ消费
+ */
+@Service
+public class ArticlePushDelayQueueConsumer extends AbstractRedisDelayQueueConsumer<ArticleDelayMessageDTO> {
+
+    @Autowired
+    private MessageQueueProducer<Set<String>, Long> messageQueueProducer;
+
+    @Override
+    protected ArticleDelayMessageDTO convert(String s) {
+
+        return JsonUtil.fromJson(s, ArticleDelayMessageDTO.class);
+    }
+
+    @Override
+    protected MessageQueueEnum getMessageQueue() {
+
+        return MessageQueueEnum.QUEUE_OFFICIAL_ARTICLE_PUSH_MESSAGE;
+    }
+
+    @Override
+    protected void doNextConsume(Set<String> messages) {
+
+        /**
+         * 广播通知connect service完成推文、以及消息持久化,批量提交任务到普通队列
+         */
+        messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_ARTICLE_PUSH_MESSAGE, messages);
+    }
+}

+ 22 - 0
webchat-pgc/src/main/java/com/webchat/pgc/repository/dao/IArticleDAO.java

@@ -0,0 +1,22 @@
+package com.webchat.pgc.repository.dao;
+
+import com.webchat.pgc.repository.entity.ArticleEntity;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
+
+
+@Repository
+public interface IArticleDAO extends JpaSpecificationExecutor<ArticleEntity>,
+        JpaRepository<ArticleEntity, Long> {
+
+
+    @Transactional
+    @Modifying
+    @Query(value = "update ArticleEntity a set a.status = 2 where a.id = :id")
+    int updatePushedStatus(Long id);
+
+}

+ 88 - 0
webchat-pgc/src/main/java/com/webchat/pgc/repository/entity/ArticleEntity.java

@@ -0,0 +1,88 @@
+package com.webchat.pgc.repository.entity;
+
+
+import jakarta.persistence.Column;
+import jakarta.persistence.Entity;
+import jakarta.persistence.GeneratedValue;
+import jakarta.persistence.GenerationType;
+import jakarta.persistence.Id;
+import jakarta.persistence.Table;
+import lombok.Data;
+
+import java.util.Date;
+
+/**
+ * 公众号文章实体
+ */
+@Data
+@Entity
+@Table(name = "web_chat_article")
+public class ArticleEntity extends BaseEntity {
+
+    @Id
+    @GeneratedValue(strategy = GenerationType.IDENTITY)
+    protected Long id;
+
+    /**
+     * 绑定公众号
+     */
+    @Column(name = "public_account")
+    private String publicAccount;
+
+    /**
+     * 作者
+     */
+    @Column(name = "author")
+    private String author;
+
+    /**
+     * 文章标题
+     */
+    @Column(name = "title")
+    private String title;
+
+    /**
+     * 文章封面主图
+     */
+    @Column(name = "cover")
+    private String cover;
+
+    /**
+     * 外部链接地址
+     */
+    @Column(name = "redirect_url")
+    private String redirectUrl;
+
+    /**
+     * 文章总结
+     */
+    @Column(name = "description")
+    private String description;
+
+    /**
+     * 文章正文
+     */
+    @Column(name = "content")
+    private String content;
+
+    /**
+     * 文章标签
+     */
+    @Column(name = "signs")
+    private String signs;
+
+    /**
+     * 计划推送时间
+     */
+    @Column(name = "plan_push_date")
+    private Date planPushDate;
+
+    /**
+     * 文章状态
+     *
+     * @see com.webchat.common.enums.ArticleStatusEnum
+     */
+    @Column(name = "status")
+    private Integer status;
+
+}

+ 28 - 0
webchat-pgc/src/main/java/com/webchat/pgc/repository/entity/BaseEntity.java

@@ -0,0 +1,28 @@
+package com.webchat.pgc.repository.entity;
+
+import jakarta.persistence.Column;
+import jakarta.persistence.MappedSuperclass;
+import jakarta.persistence.PreUpdate;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * 基础实体类
+ */
+@MappedSuperclass
+@Data
+public class BaseEntity extends SimpleBaseEntity implements Serializable {
+
+    @Column(name = "update_by")
+    private String updateBy;
+
+    @Column(name = "update_date")
+    private Date updateDate;
+
+    @PreUpdate
+    public void preUpdate() {
+        this.updateDate = new Date();
+    }
+}

+ 36 - 0
webchat-pgc/src/main/java/com/webchat/pgc/repository/entity/SimpleBaseEntity.java

@@ -0,0 +1,36 @@
+package com.webchat.pgc.repository.entity;
+
+import jakarta.persistence.Column;
+import jakarta.persistence.MappedSuperclass;
+import jakarta.persistence.PrePersist;
+import jakarta.persistence.Version;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * 基础实体类
+ */
+@MappedSuperclass
+@Data
+public class SimpleBaseEntity implements Serializable {
+
+    @Column(name = "create_by")
+    private String createBy;
+
+    @Column(name = "create_date")
+    private Date createDate;
+
+    @Version
+    @Column(name = "version")
+    private Integer version;
+
+    @PrePersist
+    public void prePersist() {
+        Date now = new Date();
+        if (this.createDate == null) {
+            this.createDate = now;
+        }
+    }
+}

+ 83 - 0
webchat-pgc/src/main/java/com/webchat/pgc/service/AccountService.java

@@ -0,0 +1,83 @@
+package com.webchat.pgc.service;
+
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.common.bean.APIResponseBeanUtil;
+import com.webchat.common.enums.AccountRelationTypeEnum;
+import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
+import com.webchat.rmi.user.UserServiceClient;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Service
+public class AccountService {
+
+    @Autowired
+    private UserServiceClient userServiceClient;
+
+    /**
+     * 查询当前账号信息
+     *
+     * @param account
+     * @return
+     */
+
+    public UserBaseResponseInfoVO accountInfo(String account) {
+
+        APIResponseBean<UserBaseResponseInfoVO> responseBean = userServiceClient.userInfo(account);
+        if (APIResponseBeanUtil.isOk(responseBean)) {
+            return responseBean.getData();
+        }
+        return null;
+    }
+
+    /**
+     *  批量查询账号详情数据
+     *
+     * @param accounts
+     * @return
+     */
+    public List<UserBaseResponseInfoVO> batchGet(List<String> accounts) {
+        if (CollectionUtils.isEmpty(accounts)) {
+            return Collections.emptyList();
+        }
+        APIResponseBean<List<UserBaseResponseInfoVO>> responseBean =
+                userServiceClient.batchGet(accounts);
+        if (APIResponseBeanUtil.isOk(responseBean)) {
+            return responseBean.getData();
+        }
+        throw new RuntimeException(responseBean.getMsg());
+    }
+
+    public Map<String, UserBaseResponseInfoVO> batchGet(Set<String> accounts) {
+        if (CollectionUtils.isEmpty(accounts)) {
+            return Collections.emptyMap();
+        }
+        APIResponseBean<Map<String, UserBaseResponseInfoVO>> responseBean =
+                userServiceClient.batchGet(accounts);
+        if (APIResponseBeanUtil.isOk(responseBean)) {
+            return responseBean.getData();
+        }
+        throw new RuntimeException(responseBean.getMsg());
+    }
+
+    /**
+     * 获取群组下的群成员用户id集合
+     *
+     * @param groupAccount
+     * @return
+     */
+    public Set<String> getGroupUserIds(String groupAccount) {
+        APIResponseBean<Set<String>> responseBean =
+                userServiceClient.getAllSubscriberByAccount(AccountRelationTypeEnum.USER_GROUP.getType(), groupAccount);
+        if (APIResponseBeanUtil.isOk(responseBean)) {
+            return responseBean.getData();
+        }
+        return null;
+    }
+}

+ 333 - 0
webchat-pgc/src/main/java/com/webchat/pgc/service/OfficialArticleService.java

@@ -0,0 +1,333 @@
+package com.webchat.pgc.service;
+
+
+import com.webchat.common.bean.APIPageResponseBean;
+import com.webchat.common.constants.WebConstant;
+import com.webchat.common.enums.ArticleStatusEnum;
+import com.webchat.common.enums.RedisKeyEnum;
+import com.webchat.common.enums.messagequeue.MessageQueueEnum;
+import com.webchat.common.service.RedisService;
+import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.domain.dto.queue.ArticleDelayMessageDTO;
+import com.webchat.domain.vo.request.publicaccount.SaveArticleRequestVO;
+import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
+import com.webchat.domain.vo.response.mess.PublicAccountArticleMessageVO;
+import com.webchat.domain.vo.response.publicaccount.ArticleBaseResponseVO;
+import com.webchat.pgc.repository.dao.IArticleDAO;
+import com.webchat.pgc.repository.entity.ArticleEntity;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.domain.Sort;
+import org.springframework.stereotype.Service;
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * 公众号文章服务
+ *
+ */
+@Service
+public class OfficialArticleService {
+
+    @Autowired
+    private IArticleDAO articleDAO;
+
+    @Autowired
+    private AccountService accountService;
+
+    @Autowired
+    private RedisService redisService;
+
+    @Autowired
+    private MessageQueueProducer<ArticleDelayMessageDTO, Long> messageQueueProducer;
+
+    /**
+     * 公众号推文
+     *
+     * @param saveArticleRequest
+     * @return
+     */
+    public Long submit(SaveArticleRequestVO saveArticleRequest) {
+        /**
+         * 1. 持久化文化到数据库
+         */
+        ArticleEntity articleEntity = this.convert(saveArticleRequest);
+        articleEntity = articleDAO.save(articleEntity);
+        /**
+         * 2. 缓存文章详情到redis
+         */
+        this.refreshArticleRedisDetailCache(articleEntity);
+        /**
+         * 3. 推文:MQ ---> 延迟队列 + 普通列表
+         * 《实时推文》:用户未指定推送时间(默认当前时间)
+         * 《延迟推送》:用户指定未来时间推送
+         */
+        final String publicAccount = saveArticleRequest.getPublicAccount();
+        final Long articleId = articleEntity.getId();
+        final Long pushTime = saveArticleRequest.getPlanPushTime();
+        this.doSubmitDelayQueue(publicAccount, articleId, pushTime);
+        /**
+         * 4. 公众号文章消息数据持久化
+         */
+        // TODO
+        return articleId;
+    }
+
+
+    /**
+     * 公众号文章推送计划加入延迟队列
+     *
+     * @param publicAccount 公众号账号
+     * @param articleId     公众号文章
+     * @param pushTime      设定的推送时间
+     */
+    private void doSubmitDelayQueue(String publicAccount, Long articleId, Long pushTime) {
+        ArticleDelayMessageDTO message = new ArticleDelayMessageDTO();
+        message.setArticleId(articleId);
+        message.setPublicAccount(publicAccount);
+        // 延迟推文时间(如果未设置发布时间,则立即发布)
+        pushTime = pushTime == null ? System.currentTimeMillis() : pushTime;
+        message.setTime(pushTime);
+        // 提交队列:优先级队列(一级队列)
+        messageQueueProducer.prioritySend(MessageQueueEnum.QUEUE_OFFICIAL_ARTICLE_PUSH_MESSAGE, message, pushTime);
+    }
+
+
+    /**
+     * 刷新公众号文章redis缓存
+     *
+     * @param articleId
+     * @return
+     */
+    private ArticleBaseResponseVO refreshArticleRedisDetailCache(Long articleId) {
+        if (articleId == null) {
+            return null;
+        }
+        ArticleEntity articleEntity = articleDAO.findById(articleId).orElse(null);
+        return this.refreshArticleRedisDetailCache(articleEntity);
+    }
+
+    /**
+     * 刷新公众号文章redis缓存
+     *
+     * @param articleEntity
+     * @return
+     */
+    private ArticleBaseResponseVO refreshArticleRedisDetailCache(ArticleEntity articleEntity) {
+        if (articleEntity == null) {
+            return null;
+        }
+        String cacheKey = RedisKeyEnum.ARTICLE_DETAIL_CACHE.getKey(String.valueOf(articleEntity.getId()));
+        // 文章详情我们使用string类型来缓存,每个文章有自己的失效时间,避免缓存雪崩
+        ArticleBaseResponseVO articleBaseResponseVO = this.convert(articleEntity);
+        redisService.set(cacheKey, JsonUtil.toJsonString(articleBaseResponseVO), RedisKeyEnum.ARTICLE_DETAIL_CACHE.getExpireTime());
+        return articleBaseResponseVO;
+    }
+
+    /**
+     * 缓存空值,防止缓存击穿
+     *
+     * @param articleId
+     */
+    private void refreshArticleNoneCache(Long articleId) {
+        String cacheKey = RedisKeyEnum.ARTICLE_DETAIL_CACHE.getKey(String.valueOf(articleId));
+        // 文章详情我们使用string类型来缓存,每个文章有自己的失效时间,避免缓存雪崩
+        redisService.set(cacheKey, WebConstant.CACHE_NONE, RedisKeyEnum.ARTICLE_DETAIL_CACHE.getExpireTime());
+    }
+
+    /**
+     * 浏览文章
+     *
+     * @param articleId
+     * @return
+     */
+    public ArticleBaseResponseVO viewArticle(Long articleId) {
+        ArticleBaseResponseVO article = this.getArticleDetailFromCache(articleId);
+        if (article == null) {
+            return null;
+        }
+        // 设置公众号信息
+        article.setPublicAccountInfo(accountService.accountInfo(article.getPublicAccount()));
+        return article;
+    }
+
+    /**
+     * 查询消息列表所有文章信息
+     *
+     * @param articleId
+     * @return
+     */
+    public PublicAccountArticleMessageVO getPublicAccountArticleMessage(Long articleId) {
+        ArticleBaseResponseVO article = this.getArticleDetailFromCache(articleId);
+        if (article == null) {
+            return null;
+        }
+        PublicAccountArticleMessageVO publicAccountArticleMessage = new PublicAccountArticleMessageVO();
+        publicAccountArticleMessage.setTitle(article.getTitle());
+        publicAccountArticleMessage.setDescription(article.getDescription());
+        publicAccountArticleMessage.setCover(article.getCover());
+        publicAccountArticleMessage.setArticleId(articleId);
+        publicAccountArticleMessage.setRedirectUrl(article.getRedirectUrl());
+        return publicAccountArticleMessage;
+    }
+
+
+    public ArticleBaseResponseVO getArticleDetailFromCache(Long articleId, boolean needContent) {
+        ArticleBaseResponseVO articleVO = this.getArticleDetailFromCache(articleId);
+        if (articleVO == null) {
+            return null;
+        }
+        if (!needContent) {
+            articleVO.setContent(null);
+        }
+        return articleVO;
+    }
+
+    /**
+     * 查询公众号文章详情
+     *
+     * @param articleId
+     * @return
+     */
+    public ArticleBaseResponseVO getArticleDetailFromCache(Long articleId) {
+
+        String cacheKey = RedisKeyEnum.ARTICLE_DETAIL_CACHE.getKey(String.valueOf(articleId));
+        String cache = redisService.get(cacheKey);
+        // 这里可能存在击穿问题(比如:有人恶意那不存在的文章一致查询)
+        // 文章缓存击穿解决办法:我们缓存一个空值
+        if (StringUtils.isNotBlank(cache)) {
+            if (WebConstant.CACHE_NONE.equals(cache)) {
+                // 文章不存在,直接返回null,不需要在查库
+                return null;
+            }
+            return JsonUtil.fromJson(cache, ArticleBaseResponseVO.class);
+        }
+        // 缓存不存在主动查询数据库,重新刷新缓存
+        ArticleBaseResponseVO articleBase = this.refreshArticleRedisDetailCache(articleId);
+        if (articleBase == null) {
+            // 数据库文章不存在,这里缓存空值,防止redis击穿
+            this.refreshArticleNoneCache(articleId);
+        }
+        return articleBase;
+    }
+
+    /**
+     * 批量查询redis,获取文章详情缓存
+     *
+     * 场景:公众号详情页,一次可能查询10篇文章
+     * @param articleIdList
+     * @return
+     */
+    public Map<Long, ArticleBaseResponseVO> batchGetArticleDetailFromCache(List<Long> articleIdList) {
+        if (CollectionUtils.isEmpty(articleIdList)) {
+            return Collections.emptyMap();
+        }
+        Map<Long, ArticleBaseResponseVO> batchGetResult = new HashMap<>();
+        // 构造批量查询redis的缓存key
+        List<String> cacheKeys = articleIdList.stream().map(
+                        id -> RedisKeyEnum.ARTICLE_DETAIL_CACHE.getKey(String.valueOf(id)))
+                .collect(Collectors.toList());
+        // 批量查询redis
+        List<String> caches = redisService.mget(cacheKeys);
+        for (int i = 0; i < articleIdList.size(); i++) {
+            Long articleId = articleIdList.get(i);
+            String cache = caches.get(i);
+            ArticleBaseResponseVO articleBaseResponseVO;
+            if (StringUtils.isNotBlank(cache)) {
+                articleBaseResponseVO = JsonUtil.fromJson(cache, ArticleBaseResponseVO.class);
+            } else {
+                articleBaseResponseVO = this.refreshArticleRedisDetailCache(articleId);
+            }
+            batchGetResult.put(articleId, articleBaseResponseVO);
+        }
+        return batchGetResult;
+    }
+
+    /**
+     * 分页查询文章列表
+     *
+     * @param pageNo
+     * @param pageSize
+     * @return
+     */
+    public APIPageResponseBean<List<ArticleBaseResponseVO>> page(Integer pageNo, Integer pageSize) {
+        Pageable pageable = PageRequest.of(pageNo - 1, pageSize, Sort.by(Sort.Direction.DESC, "id"));
+        Page<ArticleEntity> userEntities = articleDAO.findAll(pageable);
+        List<ArticleBaseResponseVO> articles = new ArrayList<>();
+        if (userEntities != null && !CollectionUtils.isEmpty(userEntities.getContent())) {
+            // 走缓存批量查询公众号信息
+            Set<String> publicAccounts = userEntities.stream().map(ArticleEntity::getPublicAccount).collect(Collectors.toSet());
+            Map<String, UserBaseResponseInfoVO> accounts = accountService.batchGet(publicAccounts);
+            // 批量构造返回文章列表参数
+            articles = userEntities.getContent().stream().map(a -> {
+                ArticleBaseResponseVO article = convert(a);
+                // 列表一般不需要返回详情信息,减少网络数据包传输设置为null
+                article.setContent(null);
+                // 这里偷懒了,建议
+                article.setPublicAccountInfo(accounts.get(article.getPublicAccount()));
+                return article;
+            }).collect(Collectors.toList());
+        }
+        return APIPageResponseBean.success(pageNo, pageSize, userEntities.getTotalElements(), articles);
+
+    }
+
+    private ArticleBaseResponseVO convert(ArticleEntity articleEntity) {
+        ArticleBaseResponseVO articleBase = new ArticleBaseResponseVO();
+        BeanUtils.copyProperties(articleEntity, articleBase);
+        if (articleEntity.getPlanPushDate() != null) {
+            articleBase.setPlanPushTime(articleEntity.getPlanPushDate().getTime());
+        }
+        if (articleEntity.getCreateDate() != null) {
+            articleBase.setPublishTime(articleEntity.getCreateDate().getTime());
+        }
+        return articleBase;
+    }
+
+    private ArticleEntity convert(SaveArticleRequestVO saveArticleRequest) {
+        Long articleId = saveArticleRequest.getId();
+        String author = saveArticleRequest.getAuthor();
+        Date now = new Date();
+        ArticleEntity articleEntity;
+        if (articleId != null) {
+            articleEntity = articleDAO.findById(articleId).orElse(null);
+            Assert.notNull(articleEntity, "文章更新失败: 文章不存在!");
+            Assert.isTrue(ObjectUtils.equals(articleEntity.getAuthor(), author), "没有更新权限!");
+        } else {
+            articleEntity = new ArticleEntity();
+            articleEntity.setCreateDate(now);
+            articleEntity.setCreateBy(author);
+            articleEntity.setAuthor(author);
+            articleEntity.setStatus(ArticleStatusEnum.WAIT_PUSH.getStatus());
+        }
+        articleEntity.setRedirectUrl(saveArticleRequest.getRedirectUrl());
+        articleEntity.setStatus(articleEntity.getStatus());
+        articleEntity.setCover(saveArticleRequest.getCover());
+        articleEntity.setDescription(saveArticleRequest.getDescription());
+        articleEntity.setTitle(saveArticleRequest.getTitle());
+        articleEntity.setContent(saveArticleRequest.getContent());
+        articleEntity.setPublicAccount(saveArticleRequest.getPublicAccount());
+        articleEntity.setSigns(saveArticleRequest.getSigns());
+        Date planPushDate = new Date();
+        if (saveArticleRequest.getPlanPushTime() != null) {
+            planPushDate = new Date(saveArticleRequest.getPlanPushTime());
+        }
+        articleEntity.setPlanPushDate(planPushDate);
+        return articleEntity;
+    }
+}

+ 35 - 0
webchat-remote/src/main/java/com/webchat/rmi/pgc/OfficialArticleClient.java

@@ -0,0 +1,35 @@
+package com.webchat.rmi.pgc;
+
+import com.webchat.common.bean.APIResponseBean;
+import com.webchat.domain.vo.request.publicaccount.SaveArticleRequestVO;
+import com.webchat.domain.vo.response.publicaccount.ArticleBaseResponseVO;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestParam;
+
+@FeignClient(name = "webchat-pgc-service")
+public interface OfficialArticleClient {
+
+    /**
+     * 公众号推文
+     *
+     * @param saveArticleRequest
+     * @return
+     */
+    @PostMapping("/pgc-service/official/article/submit")
+    APIResponseBean<Long> submit(@RequestBody SaveArticleRequestVO saveArticleRequest);
+
+    /**
+     * 获取公众号文章详情
+     *
+     * @param id
+     * @return
+     */
+    @GetMapping("/pgc-service/official/article/detail/{id}")
+    APIResponseBean<ArticleBaseResponseVO> detail(@PathVariable Long id,
+                                                  @RequestParam(name = "isNeedContent", required = false,
+                                                          defaultValue = "false") Boolean isNeedContent);
+}

+ 3 - 3
webchat-remote/src/main/java/com/webchat/rmi/user/UserServiceClient.java

@@ -39,11 +39,11 @@ public interface UserServiceClient {
     /**
      * 获取群聊下所有用户id
      *
-     * @param groupAccount
+     * @param account
      * @return
      */
-    @GetMapping("/user-service/group/userIds/{groupAccount}")
-    APIResponseBean<Set<String>> getGroupUserIds(@PathVariable String groupAccount);
+    @GetMapping("/user-service/subscriber/{relationType}/{account}")
+    APIResponseBean<Set<String>> getAllSubscriberByAccount(@PathVariable Integer relationType, @PathVariable String account);
 
     /**
      * 根据手机号查询用户基础信息

+ 3 - 2
webchat-user/src/main/java/com/webchat/user/controller/UserServiceController.java

@@ -38,11 +38,12 @@ public class UserServiceController implements UserServiceClient {
     }
 
     @Override
-    public APIResponseBean<Set<String>> getGroupUserIds(String groupAccount) {
+    public APIResponseBean<Set<String>> getAllSubscriberByAccount(Integer relationType, String account) {
 
-        return APIResponseBeanUtil.success(userService.getGroupUserIds(groupAccount));
+        return APIResponseBeanUtil.success(userService.getAllSubscriberByAccount(relationType, account));
     }
 
+
     @Override
     public APIResponseBean<UserBaseResponseVO> userBaseInfo(String mobile) {
 

+ 7 - 5
webchat-user/src/main/java/com/webchat/user/service/UserService.java

@@ -24,6 +24,7 @@ import com.webchat.user.repository.dao.IGroupUserDAO;
 import com.webchat.user.repository.dao.IUserDAO;
 import com.webchat.user.repository.entity.GroupUserEntity;
 import com.webchat.user.repository.entity.UserEntity;
+import com.webchat.user.service.relation.AccountRelationFactory;
 import com.webchat.user.service.relation.User2FileSenderAccountRelationService;
 import com.webchat.user.service.relation.User2GroupAccountRelationService;
 import jakarta.persistence.criteria.Predicate;
@@ -594,14 +595,15 @@ public class UserService {
     }
 
     /**
-     * 查询群聊下的所有用户
+     * 查询指定订阅用户
      *
-     * @param groupAccount
+     * @param relationType 关系类型
+     * @param account
      * @return
      */
-    public Set<String> getGroupUserIds(String groupAccount) {
-        return SpringContextUtil.getBean(User2GroupAccountRelationService.class)
-                .getAllSubscriber(groupAccount);
+    public Set<String> getAllSubscriberByAccount(Integer relationType, String account) {
+
+        return AccountRelationFactory.getServiceByType(relationType).getAllSubscriber(account);
     }
 
     /**

+ 1 - 2
webchat-user/src/main/java/com/webchat/user/service/relation/AbstractAccountRelationService.java

@@ -246,8 +246,7 @@ public abstract class AbstractAccountRelationService implements AccountRelationV
      */
     protected void addTargetAccountRelationListCache(String sourceAccount, String targetAccount) {
         // 获取好友列表redis key
-        AccountRelationTypeEnum relationType = AccountRelationTypeEnum.getByTargetAccountRoleCode(
-                targetAccountInfo.getRoleCode());
+        AccountRelationTypeEnum relationType = getRelationType();
         String cacheKey = this.getRelationListRedisKey(sourceAccount, relationType.getType());
         if (!redisService.exists(cacheKey)) {
             // 第一次订阅或者缓存失效

+ 3 - 1
webchat-user/src/main/java/com/webchat/user/service/relation/User2OfficialAccountRelationService.java

@@ -27,8 +27,10 @@ public class User2OfficialAccountRelationService extends AbstractAccountRelation
         if (subscribe) {
             // 文件传输助手,用户注册后默认订阅,无需审核
             // 添加文件传输助手到用户好友列表缓存
+            super.init(targetAccount, sourceAccount);
+            super.addTargetAccountRelationListCache(targetAccount, sourceAccount);
+            super.init(sourceAccount, targetAccount);
             super.addTargetAccountRelationListCache(sourceAccount, targetAccount);
-            // 添加文件传输助手到用户聊天对话列表缓存
             super.addTargetAccount2SourceLastChattingList(sourceAccount, targetAccount);
         }
     }