Browse Source

客服消息对话

zrd 3 months ago
parent
commit
761b234253
14 changed files with 133 additions and 30 deletions
  1. 9 1
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java
  2. 29 1
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java
  3. 2 1
      yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java
  4. 4 0
      yudao-module-infra/yudao-module-infra-api/pom.xml
  5. 6 0
      yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/ai/AiApi.java
  6. 2 1
      yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java
  7. 13 0
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/ai/AiApiImpl.java
  8. 6 2
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java
  9. 1 1
      yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/utils/DifyApi.java
  10. 1 0
      yudao-module-mall/yudao-module-promotion-api/src/main/java/cn/iocoder/yudao/module/promotion/enums/WebSocketMessageTypeConstants.java
  11. 9 5
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/controller/app/kefu/AppKeFuConversationController.java
  12. 3 1
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/controller/app/kefu/AppKeFuMessageController.java
  13. 5 4
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/dal/mysql/kefu/KeFuMessageMapper.java
  14. 43 13
      yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/service/kefu/KeFuMessageServiceImpl.java

+ 9 - 1
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java

@@ -3,6 +3,7 @@ package cn.iocoder.yudao.framework.websocket.core.message;
 import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
 import lombok.Data;
 
+import java.io.Serial;
 import java.io.Serializable;
 
 /**
@@ -12,7 +13,9 @@ import java.io.Serializable;
  */
 @Data
 public class JsonWebSocketMessage implements Serializable {
-
+    
+    @Serial
+    private static final long serialVersionUID = -3221082158524151067L;
     /**
      * 消息类型
      *
@@ -25,5 +28,10 @@ public class JsonWebSocketMessage implements Serializable {
      * 要求 JSON 对象
      */
     private String content;
+    
+    /**
+     * 对话id
+     */
+    private String conversationId;
 
 }

+ 29 - 1
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java

@@ -40,7 +40,35 @@ public abstract class AbstractWebSocketMessageSender implements WebSocketMessage
     public void send(String sessionId, String messageType, String messageContent) {
         send(sessionId, null, null, messageType, messageContent);
     }
-
+    
+    /**
+     * 发送消息
+     *
+     * @param sessionId Session 编号
+     * @param userType  用户类型
+     * @param userId    用户编号
+     */
+    @Override
+    public boolean checKSession(String sessionId, Integer userType, Long userId) {
+        // 1. 获得 Session 列表
+        List<WebSocketSession> sessions = Collections.emptyList();
+        if (StrUtil.isNotEmpty(sessionId)) {
+            WebSocketSession session = sessionManager.getSession(sessionId);
+            if (session != null) {
+                sessions = Collections.singletonList(session);
+            }
+        } else if (userType != null && userId != null) {
+            sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType, userId);
+        } else if (userType != null) {
+            sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType);
+        }
+        if (CollUtil.isEmpty(sessions)) {
+            return false;
+        }
+        // 2. 执行发送
+        return true;
+    }
+    
     /**
      * 发送消息
      *

+ 2 - 1
yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java

@@ -44,7 +44,8 @@ public interface WebSocketMessageSender {
     default void sendObject(Integer userType, String messageType, Object messageContent) {
         send(userType, messageType, JsonUtils.toJsonString(messageContent));
     }
-
+    
+    public boolean checKSession(String sessionId, Integer userType, Long userId);
     default void sendObject(String sessionId, String messageType, Object messageContent) {
         send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
     }

+ 4 - 0
yudao-module-infra/yudao-module-infra-api/pom.xml

@@ -28,6 +28,10 @@
             <artifactId>spring-boot-starter-validation</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 6 - 0
yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/ai/AiApi.java

@@ -1,6 +1,9 @@
 package cn.iocoder.yudao.module.infra.api.ai;
 
 import cn.iocoder.yudao.module.infra.api.ai.dto.DifyaiReqDTO;
+import reactor.core.publisher.Flux;
+
+import java.util.Map;
 
 /**
  * 文件 API 接口
@@ -11,4 +14,7 @@ public interface AiApi {
     
     
     String ai(DifyaiReqDTO createReqVO);
+    
+    Flux<String> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey, String query,
+                                         String conversationId);
 }

+ 2 - 1
yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java

@@ -29,7 +29,8 @@ public interface WebSocketSenderApi {
      * @param messageContent 消息内容,JSON 格式
      */
     void send(Integer userType, String messageType, String messageContent);
