123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- package com.webchat.ugc.messaegqueue.service;
- import com.webchat.common.bean.APIPageResponseBean;
- import com.webchat.common.enums.ChatMessageTypeEnum;
- import com.webchat.common.enums.RedisKeyEnum;
- import com.webchat.common.enums.RoleCodeEnum;
- import com.webchat.common.enums.messagequeue.MessageQueueEnum;
- import com.webchat.common.service.RedisService;
- import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
- import com.webchat.common.util.DateUtils;
- import com.webchat.common.util.HtmlUtil;
- import com.webchat.common.util.JsonUtil;
- import com.webchat.domain.vo.request.mess.ChatMessageRequestVO;
- import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
- import com.webchat.domain.vo.response.mess.ChatMessageResponseVO;
- import com.webchat.ugc.repository.dao.IChatMessageDAO;
- import com.webchat.ugc.repository.entity.ChatMessageEntity;
- import com.webchat.ugc.service.AccountService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.domain.Page;
- import org.springframework.data.domain.PageRequest;
- import org.springframework.data.domain.Pageable;
- import org.springframework.data.domain.Sort;
- import org.springframework.stereotype.Service;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.Set;
- import java.util.stream.Collectors;
- @Slf4j
- @Service
- public class PersistentMessageService {
- @Autowired
- private IChatMessageDAO chatMessageDAO;
- @Autowired
- private RedisService redisService;
- @Autowired
- private AccountService accountService;
- @Autowired
- private MessageQueueProducer<Object, Long> messageQueueProducer;
- /**
- * 《离线场景》持久化消息队列,保存离线消息,同时会将数据同步到ES用于后续的RAG问答和消息搜索
- *
- * @param messVo
- * @return
- */
- public boolean persistent(ChatMessageRequestVO messVo) {
- ChatMessageEntity mess = convert(messVo);
- // 取消息接收人信息,判断是否群聊场景
- String receiverId = messVo.getReceiverId();
- UserBaseResponseInfoVO receiver = accountService.accountInfo(receiverId);
- boolean groupMessage = RoleCodeEnum.GROUP.getCode().equals(receiver.getRoleCode());
- List<String> receivers = new ArrayList<>();
- if (groupMessage) {
- // 角色翻转,已群组作为发送人
- mess.setSender(receiverId);
- // 设置代理消息发送用户
- mess.setProxySender(messVo.getSenderId());
- // 查询实际接收人,群组下所有用户
- Set<String> groupUserIds = accountService.getGroupUserIds(receiverId);
- receivers.addAll(groupUserIds);
- } else {
- receivers.add(receiverId);
- }
- receivers.remove(messVo.getSenderId());
- for (String receiverUserId : receivers) {
- mess.setReceiver(receiverUserId);
- // 这里有优化空间,可以改为批量一次入库(先简单实现功能)
- mess = chatMessageDAO.save(mess);
- // 加入聊天缓存
- this.addUserMessCache(mess);
- // 加入聊天列表
- if (groupMessage) {
- this.addOrRefreshMessListCache(mess.getProxySender(), messVo.getReceiverId());
- this.addOrRefreshMessListCache(receiverUserId, messVo.getReceiverId());
- // 群聊未读消息+1
- addUnreadMessCountCache(receiverUserId, mess.getSender());
- } else {
- this.addOrRefreshMessListCache(messVo.getSenderId(), receiverUserId);
- this.addOrRefreshMessListCache(receiverUserId, messVo.getSenderId());
- // 未读消息+1
- addUnreadMessCountCache(receiverUserId, messVo.getSenderId());
- }
- }
- /**
- * 刷新用户对话列表
- *
- */
- this.refreshChattingCache(messVo);
- return true;
- }
- private void refreshChattingCache(ChatMessageRequestVO chatMessage) {
- // 刷新聊天对话列表
- Long msgTime = System.currentTimeMillis();
- ChatMessageRequestVO chattingRequest = new ChatMessageRequestVO();
- chattingRequest.setSenderId(chatMessage.getSenderId());
- chattingRequest.setReceiverId(chatMessage.getReceiverId());
- chattingRequest.setTime(msgTime);
- messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest);
- UserBaseResponseInfoVO accountInfo = accountService.accountInfo(chatMessage.getReceiverId());
- if (RoleCodeEnum.isUserRole(accountInfo.getRoleCode())) {
- // 如果消息接受账号是用户类型,也需要刷新用户账号列表
- chattingRequest.setSenderId(chatMessage.getReceiverId());
- chattingRequest.setReceiverId(chatMessage.getSenderId());
- messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest);
- } else if (RoleCodeEnum.GROUP.getCode().equals(accountInfo.getRoleCode())) {
- // 接受人是群组,需要刷新群组下所有用户对话列表
- Set<String> groupUsers = accountService.getGroupUserIds(chatMessage.getReceiverId());
- if (CollectionUtils.isNotEmpty(groupUsers)) {
- for (String groupUserId : groupUsers) {
- if (groupUserId.equals(chatMessage.getSenderId())) {
- continue;
- }
- chattingRequest.setSenderId(groupUserId);
- chattingRequest.setReceiverId(chatMessage.getReceiverId());
- messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest);
- }
- }
- }
- }
- /**
- * 刷新未读消息数缓存,用于新消息红点🔴通知
- *
- * @param currUserId
- * @param chatUserId
- * @return
- */
- private Long addUnreadMessCountCache(String currUserId, String chatUserId) {
- String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
- redisService.sadd(unreadUserCacheCountKey, chatUserId);
- String unreadCacheCountKey = RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getKey(currUserId, chatUserId);
- return redisService.increx(unreadCacheCountKey, RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getExpireTime());
- }
- private void clearUnreadMessCountCache(String currUserId, String chatUserId) {
- String unreadCacheCountKey = RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getKey(currUserId, chatUserId);
- redisService.set(unreadCacheCountKey, "0", RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getExpireTime());
- String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
- redisService.sremove(unreadUserCacheCountKey, chatUserId);
- }
- public Long getUnreadMessUserCountFromCache(String currUserId) {
- String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
- return redisService.ssize(unreadUserCacheCountKey);
- }
- public Set<String> getUnreadMessUserSetFromCache(String currUserId) {
- String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
- return redisService.smembers(unreadUserCacheCountKey);
- }
- /***
- * 查询两个人的聊天记录
- * @param currUserId
- * @param chatUserId
- * @param lastId
- * @param size
- * @return
- */
- public List<ChatMessageResponseVO> getChatMessListFromCache(String currUserId, String chatUserId, Long lastId,
- Long fixedMessageId, int size) {
- // 查询后清理未读消息数
- clearUnreadMessCountCache(currUserId, chatUserId);
- lastId = lastId == null ? Long.MAX_VALUE : lastId;
- String cacheKey = getUserMessRedisKey(currUserId, chatUserId);
- Set<String> cacheSet = redisService.zreverseRangeByScore(cacheKey, lastId, 0, size);
- if (CollectionUtils.isEmpty(cacheSet)) {
- return Collections.emptyList();
- }
- if (fixedMessageId != null) {
- cacheSet.remove(fixedMessageId.toString());
- }
- List<ChatMessageResponseVO> chatMessageResponseVOList = cacheSet.stream().map(cache -> {
- ChatMessageResponseVO messageResponse = getChatMessDetailFromCache(Long.valueOf(cache));
- messageResponse.setReceiver(accountService.accountInfo(messageResponse.getReceiverId()));
- messageResponse.setSender(accountService.accountInfo(messageResponse.getSenderId()));
- if (StringUtils.isNotBlank(messageResponse.getProxySenderId())) {
- messageResponse.setProxySender(accountService.accountInfo(messageResponse.getProxySenderId()));
- }
- return messageResponse;
- }).sorted(Comparator.comparing(ChatMessageResponseVO::getMessId)).collect(Collectors.toList());
- if (fixedMessageId != null) {
- // 处理消息定位,简单处理,默认插入到消息尾部
- ChatMessageResponseVO fixedMessage = getChatMessDetailFromCache(fixedMessageId);
- fixedMessage.setReceiver(accountService.accountInfo(fixedMessage.getReceiverId()));
- fixedMessage.setSender(accountService.accountInfo(fixedMessage.getSenderId()));
- if (StringUtils.isNotBlank(fixedMessage.getProxySenderId())) {
- fixedMessage.setProxySender(accountService.accountInfo(fixedMessage.getProxySenderId()));
- }
- chatMessageResponseVOList.add(fixedMessage);
- }
- return chatMessageResponseVOList;
- }
- public Map<String, ChatMessageResponseVO> batchGetUserLastMess(String currUserId, Set<String> userIds) {
- Map<String, ChatMessageResponseVO> map = new HashMap<>();
- for (String userId : userIds) {
- String cacheKey = getUserMessRedisKey(currUserId, userId);
- Set<String> cacheSet =
- redisService.zreverseRangeByScore(cacheKey, Long.MAX_VALUE, 0, 1);
- if (CollectionUtils.isNotEmpty(cacheSet)) {
- Long lastMessId = Long.valueOf(new ArrayList<>(cacheSet).get(0));
- ChatMessageResponseVO chatMessageResponseVO = getChatMessDetailFromCache(lastMessId);
- if (chatMessageResponseVO != null) {
- map.put(userId, chatMessageResponseVO);
- }
- }
- }
- return map;
- }
- private void addOrRefreshMessListCache(String currUserId, String chatUserId) {
- String messUserKey = RedisKeyEnum.MESS_USER_LIST_KEY.getKey(currUserId);
- redisService.zadd(messUserKey, chatUserId, DateUtils.getCurrentTimeMillis());
- }
- private void addUserMessCache(ChatMessageEntity mess) {
- long messId = mess.getId();
- /***
- * 刷新消息详情缓存
- */
- refreshMessCache(mess);
- /***
- * 加入用户消息列表
- */
- addUserMessCache(
- mess.getProxySender() != null ? mess.getProxySender() : mess.getSender(),
- mess.getProxySender() != null ? mess.getSender() : mess.getReceiver(),
- messId, messId);
- addUserMessCache(mess.getReceiver(), mess.getSender(), messId, messId);
- }
- private void addUserMessCache(String sender, String receiver, Long messId, Long score) {
- String cacheKey = getUserMessRedisKey(sender, receiver);
- redisService.zadd(cacheKey, messId.toString(), score);
- }
- /***
- * 刷新消息缓存
- * @param mess
- */
- private void refreshMessCache(ChatMessageEntity mess) {
- String messKey = RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getKey();
- redisService.hset(messKey, String.valueOf(mess.getId()), JsonUtil.toJsonString(mess),
- RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getExpireTime());
- }
- private ChatMessageResponseVO getChatMessDetailFromCache(Long messId) {
- String messKey = RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getKey();
- String messCache = redisService.hget(messKey, String.valueOf(messId));
- if (StringUtils.isBlank(messCache)) {
- return null;
- }
- ChatMessageEntity userMessEntity = JsonUtil.fromJson(messCache, ChatMessageEntity.class);
- ChatMessageResponseVO messageResponse = new ChatMessageResponseVO();
- messageResponse.setMessId(userMessEntity.getId());
- messageResponse.setMessage(userMessEntity.getMessage());
- messageResponse.setSenderId(userMessEntity.getSender());
- messageResponse.setProxySenderId(userMessEntity.getProxySender());
- messageResponse.setReceiverId(userMessEntity.getReceiver());
- messageResponse.setTime(userMessEntity.getSendDate().getTime());
- messageResponse.setIsRead(userMessEntity.getIsRead());
- messageResponse.setType(userMessEntity.getType());
- if (ChatMessageTypeEnum.RED_PACKET.getType().equals(userMessEntity.getType())) {
- // messageResponse.setRedPacketDetail(redPacketService.getRedPacketDetailCache(Long.valueOf(userMessEntity.getMessage())));
- } else if (ChatMessageTypeEnum.PUBLIC_ACCOUNT_ARTICLE.getType().equals(userMessEntity.getType())) {
- // messageResponse.setPublicAccountArticle(articleService.getPublicAccountArticleMessage(Long.valueOf(userMessEntity.getMessage())));
- }
- messageResponse.setGroupMessage(StringUtils.isNotBlank(userMessEntity.getProxySender()));
- return messageResponse;
- }
- public String getUserMessRedisKey(String sender, String receiver) {
- return RedisKeyEnum.USER_CHAT_MESS_CACHE_KEY.getKey(sender, receiver);
- }
- public APIPageResponseBean<List<ChatMessageResponseVO>> pageMessage(String mess, int pageNo, int pageSize) {
- Pageable pageable = PageRequest.of(pageNo - 1, pageSize, Sort.by(Sort.Order.desc("id")));
- Page<ChatMessageEntity> chatMessEntityPage;
- if (StringUtils.isBlank(mess)) {
- chatMessEntityPage = chatMessageDAO.findAll(pageable);
- } else {
- chatMessEntityPage = chatMessageDAO.findAllByMessageLike("%"+mess+"%", pageable);
- }
- List<ChatMessageResponseVO> chatMessageResponseVOList = convertChatMessageResponseList(chatMessEntityPage.getContent());
- return APIPageResponseBean.success(pageNo, pageSize, chatMessEntityPage.getTotalElements(), chatMessageResponseVOList);
- }
- private List<ChatMessageResponseVO> convertChatMessageResponseList(List<ChatMessageEntity> chatMessEntities) {
- if (CollectionUtils.isEmpty(chatMessEntities)) {
- return Collections.emptyList();
- }
- /**
- * 批量查询用户信息
- */
- Set<String> senderUserIds = chatMessEntities.stream().map(ChatMessageEntity::getSender).collect(Collectors.toSet());
- Set<String> proxySenderUserIds = chatMessEntities.stream().map(ChatMessageEntity::getProxySender).filter(Objects::nonNull).collect(Collectors.toSet());
- Set<String> receiverUserIds = chatMessEntities.stream().map(ChatMessageEntity::getReceiver).collect(Collectors.toSet());
- senderUserIds.addAll(receiverUserIds);
- if (CollectionUtils.isNotEmpty(proxySenderUserIds)) {
- senderUserIds.addAll(proxySenderUserIds);
- }
- Map<String, UserBaseResponseInfoVO> userMap = accountService.batchGet(senderUserIds);
- return chatMessEntities.stream().map(chat -> {
- ChatMessageResponseVO chatMessageResponse = new ChatMessageResponseVO();
- chatMessageResponse.setTime(chat.getSendDate().getTime());
- chatMessageResponse.setSender(userMap.get(chat.getSender()));
- if (StringUtils.isNotBlank(chat.getProxySender())) {
- chatMessageResponse.setProxySender(userMap.get(chat.getProxySender()));
- }
- chatMessageResponse.setReceiver(userMap.get(chat.getReceiver()));
- chatMessageResponse.setMessage(chat.getMessage());
- return chatMessageResponse;
- }).collect(Collectors.toList());
- }
- private ChatMessageEntity convert(ChatMessageRequestVO messVo) {
- ChatMessageEntity mess = new ChatMessageEntity();
- mess.setSender(messVo.getSenderId());
- mess.setReceiver(messVo.getReceiverId());
- mess.setMessage(this.handleSpecialHtmlTag(HtmlUtil.xssEscape(messVo.getMessage())));
- mess.setSendDate(new Date());
- mess.setIsRead(false);
- mess.setType(messVo.getType());
- return mess;
- }
- /***
- * 处理特殊字符
- * @param content
- * @return
- */
- private String handleSpecialHtmlTag(String content) {
- if (StringUtils.isBlank(content)) {
- return content;
- }
- content = content.replaceAll("<br>", "<br>");
- content = content.replaceAll("<b>", "<b>");
- return content;
- }
- }
|