Explorar o código

feat(promotion): 优化多轮对话功能并支持流式返回- 在 AppKeFuMessageSendReqVO 中添加 fileUrls 字段,支持文件上传
- 新增 DifyResponse 类用于解析 Dify API 响应
- 修改 KeFuMessageRespVO,增加 difyConversationId 等字段
-重构 sendKefuMessage 方法,支持流式返回和多轮对话
- 更新应用配置,修改 Dify API 地址
- 添加 Spring WebFlux 依赖以支持异步请求

zrd hai 1 mes
pai
achega
4afce79eb9

+ 4 - 0
yudao-module-mall/yudao-module-promotion-biz/pom.xml

@@ -73,6 +73,10 @@
             <groupId>cn.iocoder.boot</groupId>
             <artifactId>yudao-spring-boot-starter-excel</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-webflux</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 6 - 0
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/controller/admin/kefu/vo/message/KeFuMessageRespVO.java

@@ -14,6 +14,10 @@ public class KeFuMessageRespVO {
 
     @Schema(description = "会话编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "12580")
     private Long conversationId;
+    /**
+     * dify对话id
+     */
+    private String difyConversationId;
 
     @Schema(description = "发送人编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "24571")
     private Long senderId;
@@ -35,6 +39,8 @@ public class KeFuMessageRespVO {
 
     @Schema(description = "消息", requiredMode = Schema.RequiredMode.REQUIRED)
     private String content;
+    private String event;
+    private String messageLast;
 
     @Schema(description = "是否已读", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
     private Boolean readStatus;

+ 6 - 0
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/controller/app/kefu/vo/message/AppKeFuMessageSendReqVO.java

@@ -5,6 +5,8 @@ import jakarta.validation.constraints.NotEmpty;
 import jakarta.validation.constraints.NotNull;
 import lombok.Data;
 
+import java.util.List;
+
 @Schema(description = "用户 App - 发送客服消息 Request VO")
 @Data
 public class AppKeFuMessageSendReqVO {
@@ -28,4 +30,8 @@ public class AppKeFuMessageSendReqVO {
     private Integer senderType;
     @Schema(description = "会话id", requiredMode = Schema.RequiredMode.REQUIRED, example = "1", hidden = true)
     private Long conversationId;
+    /**
+     * 文件url
+     */
+    private List<String> fileUrls;
 }

+ 90 - 0
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/controller/app/kefu/vo/message/DifyResponse.java

@@ -0,0 +1,90 @@
+package cn.iocoder.yudao.module.promotion.controller.app.kefu.vo.message;
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+@Data
+public class DifyResponse implements Serializable {
+    
+    
+    @Serial
+    private static final long serialVersionUID = -8136474488568106769L;
+    /**
+     * event : message
+     * task_id : 4513bd53-fd94-47af-abeb-759846469dc9
+     * id : 6c068b57-afc2-4dbd-b1d6-49ef5e55202e
+     * message_id : 6c068b57-afc2-4dbd-b1d6-49ef5e55202e
+     * conversation_id : 50a4839b-6bf7-4741-b86b-81c02d0f5de3
+     * mode : advanced-chat
+     * answer :
+     * <p>
+     * 您好!我是 AI 小优,您专属的 AI助手,您来说,我来办
+     * 生成个人数据资产
+     * metadata : {"usage":{"prompt_tokens":0,"prompt_unit_price":"0.0","prompt_price_unit":"0.0","prompt_price":"0
+     * .0","completion_tokens":0,"completion_unit_price":"0.0","completion_price_unit":"0.0","completion_price":"0
+     * .0","total_tokens":0,"total_price":"0.0","currency":"USD","latency":0}}
+     * created_at : 1747733207
+     */
+    
+    private String event;
+    private String task_id;
+    private String id;
+    private String message_id;
+    private String conversation_id;
+    private String mode;
+    private String answer;
+    private MetadataBean metadata;
+    private int created_at;
+    
+    
+    @Data
+    public static class MetadataBean implements Serializable {
+        @Serial
+        private static final long serialVersionUID = 8939728861612692110L;
+        /**
+         * usage : {"prompt_tokens":0,"prompt_unit_price":"0.0","prompt_price_unit":"0.0","prompt_price":"0.0",
+         * "completion_tokens":0,"completion_unit_price":"0.0","completion_price_unit":"0.0","completion_price":"0
+         * .0","total_tokens":0,"total_price":"0.0","currency":"USD","latency":0}
+         */
+        
+        private UsageBean usage;
+        
+        public UsageBean getUsage() {return usage;}
+        
+        public void setUsage(UsageBean usage) {this.usage = usage;}
+        
+        public static class UsageBean {
+            /**
+             * prompt_tokens : 0
+             * prompt_unit_price : 0.0
+             * prompt_price_unit : 0.0
+             * prompt_price : 0.0
+             * completion_tokens : 0
+             * completion_unit_price : 0.0
+             * completion_price_unit : 0.0
+             * completion_price : 0.0
+             * total_tokens : 0
+             * total_price : 0.0
+             * currency : USD
+             * latency : 0.0
+             */
+            
+            private int prompt_tokens;
+            private String prompt_unit_price;
+            private String prompt_price_unit;
+            private String prompt_price;
+            private int completion_tokens;
+            private String completion_unit_price;
+            private String completion_price_unit;
+            private String completion_price;
+            private int total_tokens;
+            private String total_price;
+            private String currency;
+            private double latency;
+            
+            
+        }
+    }
+}

+ 165 - 51
yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/service/kefu/KeFuMessageServiceImpl.java

@@ -4,7 +4,9 @@ import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.util.ObjUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.extra.spring.SpringUtil;
+import cn.hutool.json.JSONObject;
 import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
+import cn.iocoder.yudao.framework.common.exception.ErrorCode;
 import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
 import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
 import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
@@ -19,6 +21,7 @@ import cn.iocoder.yudao.module.promotion.controller.admin.kefu.vo.message.KeFuMe
 import cn.iocoder.yudao.module.promotion.controller.admin.kefu.vo.message.KeFuMessageSendReqVO;
 import cn.iocoder.yudao.module.promotion.controller.app.kefu.vo.message.AppKeFuMessagePageReqVO;
 import cn.iocoder.yudao.module.promotion.controller.app.kefu.vo.message.AppKeFuMessageSendReqVO;
+import cn.iocoder.yudao.module.promotion.controller.app.kefu.vo.message.DifyResponse;
 import cn.iocoder.yudao.module.promotion.dal.dataobject.kefu.KeFuConversationDO;
 import cn.iocoder.yudao.module.promotion.dal.dataobject.kefu.KeFuMessageDO;
 import cn.iocoder.yudao.module.promotion.dal.mysql.kefu.KeFuMessageMapper;
@@ -26,17 +29,17 @@ import cn.iocoder.yudao.module.system.api.user.AdminUserApi;
 import cn.iocoder.yudao.module.system.api.user.dto.AdminUserRespDTO;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.MediaType;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.validation.annotation.Validated;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 import reactor.core.publisher.Flux;
 
 import java.time.LocalDateTime;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
 import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
@@ -53,7 +56,9 @@ import static cn.iocoder.yudao.module.promotion.enums.WebSocketMessageTypeConsta
 @Validated
 @Slf4j
 public class KeFuMessageServiceImpl implements KeFuMessageService {
-
+    
+    private final WebClient webClient;
+    ErrorCode WRITE_STREAM_ERROR = new ErrorCode(1_040_07_001, "写作生成异常!");
     @Resource
     private KeFuMessageMapper keFuMessageMapper;
     @Resource
@@ -66,6 +71,11 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
     private WebSocketSenderApi webSocketSenderApi;
     @Resource
     private AiApi aiApi;
+    
+    public KeFuMessageServiceImpl(WebClient.Builder webClientBuilder) {
+        this.webClient = webClientBuilder.baseUrl("http://42.194.163.46:9502").build();
+    }
+    
     @Override
     @Transactional(rollbackFor = Exception.class)
     public Long sendKefuMessage(KeFuMessageSendReqVO sendReqVO) {
@@ -89,7 +99,7 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
         getSelf().sendAsyncMessageToAdmin(KEFU_MESSAGE_TYPE, message);
         return kefuMessage.getId();
     }
-
+    
     @Override
     public Long sendKefuMessage(AppKeFuMessageSendReqVO sendReqVO) {
         // 1.1 设置会话编号
@@ -142,26 +152,31 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
         // 1.1 设置会话编号
         String apiKey = DictFrameworkUtils.parseDictDataValue("ai_key", "多轮对话");
         Map<String, Object> inputs = new HashMap<>();
+        String type = "";
         if (sendReqVO.getRelUserId() == 1L) {
             inputs.put("type", "系统客服");
+            type = "系统客服";
         } else if (sendReqVO.getRelUserId() == 2L) {
             inputs.put("type", "贷款专家");
+            type = "贷款专家";
         } else if (sendReqVO.getRelUserId() == 3L) {
             inputs.put("type", "律师专家");
+            type = "律师专家";
         } else {
             inputs.put("type", "其他");
+            type = "其他";
         }
         Long relUserId = sendReqVO.getRelUserId();
         inputs.put("token", sendReqVO.getToken());
-        String conversationId;
+        String difyconversationId;
         KeFuMessageDO kefuMessage = BeanUtils.toBean(sendReqVO, KeFuMessageDO.class);
         if (ObjUtil.isNull(kefuMessage.getConversationId())) {
             KeFuConversationDO conversation = conversationService.getOrCreateConversation(sendReqVO.getSenderId(),
                     sendReqVO.getRelUserId());
             kefuMessage.setConversationId(conversation.getId());
             sendReqVO.setConversationId(conversation.getId());
-            conversationId = conversation.getDifyConversationId();
-        } else {conversationId = "";}
+            difyconversationId = conversation.getDifyConversationId();
+        } else {difyconversationId = "";}
         sendMessage(sendReqVO);
         
         
@@ -171,55 +186,154 @@ public class KeFuMessageServiceImpl implements KeFuMessageService {
         aiSendReqVO.setContentType(22);
         aiSendReqVO.setSenderType(22);
         Long aiId = sendMessage(aiSendReqVO);
-        StringBuffer contentBuffer = new StringBuffer();
-        StringBuffer contentBufferCon = new StringBuffer();
+        
         KeFuMessageRespVO message = BeanUtils.toBean(aiSendReqVO, KeFuMessageRespVO.class);
-        return aiApi.getDifyMessageStreaming(inputs,
-                        SecurityFrameworkUtils.getLoginUserId().toString(),
-                        apiKey, sendReqVO.getContent(), conversationId)
-                .flatMap(response -> {
-                    log.info("流式结果:" + response.toString());
-                    if (response.getEvent().equals("message")) {
-                        String answer = response.getAnswer(); // 完整答案
-                        log.info("进入workflow_finished阶段:" + answer);
-                        if (StrUtil.isNotBlank(answer)) {
-                            contentBuffer.append(answer);
-                            message.setContent(answer);
-                            message.setMessageId(response.getMessage_id());
-                            return Flux.just(message);
-                        }
-                    }
-                    if (response.getEvent().equals("message_end")) {
-                        log.info("进入message_end");
-                        contentBufferCon.append(response.getConversation_id());
+        message.setId(aiId);
+        message.setDifyConversationId(difyconversationId);
+        return getEmitterFromDify(type, message, apiKey, sendReqVO, difyconversationId);
+        
+    }
+    
+    /**
+     * 获取发射器dify
+     *
+     * @param reqVO 请求vo
+     * @return {@link SseEmitter }
+     */
+    private Flux<KeFuMessageRespVO> getEmitterFromDify(String type, KeFuMessageRespVO message, String apiKey,
+                                                       AppKeFuMessageSendReqVO reqVO, String difyconversationId) {
+        
+        Map<String, Object> inputs = new HashMap<>();
+        
+        JSONObject requestBody = new JSONObject();
+        inputs.put("token", reqVO.getToken());
+        inputs.put("type", type);
+        
+        requestBody.set("response_mode", "streaming");
+        requestBody.set("user", SecurityFrameworkUtils.getLoginUserId());
+        // 如果difyConversationId为0 则为 开场白
+        if (StrUtil.isNotEmpty(difyconversationId) && !StrUtil.equals(difyconversationId,
+                "0")) {
+            
+            requestBody.set("conversation_id", difyconversationId);
+        }
+        
+        if (CollUtil.isNotEmpty(reqVO.getFileUrls())) {
+            inputs.put("fileUrls", StrUtil.join(",", reqVO.getFileUrls()));
+            if (CollUtil.isNotEmpty(reqVO.getFileUrls())) {
+                List<Map<String, Object>> docs = new ArrayList<>();
+                for (String image : reqVO.getFileUrls()) {
+                    Map<String, Object> variableValue = new HashMap<>();
+                    variableValue.put("transfer_method", "remote_url");
+                    variableValue.put("url", image);
+                    if (StrUtil.containsAny(image, ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx")) {
+                        variableValue.put("type", "document");
+                    } else {
+                        variableValue.put("type", "image");
                     }
-                    return Flux.empty(); // 如果不是 workflow_finished 或 message_end,返回空 Flux
-                }).doOnComplete(() -> {
-                    KeFuMessageDO aiMessage = new KeFuMessageDO();
-                    aiMessage.setId(aiId);
-                    aiMessage.setContent(contentBuffer.toString());
-                    // 1.2 保存消息
-                    LocalDateTime current = LocalDateTime.now();
-                    aiMessage.setCreateTime(current);
-                    TenantUtils.executeIgnore(() ->
-                            keFuMessageMapper.updateById(aiMessage));
-                    if (conversationId == null) {
-                        KeFuConversationRespVO updatePinnedReqVO = new KeFuConversationRespVO();
-                        updatePinnedReqVO.setId(sendReqVO.getConversationId());
-                        updatePinnedReqVO.setDifyConversationId(contentBufferCon.toString());
-                        
-                        TenantUtils.executeIgnore(() ->
-                                conversationService.updateConversation(updatePinnedReqVO));
+                    
+                    docs.add(variableValue);
+                }
+                requestBody.set("files", docs);
+            }
+        }
+        
+        requestBody.set("inputs", inputs);
+        requestBody.set("query", reqVO.getContent());
+        // 使用 WebClient 发起非阻塞请求
+        
+        Flux<KeFuMessageRespVO> streamResponse = sendMessageToDify(apiKey, requestBody, message);
+        
+        // 3.2 流式返回
+        StringBuffer contentBuffer = new StringBuffer();
+        StringBuffer difyId = new StringBuffer();
+        return streamResponse.map(chunk -> {
+            String newContent = chunk.getContent();
+            contentBuffer.append(newContent);
+            if (chunk.getEvent().equals("message_end")) {
+                difyId.append(chunk.getDifyConversationId());
+            }
+            
+            // 响应结果
+            return chunk;
+        }).doOnComplete(() -> {
+            // 忽略租户,因为 Flux 异步无法透传租户
+            message.setContent(contentBuffer.toString());
+            // 1.2 保存消息
+            TenantUtils.executeIgnore(() ->
+                    keFuMessageMapper.updateById(BeanUtils.toBean(message, KeFuMessageDO.class)));
+            if (StrUtil.isBlank(message.getDifyConversationId())) {
+                KeFuConversationRespVO updatePinnedReqVO = new KeFuConversationRespVO();
+                updatePinnedReqVO.setId(message.getConversationId());
+                updatePinnedReqVO.setDifyConversationId(difyId.toString());
+                LocalDateTime current = LocalDateTime.now();
+                updatePinnedReqVO.setCreateTime(current);
+                TenantUtils.executeIgnore(() ->
+                        conversationService.updateConversation(updatePinnedReqVO));
+                
+            }
+        }).doOnError(throwable -> {
+            log.error("[generateWriteCon-tent][generateReqVO({}) 发生异常]", null, throwable);
+            // 忽略租户,因为 Flux 异步无法透传租户
+            
+        }).onErrorResume(error -> Flux.just(null));
+        
+    }
+    
+    
+    public Flux<KeFuMessageRespVO> sendMessageToDify(String apiKey, JSONObject requestBody,
+                                                     KeFuMessageRespVO message) {
+        List<KeFuMessageRespVO> collectedMessages = new ArrayList<>();
+        return webClient.post()
+                .uri("/v1/chat-messages") // 假设的API路径,根据实际情况修改
+                .headers(httpHeaders -> {
+                    httpHeaders.setContentType(MediaType.APPLICATION_JSON);
+                    httpHeaders.setBearerAuth(apiKey);
+                })
+                .bodyValue(requestBody)
+                .retrieve()
+                .bodyToFlux(DifyResponse.class)
+                .map(difyResponse -> convertToKeFuMessage(difyResponse, message))
+                .filter(this::shouldInclude)
+                .doOnNext(collectedMessages::add)
+                .doOnComplete(() -> {
+                    if (!collectedMessages.isEmpty()) {
                         
+                    
                     }
                 })
-                .onErrorResume(e -> {
-                    log.error("流式处理出错:", e);
-                    return Flux.error(new RuntimeException("流式处理失败,请稍后再试"));
-                });
+                .doOnTerminate(() -> System.out.println("请求结束"));
+    }
+    
+    
+    private boolean shouldInclude(KeFuMessageRespVO streamResponse) {
+        // 示例:只要message节点的数据和message_end节点的数据
+        if (streamResponse.getEvent().equals("message")
+                || streamResponse.getEvent().equals("message_end")) {
+            return true;
+        }
+        return false;
+    }
+    
+    private KeFuMessageRespVO convertToKeFuMessage(DifyResponse response, KeFuMessageRespVO message) {
+        // 实现 DifyResponse 到 KeFuMessageRespVO 的转换逻辑
+        // 设置其他必要的字段...
+        StringBuffer contentBuffer = new StringBuffer();
+        if (StrUtil.equals("message", response.getEvent())) {
+            message.setContent(response.getAnswer());
+            message.setMessageId(response.getMessage_id());
+            message.setDifyConversationId(response.getConversation_id());
+            contentBuffer.append(response.getAnswer());
+            message.setMessageLast(contentBuffer.toString());
+        } else if (StrUtil.equals("message_end", response.getEvent())) {
         
+        } else {
+        
+        }
+        message.setEvent(response.getEvent());
+        return message;
     }
-
+    
     @Override
     @Transactional(rollbackFor = Exception.class)
     public void updateKeFuMessageReadStatus(Long conversationId, Long userId, Integer userType) {

+ 4 - 0
yudao-server/pom.xml

@@ -126,6 +126,10 @@
             <groupId>cn.iocoder.boot</groupId>
             <artifactId>yudao-spring-boot-starter-protection</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-autoconfigure</artifactId>
+        </dependency>
 
     </dependencies>
 

+ 2 - 2
yudao-server/src/main/resources/application-dev.yaml

@@ -225,7 +225,7 @@ pf4j:
   pluginsDir: ${user.home}/plugins # 插件目录# 插件配置
 # dify 配置项
 dify:
-  base-url: http://42.194.163.46:9005/ # dify地址
+  base-url: http://42.194.163.46:9502/
   ocr-key: app-LtIiXlum1GZjDhuhOqQXi6MG # 图片识别的key
   asset-key: app-EsQ4vAwda8YhtosbDOefOZSA # 数据集登记的key
-  file-path: /home
+  file-path: /home

+ 1 - 1
yudao-server/src/main/resources/application-local.yaml

@@ -276,7 +276,7 @@ pf4j:
   pluginsDir: ../plugins
 
 dify:
-  base-url: http://42.194.163.46:9005/ # dify地址
+  base-url: http://42.194.163.46:9502/
   ocr-key: app-LtIiXlum1GZjDhuhOqQXi6MG # 图片识别的key
   asset-key: app-EsQ4vAwda8YhtosbDOefOZSA # 数据集登记的key
   file-path: F:\wd\