流式输出

流式输出的核心是模型边生成、后端边转发、前端边展示,减少用户等待完整答案的时间。

流式输出是什么

Streaming Output,中文一般翻译为“流式输出”。

流式输出就是服务端不等完整结果生成完,再一次性返回。

它会把结果拆成很多小片段,逐步推给前端。

普通输出:

用户提问
  -> 等模型完整生成
  -> 后端一次性返回完整答案
  -> 前端展示

流式输出:

用户提问
  -> 模型生成一段
  -> 后端返回一段
  -> 前端显示一段
  -> 直到生成完成

结论:

流式输出不一定让模型生成更快,但会让用户更早看到内容。

为什么聊天场景需要流式输出

大模型生成长文本时,完整结果可能要等几秒甚至更久。

如果一直空白等待,用户会感觉接口很慢。

流式输出可以解决几个问题:

降低首字等待时间
让用户知道系统正在生成
长回答可以边看边读
前端聊天体验更接近 ChatGPT

这里要区分两个时间:

首字时间:用户多久能看到第一个字
完整时间:用户多久能看到完整回答

流式输出主要优化的是首字时间。

SSE 是什么

SSE,全称是 Server-Sent Events,中文一般翻译为“服务器发送事件”。

SSE 是浏览器原生支持的一种服务端推送技术。

它的特点:

基于 HTTP
服务端单向推送
前端可以用 EventSource 接收
响应类型是 text/event-stream
适合聊天逐字输出、通知、日志流

SSE 是单向通信。

也就是说:

服务端 -> 前端

如果需要前端和服务端频繁双向通信,一般再考虑 WebSocket。

WebSocket,中文一般翻译为“网络套接字”,适合双向实时通信。

SSE 数据格式

SSE 响应的 Content-Type 是:

text/event-stream

最简单的数据格式:

data: 你好

data: ,这是

data: 流式输出

每条消息用空行分隔。

也可以带事件名称:

event: message
data: 你好

event: done
data: [DONE]

常见字段:

event:事件名称
data:事件数据
id:事件 ID
retry:断线重连间隔

学习阶段先掌握 data 就够。

EventSource 是什么

EventSource,中文一般翻译为“事件源”。

浏览器可以用 EventSource 接收 SSE。

示例:

<div id="answer"></div>

<script>
  const answer = document.getElementById("answer");
  const eventSource = new EventSource("/api/chat/stream?message=你好");

  eventSource.onmessage = function (event) {
    answer.textContent += event.data;
  };

  eventSource.onerror = function () {
    eventSource.close();
  };
</script>

这里的流程是:

浏览器发起 SSE 连接
后端不断发送 data
onmessage 收到片段
前端追加到页面
生成结束后关闭连接

注意:

EventSource 默认使用 GET 请求
复杂请求体不适合直接放 EventSource
需要携带大量参数时可以用 fetch 读取流

Spring Boot 流式响应

Spring Boot 里可以先用两种方式理解流式输出。

Spring MVC:SseEmitter
Spring WebFlux:Flux

Spring MVC 是传统 Servlet 模型。

Spring WebFlux 是响应式模型。

如果项目是普通 Spring Boot Web 项目,可以先从 SseEmitter 开始。

如果后面使用 Spring AI 的流式 API,常见返回类型会是 Flux<String>

Flux,中文可以理解为“多个异步数据组成的数据流”。

它来自 Reactor,是 Spring 响应式生态里的核心类型。

SseEmitter 示例

SseEmitter,中文可以理解为“SSE 事件发送器”。

它是 Spring MVC 里用来发送 SSE 的对象。

Controller 示例:

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
public class ChatStreamController {

    @GetMapping(value = "/api/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter stream(@RequestParam String message) {
        SseEmitter emitter = new SseEmitter(60_000L);

        new Thread(() -> {
            try {
                String answer = "这是一个流式输出示例,用户问题是:" + message;

                for (char c : answer.toCharArray()) {
                    emitter.send(SseEmitter.event()
                            .name("message")
                            .data(String.valueOf(c)));
                    Thread.sleep(80);
                }

                emitter.send(SseEmitter.event()
                        .name("done")
                        .data("[DONE]"));
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        }).start();

        return emitter;
    }
}

这里先用 new Thread 模拟模型逐步返回。

正式项目里不要随便为每个请求手动创建线程。

后面可以换成:

线程池
异步任务
WebFlux
模型 SDK 自带的流式响应

前端逐字显示

前端用 EventSource 接收:

<input id="message" value="Spring Boot 怎么做流式输出?" />
<button onclick="send()">发送</button>
<pre id="answer"></pre>

<script>
  let eventSource;

