流式响应处理

SSE(Server-Sent Events)协议解析、错误恢复、超时控制 —— 生产级实现。

协议格式

Swarmix 的流式响应完全符合 OpenAI 协议 —— 每个 chunk 是一行 data: {JSON}, 以空行分隔:

text
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"},"index":0}]}

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"content":"你"},"index":0}]}

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"content":"好"},"index":0}]}

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{},"finish_reason":"stop","index":0}]}

data: [DONE]

Python(OpenAI SDK)

python
from openai import OpenAI

client = OpenAI(api_key="sk-swx-xxx", base_url="http://router.swarmixtoken.com/v1")

stream = client.chat.completions.create(
    model="aliyun/qwen-max",
    messages=[{"role": "user", "content": "写首秋天的诗"}],
    stream=True,
    stream_options={"include_usage": True},   # 最后一个 chunk 会包含 token 计数
)

full = []
for chunk in stream:
    # 最后一帧 usage chunk 的 choices 为空
    if not chunk.choices:
        print(f"\n\n[usage: {chunk.usage}]")
        break
    delta = chunk.choices[0].delta.content or ""
    full.append(delta)
    print(delta, end="", flush=True)

print(f"\n\n完整输出: {len(''.join(full))} chars")

Node.js(OpenAI SDK)

typescript
import OpenAI from "openai";

const client = new OpenAI({
  apiKey: "sk-swx-xxx",
  baseURL: "http://router.swarmixtoken.com/v1",
});

const stream = await client.chat.completions.create({
  model: "aliyun/deepseek-r1",
  messages: [{ role: "user", content: "hi" }],
  stream: true,
  stream_options: { include_usage: true },
});

for await (const chunk of stream) {
  if (chunk.choices.length === 0 && chunk.usage) {
    console.log("\nusage:", chunk.usage);
    break;
  }
  process.stdout.write(chunk.choices[0]?.delta?.content || "");
}

裸 HTTP(自己解析 SSE)

python
import httpx
import json

with httpx.stream(
    "POST", "http://router.swarmixtoken.com/v1/chat/completions",
    headers={"Authorization": "Bearer sk-swx-xxx"},
    json={"model": "aliyun/qwen-max", "messages": [{"role":"user","content":"hi"}], "stream": True},
    timeout=60,
) as r:
    for line in r.iter_lines():
        if not line or not line.startswith("data: "):
            continue
        payload = line[6:]
        if payload == "[DONE]":
            break
        chunk = json.loads(payload)
        content = chunk["choices"][0]["delta"].get("content", "")
        print(content, end="", flush=True)

浏览器 / fetch API

不要直接从浏览器调 Swarmix
sk-swx-* 暴露到浏览器 = 任何访问你网站的人都能拿走你的 Key。 正确做法:后端代理转发,前端只和你自己的服务器通信。

如果你确实要从浏览器直连(比如内网工具),用 fetch + ReadableStream

typescript
const resp = await fetch("http://router.swarmixtoken.com/v1/chat/completions", {
  method: "POST",
  headers: {
    "Authorization": "Bearer sk-swx-xxx",
    "Content-Type": "application/json",
  },
  body: JSON.stringify({
    model: "aliyun/qwen-max",
    messages: [{ role: "user", content: "hi" }],
    stream: true,
  }),
});

const reader = resp.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });

  // 按行切(SSE 帧是 \n\n 分隔)
  const lines = buffer.split("\n\n");
  buffer = lines.pop() || "";   // 留最后不完整的一行下次处理
  for (const line of lines) {
    if (!line.startsWith("data: ")) continue;
    const payload = line.slice(6);
    if (payload === "[DONE]") return;
    const chunk = JSON.parse(payload);
    console.log(chunk.choices[0]?.delta?.content || "");
  }
}

超时设置

两种超时需要区分:

  • 连接超时(能否建立 TCP)—— 建议 5-10s
  • 读超时(两个 chunk 之间的间隔)—— 建议 30-60s,长推理模型可能 120s
  • 整体超时(从开始到结束)—— 不要设,或设很大(5-10 分钟)。某些复杂 prompt 确实要生成几分钟

流式中断怎么办

如果客户端主动断连(用户关窗口),Swarmix 会:

  • 取消向上游的请求(节省 token)
  • 按已产生的 token 数结算(不会按 max_tokens 扣)
  • 日志里标记 status_code=499(client closed)