Переглянути джерело

客服消息对话 如果会话不存在则自动回消息

zrd 3 місяців тому
батько
коміт
aa7f2db7fe

+ 4 - 2
yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/ai/AiApi.java

@@ -1,6 +1,7 @@
 package cn.iocoder.yudao.module.infra.api.ai;
 
 import cn.iocoder.yudao.module.infra.api.ai.dto.DifyaiReqDTO;
+import cn.iocoder.yudao.module.infra.api.ai.dto.StreamResponseDTO;
 import reactor.core.publisher.Flux;
 
 import java.util.Map;
@@ -15,6 +16,7 @@ public interface AiApi {
     
     String ai(DifyaiReqDTO createReqVO);
     
-    Flux<String> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey, String query,
-                                         String conversationId);
+    Flux<StreamResponseDTO> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey,
+                                                    String query,
+                                                    String conversationId);
 }

+ 48 - 0
yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/ai/dto/StreamResponseDTO.java

@@ -0,0 +1,48 @@
+package cn.iocoder.yudao.module.infra.api.ai.dto;
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+@Data
+public class StreamResponseDTO implements Serializable {
+    
+    @Serial
+    private static final long serialVersionUID = 1811796044273282773L;
+    /**
+     * 不同模式下的事件类型.
+     */
+    private String event;
+    
+    /**
+     * agent_thought id.
+     */
+    private String id;
+    
+    /**
+     * 任务ID.
+     */
+    private String task_id;
+    
+    /**
+     * 消息唯一ID.
+     */
+    private String message_id;
+    
+    /**
+     * LLM 返回文本块内容.
+     */
+    private String answer;
+    
+    /**
+     * 创建时间戳.
+     */
+    private Long created_at;
+    
+    /**
+     * 会话 ID.
+     */
+    private String conversation_id;
+    
+}

+ 19 - 7
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/ai/AiApiImpl.java

@@ -1,10 +1,13 @@
 package cn.iocoder.yudao.module.infra.api.ai;
 
 import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
+import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
+import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
 import cn.iocoder.yudao.module.infra.api.ai.dto.DifyaiReqDTO;
-import cn.iocoder.yudao.module.infra.service.aipromptmanagement.StreamResponse;
+import cn.iocoder.yudao.module.infra.api.ai.dto.StreamResponseDTO;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.WorkflowRunService;
 import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 import org.springframework.validation.annotation.Validated;
 import reactor.core.publisher.Flux;
@@ -19,11 +22,13 @@ import java.util.Map;
  */
 @Service
 @Validated
+@Slf4j
 public class AiApiImpl implements AiApi {
     
     @Resource
     private WorkflowRunService workflowRunService;
-    
+    @Resource
+    private WebSocketMessageSender webSocketMessageSender;
     
     @Override
     public String ai(DifyaiReqDTO createReqVO) {
@@ -36,13 +41,20 @@ public class AiApiImpl implements AiApi {
     }
     
     @Override
-    public Flux<String> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey, String query,
-                                                String conversationId) {
+    public Flux<StreamResponseDTO> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey,
+                                                           String query,
+                                                           String conversationId) {
+        apiKey = DictFrameworkUtils.parseDictDataValue("ai_key", "多轮对话");
+        
+        inputs.put("type", "活林木");
+    /*    Flux<StreamResponse> rs = workflowRunService.getDifyMessageStreaming(inputs, user, apiKey, query,
+                conversationId);*/
         
-        Flux<StreamResponse> rs = workflowRunService.getDifyMessageStreaming(inputs, user, apiKey, query,
-                conversationId);
-        return rs.map(StreamResponse::getAnswer);
         
+        return workflowRunService.getDifyStreaming(inputs,
+                SecurityFrameworkUtils.getLoginUserId().toString(),
+                apiKey, query, null)
+                ;
         
     }
 }

+ 4 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/WorkflowRunService.java

@@ -1,6 +1,7 @@
 package cn.iocoder.yudao.module.infra.service.aipromptmanagement;
 
 
