第8天 - Streaming 流式响应处理

系列: Spring AI Alibaba 技术博客系列
日期: 2026-05-02
难度: ⭐⭐⭐⭐
前置知识: ChatModel 基础使用、Prompt 工程、Function Calling 基础


目录

  1. 为什么需要流式响应?
  2. Streaming 的核心原理与通信模型
  3. Spring AI Alibaba 的 Streaming API 概览
  4. 实战一:基础流式聊天
  5. 实战二:WebFlux SSE 端点暴露流式服务
  6. 实战三:流式输出与 Spring MVC 集成
  7. 实战四:流式 Function Calling
  8. 实战五:流式响应的中断与取消
  9. 实战六:Token 计数与流式统计
  10. 底层机制:SSE 协议详解
  11. 性能优化:连接池、缓冲与背压
  12. 错误处理与容错
  13. 最佳实践总结
  14. 常见问题与排查指南
  15. 总结

1. 为什么需要流式响应?

1.1 用户体验的痛点

在 AI 应用开发中,用户最常遇到的痛点之一就是等待时间过长。当用户向 AI 发送一个问题时,大模型需要数秒甚至数十秒才能生成完整的回复。如果采用传统的同步请求-响应模式:

  • 用户视角: 屏幕空白,不知道系统是否在运行,是否卡住了
  • 体验对比: ChatGPT、文心一言等主流 AI 产品都已经实现了逐字输出
  • 心理预期: 用户已经习惯了”即时反馈”的交互模式

1.2 流式响应解决了什么问题?

流式响应(Streaming Response)将模型的输出拆分为多个小块(Chunk),逐个推送给客户端。它带来的核心优势:

维度非流式流式
首字延迟(TTFT)3-30秒0.2-1秒
用户感知长时间等待即时反馈
服务端内存需缓存完整响应逐块处理,内存占用低
实时交互不支持支持中断、实时反馈

1.3 什么是 TTFT?

TTFT(Time To First Token) 是衡量流式体验的关键指标——从用户发送请求到看到第一个字的时间。在流式模式下,TTFT 通常只有几百毫秒,而完整响应可能需要数秒到数十秒。用户感知的延迟从”等完整结果”变成了”等第一个字”,体验差异巨大。


2. Streaming 的核心原理与通信模型

2.1 服务端视角:模型如何流式输出

大语言模型在生成文本时,本质上是自回归(Autoregressive) 的过程:

输入: "Spring AI Alibaba 是..."

Step 1: 生成 token → "一"
Step 2: 生成 token → "个"
Step 3: 生成 token → "基"
Step 4: 生成 token → "于"
Step 5: 生成 token → "..."

每一步生成一个 token(词元),模型内部状态更新后继续下一步。流式响应就是把这个逐 token 生成的过程实时暴露出来,而不是等到全部生成完毕再返回。

2.2 传输协议:SSE(Server-Sent Events)

Spring AI Alibaba 的流式响应主要通过 SSE(Server-Sent Events) 协议实现。SSE 是一种基于 HTTP 的单向推送协议:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

data: {"choices":[{"delta":{"content":"你"},"index":0}]}

data: {"choices":[{"delta":{"content":"好"},"index":0}]}

data: {"choices":[{"delta":{"content":","},"index":0}]}

data: {"choices":[{"delta":{"content":"我"},"index":0}]}

data: [DONE]

SSE 的关键特性:

  • 基于普通 HTTP,无需额外协议升级(区别于 WebSocket)
  • 服务端可以持续推送事件
  • 客户端自动重连机制
  • 单向通信(服务端 → 客户端),但对于 AI 流式输出完全够用

2.3 Spring AI Alibaba 的流式架构