-
+    
+    public boolean checKSession(String sessionId, Integer userType, Long userId);
     /**
      * 发送消息给指定 Session
      *

+ 13 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/ai/AiApiImpl.java

@@ -2,10 +2,12 @@ package cn.iocoder.yudao.module.infra.api.ai;
 
 import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
 import cn.iocoder.yudao.module.infra.api.ai.dto.DifyaiReqDTO;
+import cn.iocoder.yudao.module.infra.service.aipromptmanagement.StreamResponse;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.WorkflowRunService;
 import jakarta.annotation.Resource;
 import org.springframework.stereotype.Service;
 import org.springframework.validation.annotation.Validated;
+import reactor.core.publisher.Flux;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -32,4 +34,15 @@ public class AiApiImpl implements AiApi {
         inputs.put("content", createReqVO.getContent());
         return workflowRunService.getDifyResul(inputs, createReqVO.getType(), apiKey, 1L);
     }
+    
+    @Override
+    public Flux<String> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey, String query,
+                                                String conversationId) {
+        
+        Flux<StreamResponse> rs = workflowRunService.getDifyMessageStreaming(inputs, user, apiKey, query,
+                conversationId);
+        return rs.map(StreamResponse::getAnswer);
+        
+        
+    }
 }

+ 6 - 2
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java

@@ -1,9 +1,8 @@
 package cn.iocoder.yudao.module.infra.api.websocket;
 
 import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
-import org.springframework.stereotype.Component;
-
 import jakarta.annotation.Resource;
+import org.springframework.stereotype.Component;
 
 /**
  * WebSocket 发送器的 API 实现类
@@ -25,6 +24,11 @@ public class WebSocketSenderApiImpl implements WebSocketSenderApi {
     public void send(Integer userType, String messageType, String messageContent) {
         webSocketMessageSender.send(userType, messageType, messageContent);
     }
+    
+    @Override
+    public boolean checKSession(String sessionId, Integer userType, Long userId) {
+        return webSocketMessageSender.checKSession(sessionId, userType, userId);
+    }
 
     @Override
     public void send(String sessionId, String messageType, String messageContent) {

+ 1 - 1
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/utils/DifyApi.java

@@ -61,7 +61,7 @@ public class DifyApi {
         requestBody.put("inputs", inputs);
         requestBody.put("response_mode", "streaming");
         requestBody.put("user", user);
-        requestBody.put("conversation_id", conversationId);
+//        requestBody.put("conversation_id", conversationId);
         requestBody.put("query", query);
         Flux<StreamResponse> rs = webClient.post()
                 .uri("/v1/chat-messages")

+ 1 - 0
yudao-module-mall/yudao-module-promotion-api/src/main/java/cn/iocoder/yudao/module/promotion/enums/WebSocketMessageTypeConstants.java

@@ -11,5 +11,6 @@ public interface WebSocketMessageTypeConstants {
 
     String KEFU_MESSAGE_TYPE = "kefu_message_type"; // 客服消息类型
     String KEFU_MESSAGE_ADMIN_READ = "kefu_message_read_status_change"; // 客服消息管理员已读
+    String KEFU_MESSAGE_IM = "kefu_message_im"; // 客服消息管理员已读
 
 }

+ 9 - 5
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/controller/app/kefu/AppKeFuConversationController.java

@@ -153,21 +153,25 @@ public class AppKeFuConversationController {
     @Operation(summary = "获得客服会话列表新")
     public CommonResult<List<KeFuConversationRespVO>> getConversationListNew() {
         // 查询会话列表
+        List<KeFuConversationDO> list =
+                conversationService.getConversationList(SecurityFrameworkUtils.getLoginUserId());
+        
         List<KeFuConversationRespVO> respList =
-                BeanUtils.toBean(conversationService.getConversationList(SecurityFrameworkUtils.getLoginUserId()),
+                BeanUtils.toBean(list,
                 KeFuConversationRespVO.class);
         
         // 拼接数据
         Collection<Long> ids = convertSet(respList,
                 KeFuConversationRespVO::getUserId);
-        Collection<Long> relids = convertSet(respList,
+       /* Collection<Long> relids = convertSet(respList,
                 KeFuConversationRespVO::getRelUserId);
-        ids.addAll(relids);
+        ids.addAll(relids);*/
         Map<Long, MemberUserRespDTO> userMap = memberUserApi.getUserMap(ids);
         respList.forEach(item -> findAndThen(userMap, item.getUserId(),
                 memberUser -> item.setUserAvatar(memberUser.getAvatar()).setUserNickname(memberUser.getNickname())));
-        respList.forEach(item -> findAndThen(userMap, item.getRelUserId(),
-                memberUser -> item.setRelUserAvatar(memberUser.getAvatar()).setRelUserNickname(memberUser.getNickname())));
+  /*      respList.forEach(item -> findAndThen(userMap, item.getRelUserId(),
+                memberUser -> item.setRelUserAvatar(memberUser.getAvatar()).setRelUserNickname(memberUser.getNickname
+                ())));*/
         return success(respList);
     }
     

