Parcourir la source

公众号推文架构升级2.0,支持分布式websocket

wangqi49 il y a 5 mois
Parent
commit
5ad44e72a1

+ 0 - 6
.idea/vcs.xml

@@ -1,6 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project version="4">
-  <component name="VcsDirectoryMappings">
-    <mapping directory="" vcs="Git" />
-  </component>
-</project>

+ 0 - 74
.idea/workspace.xml

@@ -1,74 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project version="4">
-  <component name="AutoImportSettings">
-    <option name="autoReloadType" value="SELECTIVE" />
-  </component>
-  <component name="ChangeListManager">
-    <list default="true" id="b0956da4-eb9a-4780-9d1b-66040aaba9d1" name="Changes" comment="">
-      <change afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
-    </list>
-    <option name="SHOW_DIALOG" value="false" />
-    <option name="HIGHLIGHT_CONFLICTS" value="true" />
-    <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
-    <option name="LAST_RESOLUTION" value="IGNORE" />
-  </component>
-  <component name="Git.Settings">
-    <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
-  </component>
-  <component name="ProjectColorInfo"><![CDATA[{
-  "associatedIndex": 5
-}]]></component>
-  <component name="ProjectId" id="2q6NkbdhZP5rox41A9vpcyTnJA2" />
-  <component name="ProjectViewState">
-    <option name="openDirectoriesWithSingleClick" value="true" />
-    <option name="showLibraryContents" value="true" />
-  </component>
-  <component name="PropertiesComponent"><![CDATA[{
-  "keyToString": {
-    "RunOnceActivity.ShowReadmeOnStart": "true",
-    "git-widget-placeholder": "master",
-    "kotlin-language-version-configured": "true",
-    "last_opened_file_path": "/Users/wangqi49/Desktop/webchat",
-    "nodejs_package_manager_path": "npm",
-    "vue.rearranger.settings.migration": "true"
-  }
-}]]></component>
-  <component name="RecentsManager">
-    <key name="CopyFile.RECENT_KEYS">
-      <recent name="$PROJECT_DIR$" />
-    </key>
-  </component>
-  <component name="RunManager">
-    <configuration name="WebChatApplication" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" nameIsGenerated="true">
-      <module name="webchat" />
-      <option name="SPRING_BOOT_MAIN_CLASS" value="com.webchat.WebChatApplication" />
-      <method v="2">
-        <option name="Make" enabled="true" />
-      </method>
-    </configuration>
-  </component>
-  <component name="SharedIndexes">
-    <attachedChunks>
-      <set>
-        <option value="bundled-jdk-9f38398b9061-39b83d9b5494-intellij.indexing.shared.core-IU-241.15989.150" />
-        <option value="bundled-js-predefined-1d06a55b98c1-91d5c284f522-JavaScript-IU-241.15989.150" />
-      </set>
-    </attachedChunks>
-  </component>
-  <component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
-  <component name="TaskManager">
-    <task active="true" id="Default" summary="Default task">
-      <changelist id="b0956da4-eb9a-4780-9d1b-66040aaba9d1" name="Changes" comment="" />
-      <created>1733979208104</created>
-      <option name="number" value="Default" />
-      <option name="presentableId" value="Default" />
-      <updated>1733979208104</updated>
-      <workItem from="1733979209139" duration="85000" />
-    </task>
-    <servers />
-  </component>
-  <component name="TypeScriptGeneratedFilesManager">
-    <option name="version" value="3" />
-  </component>
-</project>

+ 3 - 3
src/main/java/com/webchat/WebChatApplication.java

@@ -1,7 +1,7 @@
 package com.webchat;
 
 import com.webchat.common.util.SpringContextUtil;
-import com.webchat.service.listener.RedisMessageListener;
+import com.webchat.service.listener.RedisChatMessageListener;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.web.servlet.ServletComponentScan;
@@ -26,7 +26,7 @@ public class WebChatApplication {
 		sb.append("=======================================================================");
 		System.out.println(sb);
 
-		RedisMessageListener redisMessageListener = SpringContextUtil.getBean(RedisMessageListener.class);
-		new Thread(() -> redisMessageListener.consumeMessageQueue()).start();
+		RedisChatMessageListener chatMessageListener = SpringContextUtil.getBean(RedisChatMessageListener.class);
+		new Thread(() -> chatMessageListener.consumeMessageQueue()).start();
 	}
 }