┌──────────────────────────────────────────────────────┐
│                    客户端 (浏览器/终端)                  │
│                  ← SSE 逐块推送 ←                     │
├──────────────────────────────────────────────────────┤
│              Spring Boot Web 层                        │
│         (WebFlux SSE 或 MVC StreamingResponseBody)      │
├──────────────────────────────────────────────────────┤
│           Spring AI Alibaba 流式 API                    │
│    Flux<ChatResponse>  /  StreamingChatModel            │
├──────────────────────────────────────────────────────┤
│           DashScope API (通义千问)                       │
│        ← HTTP Chunked Transfer ←                      │
└──────────────────────────────────────────────────────┘

3. Spring AI Alibaba 的 Streaming API 概览

Spring AI Alibaba 提供了多种流式 API 入口,适应不同的使用场景:

3.1 ChatModel 流式接口

public interface ChatModel {
    // 同步调用(非流式)
    ChatResponse call(Prompt prompt);
    
    // 流式调用 → 返回 Flux<ChatResponse>
    Flux<ChatResponse> stream(Prompt prompt);
}

stream() 方法返回一个 Reactor Flux,这是一个响应式流,可以逐块消费模型生成的内容。

3.2 StreamingChatModel 接口

public interface StreamingChatModel {
    // 回调式流式调用
    void stream(Prompt prompt, StreamCallback callback);
}

interface StreamCallback {
    void onNext(ChatResponse response);  // 收到每个 chunk
    void onError(Throwable throwable);    // 发生错误
    void onComplete();                     // 流结束
}

这种接口适合不支持响应式编程的场景。

3.3 StreamingAdvisors

Spring AI 还支持流式 Advisor 模式,可以在流式输出的各个阶段插入自定义逻辑(如日志、过滤、增强等)。


4. 实战一:基础流式聊天

4.1 环境准备

确保已引入 Spring AI Alibaba 依赖:

<dependency>
    <groupId>com.alibaba.cloud.ai</groupId>
    <artifactId>spring-ai-alibaba-starter</artifactId>
    <version>1.0.0-M5.1</version>
</dependency>

application.yml 配置:

spring:
  ai:
    dashscope:
      api-key: ${DASHSCOPE_API_KEY}
      chat:
        options:
          model: qwen-plus
          temperature: 0.7

4.2 最简单的流式调用

import com.alibaba.cloud.ai.dashscope.chat.DashScopeChatModel;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class StreamingChatService {

    private final DashScopeChatModel chatModel;

    @Autowired
    public StreamingChatService(DashScopeChatModel chatModel) {
        this.chatModel = chatModel;
    }

    /**
     * 基础流式聊天:逐块打印模型输出
     */
    public void simpleStreamingChat(String userInput) {
        Prompt prompt = new Prompt(userInput);

        Flux<ChatResponse> responseFlux = chatModel.stream(prompt);

        responseFlux
            .doOnNext(response -> {
                // 每个 chunk 到来时执行
                String content = response.getResult().getOutput().getContent();
                System.out.print(content); // 直接打印,实现"打字机"效果
            })
            .doOnComplete(() -> System.out.println("\n--- 流式输出完成 ---"))
            .doOnError(error -> System.err.println("流式输出出错: " + error.getMessage()))
            .blockLast(); // 阻塞等待流结束(仅用于演示,生产环境不要阻塞)
    }
}

运行效果:

你好Spring AI Alibaba$ 是一个基于 Spring AI 框架的扩展,
专门用于简化阿里巴巴通义千问等大模型在 Spring Boot 应用中的集成。
它提供了开箱即用的配置、流式响应支持、函数调用等功能...
--- 流式输出完成 ---

4.3 带系统提示词的流式聊天

import org.springframework.ai.chat.messages.SystemMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.prompt.Prompt;
import java.util.List;

public void streamingWithSystemPrompt() {
    SystemMessage systemMessage = new SystemMessage(
        "你是一个专业的 Java 技术专家,擅长用简洁清晰的语言解释复杂概念。" +
        "回答要包含代码示例,并指出最佳实践。"
    );
    UserMessage userMessage = new UserMessage(
        "请解释 Java 中 CompletableFuture 的使用场景和注意事项"
    );

    Prompt prompt = new Prompt(List.of(systemMessage, userMessage));

    chatModel.stream(prompt)
        .map(response -> response.getResult().getOutput().getContent())
        .doOnNext(System.out::print)
        .doOnComplete(() -> System.out.println("\n=== 回答结束 ==="))
        .blockLast();
}

