Kaynağa Gözat

流式 dify聊天返回

zrd 3 ay önce
ebeveyn
işleme
47609c9346

+ 33 - 1
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/controller/app/aipromptmanagement/AppAiController.java

@@ -5,6 +5,7 @@ import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.iocoder.yudao.framework.common.pojo.CommonResult;
 import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
+import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
 import cn.iocoder.yudao.module.infra.controller.app.aipromptmanagement.vo.*;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.WorkflowRunService;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils.DifyFilesUtils;
@@ -12,9 +13,12 @@ import cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils.FileResp;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.MediaType;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.multipart.MultipartFile;
+import reactor.core.publisher.Flux;
 
 import java.io.File;
 import java.io.IOException;
@@ -29,6 +33,7 @@ import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
 @RestController
 @RequestMapping("/infra/ai-dify")
 @Validated
+@Slf4j
 public class AppAiController {
     @Resource
     private WorkflowRunService workflowRunService;
@@ -170,8 +175,35 @@ public class AppAiController {
         Map<String, Object> inputs = new HashMap<>();
         String apiKey = DictFrameworkUtils.parseDictDataValue("ai_key", "多轮对话");
         inputs.put("type", createReqVO.getType());
+        // Map<String, Object> inputs, String user, String apiKey,String query,String conversationId
+        return success(workflowRunService.getDifyMessage(inputs, SecurityFrameworkUtils.getLoginUserId().toString(),
+                apiKey, createReqVO.getQuery(), createReqVO.getConversationId()));
+    }
+    
+    @PostMapping(value = "/chat-messages-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+    @Operation(summary = "AI问答(流式)", description = "流式返回,响应较快")
+    public Flux<String> generateMindMap(@RequestBody MessageDifyaiReqVO createReqVO) {
+        String apiKey = DictFrameworkUtils.parseDictDataValue("ai_key", "多轮对话");
+        Map<String, Object> inputs = new HashMap<>();
+        inputs.put("type", createReqVO.getType());
         
-        return success(workflowRunService.getDifyResul(inputs, createReqVO.getType(), apiKey));
+        return workflowRunService.getDifyMessageStreaming(inputs,
+                        SecurityFrameworkUtils.getLoginUserId().toString(),
+                        apiKey, createReqVO.getQuery(), createReqVO.getConversationId())
+                .flatMap(response -> {
+                    log.info("流式结果:" + response.toString());
+                    if (response.getEvent().equals("message")) {
+                        String answer = response.getAnswer(); // 完整答案
+                        log.info("进入workflow_finished阶段:" + answer);
+                        if (StrUtil.isNotBlank(answer)) {
+                            return Flux.just(answer);
+                        }
+                    }
+                    if (response.getEvent().equals("message_end")) {
+                        log.info("进入message_end");
+                    }
+                    return Flux.empty(); // 如果不是 workflow_finished 或 message_end,返回空 Flux
+                });
     }
     
     /**

+ 1 - 1
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/controller/app/aipromptmanagement/vo/MessageDifyaiReqVO.java

@@ -17,5 +17,5 @@ public class MessageDifyaiReqVO {
      */
     private String query;
     
-    
+    private String conversationId;
 }

+ 14 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/OutputsData.java

@@ -0,0 +1,14 @@
+package cn.iocoder.yudao.module.infra.service.aipromptmanagement;
+
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+@Data
+public class OutputsData implements Serializable {
+    @Serial
+    private static final long serialVersionUID = -6393709720035221940L;
+    private String answer;
+}

+ 51 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/StreamResponse.java

@@ -0,0 +1,51 @@
+package cn.iocoder.yudao.module.infra.service.aipromptmanagement;
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+@Data
+public class StreamResponse implements Serializable {
+    
+    @Serial
+    private static final long serialVersionUID = 1811796044273282773L;
+    /**
+     * 不同模式下的事件类型.
+     */
+    private String event;
+    
+    /**
+     * agent_thought id.
+     */
+    private String id;
+    
+    /**
+     * 任务ID.
+     */
+    private String task_id;
+    
+    /**
+     * 消息唯一ID.
+     */
+    private String message_id;
+    
+    /**
+     * LLM 返回文本块内容.
+     */
+    private String answer;
+    
+    /**
+     * 创建时间戳.
+     */
+    private Long created_at;
+    
+    /**
+     * 会话 ID.
+     */
+    private String conversation_id;
+    
+    private StreamResponseData data;
+}
+
+

+ 18 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/StreamResponseData.java

@@ -0,0 +1,18 @@
+package cn.iocoder.yudao.module.infra.service.aipromptmanagement;
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+@Data
+public class StreamResponseData implements Serializable {
+    @Serial
+    private static final long serialVersionUID = -8708810272061425149L;
+    private String id;
+    private String workflow_id;
+    private String status;
+    private Long created_at;
+    private Long finished_at;
+    private OutputsData outputs;
+}

+ 5 - 1
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/WorkflowRunService.java

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.infra.service.aipromptmanagement;
 
 
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils.FileResp;
+import reactor.core.publisher.Flux;
 
 import java.io.File;
 import java.util.Map;
@@ -41,5 +42,8 @@ public interface WorkflowRunService {
     
     String getDifyResul(Map<String, Object> inputs, String eventType, String aiKey, Long userId);
     
-    String getDifyMessage(Map<String, Object> inputs, String eventType, String aiKey, Long userId);
+    String getDifyMessage(Map<String, Object> inputs, String user, String apiKey, String query, String conversationId);
+    
+    Flux<StreamResponse> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey, String query,
+                                                 String conversationId);
 }