+ 1 - 1
src/main/java/com/webchat/common/enums/RedisMessageChannelTopicEnum.java

@@ -5,7 +5,7 @@ import lombok.Getter;
 @Getter
 public enum RedisMessageChannelTopicEnum {
 
-    CHAT;
+    CHAT, PUSH_ARTICLE;
 
     public String getChannel() {
         return this.name();

+ 7 - 3
src/main/java/com/webchat/config/configuration/RedisConfig.java

@@ -1,7 +1,8 @@
 package com.webchat.config.configuration;
 
 import com.webchat.common.enums.RedisMessageChannelTopicEnum;
-import com.webchat.service.listener.RedisMessageListener;
+import com.webchat.service.listener.RedisChatMessageListener;
+import com.webchat.service.listener.RedisPushMessageListener;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
@@ -14,7 +15,9 @@ import javax.annotation.Resource;
 public class RedisConfig {
 
     @Resource
-    private RedisMessageListener redisMessageListener;
+    private RedisChatMessageListener chatMessageListener;
+    @Resource
+    private RedisPushMessageListener pushMessageListener;
 
     @Bean
     public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
@@ -22,7 +25,8 @@ public class RedisConfig {
         // 设置连接工厂
         container.setConnectionFactory(redisConnectionFactory);
         // 配置消息监听器
-        container.addMessageListener(redisMessageListener, new ChannelTopic(RedisMessageChannelTopicEnum.CHAT.getChannel()));
+        container.addMessageListener(chatMessageListener, new ChannelTopic(RedisMessageChannelTopicEnum.CHAT.getChannel()));
+        container.addMessageListener(pushMessageListener, new ChannelTopic(RedisMessageChannelTopicEnum.PUSH_ARTICLE.getChannel()));
         return container;
     }
 

+ 1 - 1
src/main/java/com/webchat/service/ChatMessService.java

@@ -178,7 +178,7 @@ public class ChatMessService {
         List<UserBaseResponseInfoVO> users = userService.batchGetUserListInfoFromCache(new ArrayList<>(queryUserIds));
         Map<String, ChatMessageResponseVO> userMessMap = this.batchGetUserLastMess(currUserId, userIdSet);
         Set<String> unReadMessUsers = getUnreadMessUserSetFromCache(currUserId);
-        return users.stream().map(user -> {
+        return users.stream().filter(Objects::nonNull).map(user -> {
             UserMessListResponseVO userMessListResponse = new UserMessListResponseVO();
             userMessListResponse.setUser(user);
             ChatMessageResponseVO chatMessageResponse = userMessMap.get(user.getUserId());

+ 10 - 17
src/main/java/com/webchat/service/listener/RedisMessageListener.java → src/main/java/com/webchat/service/listener/RedisChatMessageListener.java

@@ -1,6 +1,5 @@
 package com.webchat.service.listener;
 
-import com.webchat.common.enums.RedisMessageChannelTopicEnum;
 import com.webchat.common.enums.RoleCodeEnum;
 import com.webchat.common.util.JsonUtil;
 import com.webchat.common.util.SpringContextUtil;
@@ -28,7 +27,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 
 @Slf4j
 @Component
-public class RedisMessageListener implements MessageListener {
+public class RedisChatMessageListener implements MessageListener {
 
     private final RedisTemplate redisTemplate;
 
@@ -36,27 +35,22 @@ public class RedisMessageListener implements MessageListener {
 
     private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
 
-    public RedisMessageListener(@Qualifier("redisTemplate") RedisTemplate redisTemplate) {
+    public RedisChatMessageListener(@Qualifier("redisTemplate") RedisTemplate redisTemplate) {
         this.redisTemplate = redisTemplate;
     }
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
-
-        log.info("redis message listener ======> message:{}", message);
         String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
         String messageStr = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
-        if (RedisMessageChannelTopicEnum.CHAT.getChannel().equals(channel)) {
-            // 加入队列
-            try {
-                messageQueue.put(messageStr);
-            } catch (InterruptedException e) {
-                log.error("消息加入队列异常", e);
-            }
+        log.info("redis message listener ======> channel:{}, message:{}", channel, messageStr);
+        try {
+            messageQueue.put(messageStr);
+        } catch (InterruptedException e) {
+            log.error("消息加入队列异常", e);
         }
     }
 
-
     /**
      * 队列消费,此处使用单独线程处理队列消息
      *
@@ -66,7 +60,7 @@ public class RedisMessageListener implements MessageListener {
             try {
                 String message = messageQueue.take();
                 if (StringUtils.isNotBlank(message)) {
-                    this.handleMessage(message);
+                    this.handleChatMessage(message);
                 }
             } catch (InterruptedException e) {
                 log.error("队列消息消费异常", e);
@@ -74,8 +68,7 @@ public class RedisMessageListener implements MessageListener {
         }
     }
 
-
-    private void handleMessage(String message) {
+    private void handleChatMessage(String message) {
         try {
             log.info("来自客户端消息 =====> message:{}", message);
             UserService userService = SpringContextUtil.getBean(UserService.class);
@@ -136,7 +129,6 @@ public class RedisMessageListener implements MessageListener {
         }
     }
 
-
     public void sendMessageTo(String message, Set<String> receiver) throws IOException {
         if (CollectionUtils.isEmpty(receiver)) {
             return;
@@ -150,4 +142,5 @@ public class RedisMessageListener implements MessageListener {
         }
     }
 
+
 }

+ 98 - 0
src/main/java/com/webchat/service/listener/RedisPushMessageListener.java

@@ -0,0 +1,98 @@
+package com.webchat.service.listener;
+
+import com.webchat.common.enums.ChatMessageTypeEnum;
+import com.webchat.common.util.JsonUtil;
+import com.webchat.controller.client.ChatWebSocket;
+import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
+import com.webchat.domain.vo.response.mess.ChatMessageResponseVO;
+import com.webchat.domain.vo.response.mess.PublicAccountArticleMessageVO;
+import com.webchat.domain.vo.response.publicaccount.ArticleBaseResponseVO;
+import com.webchat.service.FriendService;
+import com.webchat.service.UserService;
+import com.webchat.service.publicaccount.ArticleService;
+import com.webchat.service.queue.dto.ArticleDelayConsumeMessageDTO;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.Session;
+import java.util.Set;
+
+@Slf4j
+@Component
+public class RedisPushMessageListener implements MessageListener {
+
+    @Autowired
+    private FriendService friendService;
+    @Autowired
+    private ArticleService articleService;
+    @Autowired
+    private UserService userService;
+
+    private final RedisTemplate redisTemplate;
+
+    public RedisPushMessageListener(@Qualifier("redisTemplate") RedisTemplate redisTemplate) {
+        this.redisTemplate = redisTemplate;
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
+        String messageStr = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
+        log.info("redis message listener ======> channel:{}, message:{}", channel, messageStr);
+        // 订阅到公众号实时推文消息,推文逻辑
+        this.doPushPublicAccountArticle(messageStr);
+    }
+
+    /**
+     * 公众号推文,支持分布式websocket
+     *
+     * @param messageStr
+     */
+    private void doPushPublicAccountArticle(String messageStr) {
+        // 1. 消息反序列化
+        ArticleDelayConsumeMessageDTO data = JsonUtil.fromJson(messageStr, ArticleDelayConsumeMessageDTO.class);
+        if (data == null) {
+            return;
+        }
+        // 2. 拿公众号
+        String publicAccount = data.getPublicAccount();
+        UserBaseResponseInfoVO publicAccountUser = userService.getUserInfoByUserId(publicAccount);
+        // 3. 查询推送文章信息
+        ArticleBaseResponseVO articleBaseResponse = articleService.getArticleDetailFromCache(data.getArticleId());
+        // 4. 走redis拿公众号下所有订阅用户
+        Set<String> subscribeUsers = friendService.listSubscribeFriendIds(publicAccount);
+        if (CollectionUtils.isEmpty(subscribeUsers)) {
+            return;
+        }
+        // 5. 处理在线场景推文
+        for (String userId : subscribeUsers) {
+            // 5.1 获取公众号所有订阅且在线用户(需要实时推文)
+            Session session = ChatWebSocket.clients.get(userId).getSession();
+            if (session == null || !session.isOpen()) {
+                continue;
+            }
+            // 5.2 构造文章卡片
+            PublicAccountArticleMessageVO publicAccountArticleMessageDTO = new PublicAccountArticleMessageVO();
+            publicAccountArticleMessageDTO.setArticleId(articleBaseResponse.getId());
+            publicAccountArticleMessageDTO.setCover(articleBaseResponse.getCover());
+            publicAccountArticleMessageDTO.setDescription(articleBaseResponse.getDescription());
+            publicAccountArticleMessageDTO.setTitle(articleBaseResponse.getTitle());
+            publicAccountArticleMessageDTO.setTime(System.currentTimeMillis());
+
+            ChatMessageResponseVO publicAccountArticleMessage = new ChatMessageResponseVO();
+            publicAccountArticleMessage.setType(ChatMessageTypeEnum.PUBLIC_ACCOUNT_ARTICLE.getType());
+            publicAccountArticleMessage.setSenderId(publicAccount);
+            publicAccountArticleMessage.setSender(publicAccountUser);
+            publicAccountArticleMessage.setReceiverId(userId);
+            publicAccountArticleMessage.setMessage(JsonUtil.toJsonString(publicAccountArticleMessageDTO));
+            // 5.3 实时推文
+            session.getAsyncRemote().sendText(JsonUtil.toJsonString(publicAccountArticleMessage));
+        }
+    }
+}

+ 8 - 41
src/main/java/com/webchat/service/queue/ArticlePublishQueue.java

@@ -2,27 +2,20 @@ package com.webchat.service.queue;
 
 import com.webchat.common.enums.ChatMessageTypeEnum;
 import com.webchat.common.enums.RedisKeyEnum;
+import com.webchat.common.enums.RedisMessageChannelTopicEnum;
 import com.webchat.common.util.JsonUtil;
-import com.webchat.controller.client.ChatWebSocket;
 import com.webchat.domain.vo.request.mess.ChatMessageRequestVO;
-import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
-import com.webchat.domain.vo.response.mess.ChatMessageResponseVO;
-import com.webchat.domain.vo.response.mess.PublicAccountArticleMessageVO;
-import com.webchat.domain.vo.response.publicaccount.ArticleBaseResponseVO;
 import com.webchat.repository.dao.IArticleDAO;
 import com.webchat.service.ChatMessService;
 import com.webchat.service.FriendService;
-import com.webchat.service.UserService;
-import com.webchat.service.publicaccount.ArticleService;
+import com.webchat.service.listener.RedisMessageSender;
 import com.webchat.service.queue.dto.ArticleDelayConsumeMessageDTO;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.PostConstruct;
-import javax.websocket.Session;
 import java.util.Set;
 
 /**
@@ -35,13 +28,11 @@ public class ArticlePublishQueue extends AbstractRedisQueue<ArticleDelayConsumeM
     @Autowired
     private FriendService friendService;
     @Autowired
-    private ArticleService articleService;
-    @Autowired
-    private UserService userService;
-    @Autowired
     private ChatMessService chatMessService;
     @Autowired
     private IArticleDAO articleDAO;
+    @Autowired
+    private RedisMessageSender redisMessageSender;
 
     @PostConstruct
     public void initBean() {
@@ -73,42 +64,18 @@ public class ArticlePublishQueue extends AbstractRedisQueue<ArticleDelayConsumeM
         }
         // 1. 拿公众号
         String publicAccount = data.getPublicAccount();
-        UserBaseResponseInfoVO publicAccountUser = userService.getUserInfoByUserId(publicAccount);
         // 2. 获取文章信息
         Long articleId = data.getArticleId();
-        ArticleBaseResponseVO articleBaseResponse = articleService.getArticleDetailFromCache(articleId);
         // 3. 走redis拿公众号下所有订阅用户
         Set<String> subscribeUsers = friendService.listSubscribeFriendIds(publicAccount);
         if (CollectionUtils.isEmpty(subscribeUsers)) {
             return;
         }
-        // 4. 处理在线场景推文
-        for (String userId : subscribeUsers) {
-            // 4.1 获取公众号所有订阅且在线用户(需要实时推文)
-            Session session = ChatWebSocket.clients.get(userId).getSession();
-            if (session == null || !session.isOpen()) {
-                continue;
-            }
-            // 4.2 构造文章卡片
-            PublicAccountArticleMessageVO publicAccountArticleMessageDTO = new PublicAccountArticleMessageVO();
-            publicAccountArticleMessageDTO.setArticleId(articleBaseResponse.getId());
-            publicAccountArticleMessageDTO.setCover(articleBaseResponse.getCover());
-            publicAccountArticleMessageDTO.setDescription(articleBaseResponse.getDescription());
-            publicAccountArticleMessageDTO.setTitle(articleBaseResponse.getTitle());
-            publicAccountArticleMessageDTO.setTime(System.currentTimeMillis());
-
-            ChatMessageResponseVO publicAccountArticleMessage = new ChatMessageResponseVO();
-            publicAccountArticleMessage.setType(ChatMessageTypeEnum.PUBLIC_ACCOUNT_ARTICLE.getType());
-            publicAccountArticleMessage.setSenderId(publicAccount);
-            publicAccountArticleMessage.setSender(publicAccountUser);
-            publicAccountArticleMessage.setReceiverId(userId);
-            publicAccountArticleMessage.setMessage(JsonUtil.toJsonString(publicAccountArticleMessageDTO));
-
-            // 3.3 实时推文
-            session.getAsyncRemote().sendText(JsonUtil.toJsonString(publicAccountArticleMessage));
-        }
-        // 5. 处理离线场景推文
+        // 4. 处理离线场景推文
         this.doExecWriteData(articleId, publicAccount, subscribeUsers);
+        // 5. 发布公众号推文消息到PUSH_ARTICLE Topic,在RedisMessageListener中完成推送(分布式websocket)
+        redisMessageSender.sendMessage(RedisMessageChannelTopicEnum.PUSH_ARTICLE.getChannel(), JsonUtil.toJsonString(data));
+
     }
 
     public void doExecWriteData(Long articleId, String publicAccount, Set<String> receiverIds) {

+ 0 - 58
src/main/resources/templates/admin/public-account-article.html

@@ -125,64 +125,6 @@
             });
         })
     }
-
-    $("#grant-btn").on('click', function() {
-        layer.closeAll();
-        layer.open({
-            type: 1,
-            title: false,
-            skin: 'layui-layer-demo', //样式类名
-            closeBtn: 0, //不显示关闭按钮
-            anim: 2,
-            area: ['380px', '300px'],
-            shadeClose: true, //开启遮罩关闭
-            content: '<div class=\"popup-module-header-div\" style="height: 70px;">' +
-                '<p class=\"popup-module-title\" style="padding-top: 20px">账号钱包发钱</p>' +
-                '</div>' +
-                '<div class=\"popup-module-core-div\" style="margin-top: -40px">' +
-                '<div class="popup-module-core-div-item">' +
-                '<input class="popup-input-number" id="create-uid" placeholder="账号userId,列表中第一列">' +
-                '</div>' +
-                '<div class="popup-module-core-div-item">' +
-                '<input class="popup-input-number" id="create-money" placeholder="金额,最好是整数">' +
-                '</div>' +
-                '<div class="popup-module-core-div-item" style=\"border: 0px\">' +
-                '<button class=\"popup-btn\" onclick=\"grantMoney($(\'#create-uid\').val(), $(\'#create-money\').val())\">赠送</button>' +
-                '</div>' +
-                '</div>' +
-                '</div>'
-        });
-    })
-
-    function grantMoney(userId, money) {
-        if (userId == '') {
-            layer.msg("接收人账号id未设置");
-            return;
-        }
-        if (money == '') {
-            layer.msg("赠送金额未设置");
-            return;
-        }
-        $.ajax({
-            url: "/api/user/wallet/grant/"+userId+"/"+money,
-            type: "post",
-            dataType: "json",
-            contentType: "application/json;charset=utf-8",
-            success:function (data) {
-                data = eval(data);
-                if (data.success){
-                    layer.msg("赠送成功");
-                    loadUserWalletInFlow();
-                    layer.closeAll();
-                } else {
-                    layer.msg(data.msg);
-                }
-            },error:function () {
-                layer.msg("服务端异常");
-            }
-        })
-    }
-
 </script>
 <style>
     .layui-laypage a, .layui-laypage span {