|
@@ -0,0 +1,366 @@
|
|
|
+package com.webchat.ugc.messaegqueue.service;
|
|
|
+
|
|
|
+
|
|
|
+import com.webchat.common.bean.APIPageResponseBean;
|
|
|
+import com.webchat.common.enums.ChatMessageTypeEnum;
|
|
|
+import com.webchat.common.enums.RedisKeyEnum;
|
|
|
+import com.webchat.common.enums.RoleCodeEnum;
|
|
|
+import com.webchat.common.enums.messagequeue.MessageQueueEnum;
|
|
|
+import com.webchat.common.service.RedisService;
|
|
|
+import com.webchat.common.service.messagequeue.producer.MessageQueueProducer;
|
|
|
+import com.webchat.common.util.DateUtils;
|
|
|
+import com.webchat.common.util.HtmlUtil;
|
|
|
+import com.webchat.common.util.JsonUtil;
|
|
|
+import com.webchat.domain.vo.request.mess.ChatMessageRequestVO;
|
|
|
+import com.webchat.domain.vo.response.UserBaseResponseInfoVO;
|
|
|
+import com.webchat.domain.vo.response.mess.ChatMessageResponseVO;
|
|
|
+import com.webchat.ugc.repository.dao.IChatMessageDAO;
|
|
|
+import com.webchat.ugc.repository.entity.ChatMessageEntity;
|
|
|
+import com.webchat.ugc.service.AccountService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.data.domain.Page;
|
|
|
+import org.springframework.data.domain.PageRequest;
|
|
|
+import org.springframework.data.domain.Pageable;
|
|
|
+import org.springframework.data.domain.Sort;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class PersistentMessageService {
|
|
|
+
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IChatMessageDAO chatMessageDAO;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private RedisService redisService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private AccountService accountService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private MessageQueueProducer<Object, Long> messageQueueProducer;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 《离线场景》持久化消息队列,保存离线消息,同时会将数据同步到ES用于后续的RAG问答和消息搜索
|
|
|
+ *
|
|
|
+ * @param messVo
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public boolean persistent(ChatMessageRequestVO messVo) {
|
|
|
+
|
|
|
+ ChatMessageEntity mess = convert(messVo);
|
|
|
+ // 取消息接收人信息,判断是否群聊场景
|
|
|
+ String receiverId = messVo.getReceiverId();
|
|
|
+ UserBaseResponseInfoVO receiver = accountService.accountInfo(receiverId);
|
|
|
+ boolean groupMessage = RoleCodeEnum.GROUP.getCode().equals(receiver.getRoleCode());
|
|
|
+ List<String> receivers = new ArrayList<>();
|
|
|
+ if (groupMessage) {
|
|
|
+ // 角色翻转,已群组作为发送人
|
|
|
+ mess.setSender(receiverId);
|
|
|
+ // 设置代理消息发送用户
|
|
|
+ mess.setProxySender(messVo.getSenderId());
|
|
|
+ // 查询实际接收人,群组下所有用户
|
|
|
+ Set<String> groupUserIds = accountService.getGroupUserIds(receiverId);
|
|
|
+ receivers.addAll(groupUserIds);
|
|
|
+ } else {
|
|
|
+ receivers.add(receiverId);
|
|
|
+ }
|
|
|
+ receivers.remove(messVo.getSenderId());
|
|
|
+ for (String receiverUserId : receivers) {
|
|
|
+ mess.setReceiver(receiverUserId);
|
|
|
+ // 这里有优化空间,可以改为批量一次入库(先简单实现功能)
|
|
|
+ mess = chatMessageDAO.save(mess);
|
|
|
+ // 加入聊天缓存
|
|
|
+ this.addUserMessCache(mess);
|
|
|
+ // 加入聊天列表
|
|
|
+ if (groupMessage) {
|
|
|
+ this.addOrRefreshMessListCache(mess.getProxySender(), messVo.getReceiverId());
|
|
|
+ this.addOrRefreshMessListCache(receiverUserId, messVo.getReceiverId());
|
|
|
+ // 群聊未读消息+1
|
|
|
+ addUnreadMessCountCache(receiverUserId, mess.getSender());
|
|
|
+ } else {
|
|
|
+ this.addOrRefreshMessListCache(messVo.getSenderId(), receiverUserId);
|
|
|
+ this.addOrRefreshMessListCache(receiverUserId, messVo.getSenderId());
|
|
|
+ // 未读消息+1
|
|
|
+ addUnreadMessCountCache(receiverUserId, messVo.getSenderId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 刷新用户对话列表
|
|
|
+ *
|
|
|
+ */
|
|
|
+ this.refreshChattingCache(messVo);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void refreshChattingCache(ChatMessageRequestVO chatMessage) {
|
|
|
+ // 刷新聊天对话列表
|
|
|
+ Long msgTime = System.currentTimeMillis();
|
|
|
+ ChatMessageRequestVO chattingRequest = new ChatMessageRequestVO();
|
|
|
+ chattingRequest.setSenderId(chatMessage.getSenderId());
|
|
|
+ chattingRequest.setReceiverId(chatMessage.getReceiverId());
|
|
|
+ chattingRequest.setTime(msgTime);
|
|
|
+ messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest);
|
|
|
+ UserBaseResponseInfoVO accountInfo = accountService.accountInfo(chatMessage.getReceiverId());
|
|
|
+ if (RoleCodeEnum.isUserRole(accountInfo.getRoleCode())) {
|
|
|
+ // 如果消息接受账号是用户类型,也需要刷新用户账号列表
|
|
|
+ chattingRequest.setSenderId(chatMessage.getReceiverId());
|
|
|
+ chattingRequest.setReceiverId(chatMessage.getSenderId());
|
|
|
+ messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest);
|
|
|
+ } else if (RoleCodeEnum.GROUP.getCode().equals(accountInfo.getRoleCode())) {
|
|
|
+ // 接受人是群组,需要刷新群组下所有用户对话列表
|
|
|
+ Set<String> groupUsers = accountService.getGroupUserIds(chatMessage.getReceiverId());
|
|
|
+ if (CollectionUtils.isNotEmpty(groupUsers)) {
|
|
|
+ for (String groupUserId : groupUsers) {
|
|
|
+ if (groupUserId.equals(chatMessage.getSenderId())) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ chattingRequest.setSenderId(groupUserId);
|
|
|
+ chattingRequest.setReceiverId(chatMessage.getReceiverId());
|
|
|
+ messageQueueProducer.send(MessageQueueEnum.QUEUE_CHATTING_LIST_REFRESH, chattingRequest);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 刷新未读消息数缓存,用于新消息红点🔴通知
|
|
|
+ *
|
|
|
+ * @param currUserId
|
|
|
+ * @param chatUserId
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private Long addUnreadMessCountCache(String currUserId, String chatUserId) {
|
|
|
+ String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
|
|
|
+ redisService.sadd(unreadUserCacheCountKey, chatUserId);
|
|
|
+ String unreadCacheCountKey = RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getKey(currUserId, chatUserId);
|
|
|
+ return redisService.increx(unreadCacheCountKey, RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getExpireTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void clearUnreadMessCountCache(String currUserId, String chatUserId) {
|
|
|
+ String unreadCacheCountKey = RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getKey(currUserId, chatUserId);
|
|
|
+ redisService.set(unreadCacheCountKey, "0", RedisKeyEnum.UN_READ_MESS_COUNT_CACHE.getExpireTime());
|
|
|
+ String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
|
|
|
+ redisService.sremove(unreadUserCacheCountKey, chatUserId);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Long getUnreadMessUserCountFromCache(String currUserId) {
|
|
|
+ String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
|
|
|
+ return redisService.ssize(unreadUserCacheCountKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Set<String> getUnreadMessUserSetFromCache(String currUserId) {
|
|
|
+ String unreadUserCacheCountKey = RedisKeyEnum.UN_READ_MESS_USER_SET_CACHE.getKey(currUserId);
|
|
|
+ return redisService.smembers(unreadUserCacheCountKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ /***
|
|
|
+ * 查询两个人的聊天记录
|
|
|
+ * @param currUserId
|
|
|
+ * @param chatUserId
|
|
|
+ * @param lastId
|
|
|
+ * @param size
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public List<ChatMessageResponseVO> getChatMessListFromCache(String currUserId, String chatUserId, Long lastId,
|
|
|
+ Long fixedMessageId, int size) {
|
|
|
+
|
|
|
+ // 查询后清理未读消息数
|
|
|
+ clearUnreadMessCountCache(currUserId, chatUserId);
|
|
|
+
|
|
|
+ lastId = lastId == null ? Long.MAX_VALUE : lastId;
|
|
|
+ String cacheKey = getUserMessRedisKey(currUserId, chatUserId);
|
|
|
+ Set<String> cacheSet = redisService.zreverseRangeByScore(cacheKey, lastId, 0, size);
|
|
|
+ if (CollectionUtils.isEmpty(cacheSet)) {
|
|
|
+ return Collections.emptyList();
|
|
|
+ }
|
|
|
+ if (fixedMessageId != null) {
|
|
|
+ cacheSet.remove(fixedMessageId.toString());
|
|
|
+ }
|
|
|
+ List<ChatMessageResponseVO> chatMessageResponseVOList = cacheSet.stream().map(cache -> {
|
|
|
+ ChatMessageResponseVO messageResponse = getChatMessDetailFromCache(Long.valueOf(cache));
|
|
|
+ messageResponse.setReceiver(accountService.accountInfo(messageResponse.getReceiverId()));
|
|
|
+ messageResponse.setSender(accountService.accountInfo(messageResponse.getSenderId()));
|
|
|
+ if (StringUtils.isNotBlank(messageResponse.getProxySenderId())) {
|
|
|
+ messageResponse.setProxySender(accountService.accountInfo(messageResponse.getProxySenderId()));
|
|
|
+ }
|
|
|
+ return messageResponse;
|
|
|
+ }).sorted(Comparator.comparing(ChatMessageResponseVO::getMessId)).collect(Collectors.toList());
|
|
|
+
|
|
|
+ if (fixedMessageId != null) {
|
|
|
+ // 处理消息定位,简单处理,默认插入到消息尾部
|
|
|
+ ChatMessageResponseVO fixedMessage = getChatMessDetailFromCache(fixedMessageId);
|
|
|
+ fixedMessage.setReceiver(accountService.accountInfo(fixedMessage.getReceiverId()));
|
|
|
+ fixedMessage.setSender(accountService.accountInfo(fixedMessage.getSenderId()));
|
|
|
+ if (StringUtils.isNotBlank(fixedMessage.getProxySenderId())) {
|
|
|
+ fixedMessage.setProxySender(accountService.accountInfo(fixedMessage.getProxySenderId()));
|
|
|
+ }
|
|
|
+ chatMessageResponseVOList.add(fixedMessage);
|
|
|
+ }
|
|
|
+
|
|
|
+ return chatMessageResponseVOList;
|
|
|
+ }
|
|
|
+ public Map<String, ChatMessageResponseVO> batchGetUserLastMess(String currUserId, Set<String> userIds) {
|
|
|
+ Map<String, ChatMessageResponseVO> map = new HashMap<>();
|
|
|
+ for (String userId : userIds) {
|
|
|
+ String cacheKey = getUserMessRedisKey(currUserId, userId);
|
|
|
+ Set<String> cacheSet =
|
|
|
+ redisService.zreverseRangeByScore(cacheKey, Long.MAX_VALUE, 0, 1);
|
|
|
+ if (CollectionUtils.isNotEmpty(cacheSet)) {
|
|
|
+ Long lastMessId = Long.valueOf(new ArrayList<>(cacheSet).get(0));
|
|
|
+ ChatMessageResponseVO chatMessageResponseVO = getChatMessDetailFromCache(lastMessId);
|
|
|
+ if (chatMessageResponseVO != null) {
|
|
|
+ map.put(userId, chatMessageResponseVO);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addOrRefreshMessListCache(String currUserId, String chatUserId) {
|
|
|
+ String messUserKey = RedisKeyEnum.MESS_USER_LIST_KEY.getKey(currUserId);
|
|
|
+ redisService.zadd(messUserKey, chatUserId, DateUtils.getCurrentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addUserMessCache(ChatMessageEntity mess) {
|
|
|
+ long messId = mess.getId();
|
|
|
+ /***
|
|
|
+ * 刷新消息详情缓存
|
|
|
+ */
|
|
|
+ refreshMessCache(mess);
|
|
|
+ /***
|
|
|
+ * 加入用户消息列表
|
|
|
+ */
|
|
|
+ addUserMessCache(
|
|
|
+ mess.getProxySender() != null ? mess.getProxySender() : mess.getSender(),
|
|
|
+ mess.getProxySender() != null ? mess.getSender() : mess.getReceiver(),
|
|
|
+ messId, messId);
|
|
|
+ addUserMessCache(mess.getReceiver(), mess.getSender(), messId, messId);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addUserMessCache(String sender, String receiver, Long messId, Long score) {
|
|
|
+ String cacheKey = getUserMessRedisKey(sender, receiver);
|
|
|
+ redisService.zadd(cacheKey, messId.toString(), score);
|
|
|
+ }
|
|
|
+
|
|
|
+ /***
|
|
|
+ * 刷新消息缓存
|
|
|
+ * @param mess
|
|
|
+ */
|
|
|
+ private void refreshMessCache(ChatMessageEntity mess) {
|
|
|
+ String messKey = RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getKey();
|
|
|
+ redisService.hset(messKey, String.valueOf(mess.getId()), JsonUtil.toJsonString(mess),
|
|
|
+ RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getExpireTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ private ChatMessageResponseVO getChatMessDetailFromCache(Long messId) {
|
|
|
+ String messKey = RedisKeyEnum.MESS_DETAIL_CACHE_KEY.getKey();
|
|
|
+ String messCache = redisService.hget(messKey, String.valueOf(messId));
|
|
|
+ if (StringUtils.isBlank(messCache)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ ChatMessageEntity userMessEntity = JsonUtil.fromJson(messCache, ChatMessageEntity.class);
|
|
|
+ ChatMessageResponseVO messageResponse = new ChatMessageResponseVO();
|
|
|
+ messageResponse.setMessId(userMessEntity.getId());
|
|
|
+ messageResponse.setMessage(userMessEntity.getMessage());
|
|
|
+ messageResponse.setSenderId(userMessEntity.getSender());
|
|
|
+ messageResponse.setProxySenderId(userMessEntity.getProxySender());
|
|
|
+ messageResponse.setReceiverId(userMessEntity.getReceiver());
|
|
|
+ messageResponse.setTime(userMessEntity.getSendDate().getTime());
|
|
|
+ messageResponse.setIsRead(userMessEntity.getIsRead());
|
|
|
+ messageResponse.setType(userMessEntity.getType());
|
|
|
+ if (ChatMessageTypeEnum.RED_PACKET.getType().equals(userMessEntity.getType())) {
|
|
|
+// messageResponse.setRedPacketDetail(redPacketService.getRedPacketDetailCache(Long.valueOf(userMessEntity.getMessage())));
|
|
|
+ } else if (ChatMessageTypeEnum.PUBLIC_ACCOUNT_ARTICLE.getType().equals(userMessEntity.getType())) {
|
|
|
+// messageResponse.setPublicAccountArticle(articleService.getPublicAccountArticleMessage(Long.valueOf(userMessEntity.getMessage())));
|
|
|
+ }
|
|
|
+ messageResponse.setGroupMessage(StringUtils.isNotBlank(userMessEntity.getProxySender()));
|
|
|
+ return messageResponse;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getUserMessRedisKey(String sender, String receiver) {
|
|
|
+ return RedisKeyEnum.USER_CHAT_MESS_CACHE_KEY.getKey(sender, receiver);
|
|
|
+ }
|
|
|
+
|
|
|
+ public APIPageResponseBean<List<ChatMessageResponseVO>> pageMessage(String mess, int pageNo, int pageSize) {
|
|
|
+ Pageable pageable = PageRequest.of(pageNo - 1, pageSize, Sort.by(Sort.Order.desc("id")));
|
|
|
+ Page<ChatMessageEntity> chatMessEntityPage;
|
|
|
+ if (StringUtils.isBlank(mess)) {
|
|
|
+ chatMessEntityPage = chatMessageDAO.findAll(pageable);
|
|
|
+ } else {
|
|
|
+ chatMessEntityPage = chatMessageDAO.findAllByMessageLike("%"+mess+"%", pageable);
|
|
|
+ }
|
|
|
+ List<ChatMessageResponseVO> chatMessageResponseVOList = convertChatMessageResponseList(chatMessEntityPage.getContent());
|
|
|
+ return APIPageResponseBean.success(pageNo, pageSize, chatMessEntityPage.getTotalElements(), chatMessageResponseVOList);
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<ChatMessageResponseVO> convertChatMessageResponseList(List<ChatMessageEntity> chatMessEntities) {
|
|
|
+ if (CollectionUtils.isEmpty(chatMessEntities)) {
|
|
|
+ return Collections.emptyList();
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 批量查询用户信息
|
|
|
+ */
|
|
|
+ Set<String> senderUserIds = chatMessEntities.stream().map(ChatMessageEntity::getSender).collect(Collectors.toSet());
|
|
|
+ Set<String> proxySenderUserIds = chatMessEntities.stream().map(ChatMessageEntity::getProxySender).filter(Objects::nonNull).collect(Collectors.toSet());
|
|
|
+ Set<String> receiverUserIds = chatMessEntities.stream().map(ChatMessageEntity::getReceiver).collect(Collectors.toSet());
|
|
|
+ senderUserIds.addAll(receiverUserIds);
|
|
|
+ if (CollectionUtils.isNotEmpty(proxySenderUserIds)) {
|
|
|
+ senderUserIds.addAll(proxySenderUserIds);
|
|
|
+ }
|
|
|
+ Map<String, UserBaseResponseInfoVO> userMap = accountService.batchGet(senderUserIds);
|
|
|
+ return chatMessEntities.stream().map(chat -> {
|
|
|
+ ChatMessageResponseVO chatMessageResponse = new ChatMessageResponseVO();
|
|
|
+ chatMessageResponse.setTime(chat.getSendDate().getTime());
|
|
|
+ chatMessageResponse.setSender(userMap.get(chat.getSender()));
|
|
|
+ if (StringUtils.isNotBlank(chat.getProxySender())) {
|
|
|
+ chatMessageResponse.setProxySender(userMap.get(chat.getProxySender()));
|
|
|
+ }
|
|
|
+ chatMessageResponse.setReceiver(userMap.get(chat.getReceiver()));
|
|
|
+ chatMessageResponse.setMessage(chat.getMessage());
|
|
|
+ return chatMessageResponse;
|
|
|
+ }).collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+ private ChatMessageEntity convert(ChatMessageRequestVO messVo) {
|
|
|
+ ChatMessageEntity mess = new ChatMessageEntity();
|
|
|
+ mess.setSender(messVo.getSenderId());
|
|
|
+ mess.setReceiver(messVo.getReceiverId());
|
|
|
+ mess.setMessage(this.handleSpecialHtmlTag(HtmlUtil.xssEscape(messVo.getMessage())));
|
|
|
+ mess.setSendDate(new Date());
|
|
|
+ mess.setIsRead(false);
|
|
|
+ mess.setType(messVo.getType());
|
|
|
+ return mess;
|
|
|
+ }
|
|
|
+
|
|
|
+ /***
|
|
|
+ * 处理特殊字符
|
|
|
+ * @param content
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private String handleSpecialHtmlTag(String content) {
|
|
|
+ if (StringUtils.isBlank(content)) {
|
|
|
+ return content;
|
|
|
+ }
|
|
|
+ content = content.replaceAll("<br>", "<br>");
|
|
|
+ content = content.replaceAll("<b>", "<b>");
|
|
|
+ return content;
|
|
|
+ }
|
|
|
+}
|