+import cn.iocoder.yudao.module.infra.api.ai.dto.StreamResponseDTO;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils.FileResp;
 import reactor.core.publisher.Flux;
 
@@ -46,4 +47,7 @@ public interface WorkflowRunService {
     
     Flux<StreamResponse> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey, String query,
                                                  String conversationId);
+    
+    Flux<StreamResponseDTO> getDifyStreaming(Map<String, Object> inputs, String user, String apiKey, String query,
+                                             String conversationId);
 }

+ 12 - 2
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/WorkflowRunServiceImpl.java

@@ -5,9 +5,10 @@ import cn.hutool.core.util.IdUtil;
 import cn.hutool.http.HttpRequest;
 import cn.hutool.http.HttpResponse;
 import cn.hutool.json.JSONObject;
-import cn.iocoder.yudao.framework.ai.core.model.wenduoduo.api.DifyApi;
 import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
 import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
+import cn.iocoder.yudao.module.infra.api.ai.dto.StreamResponseDTO;
+import cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils.DifyApiUtils;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils.DifyFilesUtils;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils.DifyResponse;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils.FileResp;
@@ -259,11 +260,20 @@ public class WorkflowRunServiceImpl implements WorkflowRunService {
     public Flux<StreamResponse> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey,
                                                         String query,
                                                         String conversationId) {
-        DifyApi difyApi = new DifyApi(baseUrl);
+        DifyApiUtils difyApi = new DifyApiUtils(baseUrl);
         Flux<StreamResponse> rs = difyApi.chatMessages(apiKey, inputs, user, query, conversationId);
         return rs;
     }
     
+    @Override
+    
+    public Flux<StreamResponseDTO> getDifyStreaming(Map<String, Object> inputs, String user, String apiKey,
+                                                    String query,
+                                                    String conversationId) {
+        DifyApiUtils difyApi = new DifyApiUtils(baseUrl);
+        return difyApi.chatDifyMessages(apiKey, inputs, user, query, conversationId);
+    }
+    
     /**
      * 获取dify响应
      *

+ 43 - 3
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/utils/DifyApi.java → yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/utils/DifyApiUtils.java

@@ -1,6 +1,7 @@
-package cn.iocoder.yudao.framework.ai.core.model.wenduoduo.api;
+package cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils;
 
 import cn.hutool.json.JSONObject;
+import cn.iocoder.yudao.module.infra.api.ai.dto.StreamResponseDTO;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.StreamResponse;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpRequest;
@@ -24,7 +25,7 @@ import java.util.function.Predicate;
  * @see <a href="https://docmee.cn/open-platform/api">PPT 生成 API</a>
  */
 @Slf4j
