123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- package com.webchat.ugc.service;
- import com.webchat.common.enums.ChatMessageTypeEnum;
- import com.webchat.common.enums.RedisKeyEnum;
- import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
- import com.webchat.common.service.RedisService;
- import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
- import com.webchat.domain.vo.request.ChattingRequestVO;
- import com.webchat.domain.vo.request.mess.MessageNotifyVO;
- import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
- import com.webchat.domain.vo.response.chatting.ChattingListResponseVO;
- import com.webchat.domain.vo.response.mess.ChatMessageResponseVO;
- import com.webchat.ugc.messaegqueue.service.PersistentMessageService;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.collections.MapUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.Set;
- import java.util.stream.Collectors;
- @Service
- public class ChatMessageService {
- @Autowired
- private RedisService redisService;
- @Autowired
- private AccountService accountService;
- @Autowired
- private PersistentMessageService persistentMessageService;
- @Autowired
- private MessageQueueProducer<MessageNotifyVO, Long> messageQueueProducer;
- /**
- * 查询用户对话列表
- *
- * @param userId
- * @param lastChatTime
- * @param size
- * @return
- */
- public List<ChattingListResponseVO> listChatting(String userId, Long lastChatTime, Integer size) {
- lastChatTime = lastChatTime == null ? Long.MAX_VALUE : lastChatTime - 1;
- String cacheKey = this.getCacheKey(userId);
- /**
- * 查询最新对话用户列表数据
- */
- Set<String> chattingUsers = redisService.zreverseRangeByScore(cacheKey, lastChatTime, 0, size);
- List<String> chattingUserList = new ArrayList<>(chattingUsers);
- List<UserBaseResponseInfoVO> chattingUserInfos = accountService.batchGet(chattingUserList);
- if (CollectionUtils.isEmpty(chattingUserInfos)) {
- return Collections.emptyList();
- }
- /**
- * 查询对话的最近一次时间
- */
- Map<String, Long> lastTimeMap = redisService.zscoreTomap(cacheKey, chattingUserList);
- /**
- * 查询未读消息的用户列表,用于标记小红点
- */
- Set<String> unreadUsers = persistentMessageService.getUnreadMessUserSetFromCache(userId);
- /**
- * 查询对话列表最新对话消息内容数据
- */
- Map<String, ChatMessageResponseVO> lastMessageVOMap = persistentMessageService.batchGetUserLastMess(userId, chattingUsers);
- Map<String, String> lastMessageMap = new HashMap<>();
- if (MapUtils.isNotEmpty(lastMessageVOMap)) {
- for (Map.Entry<String, ChatMessageResponseVO> entry : lastMessageVOMap.entrySet()) {
- lastMessageMap.put(entry.getKey(), entry.getValue() != null ? entry.getValue().getPrintMessage() : "有新消息");
- }
- }
- /**
- * 构造用户对话列表
- */
- return chattingUserInfos.stream()
- .filter(Objects::nonNull)
- .map(u -> {
- return ChattingListResponseVO.builder()
- .user(u)
- .unread(unreadUsers.contains(u.getUserId()))
- .lastChatTime(lastTimeMap.get(u.getUserId()))
- .lastOfflineMessage(lastMessageMap.get(u.getUserId()))
- .build();
- }).collect(Collectors.toList());
- }
- /**
- * 对话加入用户对话列表(完全依赖redis缓存)
- *
- * @param chattingRequest
- * @return
- */
- public boolean addChattingList(ChattingRequestVO chattingRequest) {
- /**
- * TODO 后续考虑是否需要持久化,数据一致性保障
- */
- // 基于Sorted Set实现最新对话列表缓存,最新对话时间作为Score用于排序
- String cacheKey = this.getCacheKey(chattingRequest.getAccount());
- Long lastChatTime = chattingRequest.getLastChatTime();
- lastChatTime = lastChatTime == null ? System.currentTimeMillis() : lastChatTime;
- redisService.zadd(cacheKey, chattingRequest.getChatAccount(), lastChatTime, RedisKeyEnum.CHAT_ACCOUNT_LIST_CACHE.getExpireTime());
- // 通知客户端对话列表需要刷新
- MessageNotifyVO messageBase = new MessageNotifyVO();
- messageBase.setReceiverId(chattingRequest.getAccount());
- messageBase.setType(ChatMessageTypeEnum.CHATTING_REFRESH.getType());
- messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_CHAT_NOTIFY, messageBase);
- return true;
- }
- private String getCacheKey(String account) {
- return RedisKeyEnum.CHAT_ACCOUNT_LIST_CACHE.getKey(account);
- }
- }
|