4.4 聚合完整响应

有时你需要在流结束后获得完整文本(例如用于日志记录或后续处理):

import org.springframework.ai.chat.model.ChatResponse;
import reactor.core.publisher.Mono;

public Mono<String> streamingAndAggregate(String userInput) {
    Prompt prompt = new Prompt(userInput);

    return chatModel.stream(prompt)
        // 提取每个 chunk 的文本内容
        .map(response -> response.getResult().getOutput().getContent())
        // 累积所有 chunk 为一个完整字符串
        .collectList()
        .map(chunks -> String.join("", chunks));
}

5. 实战二:WebFlux SSE 端点暴露流式服务

这是最经典的生产场景:通过 Spring WebFlux 将流式 AI 响应以 SSE 格式暴露给前端。

5.1 引入 WebFlux 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

注意: 如果你已有 spring-boot-starter-web(Tomcat),需要排除它并替换为 WebFlux,因为两者不能共存。

5.2 创建 SSE Controller

import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/api/chat")
public class StreamingChatController {

    private final DashScopeChatModel chatModel;

    public StreamingChatController(DashScopeChatModel chatModel) {
        this.chatModel = chatModel;
    }

    /**
     * SSE 流式聊天端点
     * GET /api/chat/stream?message=你好
     */
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ChatResponse> streamChat(@RequestParam String message) {
        Prompt prompt = new Prompt(message);
        return chatModel.stream(prompt);
    }

    /**
     * SSE 流式聊天端点(仅返回文本内容,不含元数据)
     */
    @GetMapping(value = "/stream/simple", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamChatSimple(@RequestParam String message) {
        Prompt prompt = new Prompt(message);
        return chatModel.stream(prompt)
            .map(response -> response.getResult().getOutput().getContent());
    }

    /**
     * 支持多轮对话的流式端点
     */
    @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ChatResponse> streamChatWithHistory(
            @RequestBody ChatRequest request) {

        Prompt prompt = new Prompt(request.messages());
        return chatModel.stream(prompt)
            .doOnNext(response -> {
                // 可以在这里记录日志、埋点等
                logChunk(response);
            });
    }

    private void logChunk(ChatResponse response) {
        String content = response.getResult().getOutput().getContent();
        if (content != null && !content.isEmpty()) {
            // 记录每个 chunk(注意:生产环境建议采样记录,避免日志爆炸)
            System.out.print(content);
        }
    }
}

record ChatRequest(List<org.springframework.ai.chat.messages.Message> messages) {}

5.3 前端调用示例(JavaScript EventSource)

<!DOCTYPE html>
<html>
<head>
    <title>AI 流式聊天</title>
    <style>
        #output {
            white-space: pre-wrap;
            font-family: monospace;
            padding: 10px;
            background: #f5f5f5;
            border-radius: 5px;
            min-height: 100px;
        }
        .cursor {
            animation: blink 1s infinite;
        }
        @keyframes blink {
            0%, 50% { opacity: 1; }
            51%, 100% { opacity: 0; }
        }
    </style>
</head>
<body>
    <h2>AI 流式聊天演示</h2>
    <input type="text" id="input" placeholder="输入问题..." style="width: 400px;">
    <button onclick="startStream()">发送</button>
    <div id="output"></div>

    <script>
        function startStream() {
            const message = document.getElementById('input').value;
            const output = document.getElementById('output');
            output.textContent = '';
            
            const eventSource = new EventSource(
                `/api/chat/stream?message=${encodeURIComponent(message)}`
            );

            eventSource.onmessage = function(event) {
                // 解析 SSE 数据
                const data = JSON.parse(event.data);
                const content = data.result?.output?.content;
                if (content) {
                    output.textContent += content;
                }
            };

            eventSource.onerror = function() {
                eventSource.close();
                console.log('流式连接已关闭');
            };

            eventSource.addEventListener('complete', function() {
                eventSource.close();
                console.log('流式输出完成');
            });
        }
    </script>
