SpringAI智能体开发-流式回复与动态中断机制(内含效果演示与源码)

关注公众号,回复【1032】领取资料

代码分析

1、SSE

@GetMapping(value = "/chatTemplate4", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  • 一种轻量级的服务器向客户端推送信息协议
  • 基于 HTTP 长连接,适合单向通信(服务端 → 客户端)
  • 使用 text/event-stream MIME 类型
  • 对比 WebSocket:更简单、无需复杂握手,但仅支持单向通信

2、响应式编程

Flux.fromIterable(...)
    .map(String::trim)
    .filter(line -> !line.isEmpty())
    .concatMap(...)
  • Flux 表示一个异步字符串序列
  • 支持链式操作(map/filter/concatMap/delayElements)
  • 可以控制数据流节奏和行为(如中断、超时

3、动态中断机制


.takeWhile(data -> isStreaming.get())

private final AtomicBoolean isStreaming = new AtomicBoolean(true);

@PostMapping("/cancel")
public void cancelStream() {
    isStreaming.set(false);
}

4、文本逐字输出动画

  • 使用 split("") 将每一行拆分为字符数组
  • 使用 .delayElements(Duration.ofMillis(50)) 控制输出节奏
  • 实现类似“打字机”效果,增强用户体验

5、前端 EventSource 接收流式数据

const eventSource = new EventSource("/api/ai_travel/chatTemplate");
eventSource.onmessage = function(event) { ... }

完整代码

private final AtomicBoolean isStreaming = new AtomicBoolean(true);
@GetMapping(value = "/chatTemplate", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chatTemplate() {
    log.info("收到消息");
    isStreaming.set(true); // 每次新连接重置状态
    return Flux.fromIterable(Arrays.asList(RES_TEMPLATE.split("\n")))
            .map(String::trim)
            .filter(line -> !line.isEmpty())
            .index() // 索引区分首行
            .concatMap(tuple -> {
                long idx = tuple.getT1();
                String line = tuple.getT2();

                // 首行不加换行符,后续行开头加换行
                Flux<String> startFlux = (idx == 0)
                        ? Flux.empty()
                        : Flux.just("\n");

                // 逐字输出
                Flux<String> charFlux = Flux.fromIterable(Arrays.asList(line.split("")))
                        .delayElements(Duration.ofMillis(50));

                return startFlux.concatWith(charFlux);
            })
            .takeWhile(data -> isStreaming.get())
            .concatWithValues("\u0003"); // 结束标识符
}

@PostMapping("/cancel")
public void cancelStream() {
    isStreaming.set(false);
}
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no">
    <title>逐行逐字输出</title>
</head>
<body>
<button onclick="send()">发送消息</button>
<button onclick="cancel()">中断回复</button>
<h2>AI 回复:</h2>
<div id="output" style="white-space: pre-wrap;"></div>

<script>
    const output = document.getElementById('output');
    let eventSource = null;
    function cancel({
        fetch('/api/ai_travel/cancel', { method'POST' });
        if(eventSource){
            eventSource.close();
        }
    }
    function send({
        let currentLineElement = null;
        const eventSource = new EventSource("/api/ai_travel/chatTemplate");
        eventSource.onmessage = function(event{
            const char = event.data;

            // 检测结束标识符 (ETX)
            if (char === '\u0003') {
                eventSource.close(); // 主动关闭连接
                return;
            }

            if (char === '\n') {
                currentLineElement = document.createElement("p");
                output.appendChild(currentLineElement);
            } else {
                if (!currentLineElement) {
                    currentLineElement = document.createElement("p");
                    output.appendChild(currentLineElement);
                }
                currentLineElement.textContent += char;
            }
        };

        eventSource.onerror = function({
            eventSource.close(); // 确保出错时关闭连接
        };
    }
</script>
</body>
</html>