-public class DifyApi {
+public class DifyApiUtils {
     
     public static final String BASE_URL = "https://docmee.cn";
     
@@ -41,7 +42,7 @@ public class DifyApi {
             });
     
     // TODO @新:是不是不用 baseUrl 哈
-    public DifyApi(String baseUrl) {
+    public DifyApiUtils(String baseUrl) {
         this.webClient = WebClient.builder()
                 .baseUrl(baseUrl)
                 
@@ -79,6 +80,36 @@ public class DifyApi {
         return rs;
     }
     
+    /**
+     * 生成大纲内容
+     *
+     * @return 大纲内容流
+     */
+    public Flux<StreamResponseDTO> chatDifyMessages(String token, Map<String, Object> inputs, String user,
+                                                    String query, String conversationId) {
+        // 构建请求体
+        JSONObject requestBody = new JSONObject();
+        requestBody.put("inputs", inputs);
+        requestBody.put("response_mode", "streaming");
+        requestBody.put("user", user);
+//        requestBody.put("conversation_id", conversationId);
+        requestBody.put("query", query);
+        Flux<StreamResponseDTO> rs = webClient.post()
+                .uri("/v1/chat-messages")
+                .headers(httpHeaders -> {
+                    httpHeaders.setContentType(MediaType.APPLICATION_JSON);
+                    httpHeaders.setBearerAuth(token);
+                })
+                .bodyValue(requestBody.toString())
+                .retrieve()
+                .bodyToFlux(StreamResponseDTO.class)
+                .filter(this::shouInclude).onErrorResume(throwable -> {
+                    log.info("异常输出:" + throwable);
+                    return null;
+                });
+        return rs;
+    }
+    
     private boolean shouldInclude(StreamResponse streamResponse) {
         // 示例:只要message节点的数据和message_end节点的数据
         if (streamResponse.getEvent().equals("message")
@@ -88,5 +119,14 @@ public class DifyApi {
         return false;
     }
     
+    private boolean shouInclude(StreamResponseDTO streamResponse) {
+        // 示例:只要message节点的数据和message_end节点的数据
+        if (streamResponse.getEvent().equals("message")
+                || streamResponse.getEvent().equals("message_end")) {
+            return true;
+        }
+        return false;
+    }
+    
     
 }

+ 28 - 1
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/controller/app/kefu/AppKeFuMessageController.java

@@ -15,8 +15,10 @@ import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import jakarta.annotation.Resource;
 import jakarta.validation.Valid;
+import org.springframework.http.MediaType;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Flux;
 
 import java.util.Collection;
 import java.util.List;
@@ -54,7 +56,32 @@ public class AppKeFuMessageController {
         
         return success(kefuMessageService.sendMessage(sendReqVO));
     }
-
+    
+    @PostMapping("/sendAi")
+    @Operation(summary = "发送消息")
+    public CommonResult<Long> sendAi(@Valid @RequestBody AppKeFuMessageSendReqVO sendReqVO) {
+        sendReqVO.setSenderId(sendReqVO.getRelUserId()).setSenderType(UserTypeEnum.MEMBER.getValue()); // 设置用户编号和类型
+        sendReqVO.setRelUserId(getLoginUserId());
+        
+        return success(kefuMessageService.sendMessage(sendReqVO));
+    }
+    
+    @PostMapping(value = "/sendStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+    @Operation(summary = "发送消息(流式)", description = "流式返回,响应较快")
+    public Flux<String> sendStream(@Valid @RequestBody AppKeFuMessageSendReqVO sendReqVO) {
+        sendReqVO.setSenderId(getLoginUserId()).setSenderType(UserTypeEnum.MEMBER.getValue()); // 设置用户编号和类型
+        
+        
+        return kefuMessageService.sendStream(sendReqVO);
+    }
+    
+    @GetMapping("/checkUserId")
+    @Operation(summary = "获得客服会话是否存在")
+    @Parameter(name = "userId", description = "编号", required = true, example = "1024")
+    public CommonResult<Boolean> checkUserId(@RequestParam("userId") Long userId) {
+        
+        return success(kefuMessageService.checkUserId(userId));
+    }
     @PutMapping("/update-read-status")
     @Operation(summary = "更新客服消息已读状态")
     @Parameter(name = "conversationId", description = "会话编号", required = true)

+ 5 - 1
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/service/kefu/KeFuMessageService.java

@@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.promotion.controller.app.kefu.vo.message.AppKeFuM
 import cn.iocoder.yudao.module.promotion.controller.app.kefu.vo.message.AppKeFuMessageSendReqVO;
 import cn.iocoder.yudao.module.promotion.dal.dataobject.kefu.KeFuMessageDO;
 import jakarta.validation.Valid;
+import reactor.core.publisher.Flux;
 
 import java.util.List;
 
@@ -33,6 +34,8 @@ public interface KeFuMessageService {
     Long sendKefuMessage(AppKeFuMessageSendReqVO sendReqVO);
     
     Long sendMessage(AppKeFuMessageSendReqVO sendReqVO);
+    
+    Flux<String> sendStream(AppKeFuMessageSendReqVO sendReqVO);
 
     /**
      * 【管理员】更新消息已读状态
@@ -59,5 +62,6 @@ public interface KeFuMessageService {
      * @return 客服消息分页
      */
     List<KeFuMessageDO> getKeFuMessageList(AppKeFuMessagePageReqVO pageReqVO, Long userId);
-
+    
+    Boolean checkUserId(Long userId);
 }

+ 53 - 10
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/service/kefu/KeFuMessageServiceImpl.java

@@ -2,11 +2,13 @@ package cn.iocoder.yudao.module.promotion.service.kefu;
 
 import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.util.ObjUtil;
+import cn.hutool.core.util.StrUtil;
 import cn.hutool.extra.spring.SpringUtil;
 import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
 import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
 import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
 import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
+import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
 import cn.iocoder.yudao.module.infra.api.ai.AiApi;
 import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi;
 import cn.iocoder.yudao.module.member.api.user.MemberUserApi;
@@ -127,21 +129,57 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
         KeFuMessageRespVO message =
                 BeanUtils.toBean(kefuMessage, KeFuMessageRespVO.class).setSenderAvatar(user.getAvatar());
         getSelf().sendAsyncMessageToMember(sendReqVO.getRelUserId(), KEFU_MESSAGE_IM, message);
+        
+        
+        return kefuMessage.getId();
+    }
+    
+    @Override
+    //@Async
+    public Flux<String> sendStream(AppKeFuMessageSendReqVO sendReqVO) {
+        // 1.1 设置会话编号
         String apiKey = DictFrameworkUtils.parseDictDataValue("ai_key", "多轮对话");
         Map<String, Object> inputs = new HashMap<>();
         inputs.put("type", "活林木");
-        aiApi.getDifyMessageStreaming(inputs,
+        sendMessage(sendReqVO);
+        
+        AppKeFuMessageSendReqVO aiSendReqVO = sendReqVO;
+        aiSendReqVO.setRelUserId(SecurityFrameworkUtils.getLoginUserId());
+        aiSendReqVO.setSenderId(aiSendReqVO.getRelUserId());
+        aiSendReqVO.setContentType(11);
+        Long aiId = sendMessage(aiSendReqVO);
+        StringBuffer contentBuffer = new StringBuffer();
+        return aiApi.getDifyMessageStreaming(inputs,
                         SecurityFrameworkUtils.getLoginUserId().toString(),
-                        apiKey, sendReqVO.getContent(), "123")
+                        apiKey, sendReqVO.getContent(), "132")
                 .flatMap(response -> {
-                            log.info("流式结果:" + response.toString());
-                            message.setContent(response);
-                            getSelf().sendAsyncMessageToMember(sendReqVO.getRelUserId(), KEFU_MESSAGE_IM, message);
-                            return Flux.empty();
+                    log.info("流式结果:" + response.toString());
+                    if (response.getEvent().equals("message")) {
+                        String answer = response.getAnswer(); // 完整答案
+                        log.info("进入workflow_finished阶段:" + answer);
+                        if (StrUtil.isNotBlank(answer)) {
+                            contentBuffer.append(answer);
+                            return Flux.just(answer);
                         }
-                
-                );
-        return kefuMessage.getId();
+                    }
+                    if (response.getEvent().equals("message_end")) {
+                        log.info("进入message_end");
+                    }
+                    return Flux.empty(); // 如果不是 workflow_finished 或 message_end,返回空 Flux
+                }).doOnComplete(() -> {
+                    KeFuMessageDO aiMessage = new KeFuMessageDO();
+                    aiMessage.setId(aiId);
+                    aiMessage.setContent(contentBuffer.toString());
+                    // 1.2 保存消息
+                    TenantUtils.executeIgnore(() ->
+                            keFuMessageMapper.updateById(aiMessage));
+                    
+                })
+                .onErrorResume(e -> {
+                    log.error("流式处理出错:", e);
+                    return Flux.error(new RuntimeException("流式处理失败,请稍后再试"));
+                });
+        
     }
 
     @Override
@@ -223,7 +261,12 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
         pageReqVO.setConversationId(conversation.getId());
         return keFuMessageMapper.selectList(BeanUtils.toBean(pageReqVO, KeFuMessageListReqVO.class));
     }
-
+    
+    @Override
+    public Boolean checkUserId(Long userId) {
+        return webSocketSenderApi.checKSession(null, UserTypeEnum.MEMBER.getValue(), userId);
+    }
+    
     private KeFuMessageServiceImpl getSelf() {
         return SpringUtil.getBean(getClass());
     }