</body>
</html>

5.4 使用 fetch + ReadableStream(现代方案)

如果不想用 SSE,也可以通过 fetch 配合 ReadableStream 实现类似效果:

@GetMapping(value = "/stream/fetch")
public Flux<ServerSentEvent<String>> streamChatFetch(@RequestParam String message) {
    Prompt prompt = new Prompt(message);
    return chatModel.stream(prompt)
        .map(response -> response.getResult().getOutput().getContent())
        .map(content -> ServerSentEvent.<String>builder()
            .event("message")
            .data(content)
            .build())
        .concatWith(Mono.just(ServerSentEvent.<String>builder()
            .event("done")
            .data("[DONE]")
            .build()));
}

6. 实战三:流式输出与 Spring MVC 集成

如果你使用的是传统的 Spring MVC(非 WebFlux),也有多种方式实现流式输出。

6.1 方案一:StreamingResponseBody

import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;

@RestController
@RequestMapping("/api/mvc")
public class MvcStreamingController {

    private final DashScopeChatModel chatModel;

    public MvcStreamingController(DashScopeChatModel chatModel) {
        this.chatModel = chatModel;
    }

    @GetMapping("/stream")
    public ResponseEntity<StreamingResponseBody> streamChatMvc(
            @RequestParam String message) {
        
        StreamingResponseBody body = outputStream -> {
            Prompt prompt = new Prompt(message);
            
            chatModel.stream(prompt)
                .doOnNext(response -> {
                    String content = response.getResult().getOutput().getContent();
                    if (content != null) {
                        outputStream.write(content.getBytes(StandardCharsets.UTF_8));
                        outputStream.flush();
                    }
                })
                .doOnComplete(() -> {
                    outputStream.write("\n[DONE]".getBytes(StandardCharsets.UTF_8));
                    outputStream.flush();
                })
                .doOnError(error -> {
                    String errorMsg = "\n[ERROR: " + error.getMessage() + "]";
                    outputStream.write(errorMsg.getBytes(StandardCharsets.UTF_8));
                    outputStream.flush();
                })
                .blockLast();
        };

        return ResponseEntity.ok()
            .contentType(MediaType.TEXT_PLAIN)
            .body(body);
    }
}

6.2 方案二:ResponseBodyEmitter

import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@GetMapping("/stream/emitter")
public ResponseBodyEmitter streamChatEmitter(@RequestParam String message) {
    ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    ExecutorService executor = Executors.newSingleThreadExecutor();

    executor.submit(() -> {
        try {
            Prompt prompt = new Prompt(message);

            chatModel.stream(prompt)
                .doOnNext(response -> {
                    String content = response.getResult().getOutput().getContent();
                    if (content != null) {
                        emitter.send(content);
                    }
                })
                .doOnComplete(() -> emitter.complete())
                .doOnError(emitter::completeWithError)
                .blockLast();
        } catch (Exception e) {
            emitter.completeWithError(e);
        }
    });

    return emitter;
}

方案选择建议:

  • 新项目:优先使用 WebFlux + SSE,响应式原生支持流式
  • 老项目迁移:StreamingResponseBody 最轻量
  • 需要细粒度控制:ResponseBodyEmitter 最灵活

7. 实战四:流式 Function Calling

流式模式下使用 Function Calling 是一个高级且常见的需求。Spring AI Alibaba 在流式场景下对 Function Calling 做了特殊处理。

7.1 定义工具函数

import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;

@Component
public class WeatherTool {

    @Tool(description = "查询指定城市的当前天气")
    public String getWeather(
            @ToolParam(description = "城市名称,如 '北京'、'上海'") String city) {
        // 模拟天气数据
        return String.format("%s当前天气:晴,温度 25°C,湿度 40%%", city);
    }

