第8天 - Streaming 流式响应处理
系列: Spring AI Alibaba 技术博客系列
日期: 2026-05-02
难度: ⭐⭐⭐⭐
前置知识: ChatModel 基础使用、Prompt 工程、Function Calling 基础
目录
- 为什么需要流式响应?
- Streaming 的核心原理与通信模型
- Spring AI Alibaba 的 Streaming API 概览
- 实战一:基础流式聊天
- 实战二:WebFlux SSE 端点暴露流式服务
- 实战三:流式输出与 Spring MVC 集成
- 实战四:流式 Function Calling
- 实战五:流式响应的中断与取消
- 实战六:Token 计数与流式统计
- 底层机制:SSE 协议详解
- 性能优化:连接池、缓冲与背压
- 错误处理与容错
- 最佳实践总结
- 常见问题与排查指南
- 总结
1. 为什么需要流式响应?
1.1 用户体验的痛点
在 AI 应用开发中,用户最常遇到的痛点之一就是等待时间过长。当用户向 AI 发送一个问题时,大模型需要数秒甚至数十秒才能生成完整的回复。如果采用传统的同步请求-响应模式:
- 用户视角: 屏幕空白,不知道系统是否在运行,是否卡住了
- 体验对比: ChatGPT、文心一言等主流 AI 产品都已经实现了逐字输出
- 心理预期: 用户已经习惯了”即时反馈”的交互模式
1.2 流式响应解决了什么问题?
流式响应(Streaming Response)将模型的输出拆分为多个小块(Chunk),逐个推送给客户端。它带来的核心优势:
| 维度 | 非流式 | 流式 |
|---|---|---|
| 首字延迟(TTFT) | 3-30秒 | 0.2-1秒 |
| 用户感知 | 长时间等待 | 即时反馈 |
| 服务端内存 | 需缓存完整响应 | 逐块处理,内存占用低 |
| 实时交互 | 不支持 | 支持中断、实时反馈 |
1.3 什么是 TTFT?
TTFT(Time To First Token) 是衡量流式体验的关键指标——从用户发送请求到看到第一个字的时间。在流式模式下,TTFT 通常只有几百毫秒,而完整响应可能需要数秒到数十秒。用户感知的延迟从”等完整结果”变成了”等第一个字”,体验差异巨大。
2. Streaming 的核心原理与通信模型
2.1 服务端视角:模型如何流式输出
大语言模型在生成文本时,本质上是自回归(Autoregressive) 的过程:
输入: "Spring AI Alibaba 是..."
Step 1: 生成 token → "一"
Step 2: 生成 token → "个"
Step 3: 生成 token → "基"
Step 4: 生成 token → "于"
Step 5: 生成 token → "..."
每一步生成一个 token(词元),模型内部状态更新后继续下一步。流式响应就是把这个逐 token 生成的过程实时暴露出来,而不是等到全部生成完毕再返回。
2.2 传输协议:SSE(Server-Sent Events)
Spring AI Alibaba 的流式响应主要通过 SSE(Server-Sent Events) 协议实现。SSE 是一种基于 HTTP 的单向推送协议:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: {"choices":[{"delta":{"content":"你"},"index":0}]}
data: {"choices":[{"delta":{"content":"好"},"index":0}]}
data: {"choices":[{"delta":{"content":","},"index":0}]}
data: {"choices":[{"delta":{"content":"我"},"index":0}]}
data: [DONE]
SSE 的关键特性:
- 基于普通 HTTP,无需额外协议升级(区别于 WebSocket)
- 服务端可以持续推送事件
- 客户端自动重连机制
- 单向通信(服务端 → 客户端),但对于 AI 流式输出完全够用
2.3 Spring AI Alibaba 的流式架构
┌──────────────────────────────────────────────────────┐
│ 客户端 (浏览器/终端) │
│ ← SSE 逐块推送 ← │
├──────────────────────────────────────────────────────┤
│ Spring Boot Web 层 │
│ (WebFlux SSE 或 MVC StreamingResponseBody) │
├──────────────────────────────────────────────────────┤
│ Spring AI Alibaba 流式 API │
│ Flux<ChatResponse> / StreamingChatModel │
├──────────────────────────────────────────────────────┤
│ DashScope API (通义千问) │
│ ← HTTP Chunked Transfer ← │
└──────────────────────────────────────────────────────┘
3. Spring AI Alibaba 的 Streaming API 概览
Spring AI Alibaba 提供了多种流式 API 入口,适应不同的使用场景:
3.1 ChatModel 流式接口
public interface ChatModel {
// 同步调用(非流式)
ChatResponse call(Prompt prompt);
// 流式调用 → 返回 Flux<ChatResponse>
Flux<ChatResponse> stream(Prompt prompt);
}
stream() 方法返回一个 Reactor Flux,这是一个响应式流,可以逐块消费模型生成的内容。
3.2 StreamingChatModel 接口
public interface StreamingChatModel {
// 回调式流式调用
void stream(Prompt prompt, StreamCallback callback);
}
interface StreamCallback {
void onNext(ChatResponse response); // 收到每个 chunk
void onError(Throwable throwable); // 发生错误
void onComplete(); // 流结束
}
这种接口适合不支持响应式编程的场景。
3.3 StreamingAdvisors
Spring AI 还支持流式 Advisor 模式,可以在流式输出的各个阶段插入自定义逻辑(如日志、过滤、增强等)。
4. 实战一:基础流式聊天
4.1 环境准备
确保已引入 Spring AI Alibaba 依赖:
<dependency>
<groupId>com.alibaba.cloud.ai</groupId>
<artifactId>spring-ai-alibaba-starter</artifactId>
<version>1.0.0-M5.1</version>
</dependency>
application.yml 配置:
spring:
ai:
dashscope:
api-key: ${DASHSCOPE_API_KEY}
chat:
options:
model: qwen-plus
temperature: 0.7
4.2 最简单的流式调用
import com.alibaba.cloud.ai.dashscope.chat.DashScopeChatModel;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class StreamingChatService {
private final DashScopeChatModel chatModel;
@Autowired
public StreamingChatService(DashScopeChatModel chatModel) {
this.chatModel = chatModel;
}
/**
* 基础流式聊天:逐块打印模型输出
*/
public void simpleStreamingChat(String userInput) {
Prompt prompt = new Prompt(userInput);
Flux<ChatResponse> responseFlux = chatModel.stream(prompt);
responseFlux
.doOnNext(response -> {
// 每个 chunk 到来时执行
String content = response.getResult().getOutput().getContent();
System.out.print(content); // 直接打印,实现"打字机"效果
})
.doOnComplete(() -> System.out.println("\n--- 流式输出完成 ---"))
.doOnError(error -> System.err.println("流式输出出错: " + error.getMessage()))
.blockLast(); // 阻塞等待流结束(仅用于演示,生产环境不要阻塞)
}
}
运行效果:
你好Spring AI Alibaba$ 是一个基于 Spring AI 框架的扩展,
专门用于简化阿里巴巴通义千问等大模型在 Spring Boot 应用中的集成。
它提供了开箱即用的配置、流式响应支持、函数调用等功能...
--- 流式输出完成 ---
4.3 带系统提示词的流式聊天
import org.springframework.ai.chat.messages.SystemMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.prompt.Prompt;
import java.util.List;
public void streamingWithSystemPrompt() {
SystemMessage systemMessage = new SystemMessage(
"你是一个专业的 Java 技术专家,擅长用简洁清晰的语言解释复杂概念。" +
"回答要包含代码示例,并指出最佳实践。"
);
UserMessage userMessage = new UserMessage(
"请解释 Java 中 CompletableFuture 的使用场景和注意事项"
);
Prompt prompt = new Prompt(List.of(systemMessage, userMessage));
chatModel.stream(prompt)
.map(response -> response.getResult().getOutput().getContent())
.doOnNext(System.out::print)
.doOnComplete(() -> System.out.println("\n=== 回答结束 ==="))
.blockLast();
}
4.4 聚合完整响应
有时你需要在流结束后获得完整文本(例如用于日志记录或后续处理):
import org.springframework.ai.chat.model.ChatResponse;
import reactor.core.publisher.Mono;
public Mono<String> streamingAndAggregate(String userInput) {
Prompt prompt = new Prompt(userInput);
return chatModel.stream(prompt)
// 提取每个 chunk 的文本内容
.map(response -> response.getResult().getOutput().getContent())
// 累积所有 chunk 为一个完整字符串
.collectList()
.map(chunks -> String.join("", chunks));
}
5. 实战二:WebFlux SSE 端点暴露流式服务
这是最经典的生产场景:通过 Spring WebFlux 将流式 AI 响应以 SSE 格式暴露给前端。
5.1 引入 WebFlux 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
注意: 如果你已有
spring-boot-starter-web(Tomcat),需要排除它并替换为 WebFlux,因为两者不能共存。
5.2 创建 SSE Controller
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/api/chat")
public class StreamingChatController {
private final DashScopeChatModel chatModel;
public StreamingChatController(DashScopeChatModel chatModel) {
this.chatModel = chatModel;
}
/**
* SSE 流式聊天端点
* GET /api/chat/stream?message=你好
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatResponse> streamChat(@RequestParam String message) {
Prompt prompt = new Prompt(message);
return chatModel.stream(prompt);
}
/**
* SSE 流式聊天端点(仅返回文本内容,不含元数据)
*/
@GetMapping(value = "/stream/simple", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatSimple(@RequestParam String message) {
Prompt prompt = new Prompt(message);
return chatModel.stream(prompt)
.map(response -> response.getResult().getOutput().getContent());
}
/**
* 支持多轮对话的流式端点
*/
@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatResponse> streamChatWithHistory(
@RequestBody ChatRequest request) {
Prompt prompt = new Prompt(request.messages());
return chatModel.stream(prompt)
.doOnNext(response -> {
// 可以在这里记录日志、埋点等
logChunk(response);
});
}
private void logChunk(ChatResponse response) {
String content = response.getResult().getOutput().getContent();
if (content != null && !content.isEmpty()) {
// 记录每个 chunk(注意:生产环境建议采样记录,避免日志爆炸)
System.out.print(content);
}
}
}
record ChatRequest(List<org.springframework.ai.chat.messages.Message> messages) {}
5.3 前端调用示例(JavaScript EventSource)
<!DOCTYPE html>
<html>
<head>
<title>AI 流式聊天</title>
<style>
#output {
white-space: pre-wrap;
font-family: monospace;
padding: 10px;
background: #f5f5f5;
border-radius: 5px;
min-height: 100px;
}
.cursor {
animation: blink 1s infinite;
}
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0; }
}
</style>
</head>
<body>
<h2>AI 流式聊天演示</h2>
<input type="text" id="input" placeholder="输入问题..." style="width: 400px;">
<button onclick="startStream()">发送</button>
<div id="output"></div>
<script>
function startStream() {
const message = document.getElementById('input').value;
const output = document.getElementById('output');
output.textContent = '';
const eventSource = new EventSource(
`/api/chat/stream?message=${encodeURIComponent(message)}`
);
eventSource.onmessage = function(event) {
// 解析 SSE 数据
const data = JSON.parse(event.data);
const content = data.result?.output?.content;
if (content) {
output.textContent += content;
}
};
eventSource.onerror = function() {
eventSource.close();
console.log('流式连接已关闭');
};
eventSource.addEventListener('complete', function() {
eventSource.close();
console.log('流式输出完成');
});
}
</script>
</body>
</html>
5.4 使用 fetch + ReadableStream(现代方案)
如果不想用 SSE,也可以通过 fetch 配合 ReadableStream 实现类似效果:
@GetMapping(value = "/stream/fetch")
public Flux<ServerSentEvent<String>> streamChatFetch(@RequestParam String message) {
Prompt prompt = new Prompt(message);
return chatModel.stream(prompt)
.map(response -> response.getResult().getOutput().getContent())
.map(content -> ServerSentEvent.<String>builder()
.event("message")
.data(content)
.build())
.concatWith(Mono.just(ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()));
}
6. 实战三:流式输出与 Spring MVC 集成
如果你使用的是传统的 Spring MVC(非 WebFlux),也有多种方式实现流式输出。
6.1 方案一:StreamingResponseBody
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("/api/mvc")
public class MvcStreamingController {
private final DashScopeChatModel chatModel;
public MvcStreamingController(DashScopeChatModel chatModel) {
this.chatModel = chatModel;
}
@GetMapping("/stream")
public ResponseEntity<StreamingResponseBody> streamChatMvc(
@RequestParam String message) {
StreamingResponseBody body = outputStream -> {
Prompt prompt = new Prompt(message);
chatModel.stream(prompt)
.doOnNext(response -> {
String content = response.getResult().getOutput().getContent();
if (content != null) {
outputStream.write(content.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
}
})
.doOnComplete(() -> {
outputStream.write("\n[DONE]".getBytes(StandardCharsets.UTF_8));
outputStream.flush();
})
.doOnError(error -> {
String errorMsg = "\n[ERROR: " + error.getMessage() + "]";
outputStream.write(errorMsg.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
})
.blockLast();
};
return ResponseEntity.ok()
.contentType(MediaType.TEXT_PLAIN)
.body(body);
}
}
6.2 方案二:ResponseBodyEmitter
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@GetMapping("/stream/emitter")
public ResponseBodyEmitter streamChatEmitter(@RequestParam String message) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
Prompt prompt = new Prompt(message);
chatModel.stream(prompt)
.doOnNext(response -> {
String content = response.getResult().getOutput().getContent();
if (content != null) {
emitter.send(content);
}
})
.doOnComplete(() -> emitter.complete())
.doOnError(emitter::completeWithError)
.blockLast();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
方案选择建议:
- 新项目:优先使用 WebFlux + SSE,响应式原生支持流式
- 老项目迁移:StreamingResponseBody 最轻量
- 需要细粒度控制:ResponseBodyEmitter 最灵活
7. 实战四:流式 Function Calling
流式模式下使用 Function Calling 是一个高级且常见的需求。Spring AI Alibaba 在流式场景下对 Function Calling 做了特殊处理。
7.1 定义工具函数
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;
@Component
public class WeatherTool {
@Tool(description = "查询指定城市的当前天气")
public String getWeather(
@ToolParam(description = "城市名称,如 '北京'、'上海'") String city) {
// 模拟天气数据
return String.format("%s当前天气:晴,温度 25°C,湿度 40%%", city);
}
@Tool(description = "查询指定城市的未来三天天气预报")
public String getWeatherForecast(
@ToolParam(description = "城市名称") String city) {
return String.format(
"%s未来三天天气预报:\n" +
" 第一天:晴,22-28°C\n" +
" 第二天:多云,20-26°C\n" +
" 第三天:小雨,18-23°C", city);
}
}
7.2 配置工具并流式调用
import org.springframework.ai.chat.prompt.ChatOptions;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.ai.tool.ToolCallbackProvider;
import org.springframework.ai.tool.method.MethodToolCallbackProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
@Service
public class StreamingFunctionCallingService {
private final DashScopeChatModel chatModel;
@Autowired
public StreamingFunctionCallingService(DashScopeChatModel chatModel) {
this.chatModel = chatModel;
}
public Flux<ChatResponse> streamWithFunctionCalling(String userInput) {
// 创建 ToolCallbackProvider
ToolCallbackProvider toolCallbackProvider =
MethodToolCallbackProvider.builder()
.toolObjects(new WeatherTool())
.build();
// 构建 Prompt 并启用工具
Prompt prompt = new Prompt(
userInput,
ChatOptions.builder()
.toolCallbacks(toolCallbackProvider.getToolCallbacks())
.build()
);
return chatModel.stream(prompt)
.doOnNext(response -> {
// 检查是否有工具调用
AssistantMessage assistantMessage = response.getResult().getOutput();
if (assistantMessage.hasToolCalls()) {
System.out.println("\n[检测到工具调用]");
assistantMessage.getToolCalls().forEach(toolCall -> {
System.out.println(" 工具: " + toolCall.name());
System.out.println(" 参数: " + toolCall.arguments());
});
}
});
}
}
7.3 流式 Function Calling 的工作流程
用户: "明天北京天气怎么样?"
↓
模型流式输出思考过程...
↓
模型发现需要调用 getWeather 工具
↓
[暂停流式输出] → 执行工具调用 → 获取天气数据
↓
[继续流式输出] "根据查询,北京明天天气:晴,温度 25°C..."
↓
[DONE]
重要: 在流式模式下,当模型决定调用工具时,流式输出会暂停,等待工具执行完毕后再继续输出最终结果。这是预期行为。
8. 实战五:流式响应的中断与取消
流式响应的一个独特优势是支持中途取消——当用户改变主意或发现答案不需要时,可以中断生成过程。
8.1 Reactor 中的取消机制
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
public class StreamingCancellationDemo {
public void demonstrateCancellation() {
Flux<ChatResponse> flux = chatModel.stream(new Prompt("写一首关于春天的长诗"));
Disposable subscription = flux
.doOnNext(response -> {
String content = response.getResult().getOutput().getContent();
System.out.print(content);
})
.subscribe();
// 模拟 5 秒后取消
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("\n[用户取消]");
subscription.dispose(); // 取消订阅,中断流
}
}
8.2 Web 场景下的客户端中断
当浏览器关闭连接时,Reactor 会自动检测到连接断开并取消上游流:
@GetMapping(value = "/stream/cancellable", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamWithCancellation(@RequestParam String message) {
Prompt prompt = new Prompt(message);
return chatModel.stream(prompt)
.map(response -> response.getResult().getOutput().getContent())
.doOnCancel(() -> {
// 客户端断开连接时的回调
System.out.println("客户端已断开连接,停止生成");
})
.doFinally(signalType -> {
// 无论正常完成、错误还是取消都会执行
System.out.println("流结束,信号: " + signalType);
});
}
8.3 带超时控制的流式调用
import java.time.Duration;
public Flux<String> streamWithTimeout(String message) {
Prompt prompt = new Prompt(message);
return chatModel.stream(prompt)
.map(response -> response.getResult().getOutput().getContent())
.timeout(Duration.ofSeconds(30)) // 30 秒超时
.onErrorResume(TimeoutException.class, e -> {
System.out.println("流式调用超时");
return Flux.just("\n[响应超时,请稍后重试]");
});
}
9. 实战六:Token 计数与流式统计
在生产环境中,追踪 Token 使用量是必要的——它直接关系到成本和配额管理。
9.1 从流式响应中提取 Token 信息
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.metadata.Usage;
import reactor.core.publisher.Flux;
public class TokenTrackingService {
public Flux<String> streamWithTokenTracking(String message) {
Prompt prompt = new Prompt(message);
return chatModel.stream(prompt)
.doOnNext(response -> {
String content = response.getResult().getOutput().getContent();
if (content != null) {
System.out.print(content);
}
// 注意:中间 chunk 的 usage 可能为空或只有 partial 信息
Usage usage = response.getMetadata().getUsage();
if (usage != null) {
System.out.println("\n[Token 统计] " +
"Prompt: " + usage.getPromptTokens() +
" | Completion: " + usage.getCompletionTokens() +
" | Total: " + usage.getTotalTokens());
}
})
.map(response -> response.getResult().getOutput().getContent())
.doOnComplete(() -> {
System.out.println("\n=== 流式输出完成 ===");
});
}
}
9.2 完整的 Token 统计服务
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import reactor.core.publisher.Mono;
public record TokenStats(
int promptTokens,
int completionTokens,
int totalTokens,
long durationMs,
double tokensPerSecond
) {}
public Mono<TokenStats> streamAndCalculateStats(String message) {
long startTime = System.currentTimeMillis();
return chatModel.stream(new Prompt(message))
.collectList() // 收集所有 chunk
.map(responses -> {
// 最后一个 response 包含完整的 usage 信息
ChatResponse lastResponse = responses.get(responses.size() - 1);
Usage usage = lastResponse.getMetadata().getUsage();
long duration = System.currentTimeMillis() - startTime;
int totalTokens = usage.getTotalTokens();
return new TokenStats(
usage.getPromptTokens(),
usage.getCompletionTokens(),
totalTokens,
duration,
duration > 0 ? (totalTokens * 1000.0 / duration) : 0
);
});
}
输出示例:
TokenStats[
promptTokens=45,
completionTokens=312,
totalTokens=357,
durationMs=4523,
tokensPerSecond=78.9
]
10. 底层机制:SSE 协议详解
10.1 SSE 消息格式
SSE 协议的消息格式非常简单,每行一个字段:
event: message ← 事件类型(可选,默认为 "message")
id: 12345 ← 事件 ID(用于断线重连,可选)
retry: 3000 ← 重连时间(毫秒,可选)
data: {"content":"你好"} ← 事件数据(必需)
← 空行表示消息结束
关键规则:
- 每个 SSE 消息以空行结束
data字段可以有多行,每行前加data:data字段的值会自动用\n连接- 注释行以
:开头(客户端忽略)
10.2 Spring AI Alibaba 的 SSE 序列化
Spring AI Alibaba 使用 Jackson 将 ChatResponse 序列化为 JSON,然后通过 SSE 格式发送:
// 内部逻辑简化示意
Flux<ChatResponse> stream = chatModel.stream(prompt);
return stream
.map(response -> {
String json = objectMapper.writeValueAsString(response);
return "data: " + json + "\n\n";
});
10.3 [DONE] 信号的发送
当流结束时,Spring AI Alibaba 会发送一个特殊的 [DONE] 标记:
data: {"choices":[{"delta":{},"finish_reason":"stop"}]}
data: [DONE]
前端可以通过检测 finish_reason 字段或 [DONE] 标记来判断流是否结束。
11. 性能优化:连接池、缓冲与背压
11.1 连接池配置
DashScope API 底层使用 HTTP 客户端,合理配置连接池可以提升流式响应的性能:
spring:
ai:
dashscope:
api-key: ${DASHSCOPE_API_KEY}
chat:
options:
model: qwen-plus
# HTTP 客户端配置
base-url: https://dashscope.aliyuncs.com
timeout: 60000
11.2 缓冲控制
在高并发场景下,需要考虑**背压(Backpressure)**问题——当消费速度跟不上生产速度时:
public Flux<String> streamWithBackpressure(String message) {
return chatModel.stream(new Prompt(message))
.map(response -> response.getResult().getOutput().getContent())
// 限制缓冲区大小,防止内存溢出
.onBackpressureBuffer(100)
// 或者丢弃来不及处理的数据
// .onBackpressureDrop()
// 或者使用最新的值
// .onBackpressureLatest();
}
11.3 并发流式请求
import reactor.core.publisher.Flux;
public Flux<String> concurrentStreaming(List<String> messages) {
return Flux.fromIterable(messages)
// 并发处理,最多同时 5 个流
.flatMap(message ->
chatModel.stream(new Prompt(message))
.map(response -> response.getResult().getOutput().getContent()),
5 // concurrency 参数
);
}
注意: 并发流式请求会显著增加 API 调用频率和成本,务必做好限流和配额管理。
12. 错误处理与容错
12.1 流式调用中的常见错误
| 错误类型 | 原因 | 处理方式 |
|---|---|---|
| TimeoutException | 模型响应超时 | 重试或返回降级响应 |
| ResourceAccessException | 网络连接中断 | 自动重连或提示用户 |
| InternalServerError | 服务端错误 | 重试(带退避策略) |
| RateLimitException | 超过 API 限流 | 等待后重试 |
12.2 完整的错误处理示例
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.util.retry.Retry;
import java.time.Duration;
public Flux<String> streamWithRobustErrorHandling(String message) {
return chatModel.stream(new Prompt(message))
.map(response -> response.getResult().getOutput().getContent())
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(5))
.filter(this::isRetryableError)
.doBeforeRetry(retrySignal ->
System.out.println("重试第 " + retrySignal.totalRetries() + " 次"))
)
.onErrorResume(WebClientResponseException.TooManyRequests.class, e -> {
System.out.println("触发限流,返回降级响应");
return Flux.just("[服务繁忙,请稍后再试]");
})
.onErrorResume(TimeoutException.class, e -> {
System.out.println("响应超时");
return Flux.just("[响应超时,请缩短问题或稍后重试]");
})
.doOnError(error -> {
System.err.println("流式调用失败: " + error.getMessage());
});
}
private boolean isRetryableError(Throwable error) {
if (error instanceof WebClientResponseException ex) {
int statusCode = ex.getStatusCode().value();
// 5xx 服务器错误可以重试,4xx 客户端错误不重试
return statusCode >= 500;
}
return true;
}
12.3 健康检查与熔断
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
@Service
public class ResilientStreamingService {
@CircuitBreaker(name = "dashscopeChat", fallbackMethod = "streamFallback")
public Flux<String> streamWithCircuitBreaker(String message) {
return chatModel.stream(new Prompt(message))
.map(response -> response.getResult().getOutput().getContent());
}
public Flux<String> streamFallback(String message, Throwable t) {
return Flux.just(
"[AI 服务暂时不可用,已触发熔断保护]\n" +
"错误原因: " + t.getMessage() + "\n" +
"请稍后再试或联系客服。"
);
}
}
13. 最佳实践总结
13.1 架构选择
| 场景 | 推荐方案 |
|---|---|
| 新项目、高性能要求 | WebFlux + SSE |
| 传统 MVC 项目 | StreamingResponseBody |
| 需要双向通信 | WebSocket(但 AI 场景通常不需要) |
| 移动端 App | HTTP Chunked / WebSocket |
13.2 编码规范
- 始终处理取消信号:使用
doOnCancel()处理客户端断开连接的情况 - 设置合理的超时:防止长时间挂起的连接占用资源
- 实现优雅降级:当流式服务不可用时,提供非流式或静态响应
- 记录关键指标:TTFT、总延迟、Token 吞吐量、错误率
- 控制并发:避免无限制的并发流式请求耗尽 API 配额
13.3 用户体验优化
// 发送"正在思考"的初始信号
@GetMapping(value = "/stream/ux-optimized", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamWithUX(@RequestParam String message) {
return Flux.concat(
// 1. 立即发送"开始"信号
Mono.just(ServerSentEvent.<String>builder()
.event("status")
.data("正在思考...")
.build()),
// 2. 流式输出 AI 回答
chatModel.stream(new Prompt(message))
.map(response -> response.getResult().getOutput().getContent())
.map(content -> ServerSentEvent.<String>builder()
.event("message")
.data(content)
.build()),
// 3. 发送"完成"信号
Mono.just(ServerSentEvent.<String>builder()
.event("status")
.data("完成")
.build())
);
}
前端可以根据 status 事件显示/隐藏加载动画,根据 message 事件逐字渲染回答。
13.4 安全注意事项
- 不要将 API Key 暴露到前端:流式请求必须经过后端代理
- 验证用户输入:防止 Prompt 注入攻击
- 限制请求频率:对每个用户/IP 实施限流
- 审查敏感输出:在流式输出中加入内容过滤逻辑
14. 常见问题与排查指南
Q1: 流式响应在某个 chunk 后卡住了?
原因: 网络连接中断或模型生成缓慢。
排查:
.doOnNext(response -> {
System.out.println("收到 chunk, 时间戳: " + System.currentTimeMillis());
})
.timeout(Duration.ofSeconds(10), Flux.just("[超时]"))
Q2: 前端收到 [DONE] 但内容不完整?
原因: 模型达到了 max_tokens 限制。
解决: 增加 max_tokens 配置:
spring:
ai:
dashscope:
chat:
options:
max-tokens: 4096
Q3: WebFlux 项目报错”No ServletContext set”?
原因: 同时引入了 spring-boot-starter-web 和 spring-boot-starter-webflux。
解决: 移除 spring-boot-starter-web,仅保留 WebFlux。
Q4: 流式输出中文乱码?
原因: 字符编码设置不正确。
解决: 确保使用 UTF-8:
outputStream.write(content.getBytes(StandardCharsets.UTF_8));
Q5: 并发流式请求导致 API 限流?
原因: 超出 DashScope API 的 QPS 限制。
解决:
import reactor.core.publisher.Flux;
import java.time.Duration;
// 限流:每秒最多 10 个请求
Flux.fromIterable(messages)
.delayElements(Duration.ofMillis(100)) // 间隔 100ms
.flatMap(msg -> chatModel.stream(new Prompt(msg)));
15. 总结
流式响应是 AI 应用中提升用户体验的关键技术。Spring AI Alibaba 通过 Reactor Flux 提供了优雅且强大的流式 API:
Flux<ChatResponse>:响应式流式接口,适合 WebFlux 场景StreamCallback:回调式接口,适合传统阻塞场景- SSE 协议:简单高效的推送协议,浏览器原生支持
- 取消、超时、重试:完善的流式生命周期管理
- Token 统计:流式输出的成本追踪
在实际项目中,我建议:
- 新项目统一使用 WebFlux + SSE
- 所有流式端点都加入超时和取消处理
- 做好 Token 统计和限流
- 前端实现打字机效果和加载状态
下一篇我们将进入 Embedding 向量嵌入 的世界——这是构建 RAG(检索增强生成)应用的基础。敬请期待!
参考资料: