package com.webchat.ugc.service; import com.webchat.common.enums.ChatMessageTypeEnum; import com.webchat.common.enums.RedisKeyEnum; import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum; import com.webchat.common.service.RedisService; import com.webchat.common.service.messagequeue.producer.MessageQueueProducer; import com.webchat.domain.vo.request.ChattingRequestVO; import com.webchat.domain.vo.request.mess.MessageNotifyVO; import com.webchat.domain.vo.response.UserBaseResponseInfoVO; import com.webchat.domain.vo.response.chatting.ChattingListResponseVO; import com.webchat.domain.vo.response.mess.ChatMessageResponseVO; import com.webchat.ugc.messaegqueue.service.PersistentMessageService; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @Service public class ChatMessageService { @Autowired private RedisService redisService; @Autowired private AccountService accountService; @Autowired private PersistentMessageService persistentMessageService; @Autowired private MessageQueueProducer messageQueueProducer; /** * 查询用户对话列表 * * @param userId * @param lastChatTime * @param size * @return */ public List listChatting(String userId, Long lastChatTime, Integer size) { lastChatTime = lastChatTime == null ? Long.MAX_VALUE : lastChatTime - 1; String cacheKey = this.getCacheKey(userId); /** * 查询最新对话用户列表数据 */ Set chattingUsers = redisService.zreverseRangeByScore(cacheKey, lastChatTime, 0, size); List chattingUserList = new ArrayList<>(chattingUsers); List chattingUserInfos = accountService.batchGet(chattingUserList); if (CollectionUtils.isEmpty(chattingUserInfos)) { return Collections.emptyList(); } /** * 查询对话的最近一次时间 */ Map lastTimeMap = redisService.zscoreTomap(cacheKey, chattingUserList); /** * 查询未读消息的用户列表,用于标记小红点 */ Set unreadUsers = persistentMessageService.getUnreadMessUserSetFromCache(userId); /** * 查询对话列表最新对话消息内容数据 */ Map lastMessageVOMap = persistentMessageService.batchGetUserLastMess(userId, chattingUsers); Map lastMessageMap = new HashMap<>(); if (MapUtils.isNotEmpty(lastMessageVOMap)) { for (Map.Entry entry : lastMessageVOMap.entrySet()) { lastMessageMap.put(entry.getKey(), entry.getValue() != null ? entry.getValue().getPrintMessage() : "有新消息"); } } /** * 构造用户对话列表 */ return chattingUserInfos.stream() .filter(Objects::nonNull) .map(u -> { return ChattingListResponseVO.builder() .user(u) .unread(unreadUsers.contains(u.getUserId())) .lastChatTime(lastTimeMap.get(u.getUserId())) .lastOfflineMessage(lastMessageMap.get(u.getUserId())) .build(); }).collect(Collectors.toList()); } /** * 对话加入用户对话列表(完全依赖redis缓存) * * @param chattingRequest * @return */ public boolean addChattingList(ChattingRequestVO chattingRequest) { /** * TODO 后续考虑是否需要持久化,数据一致性保障 */ // 基于Sorted Set实现最新对话列表缓存,最新对话时间作为Score用于排序 String cacheKey = this.getCacheKey(chattingRequest.getAccount()); Long lastChatTime = chattingRequest.getLastChatTime(); lastChatTime = lastChatTime == null ? System.currentTimeMillis() : lastChatTime; redisService.zadd(cacheKey, chattingRequest.getChatAccount(), lastChatTime, RedisKeyEnum.CHAT_ACCOUNT_LIST_CACHE.getExpireTime()); // 通知客户端对话列表需要刷新 MessageNotifyVO messageBase = new MessageNotifyVO(); messageBase.setReceiverId(chattingRequest.getAccount()); messageBase.setType(ChatMessageTypeEnum.CHATTING_REFRESH.getType()); messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_CHAT_NOTIFY, messageBase); return true; } private String getCacheKey(String account) { return RedisKeyEnum.CHAT_ACCOUNT_LIST_CACHE.getKey(account); } }