流式响应处理
SSE(Server-Sent Events)协议解析、错误恢复、超时控制 —— 生产级实现。
协议格式
Swarmix 的流式响应完全符合 OpenAI 协议 —— 每个 chunk 是一行 data: {JSON}, 以空行分隔:
text
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"},"index":0}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"content":"你"},"index":0}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"content":"好"},"index":0}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{},"finish_reason":"stop","index":0}]}
data: [DONE]Python(OpenAI SDK)
python
from openai import OpenAI
client = OpenAI(api_key="sk-swx-xxx", base_url="http://router.swarmixtoken.com/v1")
stream = client.chat.completions.create(
model="aliyun/qwen-max",
messages=[{"role": "user", "content": "写首秋天的诗"}],
stream=True,
stream_options={"include_usage": True}, # 最后一个 chunk 会包含 token 计数
)
full = []
for chunk in stream:
# 最后一帧 usage chunk 的 choices 为空
if not chunk.choices:
print(f"\n\n[usage: {chunk.usage}]")
break
delta = chunk.choices[0].delta.content or ""
full.append(delta)
print(delta, end="", flush=True)
print(f"\n\n完整输出: {len(''.join(full))} chars")Node.js(OpenAI SDK)
typescript
import OpenAI from "openai";
const client = new OpenAI({
apiKey: "sk-swx-xxx",
baseURL: "http://router.swarmixtoken.com/v1",
});
const stream = await client.chat.completions.create({
model: "aliyun/deepseek-r1",
messages: [{ role: "user", content: "hi" }],
stream: true,
stream_options: { include_usage: true },
});
for await (const chunk of stream) {
if (chunk.choices.length === 0 && chunk.usage) {
console.log("\nusage:", chunk.usage);
break;
}
process.stdout.write(chunk.choices[0]?.delta?.content || "");
}裸 HTTP(自己解析 SSE)
python
import httpx
import json
with httpx.stream(
"POST", "http://router.swarmixtoken.com/v1/chat/completions",
headers={"Authorization": "Bearer sk-swx-xxx"},
json={"model": "aliyun/qwen-max", "messages": [{"role":"user","content":"hi"}], "stream": True},
timeout=60,
) as r:
for line in r.iter_lines():
if not line or not line.startswith("data: "):
continue
payload = line[6:]
if payload == "[DONE]":
break
chunk = json.loads(payload)
content = chunk["choices"][0]["delta"].get("content", "")
print(content, end="", flush=True)浏览器 / fetch API
不要直接从浏览器调 Swarmix
把
sk-swx-* 暴露到浏览器 = 任何访问你网站的人都能拿走你的 Key。 正确做法:后端代理转发,前端只和你自己的服务器通信。如果你确实要从浏览器直连(比如内网工具),用 fetch + ReadableStream:
typescript
const resp = await fetch("http://router.swarmixtoken.com/v1/chat/completions", {
method: "POST",
headers: {
"Authorization": "Bearer sk-swx-xxx",
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "aliyun/qwen-max",
messages: [{ role: "user", content: "hi" }],
stream: true,
}),
});
const reader = resp.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按行切(SSE 帧是 \n\n 分隔)
const lines = buffer.split("\n\n");
buffer = lines.pop() || ""; // 留最后不完整的一行下次处理
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const payload = line.slice(6);
if (payload === "[DONE]") return;
const chunk = JSON.parse(payload);
console.log(chunk.choices[0]?.delta?.content || "");
}
}超时设置
两种超时需要区分:
- 连接超时(能否建立 TCP)—— 建议 5-10s
- 读超时(两个 chunk 之间的间隔)—— 建议 30-60s,长推理模型可能 120s
- 整体超时(从开始到结束)—— 不要设,或设很大(5-10 分钟)。某些复杂 prompt 确实要生成几分钟
流式中断怎么办
如果客户端主动断连(用户关窗口),Swarmix 会:
- 取消向上游的请求(节省 token)
- 按已产生的 token 数结算(不会按 max_tokens 扣)
- 日志里标记
status_code=499(client closed)