    @Tool(description = "查询指定城市的未来三天天气预报")
    public String getWeatherForecast(
            @ToolParam(description = "城市名称") String city) {
        return String.format(
            "%s未来三天天气预报:\n" +
            "  第一天:晴,22-28°C\n" +
            "  第二天:多云,20-26°C\n" +
            "  第三天:小雨,18-23°C", city);
    }
}

7.2 配置工具并流式调用

import org.springframework.ai.chat.prompt.ChatOptions;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.ai.tool.ToolCallbackProvider;
import org.springframework.ai.tool.method.MethodToolCallbackProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;

@Service
public class StreamingFunctionCallingService {

    private final DashScopeChatModel chatModel;

    @Autowired
    public StreamingFunctionCallingService(DashScopeChatModel chatModel) {
        this.chatModel = chatModel;
    }

    public Flux<ChatResponse> streamWithFunctionCalling(String userInput) {
        // 创建 ToolCallbackProvider
        ToolCallbackProvider toolCallbackProvider = 
            MethodToolCallbackProvider.builder()
                .toolObjects(new WeatherTool())
                .build();

        // 构建 Prompt 并启用工具
        Prompt prompt = new Prompt(
            userInput,
            ChatOptions.builder()
                .toolCallbacks(toolCallbackProvider.getToolCallbacks())
                .build()
        );

        return chatModel.stream(prompt)
            .doOnNext(response -> {
                // 检查是否有工具调用
                AssistantMessage assistantMessage = response.getResult().getOutput();
                if (assistantMessage.hasToolCalls()) {
                    System.out.println("\n[检测到工具调用]");
                    assistantMessage.getToolCalls().forEach(toolCall -> {
                        System.out.println("  工具: " + toolCall.name());
                        System.out.println("  参数: " + toolCall.arguments());
                    });
                }
            });
    }
}

7.3 流式 Function Calling 的工作流程

用户: "明天北京天气怎么样?"
  ↓
模型流式输出思考过程...
  ↓
模型发现需要调用 getWeather 工具
  ↓
[暂停流式输出] → 执行工具调用 → 获取天气数据
  ↓
[继续流式输出] "根据查询,北京明天天气:晴,温度 25°C..."
  ↓
[DONE]

重要: 在流式模式下,当模型决定调用工具时,流式输出会暂停,等待工具执行完毕后再继续输出最终结果。这是预期行为。


8. 实战五:流式响应的中断与取消

流式响应的一个独特优势是支持中途取消——当用户改变主意或发现答案不需要时,可以中断生成过程。

8.1 Reactor 中的取消机制

import reactor.core.Disposable;
import reactor.core.publisher.Flux;

public class StreamingCancellationDemo {

    public void demonstrateCancellation() {
        Flux<ChatResponse> flux = chatModel.stream(new Prompt("写一首关于春天的长诗"));

        Disposable subscription = flux
            .doOnNext(response -> {
                String content = response.getResult().getOutput().getContent();
                System.out.print(content);
            })
            .subscribe();

        // 模拟 5 秒后取消
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        System.out.println("\n[用户取消]");
        subscription.dispose(); // 取消订阅,中断流
    }
}

8.2 Web 场景下的客户端中断

当浏览器关闭连接时,Reactor 会自动检测到连接断开并取消上游流:

