5. 流式输出与双后端
本章目标
实现流式输出让回答逐字显示,并支持 Anthropic 和 OpenAI 两套 API 后端。
graph LR Agent[Agent] --> |useOpenAI?| Switch{后端选择} Switch -->|false| Anthropic[callAnthropicStream<br/>SDK stream 事件] Switch -->|true| OpenAI[callOpenAIStream<br/>手动 chunk 累积] Anthropic --> |stream.on text| Console[逐字输出] OpenAI --> |delta.content| Console Anthropic --> |content_block_stop| EarlyExec[流式工具执行<br/>安全工具立即启动] OpenAI --> |响应完成| Batch[并行批量执行<br/>连续安全工具 Promise.all] EarlyExec --> ToolResult[工具结果] Batch --> ToolResult style Switch fill:#7c5cfc,color:#fff style Anthropic fill:#e8e0ff style OpenAI fill:#e8e0ff style EarlyExec fill:#d4edda style Batch fill:#d4edda style ToolResult fill:#fff3cd
Claude Code 怎么做的
为什么需要流式输出?
模型生成速度大约每秒 30-80 个 token,稍长的回答需要 10-30 秒。用户面对空白等待的容忍极限约 2-3 秒。流式输出让第一个字在几百毫秒内出现,把”等待 30 秒”变成”看着内容逐渐写出来”——主观等待感接近零,并且用户能在方向错误时提前中断。
底层用的是 SSE(Server-Sent Events):服务端用一条持久 HTTP 连接持续推送 data: 行,每几个 token 就推一个 content_block_delta 事件。比 WebSocket 简单,对 LLM 应用来说单向推送已经够用。
流式处理与并行工具执行
Claude Code 的一个关键优化:StreamingToolExecutor 在模型还在生成后续内容时,已解析完成的 tool_use block 就立即开始执行。串行方式下工具执行只能等 API 完整响应后开始;流式并行下,第一个 tool_use 解析完毕时直接分发,不等第二个。
在典型的 5-30 秒 API 流窗口内,文件读取(< 100ms)几乎能全部覆盖进去——流结束时工具结果往往已全部就绪。
错误重试
不是所有错误都值得重试:429/503/529 和网络瞬断(ECONNRESET)可以重试;400/401/404 反映代码或配置问题,重试没有意义。
指数退避(而不是固定间隔)的原因:服务过载时,大量客户端固定 1 秒后同时重试会形成”重试风暴”,反而加剧过载。指数退避让间隔逐轮翻倍(1s → 2s → 4s),加上随机抖动打破多客户端同步,是标准的分布式容错做法。
我们的实现
Anthropic 后端:SDK 内置 stream
TypeScript
// agent.ts — callAnthropicStream
private async callAnthropicStream(): Promise<Anthropic.Message> {
return withRetry(async (signal) => {
const createParams: any = {
model: this.model,
max_tokens: this.thinkingMode !== "disabled" ? maxOutput : 16384,
system: this.systemPrompt,
tools: toolDefinitions,
messages: this.anthropicMessages,
};
if (this.thinkingMode === "enabled") {
createParams.thinking = { type: "enabled", budget_tokens: maxOutput - 1 };
} else if (this.thinkingMode === "adaptive") {
createParams.thinking = { type: "enabled", budget_tokens: 10000 };
}
const stream = this.anthropicClient!.messages.stream(createParams, { signal });
let firstText = true;
stream.on("text", (text) => {
if (firstText) { printAssistantText("\n"); firstText = false; }
printAssistantText(text);
});
const finalMessage = await stream.finalMessage();
// thinking blocks 不存入历史,避免浪费上下文窗口
if (this.thinkingMode !== "disabled") {
finalMessage.content = finalMessage.content.filter(
(block: any) => block.type !== "thinking"
);
}
return finalMessage;
}, this.abortController?.signal);
}Python
# agent.py — _call_anthropic_stream
async def _call_anthropic_stream(self):
async def _do():
create_params: dict[str, Any] = {
"model": self.model,
"max_tokens": _get_max_output_tokens(self.model) if self._thinking_mode != "disabled" else 16384,
"system": self._system_prompt,
"tools": self.tools,
"messages": self._anthropic_messages,
}
if self._thinking_mode in ("adaptive", "enabled"):
create_params["thinking"] = {"type": "enabled", "budget_tokens": _get_max_output_tokens(self.model) - 1}
first_text = True
async with self._anthropic_client.messages.stream(**create_params) as stream:
async for event in stream:
if hasattr(event, 'type') and event.type == "content_block_delta":
delta = event.delta
if hasattr(delta, 'text'):
if first_text:
stop_spinner()
self._emit_text("\n")
first_text = False
self._emit_text(delta.text)
final_message = await stream.get_final_message()
final_message.content = [b for b in final_message.content if b.type != "thinking"]
return final_message
return await _with_retry(_do)Anthropic SDK 封装了全部 SSE 解析细节:stream.on("text") 直接给文本增量,stream.finalMessage() 返回和非流式完全一样的 Message 对象。{ signal } 把 AbortController 传进去,Ctrl+C 可以中断网络请求。
OpenAI 兼容后端:手动 chunk 累积
OpenAI streaming 的 tool_calls 参数是分 chunk 到达的,需要手动累积重建。
TypeScript
// agent.ts — callOpenAIStream
private async callOpenAIStream(): Promise<OpenAI.ChatCompletion> {
return withRetry(async (signal) => {
const stream = await this.openaiClient!.chat.completions.create({
model: this.model,
max_tokens: 16384,
tools: toOpenAITools(),
messages: this.openaiMessages,
stream: true,
stream_options: { include_usage: true },
}, { signal });
let content = "";
let firstText = true;
const toolCalls: Map<number, { id: string; name: string; arguments: string }> = new Map();
let finishReason = "";
let usage: { prompt_tokens: number; completion_tokens: number } | undefined;
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta;
if (chunk.usage) {
usage = { prompt_tokens: chunk.usage.prompt_tokens, completion_tokens: chunk.usage.completion_tokens };
}
if (!delta) continue;
if (delta.content) {
if (firstText) { printAssistantText("\n"); firstText = false; }
printAssistantText(delta.content);
content += delta.content;
}
// tool_calls 参数分片到达,按 index 累积
if (delta.tool_calls) {
for (const tc of delta.tool_calls) {
const existing = toolCalls.get(tc.index);
if (existing) {
if (tc.function?.arguments) existing.arguments += tc.function.arguments;
} else {
toolCalls.set(tc.index, {
id: tc.id || "",
name: tc.function?.name || "",
arguments: tc.function?.arguments || "",
});
}
}
}
if (chunk.choices[0]?.finish_reason) finishReason = chunk.choices[0].finish_reason;
}
const assembledToolCalls = toolCalls.size > 0
? Array.from(toolCalls.entries())
.sort(([a], [b]) => a - b)
.map(([_, tc]) => ({
id: tc.id, type: "function" as const,
function: { name: tc.name, arguments: tc.arguments },
}))
: undefined;
return {
id: "stream", object: "chat.completion", created: Date.now(), model: this.model,
choices: [{
index: 0,
message: { role: "assistant" as const, content: content || null, tool_calls: assembledToolCalls, refusal: null },
finish_reason: finishReason || "stop", logprobs: null,
}],
usage: usage || { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 },
} as OpenAI.ChatCompletion;
}, this.abortController?.signal);
}Python
# agent.py — _call_openai_stream
async def _call_openai_stream(self) -> dict:
async def _do():
stream = await self._openai_client.chat.completions.create(
model=self.model,
max_tokens=16384,
tools=_to_openai_tools(self.tools),
messages=self._openai_messages,
stream=True,
stream_options={"include_usage": True},
)
content = ""
first_text = True
tool_calls: dict[int, dict] = {}
finish_reason = ""
usage = None
async for chunk in stream:
if chunk.usage:
usage = {"prompt_tokens": chunk.usage.prompt_tokens, "completion_tokens": chunk.usage.completion_tokens}
if not chunk.choices:
continue
delta = chunk.choices[0].delta
if delta and delta.content:
if first_text:
stop_spinner()
self._emit_text("\n")
first_text = False
self._emit_text(delta.content)
content += delta.content
if delta and delta.tool_calls:
for tc in delta.tool_calls:
existing = tool_calls.get(tc.index)
if existing:
if tc.function and tc.function.arguments:
existing["arguments"] += tc.function.arguments
else:
tool_calls[tc.index] = {
"id": tc.id or "",
"name": (tc.function.name if tc.function else "") or "",
"arguments": (tc.function.arguments if tc.function else "") or "",
}
if chunk.choices[0].finish_reason:
finish_reason = chunk.choices[0].finish_reason
assembled = [
{"id": tc["id"], "type": "function", "function": {"name": tc["name"], "arguments": tc["arguments"]}}
for _, tc in sorted(tool_calls.items())
] if tool_calls else None
return {
"choices": [{"message": {"role": "assistant", "content": content or None, "tool_calls": assembled},
"finish_reason": finish_reason or "stop"}],
"usage": usage or {"prompt_tokens": 0, "completion_tokens": 0},
}
return await _with_retry(_do)OpenAI tool_calls 的 id 和 name 只在第一个 chunk 出现,后续 chunk 只有 arguments 的增量片段。多个 tool_call 的 chunk 会交错到达,用 index 字段区分,累积结束后才能 JSON.parse()。
工具格式转换
两个 API 的工具定义几乎相同,只是字段名不一样:
TypeScript
function toOpenAITools(): OpenAI.ChatCompletionTool[] {
return toolDefinitions.map((t) => ({
type: "function" as const,
function: { name: t.name, description: t.description, parameters: t.input_schema as Record<string, unknown> },
}));
}Python
def _to_openai_tools(tools: list[ToolDef]) -> list[dict]:
return [{"type": "function", "function": {"name": t["name"], "description": t["description"], "parameters": t["input_schema"]}} for t in tools]Anthropic 用 input_schema,OpenAI 用 parameters,内容完全一样。
重试机制
TypeScript
function isRetryable(error: any): boolean {
const status = error?.status || error?.statusCode;
if ([429, 503, 529].includes(status)) return true;
if (error?.code === "ECONNRESET" || error?.code === "ETIMEDOUT") return true;
if (error?.message?.includes("overloaded")) return true;
return false;
}
async function withRetry<T>(
fn: (signal?: AbortSignal) => Promise<T>,
signal?: AbortSignal,
maxRetries = 3
): Promise<T> {
for (let attempt = 0; ; attempt++) {
try {
return await fn(signal);
} catch (error: any) {
if (signal?.aborted) throw error;
if (attempt >= maxRetries || !isRetryable(error)) throw error;
const delay = Math.min(1000 * Math.pow(2, attempt), 30000) + Math.random() * 1000;
const reason = error?.status ? `HTTP ${error.status}` : error?.code || "network error";
printRetry(attempt + 1, maxRetries, reason);
await new Promise((r) => setTimeout(r, delay));
}
}
}Python
def _is_retryable(error: Exception) -> bool:
status = getattr(error, "status_code", None) or getattr(error, "status", None)
if status in (429, 503, 529):
return True
msg = str(error)
if "overloaded" in msg or "ECONNRESET" in msg or "ETIMEDOUT" in msg:
return True
return False
async def _with_retry(fn, max_retries: int = 3):
for attempt in range(max_retries + 1):
try:
return await fn()
except Exception as error:
if attempt >= max_retries or not _is_retryable(error):
raise
delay = min(1000 * (2 ** attempt), 30000) / 1000 + (hash(str(time.time())) % 1000) / 1000
reason = str(getattr(error, "status_code", "")) or str(error)[:60]
print_retry(attempt + 1, max_retries, reason)
await asyncio.sleep(delay)延迟公式 min(1000 * 2^attempt, 30000) + random(0, 1000):指数部分控制退避速度,30 秒上限防止等待过久,随机抖动防止多个客户端同步重试形成”重试风暴”。
Extended Thinking
Extended Thinking 让模型在输出前有一个私有”草稿纸”做推理规划,对需要多步决策的 coding 任务有明显帮助。
三种模式:
- adaptive:claude-4.x 模型自动开启,budget 10000 tokens,模型自行决定是否使用
- enabled:
--thinkingflag 显式开启,budget 最大化 - disabled:不支持 thinking 的模型(Claude 3.x 及 OpenAI)
TypeScript
function resolveThinkingMode(model: string, thinkingFlag: boolean): "adaptive" | "enabled" | "disabled" {
if (!modelSupportsThinking(model)) return "disabled";
if (thinkingFlag) return "enabled";
if (modelSupportsAdaptiveThinking(model)) return "adaptive";
return "disabled";
}
// 构造请求参数
if (this.thinkingMode === "enabled") {
createParams.thinking = { type: "enabled", budget_tokens: maxOutput - 1 };
} else if (this.thinkingMode === "adaptive") {
createParams.thinking = { type: "enabled", budget_tokens: 10000 };
}
// 过滤 thinking blocks,不存入历史
finalMessage.content = finalMessage.content.filter((block: any) => block.type !== "thinking");Python
def _resolve_thinking_mode(self) -> str:
if not self.thinking or not _model_supports_thinking(self.model):
return "disabled"
if _model_supports_adaptive_thinking(self.model):
return "adaptive"
return "enabled"
# 构造请求参数
if self._thinking_mode in ("adaptive", "enabled"):
create_params["thinking"] = {"type": "enabled", "budget_tokens": max_output - 1}
# 过滤 thinking blocks,不存入历史
final_message.content = [b for b in final_message.content if b.type != "thinking"]thinking blocks 可能长达数千 token,对后续对话没有参考价值,过滤掉是避免上下文窗口被无效内容占满的直接手段。
流式工具执行
当 Anthropic 流式响应中某个 tool_use block 完整接收(content_block_stop 事件触发)时,如果该工具是并发安全的(read_file、list_files、grep_search、web_fetch),立即开始执行——不必等待整个 API 响应完成。这样可以把工具执行时间”藏”进模型生成后续内容的流式窗口中。
TypeScript
// agent.ts — 流式工具执行
// 在流式过程中跟踪提前执行的工具
const earlyExecutions = new Map<string, Promise<string>>();
const response = await this.callAnthropicStream((block) => {
const input = block.input as Record<string, any>;
if (CONCURRENCY_SAFE_TOOLS.has(block.name)) {
const perm = checkPermission(block.name, input, this.permissionMode, this.planFilePath || undefined);
if (perm.action === "allow") {
earlyExecutions.set(block.id, this.executeToolCall(block.name, input));
}
}
});
// 后续处理工具结果时:
const earlyPromise = earlyExecutions.get(toolUse.id);
if (earlyPromise) {
const raw = await earlyPromise; // 已完成或即将完成
// ... 直接使用结果
continue;
}Python
# agent.py — 流式工具执行
# 在流式过程中跟踪提前执行的工具
early_executions: dict[str, asyncio.Task] = {}
async def on_tool_block_complete(block):
if block["name"] in CONCURRENCY_SAFE_TOOLS:
perm = check_permission(block["name"], block["input"], self._permission_mode)
if perm["action"] == "allow":
task = asyncio.create_task(self._execute_tool_call(block["name"], block["input"]))
early_executions[block["id"]] = task
response = await self._call_anthropic_stream(on_tool_block_complete=on_tool_block_complete)
# 后续处理工具结果时:
early_task = early_executions.get(tool_use["id"])
if early_task:
raw = await early_task # 已完成或即将完成
# ... 直接使用结果
continuecallAnthropicStream 内部通过回调机制实现:
TypeScript
// agent.ts — callAnthropicStream 工具 block 跟踪
private async callAnthropicStream(
onToolBlockComplete?: (block: Anthropic.ToolUseBlock) => void,
): Promise<Anthropic.Message> {
// ...
const toolBlocksByIndex = new Map<number, { id: string; name: string; inputJson: string }>();
stream.on("streamEvent" as any, (event: any) => {
// 工具 block 跟踪:随着流式接收累积 input JSON
if (event.type === "content_block_start" && event.content_block?.type === "tool_use") {
toolBlocksByIndex.set(event.index, {
id: event.content_block.id,
name: event.content_block.name,
inputJson: "",
});
}
if (event.type === "content_block_delta" && event.delta?.type === "input_json_delta") {
const tracked = toolBlocksByIndex.get(event.index);
if (tracked) tracked.inputJson += event.delta.partial_json;
}
if (event.type === "content_block_stop" && onToolBlockComplete) {
const tracked = toolBlocksByIndex.get(event.index);
if (tracked) {
try {
const input = JSON.parse(tracked.inputJson);
onToolBlockComplete({ type: "tool_use", id: tracked.id, name: tracked.name, input });
} catch {}
}
}
});
// ...
}Python
# agent.py — _call_anthropic_stream 工具 block 跟踪
async def _call_anthropic_stream(self, on_tool_block_complete=None):
async def _do():
# ...
tool_blocks_by_index: dict[int, dict] = {}
async with self._anthropic_client.messages.stream(**create_params) as stream:
async for event in stream:
# 工具 block 跟踪:随着流式接收累积 input JSON
if hasattr(event, 'type'):
if event.type == "content_block_start" and getattr(event, 'content_block', None):
cb = event.content_block
if cb.type == "tool_use":
tool_blocks_by_index[event.index] = {
"id": cb.id, "name": cb.name, "input_json": ""
}
elif event.type == "content_block_delta" and hasattr(event.delta, 'partial_json'):
tracked = tool_blocks_by_index.get(event.index)
if tracked:
tracked["input_json"] += event.delta.partial_json
elif event.type == "content_block_stop" and on_tool_block_complete:
tracked = tool_blocks_by_index.get(event.index)
if tracked:
try:
inp = json.loads(tracked["input_json"])
await on_tool_block_complete({
"type": "tool_use", "id": tracked["id"],
"name": tracked["name"], "input": inp
})
except json.JSONDecodeError:
pass
final_message = await stream.get_final_message()
# ...设计要点:
content_block_stop是 block 级别事件:当单个tool_useblock 的 JSON 完整接收时触发,并非整个响应结束。模型可能在一次响应中返回多个工具调用,第一个 block 完整时第二个可能还在流式传输中- 仅并发安全工具提前执行:只有只读工具(
read_file、list_files、grep_search、web_fetch)会被提前执行,写操作和命令执行不会 - 权限检查仍然生效:只有
checkPermission返回"allow"的工具才会提前执行,需要用户确认的工具("confirm")不会被提前触发 - Promise/Task 存储,后续直接 await:
earlyExecutionsMap 存储的是 Promise(TS)或 Task(Python),后续工具处理循环检查到已有提前执行的结果时,直接 await 即可——通常此时已经完成 - 核心收益:5-30 秒的流式窗口期内,工具执行与模型生成并行进行,文件读取等快速操作在流结束时往往已经就绪
并行工具执行
并行执行的前提是标记哪些工具是并发安全的——只读工具不会产生副作用,可以安全地同时运行:
TypeScript
// tools.ts
export const CONCURRENCY_SAFE_TOOLS = new Set([
"read_file", "list_files", "grep_search", "web_fetch"
]);Python
# tools.py
CONCURRENCY_SAFE_TOOLS = {"read_file", "list_files", "grep_search", "web_fetch"}对于 Anthropic 后端,流式工具执行天然处理了并行——每个工具 block 完整时就启动执行,多个工具自然重叠运行。
对于 OpenAI 后端(不支持流式工具 block 事件),采用显式批量并行:将连续的安全工具分组,用 Promise.all / asyncio.gather 一次性执行:
TypeScript
// agent.ts — OpenAI 并行执行
// 将连续的并发安全工具分组为批次
type OAIBatch = { concurrent: boolean; items: OAIChecked[] };
const oaiBatches: OAIBatch[] = [];
for (const ct of oaiChecked) {
const safe = ct.allowed && CONCURRENCY_SAFE_TOOLS.has(ct.fnName);
if (safe && oaiBatches.length > 0 && oaiBatches[oaiBatches.length - 1].concurrent) {
oaiBatches[oaiBatches.length - 1].items.push(ct);
} else {
oaiBatches.push({ concurrent: safe, items: [ct] });
}
}
// 执行:并发批次使用 Promise.all
for (const batch of oaiBatches) {
if (batch.concurrent) {
const results = await Promise.all(
batch.items.map(async (ct) => {
const raw = await this.executeToolCall(ct.fnName, ct.input);
return { ct, res: this.persistLargeResult(ct.fnName, raw) };
})
);
// ... 推入结果
} else {
// 非安全工具顺序执行
}
}Python
# agent.py — OpenAI 并行执行
# 将连续的并发安全工具分组为批次
oai_batches: list[dict] = []
for ct in oai_checked:
safe = ct["allowed"] and ct["fn_name"] in CONCURRENCY_SAFE_TOOLS
if safe and oai_batches and oai_batches[-1]["concurrent"]:
oai_batches[-1]["items"].append(ct)
else:
oai_batches.append({"concurrent": safe, "items": [ct]})
# 执行:并发批次使用 asyncio.gather
for batch in oai_batches:
if batch["concurrent"]:
async def _exec(ct):
raw = await self._execute_tool_call(ct["fn_name"], ct["input"])
return {"ct": ct, "res": self._persist_large_result(ct["fn_name"], raw)}
results = await asyncio.gather(*[_exec(ct) for ct in batch["items"]])
# ... 推入结果
else:
# 非安全工具顺序执行两种后端的并行策略对比:
- Anthropic 后端:流式执行自动处理并行——工具 block 完整时立即启动,多个工具的执行时间自然重叠
- OpenAI 后端:响应完成后显式分批——将连续的安全工具归入同一批次,用
Promise.all并行执行 - 混合序列保持安全:
[read, read, write, read]会被分为[read||read]、[write]、[read]三个批次,写操作前后的工具各自独立,不会跨越写操作并行 - 典型加速效果:当模型在一次响应中读取 3-5 个文件时,并行执行通常带来 2-3 倍的速度提升
简化对比
| 维度 | Claude Code | mini-claude |
|---|---|---|
| 后端支持 | 仅 Anthropic | Anthropic + OpenAI 兼容 |
| 重试策略 | 类似指数退避 | 指数退避 + 随机抖动 |
| Thinking 处理 | 深度集成,独立展示与折叠 | 基础支持,过滤 thinking blocks |
| 流式工具执行 | StreamingToolExecutor 独立模块,全量事件处理 | 回调 + earlyExecutions Map,精简实现 |
| 并行工具执行 | 完整的并发调度器 | Anthropic 流式提前执行 + OpenAI 批量 Promise.all |
下一章:Agent 能操作文件和执行命令了,但我们需要防止它做危险的事——权限系统保护你的系统。