PersistentMessageService.java 17 KB


  1. package com.webchat.ugc.messaegqueue.service;
  2. import com.webchat.common.bean.APIPageResponseBean;
  3. import com.webchat.common.enums.ChatMessageTypeEnum;
  4. import com.webchat.common.enums.RedisKeyEnum;
  5. import com.webchat.common.enums.RoleCodeEnum;
  6. import com.webchat.common.enums.messagequeue.MessageQueueEnum;
  7. import com.webchat.common.service.RedisService;
  8. import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
  9. import com.webchat.common.util.DateUtils;
  10. import com.webchat.common.util.HtmlUtil;
  11. import com.webchat.common.util.JsonUtil;
  12. import com.webchat.domain.vo.request.mess.ChatMessageRequestVO;
  13. import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
  14. import com.webchat.domain.vo.response.mess.ChatMessageResponseVO;
  15. import com.webchat.ugc.repository.dao.IChatMessageDAO;
  16. import com.webchat.ugc.repository.entity.ChatMessageEntity;
  17. import com.webchat.ugc.service.AccountService;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.apache.commons.collections.CollectionUtils;
  20. import org.apache.commons.lang3.StringUtils;
  21. import org.springframework.beans.factory.annotation.Autowired;
  22. import org.springframework.data.domain.Page;
  23. import org.springframework.data.domain.PageRequest;
  24. import org.springframework.data.domain.Pageable;
  25. import org.springframework.data.domain.Sort;
  26. import org.springframework.stereotype.Service;
  27. import java.util.ArrayList;
  28. import java.util.Collections;
  29. import java.util.Comparator;
  30. import java.util.Date;
  31. import java.util.HashMap;
  32. import java.util.List;
  33. import java.util.Map;
  34. import java.util.Objects;
  35. import java.util.Set;
  36. import java.util.stream.Collectors;
  37. @Slf4j
  38. @Service
  39. public class PersistentMessageService {
  40. @Autowired
  41. private IChatMessageDAO chatMessageDAO;
  42. @Autowired
  43. private RedisService redisService;
  44. @Autowired
  45. private AccountService accountService;
  46. @Autowired
  47. private MessageQueueProducer<Object, Long> messageQueueProducer;
  48. /**
  49. * 《离线场景》持久化消息队列,保存离线消息,同时会将数据同步到ES用于后续的RAG问答和消息搜索
  50. *
  51. * @param messVo
  52. * @return
  53. */
  54. public boolean persistent(ChatMessageRequestVO messVo) {
  55. ChatMessageEntity mess = convert(messVo);
  56. // 取消息接收人信息,判断是否群聊场景
  57. String receiverId = messVo.getReceiverId();
  58. UserBaseResponseInfoVO receiver = accountService.accountInfo(receiverId);
  59. boolean groupMessage = RoleCodeEnum.GROUP.getCode().equals(receiver.getRoleCode());
  60. List<String> receivers = new ArrayList<>();
  61. if (groupMessage) {
  62. // 角色翻转,已群组作为发送人
  63. mess.setSender(receiverId);
  64. // 设置代理消息发送用户
  65. mess.setProxySender(messVo.getSenderId());
  66. // 查询实际接收人,群组下所有用户
  67. Set<String> groupUserIds = accountService.getGroupUserIds(receiverId);
  68. receivers.addAll(groupUserIds);
  69. } else {
  70. receivers.add(receiverId);
  71. }
  72. receivers.remove(messVo.getSenderId());
  73. for (String receiverUserId : receivers) {
  74. mess.setReceiver(receiverUserId);
  75. // 这里有优化空间,可以改为批量一次入库(先简单实现功能)
  76. mess = chatMessageDAO.save(mess);
  77. // 加入聊天缓存
  78. this.addUserMessCache(mess);
  79. // 加入聊天列表
  80. if (groupMessage) {
  81. this.addOrRefreshMessListCache(mess.getProxySender(), messVo.getReceiverId());
  82. this.addOrRefreshMessListCache(receiverUserId, messVo.getReceiverId());
  83. // 群聊未读消息+1
  84. addUnreadMessCountCache(receiverUserId, mess.getSender());
  85. } else {
  86. this.addOrRefreshMessListCache(messVo.getSenderId(), receiverUserId);
  87. this.addOrRefreshMessListCache(receiverUserId, messVo.getSenderId());
  88. // 未读消息+1
  89. addUnreadMessCountCache(receiverUserId, messVo.getSenderId());
  90. }
  91. }
  92. /**
  93. * 刷新用户对话列表
  94. *
  95. */
  96. this.refreshChattingCache(messVo);
  97. return true;
  98. }
  99. private void refreshChattingCache(ChatMessageRequestVO chatMessage) {
  100. // 刷新聊天对话列表
  101. Long msgTime = System.currentTimeMillis();
  102. ChatMessageRequestVO chattingRequest = new ChatMessageRequestVO();
  103. chattingRequest.setSenderId(chatMessage.getSenderId());
  104. chattingRequest.setReceiverId(chatMessage.getReceiverId());
  105. chattingRequest.setTime(msgTime);
  106. messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest);
  107. UserBaseResponseInfoVO accountInfo = accountService.accountInfo(chatMessage.getReceiverId());
  108. if (RoleCodeEnum.isUserRole(accountInfo.getRoleCode())) {
  109. // 如果消息接受账号是用户类型,也需要刷新用户账号列表
  110. chattingRequest.setSenderId(chatMessage.getReceiverId());
  111. chattingRequest.setReceiverId(chatMessage.getSenderId());
  112. messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest);
  113. } else if (RoleCodeEnum.GROUP.getCode().equals(accountInfo.getRoleCode())) {
  114. // 接受人是群组,需要刷新群组下所有用户对话列表
  115. Set<String> groupUsers = accountService.getGroupUserIds(chatMessage.getReceiverId());
  116. if (CollectionUtils.isNotEmpty(groupUsers)) {
  117. for (String groupUserId : groupUsers) {
  118. if (groupUserId.equals(chatMessage.getSenderId())) {
  119. continue;
  120. }
  121. chattingRequest.setSenderId(groupUserId);
  122. chattingRequest.setReceiverId(chatMessage.getReceiverId());
  123. messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest);
  124. }
  125. }
  126. }
  127. }
  128. /**
  129. * 刷新未读消息数缓存,用于新消息红点🔴通知
  130. *
  131. * @param currUserId
  132. * @param chatUserId
  133. * @return
  134. */
  135. private Long addUnreadMessCountCache(String currUserId, String chatUserId) {
  136. String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
  137. redisService.sadd(unreadUserCacheCountKey, chatUserId);
  138. String unreadCacheCountKey = RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getKey(currUserId, chatUserId);
  139. return redisService.increx(unreadCacheCountKey, RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getExpireTime());
  140. }
  141. private void clearUnreadMessCountCache(String currUserId, String chatUserId) {
  142. String unreadCacheCountKey = RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getKey(currUserId, chatUserId);
  143. redisService.set(unreadCacheCountKey, "0", RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getExpireTime());
  144. String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
  145. redisService.sremove(unreadUserCacheCountKey, chatUserId);
  146. }
  147. public Long getUnreadMessUserCountFromCache(String currUserId) {
  148. String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
  149. return redisService.ssize(unreadUserCacheCountKey);
  150. }
  151. public Set<String> getUnreadMessUserSetFromCache(String currUserId) {
  152. String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
  153. return redisService.smembers(unreadUserCacheCountKey);
  154. }
  155. /***
  156. * 查询两个人的聊天记录
  157. * @param currUserId
  158. * @param chatUserId
  159. * @param lastId
  160. * @param size
  161. * @return
  162. */
  163. public List<ChatMessageResponseVO> getChatMessListFromCache(String currUserId, String chatUserId, Long lastId,
  164. Long fixedMessageId, int size) {
  165. // 查询后清理未读消息数
  166. clearUnreadMessCountCache(currUserId, chatUserId);
  167. lastId = lastId == null ? Long.MAX_VALUE : lastId;
  168. String cacheKey = getUserMessRedisKey(currUserId, chatUserId);
  169. Set<String> cacheSet = redisService.zreverseRangeByScore(cacheKey, lastId, 0, size);
  170. if (CollectionUtils.isEmpty(cacheSet)) {
  171. return Collections.emptyList();
  172. }
  173. if (fixedMessageId != null) {
  174. cacheSet.remove(fixedMessageId.toString());
  175. }
  176. List<ChatMessageResponseVO> chatMessageResponseVOList = cacheSet.stream().map(cache -> {
  177. ChatMessageResponseVO messageResponse = getChatMessDetailFromCache(Long.valueOf(cache));
  178. messageResponse.setReceiver(accountService.accountInfo(messageResponse.getReceiverId()));
  179. messageResponse.setSender(accountService.accountInfo(messageResponse.getSenderId()));
  180. if (StringUtils.isNotBlank(messageResponse.getProxySenderId())) {
  181. messageResponse.setProxySender(accountService.accountInfo(messageResponse.getProxySenderId()));
  182. }
  183. return messageResponse;
  184. }).sorted(Comparator.comparing(ChatMessageResponseVO::getMessId)).collect(Collectors.toList());
  185. if (fixedMessageId != null) {
  186. // 处理消息定位,简单处理,默认插入到消息尾部
  187. ChatMessageResponseVO fixedMessage = getChatMessDetailFromCache(fixedMessageId);
  188. fixedMessage.setReceiver(accountService.accountInfo(fixedMessage.getReceiverId()));
  189. fixedMessage.setSender(accountService.accountInfo(fixedMessage.getSenderId()));
  190. if (StringUtils.isNotBlank(fixedMessage.getProxySenderId())) {
  191. fixedMessage.setProxySender(accountService.accountInfo(fixedMessage.getProxySenderId()));
  192. }
  193. chatMessageResponseVOList.add(fixedMessage);
  194. }
  195. return chatMessageResponseVOList;
  196. }
  197. public Map<String, ChatMessageResponseVO> batchGetUserLastMess(String currUserId, Set<String> userIds) {
  198. Map<String, ChatMessageResponseVO> map = new HashMap<>();
  199. for (String userId : userIds) {
  200. String cacheKey = getUserMessRedisKey(currUserId, userId);
  201. Set<String> cacheSet =
  202. redisService.zreverseRangeByScore(cacheKey, Long.MAX_VALUE, 0, 1);
  203. if (CollectionUtils.isNotEmpty(cacheSet)) {
  204. Long lastMessId = Long.valueOf(new ArrayList<>(cacheSet).get(0));
  205. ChatMessageResponseVO chatMessageResponseVO = getChatMessDetailFromCache(lastMessId);
  206. if (chatMessageResponseVO != null) {
  207. map.put(userId, chatMessageResponseVO);
  208. }
  209. }
  210. }
  211. return map;
  212. }
  213. private void addOrRefreshMessListCache(String currUserId, String chatUserId) {
  214. String messUserKey = RedisKeyEnum.MESS_USER_LIST_KEY.getKey(currUserId);
  215. redisService.zadd(messUserKey, chatUserId, DateUtils.getCurrentTimeMillis());
  216. }
  217. private void addUserMessCache(ChatMessageEntity mess) {
  218. long messId = mess.getId();
  219. /***
  220. * 刷新消息详情缓存
  221. */
  222. refreshMessCache(mess);
  223. /***
  224. * 加入用户消息列表
  225. */
  226. addUserMessCache(
  227. mess.getProxySender() != null ? mess.getProxySender() : mess.getSender(),
  228. mess.getProxySender() != null ? mess.getSender() : mess.getReceiver(),
  229. messId, messId);
  230. addUserMessCache(mess.getReceiver(), mess.getSender(), messId, messId);
  231. }
  232. private void addUserMessCache(String sender, String receiver, Long messId, Long score) {
  233. String cacheKey = getUserMessRedisKey(sender, receiver);
  234. redisService.zadd(cacheKey, messId.toString(), score);
  235. }
  236. /***
  237. * 刷新消息缓存
  238. * @param mess
  239. */
  240. private void refreshMessCache(ChatMessageEntity mess) {
  241. String messKey = RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getKey();
  242. redisService.hset(messKey, String.valueOf(mess.getId()), JsonUtil.toJsonString(mess),
  243. RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getExpireTime());
  244. }
  245. private ChatMessageResponseVO getChatMessDetailFromCache(Long messId) {
  246. String messKey = RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getKey();
  247. String messCache = redisService.hget(messKey, String.valueOf(messId));
  248. if (StringUtils.isBlank(messCache)) {
  249. return null;
  250. }
  251. ChatMessageEntity userMessEntity = JsonUtil.fromJson(messCache, ChatMessageEntity.class);
  252. ChatMessageResponseVO messageResponse = new ChatMessageResponseVO();
  253. messageResponse.setMessId(userMessEntity.getId());
  254. messageResponse.setMessage(userMessEntity.getMessage());
  255. messageResponse.setSenderId(userMessEntity.getSender());
  256. messageResponse.setProxySenderId(userMessEntity.getProxySender());
  257. messageResponse.setReceiverId(userMessEntity.getReceiver());
  258. messageResponse.setTime(userMessEntity.getSendDate().getTime());
  259. messageResponse.setIsRead(userMessEntity.getIsRead());
  260. messageResponse.setType(userMessEntity.getType());
  261. if (ChatMessageTypeEnum.RED_PACKET.getType().equals(userMessEntity.getType())) {
  262. // messageResponse.setRedPacketDetail(redPacketService.getRedPacketDetailCache(Long.valueOf(userMessEntity.getMessage())));
  263. } else if (ChatMessageTypeEnum.PUBLIC_ACCOUNT_ARTICLE.getType().equals(userMessEntity.getType())) {
  264. // messageResponse.setPublicAccountArticle(articleService.getPublicAccountArticleMessage(Long.valueOf(userMessEntity.getMessage())));
  265. }
  266. messageResponse.setGroupMessage(StringUtils.isNotBlank(userMessEntity.getProxySender()));
  267. return messageResponse;
  268. }
  269. public String getUserMessRedisKey(String sender, String receiver) {
  270. return RedisKeyEnum.USER_CHAT_MESS_CACHE_KEY.getKey(sender, receiver);
  271. }
  272. public APIPageResponseBean<List<ChatMessageResponseVO>> pageMessage(String mess, int pageNo, int pageSize) {
  273. Pageable pageable = PageRequest.of(pageNo - 1, pageSize, Sort.by(Sort.Order.desc("id")));
  274. Page<ChatMessageEntity> chatMessEntityPage;
  275. if (StringUtils.isBlank(mess)) {
  276. chatMessEntityPage = chatMessageDAO.findAll(pageable);
  277. } else {
  278. chatMessEntityPage = chatMessageDAO.findAllByMessageLike("%"+mess+"%", pageable);
  279. }
  280. List<ChatMessageResponseVO> chatMessageResponseVOList = convertChatMessageResponseList(chatMessEntityPage.getContent());
  281. return APIPageResponseBean.success(pageNo, pageSize, chatMessEntityPage.getTotalElements(), chatMessageResponseVOList);
  282. }
  283. private List<ChatMessageResponseVO> convertChatMessageResponseList(List<ChatMessageEntity> chatMessEntities) {
  284. if (CollectionUtils.isEmpty(chatMessEntities)) {
  285. return Collections.emptyList();
  286. }
  287. /**
  288. * 批量查询用户信息
  289. */
  290. Set<String> senderUserIds = chatMessEntities.stream().map(ChatMessageEntity::getSender).collect(Collectors.toSet());
  291. Set<String> proxySenderUserIds = chatMessEntities.stream().map(ChatMessageEntity::getProxySender).filter(Objects::nonNull).collect(Collectors.toSet());
  292. Set<String> receiverUserIds = chatMessEntities.stream().map(ChatMessageEntity::getReceiver).collect(Collectors.toSet());
  293. senderUserIds.addAll(receiverUserIds);
  294. if (CollectionUtils.isNotEmpty(proxySenderUserIds)) {
  295. senderUserIds.addAll(proxySenderUserIds);
  296. }
  297. Map<String, UserBaseResponseInfoVO> userMap = accountService.batchGet(senderUserIds);
  298. return chatMessEntities.stream().map(chat -> {
  299. ChatMessageResponseVO chatMessageResponse = new ChatMessageResponseVO();
  300. chatMessageResponse.setTime(chat.getSendDate().getTime());
  301. chatMessageResponse.setSender(userMap.get(chat.getSender()));
  302. if (StringUtils.isNotBlank(chat.getProxySender())) {
  303. chatMessageResponse.setProxySender(userMap.get(chat.getProxySender()));
  304. }
  305. chatMessageResponse.setReceiver(userMap.get(chat.getReceiver()));
  306. chatMessageResponse.setMessage(chat.getMessage());
  307. return chatMessageResponse;
  308. }).collect(Collectors.toList());
  309. }
  310. private ChatMessageEntity convert(ChatMessageRequestVO messVo) {
  311. ChatMessageEntity mess = new ChatMessageEntity();
  312. mess.setSender(messVo.getSenderId());
  313. mess.setReceiver(messVo.getReceiverId());
  314. mess.setMessage(this.handleSpecialHtmlTag(HtmlUtil.xssEscape(messVo.getMessage())));
  315. mess.setSendDate(new Date());
  316. mess.setIsRead(false);
  317. mess.setType(messVo.getType());
  318. return mess;
  319. }
  320. /***
  321. * 处理特殊字符
  322. * @param content
  323. * @return
  324. */
  325. private String handleSpecialHtmlTag(String content) {
  326. if (StringUtils.isBlank(content)) {
  327. return content;
  328. }
  329. content = content.replaceAll("&lt;br&gt;", "<br>");
  330. content = content.replaceAll("&lt;b&gt;", "<b>");
  331. return content;
  332. }
  333. }