@GetMapping(value = "/stream/cancellable", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamWithCancellation(@RequestParam String message) {
    Prompt prompt = new Prompt(message);

    return chatModel.stream(prompt)
        .map(response -> response.getResult().getOutput().getContent())
        .doOnCancel(() -> {
            // 客户端断开连接时的回调
            System.out.println("客户端已断开连接,停止生成");
        })
        .doFinally(signalType -> {
            // 无论正常完成、错误还是取消都会执行
            System.out.println("流结束,信号: " + signalType);
        });
}

8.3 带超时控制的流式调用

import java.time.Duration;

public Flux<String> streamWithTimeout(String message) {
    Prompt prompt = new Prompt(message);

    return chatModel.stream(prompt)
        .map(response -> response.getResult().getOutput().getContent())
        .timeout(Duration.ofSeconds(30)) // 30 秒超时
        .onErrorResume(TimeoutException.class, e -> {
            System.out.println("流式调用超时");
            return Flux.just("\n[响应超时,请稍后重试]");
        });
}

9. 实战六:Token 计数与流式统计

在生产环境中,追踪 Token 使用量是必要的——它直接关系到成本和配额管理。

9.1 从流式响应中提取 Token 信息

import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.metadata.Usage;
import reactor.core.publisher.Flux;

public class TokenTrackingService {

    public Flux<String> streamWithTokenTracking(String message) {
        Prompt prompt = new Prompt(message);

        return chatModel.stream(prompt)
            .doOnNext(response -> {
                String content = response.getResult().getOutput().getContent();
                if (content != null) {
                    System.out.print(content);
                }
                
                // 注意:中间 chunk 的 usage 可能为空或只有 partial 信息
                Usage usage = response.getMetadata().getUsage();
                if (usage != null) {
                    System.out.println("\n[Token 统计] " +
                        "Prompt: " + usage.getPromptTokens() + 
                        " | Completion: " + usage.getCompletionTokens() +
                        " | Total: " + usage.getTotalTokens());
                }
            })
            .map(response -> response.getResult().getOutput().getContent())
            .doOnComplete(() -> {
                System.out.println("\n=== 流式输出完成 ===");
            });
    }
}

9.2 完整的 Token 统计服务

import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import reactor.core.publisher.Mono;

public record TokenStats(
    int promptTokens,
    int completionTokens,
    int totalTokens,
    long durationMs,
    double tokensPerSecond
) {}

public Mono<TokenStats> streamAndCalculateStats(String message) {
    long startTime = System.currentTimeMillis();
    
    return chatModel.stream(new Prompt(message))
        .collectList() // 收集所有 chunk
        .map(responses -> {
            // 最后一个 response 包含完整的 usage 信息
            ChatResponse lastResponse = responses.get(responses.size() - 1);
            Usage usage = lastResponse.getMetadata().getUsage();
            
            long duration = System.currentTimeMillis() - startTime;
            int totalTokens = usage.getTotalTokens();
            
            return new TokenStats(
                usage.getPromptTokens(),
                usage.getCompletionTokens(),
                totalTokens,
                duration,
                duration > 0 ? (totalTokens * 1000.0 / duration) : 0
            );
        });
}

输出示例:

TokenStats[
  promptTokens=45, 
  completionTokens=312, 
  totalTokens=357, 
  durationMs=4523, 
  tokensPerSecond=78.9
]

10. 底层机制:SSE 协议详解

10.1 SSE 消息格式

SSE 协议的消息格式非常简单,每行一个字段:

event: message        ← 事件类型(可选,默认为 "message")
id: 12345             ← 事件 ID(用于断线重连,可选)
retry: 3000           ← 重连时间(毫秒,可选)
data: {"content":"你好"} ← 事件数据(必需)
                      ← 空行表示消息结束

关键规则:

  • 每个 SSE 消息以空行结束
  • data 字段可以有多行,每行前加 data:
  • data 字段的值会自动用 \n 连接
  • 注释行以 : 开头(客户端忽略)

10.2 Spring AI Alibaba 的 SSE 序列化

Spring AI Alibaba 使用 JacksonChatResponse 序列化为 JSON,然后通过 SSE 格式发送:

// 内部逻辑简化示意
Flux<ChatResponse> stream = chatModel.stream(prompt);

return stream
    .map(response -> {
        String json = objectMapper.writeValueAsString(response);
        return "data: " + json + "\n\n";
    });

10.3 [DONE] 信号的发送

当流结束时,Spring AI Alibaba 会发送一个特殊的 [DONE] 标记:

data: {"choices":[{"delta":{},"finish_reason":"stop"}]}

data: [DONE]

前端可以通过检测 finish_reason 字段或 [DONE] 标记来判断流是否结束。


11. 性能优化:连接池、缓冲与背压

11.1 连接池配置

DashScope API 底层使用 HTTP 客户端,合理配置连接池可以提升流式响应的性能:

spring:
  ai:
    dashscope:
      api-key: ${DASHSCOPE_API_KEY}
      chat:
        options:
          model: qwen-plus
      # HTTP 客户端配置
      base-url: https://dashscope.aliyuncs.com
      timeout: 60000

11.2 缓冲控制

在高并发场景下,需要考虑**背压(Backpressure)**问题——当消费速度跟不上生产速度时:

public Flux<String> streamWithBackpressure(String message) {
    return chatModel.stream(new Prompt(message))
        .map(response -> response.getResult().getOutput().getContent())
        // 限制缓冲区大小,防止内存溢出
        .onBackpressureBuffer(100)
        // 或者丢弃来不及处理的数据
        // .onBackpressureDrop()
        // 或者使用最新的值
        // .onBackpressureLatest();
}

11.3 并发流式请求

import reactor.core.publisher.Flux;

public Flux<String> concurrentStreaming(List<String> messages) {
    return Flux.fromIterable(messages)
        // 并发处理,最多同时 5 个流
        .flatMap(message -> 
            chatModel.stream(new Prompt(message))
                .map(response -> response.getResult().getOutput().getContent()),
            5 // concurrency 参数
        );
}

注意: 并发流式请求会显著增加 API 调用频率和成本,务必做好限流和配额管理。


12. 错误处理与容错

12.1 流式调用中的常见错误

错误类型原因处理方式
TimeoutException模型响应超时重试或返回降级响应
ResourceAccessException网络连接中断自动重连或提示用户
InternalServerError服务端错误重试(带退避策略)
RateLimitException超过 API 限流等待后重试

12.2 完整的错误处理示例

import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.util.retry.Retry;
import java.time.Duration;

public Flux<String> streamWithRobustErrorHandling(String message) {
    return chatModel.stream(new Prompt(message))
        .map(response -> response.getResult().getOutput().getContent())
        .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
            .maxBackoff(Duration.ofSeconds(5))
            .filter(this::isRetryableError)
            .doBeforeRetry(retrySignal -> 
                System.out.println("重试第 " + retrySignal.totalRetries() + " 次"))
        )
        .onErrorResume(WebClientResponseException.TooManyRequests.class, e -> {
            System.out.println("触发限流,返回降级响应");
            return Flux.just("[服务繁忙,请稍后再试]");
        })
        .onErrorResume(TimeoutException.class, e -> {
            System.out.println("响应超时");
            return Flux.just("[响应超时,请缩短问题或稍后重试]");
        })
        .doOnError(error -> {
            System.err.println("流式调用失败: " + error.getMessage());
        });
}

