콘텐츠로 이동

AI 스트리밍 응답 구현 패턴

LLM API를 사용하는 서비스를 만든다면 스트리밍 응답은 선택이 아닌 필수다. 사용자가 수십 초를 기다리지 않으려면 토큰이 생성되는 즉시 화면에 표시해야 한다. 이 챕터에서는 주요 API의 스트림 포맷을 분석하고, 직접 모델을 서빙할 때의 구현 패턴을 다룬다.

stream: true를 설정하면 OpenAI는 SSE 스트림으로 응답한다.

POST /v1/chat/completions HTTP/1.1
Content-Type: application/json
{"model": "gpt-4o", "stream": true, "messages": [...]}

응답 스트림:

HTTP/1.1 200 OK
Content-Type: text/event-stream
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"},"index":0}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":"안"},"index":0}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":"녕"},"index":0}]}
data: [DONE]

각 이벤트는 data: 하나에 JSON 객체다. choices[0].delta.content에 토큰이 담긴다. 스트림 종료는 data: [DONE]으로 신호한다.

Anthropic은 명시적인 이벤트 타입을 사용한다.

event: message_start
data: {"type":"message_start","message":{"id":"msg_abc","type":"message","role":"assistant","content":[]}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"안"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"녕"}}
event: message_stop
data: {"type":"message_stop"}

OpenAI와 달리 event: 필드로 이벤트 유형을 구분하고, 각 단계(메시지 시작, 블록 시작, 델타, 블록 종료, 메시지 종료)를 명확하게 분리한다.

import asyncio
import json
from threading import Thread
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
app = FastAPI()
# 서버 시작 시 모델 로드 (한 번만)
MODEL_ID = "microsoft/DialoGPT-medium"
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForCausalLM.from_pretrained(MODEL_ID)
async def generate_tokens(prompt: str):
"""HuggingFace 모델에서 토큰을 SSE 포맷으로 yield한다."""
streamer = TextIteratorStreamer(
tokenizer,
skip_prompt=True,
skip_special_tokens=True,
)
inputs = tokenizer(prompt, return_tensors="pt")
# 생성은 별도 스레드에서 실행 (blocking 작업)
generation_kwargs = {
**inputs,
"streamer": streamer,
"max_new_tokens": 256,
"do_sample": True,
"temperature": 0.7,
}
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
# streamer에서 토큰을 읽어 SSE 포맷으로 변환
for token in streamer:
payload = json.dumps({"token": token}, ensure_ascii=False)
yield f"data: {payload}\n\n"
await asyncio.sleep(0) # 이벤트 루프에 제어권 반환
yield "data: [DONE]\n\n"
@app.post("/generate")
async def generate_endpoint(body: dict):
prompt = body.get("prompt", "")
return StreamingResponse(
generate_tokens(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)

TextIteratorStreamer는 HuggingFace가 스트리밍을 위해 제공하는 클래스다. 모델 생성을 별도 스레드에서 실행하고 메인 스레드에서 토큰을 이터레이션한다. await asyncio.sleep(0)으로 FastAPI 이벤트 루프가 다른 요청도 처리할 수 있게 제어권을 돌려준다.

EventSource는 GET만 지원하므로, POST 바디로 프롬프트를 보내야 할 때는 fetch API로 스트림을 직접 읽는다.

import { useState, useRef } from "react";
interface StreamState {
text: string;
isStreaming: boolean;
}
function useLLMStream() {
const [state, setState] = useState<StreamState>({ text: "", isStreaming: false });
const abortRef = useRef<AbortController | null>(null);
const startStream = async (prompt: string) => {
// 이전 스트림이 있으면 취소
abortRef.current?.abort();
abortRef.current = new AbortController();
setState({ text: "", isStreaming: true });
try {
const response = await fetch("/generate", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt }),
signal: abortRef.current.signal,
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split("\n").filter((l) => l.startsWith("data: "));
for (const line of lines) {
const data = line.slice(6); // "data: " 제거
if (data === "[DONE]") break;
const { token } = JSON.parse(data);
setState((prev) => ({ ...prev, text: prev.text + token }));
}
}
} catch (error) {
if ((error as Error).name !== "AbortError") {
console.error("스트리밍 오류:", error);
}
} finally {
setState((prev) => ({ ...prev, isStreaming: false }));
}
};
const cancelStream = () => {
abortRef.current?.abort();
};
return { ...state, startStream, cancelStream };
}
사용자가 "중지" 클릭
AbortController.abort() 호출
fetch 요청에 연결된 스트림 즉시 중단
서버는 다음 yield 시점에 연결 끊김 감지
FastAPI generator에서 GeneratorExit 발생
모델 생성 스레드는 streamer가 닫히면서 중단 신호 수신

모델이 토큰을 생성하는 속도가 네트워크 전송 속도보다 빠른 경우, 서버 메모리에 토큰이 쌓일 수 있다.

해결 방법:
1. 토큰을 큐 대신 스트리머에서 직접 yield (메모리 효율)
2. 클라이언트 연결 끊김 즉시 생성 중단
3. 서버 측 최대 버퍼 크기 제한

FastAPI의 StreamingResponse는 클라이언트가 연결을 끊으면 generator가 자동으로 종료되므로, 별도로 연결 상태를 확인할 필요가 없다.

async def generate_tokens_safe(prompt: str):
try:
async for chunk in generate_tokens(prompt):
yield chunk
except Exception as e:
# SSE 스트림 중간에 오류를 전달하는 방법
error_payload = json.dumps({"error": str(e)}, ensure_ascii=False)
yield f"event: error\ndata: {error_payload}\n\n"

스트림 중간에 오류가 발생해도 HTTP 상태 코드는 이미 200으로 전송된 상태다. event: error 이벤트로 클라이언트에 오류를 알리고 클라이언트가 이를 구독해 처리하도록 한다.

  • OpenAI는 data: {...} + data: [DONE], Anthropic은 event: 타입으로 단계를 구분한다
  • HuggingFace TextIteratorStreamer와 별도 스레드로 동기 모델 추론을 비동기 스트림으로 변환한다
  • 브라우저에서 POST + 스트림은 fetch API로 response.body.getReader()를 사용한다
  • AbortController로 사용자가 생성을 취소하면 서버 generator도 연쇄적으로 종료된다
  • 스트림 중간 오류는 HTTP 상태 코드 대신 event: error SSE 이벤트로 전달한다