+ 3 - 1
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/controller/app/kefu/AppKeFuMessageController.java

@@ -38,7 +38,7 @@ public class AppKeFuMessageController {
 
     @Resource
     private AdminUserApi adminUserApi;
-
+    
     @PostMapping("/send")
     @Operation(summary = "发送客服消息")
     public CommonResult<Long> sendKefuMessage(@Valid @RequestBody AppKeFuMessageSendReqVO sendReqVO) {
@@ -50,6 +50,8 @@ public class AppKeFuMessageController {
     @Operation(summary = "发送消息")
     public CommonResult<Long> sendNew(@Valid @RequestBody AppKeFuMessageSendReqVO sendReqVO) {
         sendReqVO.setSenderId(getLoginUserId()).setSenderType(UserTypeEnum.MEMBER.getValue()); // 设置用户编号和类型
+        
+        
         return success(kefuMessageService.sendMessage(sendReqVO));
     }
 

+ 5 - 4
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/dal/mysql/kefu/KeFuMessageMapper.java

@@ -34,13 +34,14 @@ public interface KeFuMessageMapper extends BaseMapperX<KeFuMessageDO> {
                 .orderByDesc("create_time")
                 .limitN(reqVO.getLimit()));
     }
-
-    default List<KeFuMessageDO> selectListByConversationIdAndUserTypeAndReadStatus(Long conversationId, Integer userType,
+    
+    default List<KeFuMessageDO> selectListByConversationIdAndUserTypeAndReadStatus(Long conversationId, Long userId,
                                                                                    Boolean readStatus) {
         return selectList(new LambdaQueryWrapper<KeFuMessageDO>()
                 .eq(KeFuMessageDO::getConversationId, conversationId)
-                .ne(KeFuMessageDO::getSenderType, userType) // 管理员:查询出未读的会员消息,会员:查询出未读的客服消息
-                .eq(KeFuMessageDO::getReadStatus, readStatus));
+                        .eq(KeFuMessageDO::getReceiverId, userId)
+//                .ne(KeFuMessageDO::getSenderType, userType) // 管理员:查询出未读的会员消息,会员:查询出未读的客服消息
+        );
     }
 
     default void updateReadStatusBatchByIds(Collection<Long> ids, KeFuMessageDO keFuMessageDO) {

+ 43 - 13
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/service/kefu/KeFuMessageServiceImpl.java

@@ -5,7 +5,9 @@ import cn.hutool.core.util.ObjUtil;
 import cn.hutool.extra.spring.SpringUtil;
 import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
 import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
+import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
 import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
+import cn.iocoder.yudao.module.infra.api.ai.AiApi;
 import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi;
 import cn.iocoder.yudao.module.member.api.user.MemberUserApi;
 import cn.iocoder.yudao.module.member.api.user.dto.MemberUserRespDTO;
@@ -20,19 +22,22 @@ import cn.iocoder.yudao.module.promotion.dal.mysql.kefu.KeFuMessageMapper;
 import cn.iocoder.yudao.module.system.api.user.AdminUserApi;
 import cn.iocoder.yudao.module.system.api.user.dto.AdminUserRespDTO;
 import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.validation.annotation.Validated;
+import reactor.core.publisher.Flux;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
-import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.*;
+import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
 import static cn.iocoder.yudao.module.promotion.enums.ErrorCodeConstants.KEFU_CONVERSATION_NOT_EXISTS;
-import static cn.iocoder.yudao.module.promotion.enums.WebSocketMessageTypeConstants.KEFU_MESSAGE_ADMIN_READ;
-import static cn.iocoder.yudao.module.promotion.enums.WebSocketMessageTypeConstants.KEFU_MESSAGE_TYPE;
+import static cn.iocoder.yudao.module.promotion.enums.WebSocketMessageTypeConstants.*;
 
 /**
  * 客服消息 Service 实现类
@@ -41,6 +46,7 @@ import static cn.iocoder.yudao.module.promotion.enums.WebSocketMessageTypeConsta
  */
 @Service
 @Validated
+@Slf4j
 public class KeFuMessageServiceImpl implements KeFuMessageService {
 
     @Resource
@@ -53,7 +59,8 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
     private MemberUserApi memberUserApi;
     @Resource
     private WebSocketSenderApi webSocketSenderApi;
-
+    @Resource
+    private AiApi aiApi;
     @Override
     @Transactional(rollbackFor = Exception.class)
     public Long sendKefuMessage(KeFuMessageSendReqVO sendReqVO) {
@@ -100,6 +107,7 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
     }
     
     @Override
+    //@Async
     public Long sendMessage(AppKeFuMessageSendReqVO sendReqVO) {
         // 1.1 设置会话编号
         KeFuMessageDO kefuMessage = BeanUtils.toBean(sendReqVO, KeFuMessageDO.class);
@@ -118,7 +126,21 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
         MemberUserRespDTO user = memberUserApi.getUser(kefuMessage.getSenderId());
         KeFuMessageRespVO message =
                 BeanUtils.toBean(kefuMessage, KeFuMessageRespVO.class).setSenderAvatar(user.getAvatar());
-        getSelf().sendAsyncMessageToAdmin(KEFU_MESSAGE_TYPE, message);
+        getSelf().sendAsyncMessageToMember(sendReqVO.getRelUserId(), KEFU_MESSAGE_IM, message);
+        String apiKey = DictFrameworkUtils.parseDictDataValue("ai_key", "多轮对话");
+        Map<String, Object> inputs = new HashMap<>();
+        inputs.put("type", "活林木");
+        aiApi.getDifyMessageStreaming(inputs,
+                        SecurityFrameworkUtils.getLoginUserId().toString(),
+                        apiKey, sendReqVO.getContent(), "123")
+                .flatMap(response -> {
+                            log.info("流式结果:" + response.toString());
+                            message.setContent(response);
+                            getSelf().sendAsyncMessageToMember(sendReqVO.getRelUserId(), KEFU_MESSAGE_IM, message);
+                            return Flux.empty();
+                        }
+                
+                );
         return kefuMessage.getId();
     }
 
@@ -128,11 +150,14 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
         // 1.1 校验会话是否存在
         KeFuConversationDO conversation = conversationService.validateKefuConversationExists(conversationId);
         // 1.2 如果是会员端处理已读,需要传递 userId;万一用户模拟一个 conversationId
-        if (UserTypeEnum.MEMBER.getValue().equals(userType) && ObjUtil.notEqual(conversation.getUserId(), userId)) {
+        if (ObjUtil.notEqual(conversation.getUserId(), userId) && ObjUtil.notEqual(conversation.getRelUserId(),
+                userId)) {
             throw exception(KEFU_CONVERSATION_NOT_EXISTS);
         }
         // 1.3 查询会话所有的未读消息 (tips: 多个客服,一个人点了,就都点了)
-        List<KeFuMessageDO> messageList = keFuMessageMapper.selectListByConversationIdAndUserTypeAndReadStatus(conversationId, userType, Boolean.FALSE);
+        List<KeFuMessageDO> messageList =
+                keFuMessageMapper.selectListByConversationIdAndUserTypeAndReadStatus(conversationId, userId,
+                        Boolean.FALSE);
         if (CollUtil.isEmpty(messageList)) {
             return;
         }
@@ -143,19 +168,24 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
         // 2.2 将管理员未读消息计数更新为零
         if (SecurityFrameworkUtils.getLoginUserId().equals(conversation.getUserId())) {
             conversationService.updateAdminUnreadMessageCountToZero(conversationId);
+            getSelf().sendAsyncMessageToMember(conversation.getUserId(), KEFU_MESSAGE_ADMIN_READ,
+                    new KeFuMessageRespVO().setConversationId(conversation.getId()));
         }
         if (SecurityFrameworkUtils.getLoginUserId().equals(conversation.getRelUserId())) {
             conversationService.updateRelUnreadMessageCountToZero(conversationId);
+            getSelf().sendAsyncMessageToMember(conversation.getRelUserId(), KEFU_MESSAGE_ADMIN_READ,
+                    new KeFuMessageRespVO().setConversationId(conversation.getId()));
         }
 
         // 2.3 发送消息通知会员,管理员已读 -> 会员更新发送的消息状态
-        KeFuMessageDO keFuMessage = getFirst(filterList(messageList, message -> UserTypeEnum.MEMBER.getValue().equals(message.getSenderType())));
-        assert keFuMessage != null; // 断言避免警告
-        getSelf().sendAsyncMessageToMember(keFuMessage.getSenderId(), KEFU_MESSAGE_ADMIN_READ,
-                new KeFuMessageRespVO().setConversationId(keFuMessage.getConversationId()));
+   /*     KeFuMessageDO keFuMessage = getFirst(filterList(messageList, message -> UserTypeEnum.MEMBER.getValue()
+   .equals(message.getSenderType())));
+        assert keFuMessage != null; // 断言避免警告*/
+        
+        
         // 2.4 通知所有管理员消息已读
-        getSelf().sendAsyncMessageToAdmin(KEFU_MESSAGE_ADMIN_READ,
-                new KeFuMessageRespVO().setConversationId(keFuMessage.getConversationId()));
+       /* getSelf().sendAsyncMessageToAdmin(KEFU_MESSAGE_ADMIN_READ,
+                new KeFuMessageRespVO().setConversationId(keFuMessage.getConversationId()));*/
     }
 
     private void validateReceiverExist(Long receiverId, Integer receiverType) {