+ 21 - 7
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/WorkflowRunServiceImpl.java

@@ -5,6 +5,7 @@ import cn.hutool.core.util.IdUtil;
 import cn.hutool.http.HttpRequest;
 import cn.hutool.http.HttpResponse;
 import cn.hutool.json.JSONObject;
+import cn.iocoder.yudao.framework.ai.core.model.wenduoduo.api.DifyApi;
 import cn.iocoder.yudao.framework.dict.core.DictFrameworkUtils;
 import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils;
 import cn.iocoder.yudao.module.infra.service.aipromptmanagement.utils.DifyFilesUtils;
@@ -14,6 +15,7 @@ import com.alibaba.fastjson.JSON;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.validation.annotation.Validated;
+import reactor.core.publisher.Flux;
 
 import java.io.File;
 import java.util.Map;
@@ -29,12 +31,12 @@ public class WorkflowRunServiceImpl implements WorkflowRunService {
     
     @Value("${dify.base-url}")
     private String baseUrl;
-    
     @Value("${dify.ocr-key}")
     private String ocrKey;
     @Value("${dify.file-path}")
     private String filePath;
     
+    
     /**
      * 从 URL 中提取文件扩展名
      *
@@ -247,8 +249,19 @@ public class WorkflowRunServiceImpl implements WorkflowRunService {
     }
     
     @Override
-    public String getDifyMessage(Map<String, Object> inputs, String eventType, String aiKey, Long userId) {
-        return "";
+    public String getDifyMessage(Map<String, Object> inputs, String user, String apiKey, String query,
+                                 String conversationId) {
+        DifyResponse response = getDifyMessageResponse(inputs, user, apiKey, query, conversationId);
+        return response.getData().getOutputs().getText();
+    }
+    
+    @Override
+    public Flux<StreamResponse> getDifyMessageStreaming(Map<String, Object> inputs, String user, String apiKey,
+                                                        String query,
+                                                        String conversationId) {
+        DifyApi difyApi = new DifyApi(baseUrl);
+        Flux<StreamResponse> rs = difyApi.chatMessages(apiKey, inputs, user, query, conversationId);
+        return rs;
     }
     
     /**
@@ -258,7 +271,8 @@ public class WorkflowRunServiceImpl implements WorkflowRunService {
      * @param user   用户
      * @return {@link DifyResponse }
      */
-    public DifyResponse getDifyMessageResponse(Map<String, Object> inputs, String user, String apiKey) {
+    public DifyResponse getDifyMessageResponse(Map<String, Object> inputs, String user, String apiKey, String query,
+                                               String conversationId) {
         DifyResponse difyResponse = new DifyResponse();
         // 接口地址
         String url = baseUrl + "/v1/chat-messages";
@@ -271,10 +285,10 @@ public class WorkflowRunServiceImpl implements WorkflowRunService {
         // 构建请求体
         JSONObject requestBody = new JSONObject();
         requestBody.put("inputs", inputs);
-        requestBody.put("response_mode", "streaming");
+        requestBody.put("response_mode", "blocking");
         requestBody.put("user", user);
-//        requestBody.put("conversation_id", );
-        requestBody.put("query", user);
+        requestBody.put("conversation_id", conversationId);
+        requestBody.put("query", query);
         
         // 发送 POST 请求
         HttpResponse response = HttpRequest.post(url)

+ 92 - 0
yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/service/aipromptmanagement/utils/DifyApi.java

@@ -0,0 +1,92 @@
+package cn.iocoder.yudao.framework.ai.core.model.wenduoduo.api;
+
+import cn.hutool.json.JSONObject;
+import cn.iocoder.yudao.module.infra.service.aipromptmanagement.StreamResponse;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpRequest;
+import org.springframework.http.HttpStatusCode;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+// TODO @新:要不改成 WenDuoDuoPptApi
+
+/**
+ * 文多多 API
+ *
+ * @author xiaoxin
+ * @see <a href="https://docmee.cn/open-platform/api">PPT 生成 API</a>
+ */
+@Slf4j
+public class DifyApi {
+    
+    public static final String BASE_URL = "https://docmee.cn";
+    
+    private final WebClient webClient;
+    
+    private final Predicate<HttpStatusCode> STATUS_PREDICATE = status -> !status.is2xxSuccessful();
+    
+    private final Function<Object, Function<ClientResponse, Mono<? extends Throwable>>> EXCEPTION_FUNCTION =
+            reqParam -> response -> response.bodyToMono(String.class).handle((responseBody, sink) -> {
+                HttpRequest request = response.request();
+                log.error("[wdd-api] 调用失败!请求方式:[{}],请求地址:[{}],请求参数:[{}],响应数据: [{}]",
+                        request.getMethod(), request.getURI(), reqParam, responseBody);
+                sink.error(new IllegalStateException("[wdd-api] 调用失败!"));
+            });
+    
+    // TODO @新:是不是不用 baseUrl 哈
+    public DifyApi(String baseUrl) {
+        this.webClient = WebClient.builder()
+                .baseUrl(baseUrl)
+                
+                .build();
+    }
+    
+    
+    /**
+     * 生成大纲内容
+     *
+     * @return 大纲内容流
+     */
+    public Flux<StreamResponse> chatMessages(String token, Map<String, Object> inputs, String user,
+                                             String query, String conversationId) {
+        // 构建请求体
+        JSONObject requestBody = new JSONObject();
+        requestBody.put("inputs", inputs);
+        requestBody.put("response_mode", "streaming");
+        requestBody.put("user", user);
+        requestBody.put("conversation_id", conversationId);
+        requestBody.put("query", query);
+        Flux<StreamResponse> rs = webClient.post()
+                .uri("/v1/chat-messages")
+                .headers(httpHeaders -> {
+                    httpHeaders.setContentType(MediaType.APPLICATION_JSON);
+                    httpHeaders.setBearerAuth(token);
+                })
+                .bodyValue(requestBody.toString())
+                .retrieve()
+                .bodyToFlux(StreamResponse.class)
+                .filter(this::shouldInclude).onErrorResume(throwable -> {
+                    log.info("异常输出:" + throwable);
+                    return null;
+                });
+        return rs;
+    }
+    
+    private boolean shouldInclude(StreamResponse streamResponse) {
+        // 示例:只要message节点的数据和message_end节点的数据
+        if (streamResponse.getEvent().equals("message")
+                || streamResponse.getEvent().equals("message_end")) {
+            return true;
+        }
+        return false;
+    }
+    
+    
+}