private boolean isRetryableError(Throwable error) {
    if (error instanceof WebClientResponseException ex) {
        int statusCode = ex.getStatusCode().value();
        // 5xx 服务器错误可以重试,4xx 客户端错误不重试
        return statusCode >= 500;
    }
    return true;
}

12.3 健康检查与熔断

import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;

@Service
public class ResilientStreamingService {

    @CircuitBreaker(name = "dashscopeChat", fallbackMethod = "streamFallback")
    public Flux<String> streamWithCircuitBreaker(String message) {
        return chatModel.stream(new Prompt(message))
            .map(response -> response.getResult().getOutput().getContent());
    }

    public Flux<String> streamFallback(String message, Throwable t) {
        return Flux.just(
            "[AI 服务暂时不可用,已触发熔断保护]\n" +
            "错误原因: " + t.getMessage() + "\n" +
            "请稍后再试或联系客服。"
        );
    }
}

13. 最佳实践总结

13.1 架构选择

场景推荐方案
新项目、高性能要求WebFlux + SSE
传统 MVC 项目StreamingResponseBody
需要双向通信WebSocket(但 AI 场景通常不需要)
移动端 AppHTTP Chunked / WebSocket

13.2 编码规范

  1. 始终处理取消信号:使用 doOnCancel() 处理客户端断开连接的情况
  2. 设置合理的超时:防止长时间挂起的连接占用资源
  3. 实现优雅降级:当流式服务不可用时,提供非流式或静态响应
  4. 记录关键指标:TTFT、总延迟、Token 吞吐量、错误率
  5. 控制并发:避免无限制的并发流式请求耗尽 API 配额

