package com.webchat.ugc.messaegqueue.service; import com.webchat.common.bean.APIPageResponseBean; import com.webchat.common.enums.ChatMessageTypeEnum; import com.webchat.common.enums.RedisKeyEnum; import com.webchat.common.enums.RoleCodeEnum; import com.webchat.common.enums.messagequeue.MessageQueueEnum; import com.webchat.common.service.RedisService; import com.webchat.common.service.messagequeue.producer.MessageQueueProducer; import com.webchat.common.util.DateUtils; import com.webchat.common.util.HtmlUtil; import com.webchat.common.util.JsonUtil; import com.webchat.domain.vo.request.mess.ChatMessageRequestVO; import com.webchat.domain.vo.response.UserBaseResponseInfoVO; import com.webchat.domain.vo.response.mess.ChatMessageResponseVO; import com.webchat.ugc.repository.dao.IChatMessageDAO; import com.webchat.ugc.repository.entity.ChatMessageEntity; import com.webchat.ugc.service.AccountService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @Slf4j @Service public class PersistentMessageService { @Autowired private IChatMessageDAO chatMessageDAO; @Autowired private RedisService redisService; @Autowired private AccountService accountService; @Autowired private MessageQueueProducer messageQueueProducer; /** * 《离线场景》持久化消息队列,保存离线消息,同时会将数据同步到ES用于后续的RAG问答和消息搜索 * * @param messVo * @return */ public boolean persistent(ChatMessageRequestVO messVo) { ChatMessageEntity mess = convert(messVo); // 取消息接收人信息,判断是否群聊场景 String receiverId = messVo.getReceiverId(); UserBaseResponseInfoVO receiver = accountService.accountInfo(receiverId); boolean groupMessage = RoleCodeEnum.GROUP.getCode().equals(receiver.getRoleCode()); List receivers = new ArrayList<>(); if (groupMessage) { // 角色翻转,已群组作为发送人 mess.setSender(receiverId); // 设置代理消息发送用户 mess.setProxySender(messVo.getSenderId()); // 查询实际接收人,群组下所有用户 Set groupUserIds = accountService.getGroupUserIds(receiverId); receivers.addAll(groupUserIds); } else { receivers.add(receiverId); } receivers.remove(messVo.getSenderId()); for (String receiverUserId : receivers) { mess.setReceiver(receiverUserId); // 这里有优化空间,可以改为批量一次入库(先简单实现功能) mess = chatMessageDAO.save(mess); // 加入聊天缓存 this.addUserMessCache(mess); // 加入聊天列表 if (groupMessage) { this.addOrRefreshMessListCache(mess.getProxySender(), messVo.getReceiverId()); this.addOrRefreshMessListCache(receiverUserId, messVo.getReceiverId()); // 群聊未读消息+1 addUnreadMessCountCache(receiverUserId, mess.getSender()); } else { this.addOrRefreshMessListCache(messVo.getSenderId(), receiverUserId); this.addOrRefreshMessListCache(receiverUserId, messVo.getSenderId()); // 未读消息+1 addUnreadMessCountCache(receiverUserId, messVo.getSenderId()); } } /** * 刷新用户对话列表 * */ this.refreshChattingCache(messVo); return true; } private void refreshChattingCache(ChatMessageRequestVO chatMessage) { // 刷新聊天对话列表 Long msgTime = System.currentTimeMillis(); ChatMessageRequestVO chattingRequest = new ChatMessageRequestVO(); chattingRequest.setSenderId(chatMessage.getSenderId()); chattingRequest.setReceiverId(chatMessage.getReceiverId()); chattingRequest.setTime(msgTime); messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest); UserBaseResponseInfoVO accountInfo = accountService.accountInfo(chatMessage.getReceiverId()); if (RoleCodeEnum.isUserRole(accountInfo.getRoleCode())) { // 如果消息接受账号是用户类型,也需要刷新用户账号列表 chattingRequest.setSenderId(chatMessage.getReceiverId()); chattingRequest.setReceiverId(chatMessage.getSenderId()); messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest); } else if (RoleCodeEnum.GROUP.getCode().equals(accountInfo.getRoleCode())) { // 接受人是群组,需要刷新群组下所有用户对话列表 Set groupUsers = accountService.getGroupUserIds(chatMessage.getReceiverId()); if (CollectionUtils.isNotEmpty(groupUsers)) { for (String groupUserId : groupUsers) { if (groupUserId.equals(chatMessage.getSenderId())) { continue; } chattingRequest.setSenderId(groupUserId); chattingRequest.setReceiverId(chatMessage.getReceiverId()); messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest); } } } } /** * 刷新未读消息数缓存,用于新消息红点🔴通知 * * @param currUserId * @param chatUserId * @return */ private Long addUnreadMessCountCache(String currUserId, String chatUserId) { String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId); redisService.sadd(unreadUserCacheCountKey, chatUserId); String unreadCacheCountKey = RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getKey(currUserId, chatUserId); return redisService.increx(unreadCacheCountKey, RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getExpireTime()); } private void clearUnreadMessCountCache(String currUserId, String chatUserId) { String unreadCacheCountKey = RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getKey(currUserId, chatUserId); redisService.set(unreadCacheCountKey, "0", RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getExpireTime()); String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId); redisService.sremove(unreadUserCacheCountKey, chatUserId); } public Long getUnreadMessUserCountFromCache(String currUserId) { String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId); return redisService.ssize(unreadUserCacheCountKey); } public Set getUnreadMessUserSetFromCache(String currUserId) { String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId); return redisService.smembers(unreadUserCacheCountKey); } /*** * 查询两个人的聊天记录 * @param currUserId * @param chatUserId * @param lastId * @param size * @return */ public List getChatMessListFromCache(String currUserId, String chatUserId, Long lastId, Long fixedMessageId, int size) { // 查询后清理未读消息数 clearUnreadMessCountCache(currUserId, chatUserId); lastId = lastId == null ? Long.MAX_VALUE : lastId; String cacheKey = getUserMessRedisKey(currUserId, chatUserId); Set cacheSet = redisService.zreverseRangeByScore(cacheKey, lastId, 0, size); if (CollectionUtils.isEmpty(cacheSet)) { return Collections.emptyList(); } if (fixedMessageId != null) { cacheSet.remove(fixedMessageId.toString()); } List chatMessageResponseVOList = cacheSet.stream().map(cache -> { ChatMessageResponseVO messageResponse = getChatMessDetailFromCache(Long.valueOf(cache)); messageResponse.setReceiver(accountService.accountInfo(messageResponse.getReceiverId())); messageResponse.setSender(accountService.accountInfo(messageResponse.getSenderId())); if (StringUtils.isNotBlank(messageResponse.getProxySenderId())) { messageResponse.setProxySender(accountService.accountInfo(messageResponse.getProxySenderId())); } return messageResponse; }).sorted(Comparator.comparing(ChatMessageResponseVO::getMessId)).collect(Collectors.toList()); if (fixedMessageId != null) { // 处理消息定位,简单处理,默认插入到消息尾部 ChatMessageResponseVO fixedMessage = getChatMessDetailFromCache(fixedMessageId); fixedMessage.setReceiver(accountService.accountInfo(fixedMessage.getReceiverId())); fixedMessage.setSender(accountService.accountInfo(fixedMessage.getSenderId())); if (StringUtils.isNotBlank(fixedMessage.getProxySenderId())) { fixedMessage.setProxySender(accountService.accountInfo(fixedMessage.getProxySenderId())); } chatMessageResponseVOList.add(fixedMessage); } return chatMessageResponseVOList; } public Map batchGetUserLastMess(String currUserId, Set userIds) { Map map = new HashMap<>(); for (String userId : userIds) { String cacheKey = getUserMessRedisKey(currUserId, userId); Set cacheSet = redisService.zreverseRangeByScore(cacheKey, Long.MAX_VALUE, 0, 1); if (CollectionUtils.isNotEmpty(cacheSet)) { Long lastMessId = Long.valueOf(new ArrayList<>(cacheSet).get(0)); ChatMessageResponseVO chatMessageResponseVO = getChatMessDetailFromCache(lastMessId); if (chatMessageResponseVO != null) { map.put(userId, chatMessageResponseVO); } } } return map; } private void addOrRefreshMessListCache(String currUserId, String chatUserId) { String messUserKey = RedisKeyEnum.MESS_USER_LIST_KEY.getKey(currUserId); redisService.zadd(messUserKey, chatUserId, DateUtils.getCurrentTimeMillis()); } private void addUserMessCache(ChatMessageEntity mess) { long messId = mess.getId(); /*** * 刷新消息详情缓存 */ refreshMessCache(mess); /*** * 加入用户消息列表 */ addUserMessCache( mess.getProxySender() != null ? mess.getProxySender() : mess.getSender(), mess.getProxySender() != null ? mess.getSender() : mess.getReceiver(), messId, messId); addUserMessCache(mess.getReceiver(), mess.getSender(), messId, messId); } private void addUserMessCache(String sender, String receiver, Long messId, Long score) { String cacheKey = getUserMessRedisKey(sender, receiver); redisService.zadd(cacheKey, messId.toString(), score); } /*** * 刷新消息缓存 * @param mess */ private void refreshMessCache(ChatMessageEntity mess) { String messKey = RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getKey(); redisService.hset(messKey, String.valueOf(mess.getId()), JsonUtil.toJsonString(mess), RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getExpireTime()); } private ChatMessageResponseVO getChatMessDetailFromCache(Long messId) { String messKey = RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getKey(); String messCache = redisService.hget(messKey, String.valueOf(messId)); if (StringUtils.isBlank(messCache)) { return null; } ChatMessageEntity userMessEntity = JsonUtil.fromJson(messCache, ChatMessageEntity.class); ChatMessageResponseVO messageResponse = new ChatMessageResponseVO(); messageResponse.setMessId(userMessEntity.getId()); messageResponse.setMessage(userMessEntity.getMessage()); messageResponse.setSenderId(userMessEntity.getSender()); messageResponse.setProxySenderId(userMessEntity.getProxySender()); messageResponse.setReceiverId(userMessEntity.getReceiver()); messageResponse.setTime(userMessEntity.getSendDate().getTime()); messageResponse.setIsRead(userMessEntity.getIsRead()); messageResponse.setType(userMessEntity.getType()); if (ChatMessageTypeEnum.RED_PACKET.getType().equals(userMessEntity.getType())) { // messageResponse.setRedPacketDetail(redPacketService.getRedPacketDetailCache(Long.valueOf(userMessEntity.getMessage()))); } else if (ChatMessageTypeEnum.PUBLIC_ACCOUNT_ARTICLE.getType().equals(userMessEntity.getType())) { // messageResponse.setPublicAccountArticle(articleService.getPublicAccountArticleMessage(Long.valueOf(userMessEntity.getMessage()))); } messageResponse.setGroupMessage(StringUtils.isNotBlank(userMessEntity.getProxySender())); return messageResponse; } public String getUserMessRedisKey(String sender, String receiver) { return RedisKeyEnum.USER_CHAT_MESS_CACHE_KEY.getKey(sender, receiver); } public APIPageResponseBean> pageMessage(String mess, int pageNo, int pageSize) { Pageable pageable = PageRequest.of(pageNo - 1, pageSize, Sort.by(Sort.Order.desc("id"))); Page chatMessEntityPage; if (StringUtils.isBlank(mess)) { chatMessEntityPage = chatMessageDAO.findAll(pageable); } else { chatMessEntityPage = chatMessageDAO.findAllByMessageLike("%"+mess+"%", pageable); } List chatMessageResponseVOList = convertChatMessageResponseList(chatMessEntityPage.getContent()); return APIPageResponseBean.success(pageNo, pageSize, chatMessEntityPage.getTotalElements(), chatMessageResponseVOList); } private List convertChatMessageResponseList(List chatMessEntities) { if (CollectionUtils.isEmpty(chatMessEntities)) { return Collections.emptyList(); } /** * 批量查询用户信息 */ Set senderUserIds = chatMessEntities.stream().map(ChatMessageEntity::getSender).collect(Collectors.toSet()); Set proxySenderUserIds = chatMessEntities.stream().map(ChatMessageEntity::getProxySender).filter(Objects::nonNull).collect(Collectors.toSet()); Set receiverUserIds = chatMessEntities.stream().map(ChatMessageEntity::getReceiver).collect(Collectors.toSet()); senderUserIds.addAll(receiverUserIds); if (CollectionUtils.isNotEmpty(proxySenderUserIds)) { senderUserIds.addAll(proxySenderUserIds); } Map userMap = accountService.batchGet(senderUserIds); return chatMessEntities.stream().map(chat -> { ChatMessageResponseVO chatMessageResponse = new ChatMessageResponseVO(); chatMessageResponse.setTime(chat.getSendDate().getTime()); chatMessageResponse.setSender(userMap.get(chat.getSender())); if (StringUtils.isNotBlank(chat.getProxySender())) { chatMessageResponse.setProxySender(userMap.get(chat.getProxySender())); } chatMessageResponse.setReceiver(userMap.get(chat.getReceiver())); chatMessageResponse.setMessage(chat.getMessage()); return chatMessageResponse; }).collect(Collectors.toList()); } private ChatMessageEntity convert(ChatMessageRequestVO messVo) { ChatMessageEntity mess = new ChatMessageEntity(); mess.setSender(messVo.getSenderId()); mess.setReceiver(messVo.getReceiverId()); mess.setMessage(this.handleSpecialHtmlTag(HtmlUtil.xssEscape(messVo.getMessage()))); mess.setSendDate(new Date()); mess.setIsRead(false); mess.setType(messVo.getType()); return mess; } /*** * 处理特殊字符 * @param content * @return */ private String handleSpecialHtmlTag(String content) { if (StringUtils.isBlank(content)) { return content; } content = content.replaceAll("<br>", "
"); content = content.replaceAll("<b>", ""); return content; } }