  function send() {
    const message = document.getElementById("message").value;
    const answer = document.getElementById("answer");

    answer.textContent = "";

    if (eventSource) {
      eventSource.close();
    }

    eventSource = new EventSource(
      "/api/chat/stream?message=" + encodeURIComponent(message)
    );

    eventSource.addEventListener("message", function (event) {
      answer.textContent += event.data;
    });

    eventSource.addEventListener("done", function () {
      eventSource.close();
    });

    eventSource.onerror = function () {
      answer.textContent += "\n[连接异常]";
      eventSource.close();
    };
  }
</script>

这里重点看三件事:

message 事件:追加内容
done 事件:关闭连接
error 事件:处理异常

用 fetch 读取流

EventSource 简单,但它默认是 GET 请求。

如果需要 POST 请求,比如请求体里有多轮消息:

{
  "messages": [
    {
      "role": "user",
      "content": "解释一下 SSE"
    }
  ]
}

可以用 fetch 读取响应流。

示例:

<pre id="answer"></pre>

<script>
  async function send() {
    const answer = document.getElementById("answer");
    answer.textContent = "";

    const response = await fetch("/api/chat/stream-post", {
      method: "POST",
      headers: {
        "Content-Type": "application/json"
      },
      body: JSON.stringify({
        message: "解释一下 SSE"
      })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder("utf-8");

    while (true) {
      const { done, value } = await reader.read();
      if (done) {
        break;
      }
      answer.textContent += decoder.decode(value, { stream: true });
    }
  }
</script>

这个方式适合:

POST 请求
复杂请求体
自定义请求头
需要携带 token

但如果后端返回的是 SSE 格式,需要自己解析 data:

学习阶段可以先用 EventSource 跑通最小链路。

Spring AI ChatClient 流式调用

Spring AI ChatClient,中文可以理解为“Spring AI 聊天客户端”。

Spring AI 的 ChatClient 同时支持同步和流式调用。

同步调用:

String content = chatClient.prompt()
        .user("讲一下 SSE")
        .call()
        .content();

流式调用:

import reactor.core.publisher.Flux;

Flux<String> output = chatClient.prompt()
        .user("讲一下 SSE")
        .stream()
        .content();

区别:

call():等完整结果返回
stream():返回 Flux<String>,可以逐段处理

image.png

image.png

WebFlux 返回 Flux

如果项目使用 WebFlux,可以直接返回 Flux<String>

示例:

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class AiStreamController {

    private final ChatClient chatClient;

    public AiStreamController(ChatClient.Builder builder) {
        this.chatClient = builder.build();
    }

    @GetMapping(value = "/api/ai/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> stream(@RequestParam String message) {
        return chatClient.prompt()
                .user(message)
                .stream()
                .content();
    }
}

如果前端用 EventSource 接收,后端响应类型需要是:

text/event-stream

MVC 项目里转发模型流

如果项目是 Spring MVC,也可以把模型流写到 SseEmitter

示例:

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.Disposable;

@RestController
public class AiSseController {

    private final ChatClient chatClient;

    public AiSseController(ChatClient.Builder builder) {
        this.chatClient = builder.build();
    }

    @GetMapping(value = "/api/ai/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter stream(@RequestParam String message) {
        SseEmitter emitter = new SseEmitter(60_000L);

        Disposable disposable = chatClient.prompt()
                .user(message)
                .stream()
                .content()
                .subscribe(
                        chunk -> send(emitter, chunk),
                        emitter::completeWithError,
                        emitter::complete
                );

        emitter.onCompletion(disposable::dispose);
        emitter.onTimeout(() -> {
            disposable.dispose();
            emitter.complete();
        });

        return emitter;
    }

    private void send(SseEmitter emitter, String chunk) {
        try {
            emitter.send(SseEmitter.event()
                    .name("message")
                    .data(chunk));
        } catch (Exception e) {
            emitter.completeWithError(e);
        }
    }
}

这里需要注意:

前端断开时要停止订阅
超时时要释放资源
模型异常时要通知前端
不要无限占用连接

错误处理

流式输出的错误处理比普通接口更麻烦。

普通接口:

成功:返回 200 + JSON
失败:返回 4xx / 5xx + 错误信息

流式接口:

响应可能已经开始
状态码可能已经发给前端
中途失败只能通过事件告诉前端

建议定义几个事件:

message:正常内容片段
error:错误信息
done:生成结束

示例:

event: message
data: 你好

event: error
data: 模型调用失败,请稍后重试

event: done
data: [DONE]

前端按事件处理:

eventSource.addEventListener("error", function () {
  eventSource.close();
});

eventSource.addEventListener("done", function () {
  eventSource.close();
});

注意:

不要把异常堆栈直接返回给前端
日志里记录 requestId 和原始异常
前端要能关闭连接
后端要能释放资源

超时和断开

流式接口要处理几类断开:

用户关闭页面
浏览器刷新
网络断开
模型服务超时
Nginx 超时
后端线程被中断

后端需要处理:

SseEmitter onCompletion
SseEmitter onTimeout
SseEmitter onError
取消模型流订阅
释放线程或连接资源

前端需要处理:

onerror 关闭连接
用户重新发送时关闭旧连接
页面卸载时关闭连接

页面卸载示例:

window.addEventListener("beforeunload", function () {
  if (eventSource) {
    eventSource.close();
  }
});

Nginx 代理注意点

如果前面有 Nginx,流式输出可能被缓冲。

现象:

后端已经逐段返回
前端还是等全部生成完才显示

需要检查:

proxy_buffering
响应头是否是 text/event-stream
网关超时配置
浏览器控制台 Network 是否逐步收到数据

常见配置方向:

location /api/ {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_buffering off;
    proxy_cache off;
    proxy_read_timeout 300s;
}

实际配置要结合项目网关和部署环境。

流式输出和 JSON 结构化输出的关系

Day04 讲的是 JSON 结构化输出。

Day05 讲的是流式输出。

两者关注点不同:

结构化输出:结果长什么样
流式输出:结果怎么传给前端

普通聊天可以流式返回文本。

分类、抽取、工具参数这类任务不一定适合流式输出。

原因是:

后端通常需要完整 JSON 才能解析
半截 JSON 没法校验
业务逻辑需要完整对象

适合流式输出的场景:

聊天回答
长文总结
代码生成
报告生成
日志输出

不太适合直接流式输出的场景:

问题分类
字段抽取
工具参数生成
订单状态判断

原则:

给用户看的长文本,可以流式
给后端解析的数据,先拿完整结果再校验

常见问题

为什么前端没有逐字显示

先查这几项:

后端是否使用 text/event-stream
前端是否用 EventSource 或读取 response.body
Nginx 是否开启了缓冲
浏览器 Network 是否逐步收到数据
后端是否真的 flush 了数据

EventSource 能不能发 POST

EventSource 默认是 GET。

如果需要 POST、JSON 请求体、自定义请求头,可以用 fetch 读取流。

也可以先创建会话,再用 EventSource 通过会话 ID 订阅结果。

SSE 和 WebSocket 怎么选

简单选择:

只需要服务端推送:SSE
需要双向实时通信:WebSocket

聊天机器人逐字输出一般用 SSE 就够。

多人协作、实时游戏、双向指令控制更适合 WebSocket。

流式输出怎么记录日志

至少记录:

requestId
用户 ID
输入问题
模型名称
开始时间
结束时间
是否成功
错误原因
最终完整输出
token 用量

片段日志不建议全部打到普通业务日志里。

可以聚合完整输出后再记录。

前端什么时候关闭连接

几种情况要关闭:

收到 done 事件
收到不可恢复错误
用户重新发送问题
用户离开页面
超过前端等待时间

连接不关闭,会占用浏览器连接数和后端资源。

练习清单

完成几件事情:

知道 Streaming Output 是流式输出
知道 SSE 是 Server-Sent Events
知道 SSE 是服务端到前端的单向推送
知道 text/event-stream 的作用
能用 Spring Boot SseEmitter 返回流式响应
能用 EventSource 在前端逐段接收
知道 Spring AI ChatClient 可以用 stream().content()
知道流式接口要处理 done、error、timeout、disconnect
知道 Nginx 缓冲会影响流式显示

可以准备三个小练习。

练习 1:模拟流式输出

Spring Boot 写一个 /api/chat/stream
每隔 80ms 返回一个字符
前端用 EventSource 逐字显示

练习 2:接 Spring AI

用 ChatClient.stream().content() 获取 Flux<String>
把 Flux<String> 返回给前端
前端逐段展示模型回答

练习 3:异常处理

后端发送 done 事件
后端发送 error 事件
前端收到 done 或 error 后关闭连接
用户重新发送时关闭旧连接

建议目录:

ai-agent-study
├── java
│   └── stream-output-demo
│       ├── ChatStreamController.java
│       └── AiSseController.java
├── frontend
│   └── stream-demo
│       └── index.html
└── docs
    └── streaming-output.md

小结

Day05 的结论:

流式输出是把模型生成过程拆成多个片段传给前端,核心链路是模型流、后端转发、前端追加显示。

最小链路:

模型生成片段
  -> Spring Boot 返回 text/event-stream
  -> 前端 EventSource 接收
  -> 页面追加显示

工程里还要补:

连接关闭
异常事件
超时处理
资源释放
Nginx 缓冲配置
日志记录

先把 SSE 跑通,再接 Spring AI 的 stream().content(),Day06 学 Spring AI 时会更容易理解。

参考资料


这个家伙很懒,啥也没有留下😋