13.3 用户体验优化

// 发送"正在思考"的初始信号
@GetMapping(value = "/stream/ux-optimized", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamWithUX(@RequestParam String message) {
    return Flux.concat(
        // 1. 立即发送"开始"信号
        Mono.just(ServerSentEvent.<String>builder()
            .event("status")
            .data("正在思考...")
            .build()),
        // 2. 流式输出 AI 回答
        chatModel.stream(new Prompt(message))
            .map(response -> response.getResult().getOutput().getContent())
            .map(content -> ServerSentEvent.<String>builder()
                .event("message")
                .data(content)
                .build()),
        // 3. 发送"完成"信号
        Mono.just(ServerSentEvent.<String>builder()
            .event("status")
            .data("完成")
            .build())
    );
}

前端可以根据 status 事件显示/隐藏加载动画,根据 message 事件逐字渲染回答。

13.4 安全注意事项

  1. 不要将 API Key 暴露到前端:流式请求必须经过后端代理
  2. 验证用户输入:防止 Prompt 注入攻击
  3. 限制请求频率:对每个用户/IP 实施限流
  4. 审查敏感输出:在流式输出中加入内容过滤逻辑

14. 常见问题与排查指南

Q1: 流式响应在某个 chunk 后卡住了?

原因: 网络连接中断或模型生成缓慢。

排查:

.doOnNext(response -> {
    System.out.println("收到 chunk, 时间戳: " + System.currentTimeMillis());
})
.timeout(Duration.ofSeconds(10), Flux.just("[超时]"))

Q2: 前端收到 [DONE] 但内容不完整?

原因: 模型达到了 max_tokens 限制。

解决: 增加 max_tokens 配置:

spring:
  ai:
    dashscope:
      chat:
        options:
          max-tokens: 4096

Q3: WebFlux 项目报错”No ServletContext set”?

原因: 同时引入了 spring-boot-starter-webspring-boot-starter-webflux

解决: 移除 spring-boot-starter-web,仅保留 WebFlux。

Q4: 流式输出中文乱码?

原因: 字符编码设置不正确。

解决: 确保使用 UTF-8:

outputStream.write(content.getBytes(StandardCharsets.UTF_8));

Q5: 并发流式请求导致 API 限流?

原因: 超出 DashScope API 的 QPS 限制。

解决:

import reactor.core.publisher.Flux;
import java.time.Duration;

// 限流:每秒最多 10 个请求
Flux.fromIterable(messages)
    .delayElements(Duration.ofMillis(100)) // 间隔 100ms
    .flatMap(msg -> chatModel.stream(new Prompt(msg)));

15. 总结

流式响应是 AI 应用中提升用户体验的关键技术。Spring AI Alibaba 通过 Reactor Flux 提供了优雅且强大的流式 API:

  • Flux<ChatResponse>:响应式流式接口,适合 WebFlux 场景
  • StreamCallback:回调式接口,适合传统阻塞场景
  • SSE 协议:简单高效的推送协议,浏览器原生支持
  • 取消、超时、重试:完善的流式生命周期管理
  • Token 统计:流式输出的成本追踪

在实际项目中,我建议:

  1. 新项目统一使用 WebFlux + SSE
  2. 所有流式端点都加入超时和取消处理
  3. 做好 Token 统计和限流
  4. 前端实现打字机效果和加载状态

下一篇我们将进入 Embedding 向量嵌入 的世界——这是构建 RAG(检索增强生成)应用的基础。敬请期待!


参考资料: