ChatMessageService.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package com.webchat.ugc.service;
  2. import com.webchat.common.enums.ChatMessageTypeEnum;
  3. import com.webchat.common.enums.RedisKeyEnum;
  4. import com.webchat.common.enums.messagequeue.MessageBroadChannelEnum;
  5. import com.webchat.common.service.RedisService;
  6. import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
  7. import com.webchat.domain.vo.request.ChattingRequestVO;
  8. import com.webchat.domain.vo.request.mess.MessageNotifyVO;
  9. import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
  10. import com.webchat.domain.vo.response.chatting.ChattingListResponseVO;
  11. import com.webchat.domain.vo.response.mess.ChatMessageResponseVO;
  12. import com.webchat.ugc.messaegqueue.service.PersistentMessageService;
  13. import org.apache.commons.collections.CollectionUtils;
  14. import org.apache.commons.collections.MapUtils;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.stereotype.Service;
  17. import java.util.ArrayList;
  18. import java.util.Collections;
  19. import java.util.HashMap;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.Objects;
  23. import java.util.Set;
  24. import java.util.stream.Collectors;
  25. @Service
  26. public class ChatMessageService {
  27. @Autowired
  28. private RedisService redisService;
  29. @Autowired
  30. private AccountService accountService;
  31. @Autowired
  32. private PersistentMessageService persistentMessageService;
  33. @Autowired
  34. private MessageQueueProducer<MessageNotifyVO, Long> messageQueueProducer;
  35. /**
  36. * 查询用户对话列表
  37. *
  38. * @param userId
  39. * @param lastChatTime
  40. * @param size
  41. * @return
  42. */
  43. public List<ChattingListResponseVO> listChatting(String userId, Long lastChatTime, Integer size) {
  44. lastChatTime = lastChatTime == null ? Long.MAX_VALUE : lastChatTime - 1;
  45. String cacheKey = this.getCacheKey(userId);
  46. /**
  47. * 查询最新对话用户列表数据
  48. */
  49. Set<String> chattingUsers = redisService.zreverseRangeByScore(cacheKey, lastChatTime, 0, size);
  50. List<String> chattingUserList = new ArrayList<>(chattingUsers);
  51. List<UserBaseResponseInfoVO> chattingUserInfos = accountService.batchGet(chattingUserList);
  52. if (CollectionUtils.isEmpty(chattingUserInfos)) {
  53. return Collections.emptyList();
  54. }
  55. /**
  56. * 查询对话的最近一次时间
  57. */
  58. Map<String, Long> lastTimeMap = redisService.zscoreTomap(cacheKey, chattingUserList);
  59. /**
  60. * 查询未读消息的用户列表,用于标记小红点
  61. */
  62. Set<String> unreadUsers = persistentMessageService.getUnreadMessUserSetFromCache(userId);
  63. /**
  64. * 查询对话列表最新对话消息内容数据
  65. */
  66. Map<String, ChatMessageResponseVO> lastMessageVOMap = persistentMessageService.batchGetUserLastMess(userId, chattingUsers);
  67. Map<String, String> lastMessageMap = new HashMap<>();
  68. if (MapUtils.isNotEmpty(lastMessageVOMap)) {
  69. for (Map.Entry<String, ChatMessageResponseVO> entry : lastMessageVOMap.entrySet()) {
  70. lastMessageMap.put(entry.getKey(), entry.getValue() != null ? entry.getValue().getPrintMessage() : "有新消息");
  71. }
  72. }
  73. /**
  74. * 构造用户对话列表
  75. */
  76. return chattingUserInfos.stream()
  77. .filter(Objects::nonNull)
  78. .map(u -> {
  79. return ChattingListResponseVO.builder()
  80. .user(u)
  81. .unread(unreadUsers.contains(u.getUserId()))
  82. .lastChatTime(lastTimeMap.get(u.getUserId()))
  83. .lastOfflineMessage(lastMessageMap.get(u.getUserId()))
  84. .build();
  85. }).collect(Collectors.toList());
  86. }
  87. /**
  88. * 对话加入用户对话列表(完全依赖redis缓存)
  89. *
  90. * @param chattingRequest
  91. * @return
  92. */
  93. public boolean addChattingList(ChattingRequestVO chattingRequest) {
  94. /**
  95. * TODO 后续考虑是否需要持久化,数据一致性保障
  96. */
  97. // 基于Sorted Set实现最新对话列表缓存,最新对话时间作为Score用于排序
  98. String cacheKey = this.getCacheKey(chattingRequest.getAccount());
  99. Long lastChatTime = chattingRequest.getLastChatTime();
  100. lastChatTime = lastChatTime == null ? System.currentTimeMillis() : lastChatTime;
  101. redisService.zadd(cacheKey, chattingRequest.getChatAccount(), lastChatTime, RedisKeyEnum.CHAT_ACCOUNT_LIST_CACHE.getExpireTime());
  102. // 通知客户端对话列表需要刷新
  103. MessageNotifyVO messageBase = new MessageNotifyVO();
  104. messageBase.setReceiverId(chattingRequest.getAccount());
  105. messageBase.setType(ChatMessageTypeEnum.CHATTING_REFRESH.getType());
  106. messageQueueProducer.broadSend(MessageBroadChannelEnum.QUEUE_CHAT_NOTIFY, messageBase);
  107. return true;
  108. }
  109. private String getCacheKey(String account) {
  110. return RedisKeyEnum.CHAT_ACCOUNT_LIST_CACHE.getKey(account);
  111. }
  112. }