AI 스트리밍 응답 구현 패턴
LLM API를 사용하는 서비스를 만든다면 스트리밍 응답은 선택이 아닌 필수다. 사용자가 수십 초를 기다리지 않으려면 토큰이 생성되는 즉시 화면에 표시해야 한다. 이 챕터에서는 주요 API의 스트림 포맷을 분석하고, 직접 모델을 서빙할 때의 구현 패턴을 다룬다.
OpenAI 스트리밍 포맷 분석
섹션 제목: “OpenAI 스트리밍 포맷 분석”stream: true를 설정하면 OpenAI는 SSE 스트림으로 응답한다.
POST /v1/chat/completions HTTP/1.1Content-Type: application/json
{"model": "gpt-4o", "stream": true, "messages": [...]}응답 스트림:
HTTP/1.1 200 OKContent-Type: text/event-stream
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"},"index":0}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":"안"},"index":0}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"delta":{"content":"녕"},"index":0}]}
data: [DONE]각 이벤트는 data: 하나에 JSON 객체다. choices[0].delta.content에 토큰이 담긴다. 스트림 종료는 data: [DONE]으로 신호한다.
Anthropic SSE 포맷 분석
섹션 제목: “Anthropic SSE 포맷 분석”Anthropic은 명시적인 이벤트 타입을 사용한다.
event: message_startdata: {"type":"message_start","message":{"id":"msg_abc","type":"message","role":"assistant","content":[]}}
event: content_block_startdata: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_deltadata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"안"}}
event: content_block_deltadata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"녕"}}
event: message_stopdata: {"type":"message_stop"}OpenAI와 달리 event: 필드로 이벤트 유형을 구분하고, 각 단계(메시지 시작, 블록 시작, 델타, 블록 종료, 메시지 종료)를 명확하게 분리한다.
FastAPI + HuggingFace 스트리밍 구현
섹션 제목: “FastAPI + HuggingFace 스트리밍 구현”import asyncioimport jsonfrom threading import Threadfrom fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
app = FastAPI()
# 서버 시작 시 모델 로드 (한 번만)MODEL_ID = "microsoft/DialoGPT-medium"tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)model = AutoModelForCausalLM.from_pretrained(MODEL_ID)
async def generate_tokens(prompt: str): """HuggingFace 모델에서 토큰을 SSE 포맷으로 yield한다.""" streamer = TextIteratorStreamer( tokenizer, skip_prompt=True, skip_special_tokens=True, )
inputs = tokenizer(prompt, return_tensors="pt")
# 생성은 별도 스레드에서 실행 (blocking 작업) generation_kwargs = { **inputs, "streamer": streamer, "max_new_tokens": 256, "do_sample": True, "temperature": 0.7, } thread = Thread(target=model.generate, kwargs=generation_kwargs) thread.start()
# streamer에서 토큰을 읽어 SSE 포맷으로 변환 for token in streamer: payload = json.dumps({"token": token}, ensure_ascii=False) yield f"data: {payload}\n\n" await asyncio.sleep(0) # 이벤트 루프에 제어권 반환
yield "data: [DONE]\n\n"
@app.post("/generate")async def generate_endpoint(body: dict): prompt = body.get("prompt", "") return StreamingResponse( generate_tokens(prompt), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, )TextIteratorStreamer는 HuggingFace가 스트리밍을 위해 제공하는 클래스다. 모델 생성을 별도 스레드에서 실행하고 메인 스레드에서 토큰을 이터레이션한다. await asyncio.sleep(0)으로 FastAPI 이벤트 루프가 다른 요청도 처리할 수 있게 제어권을 돌려준다.
React 클라이언트 구현
섹션 제목: “React 클라이언트 구현”EventSource는 GET만 지원하므로, POST 바디로 프롬프트를 보내야 할 때는 fetch API로 스트림을 직접 읽는다.
import { useState, useRef } from "react";
interface StreamState { text: string; isStreaming: boolean;}
function useLLMStream() { const [state, setState] = useState<StreamState>({ text: "", isStreaming: false }); const abortRef = useRef<AbortController | null>(null);
const startStream = async (prompt: string) => { // 이전 스트림이 있으면 취소 abortRef.current?.abort(); abortRef.current = new AbortController();
setState({ text: "", isStreaming: true });
try { const response = await fetch("/generate", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ prompt }), signal: abortRef.current.signal, });
const reader = response.body!.getReader(); const decoder = new TextDecoder();
while (true) { const { done, value } = await reader.read(); if (done) break;
const chunk = decoder.decode(value, { stream: true }); const lines = chunk.split("\n").filter((l) => l.startsWith("data: "));
for (const line of lines) { const data = line.slice(6); // "data: " 제거 if (data === "[DONE]") break;
const { token } = JSON.parse(data); setState((prev) => ({ ...prev, text: prev.text + token })); } } } catch (error) { if ((error as Error).name !== "AbortError") { console.error("스트리밍 오류:", error); } } finally { setState((prev) => ({ ...prev, isStreaming: false })); } };
const cancelStream = () => { abortRef.current?.abort(); };
return { ...state, startStream, cancelStream };}취소와 Backpressure
섹션 제목: “취소와 Backpressure”AbortController로 취소 처리
섹션 제목: “AbortController로 취소 처리”사용자가 "중지" 클릭 ↓AbortController.abort() 호출 ↓fetch 요청에 연결된 스트림 즉시 중단 ↓서버는 다음 yield 시점에 연결 끊김 감지 ↓FastAPI generator에서 GeneratorExit 발생 ↓모델 생성 스레드는 streamer가 닫히면서 중단 신호 수신Backpressure 문제
섹션 제목: “Backpressure 문제”모델이 토큰을 생성하는 속도가 네트워크 전송 속도보다 빠른 경우, 서버 메모리에 토큰이 쌓일 수 있다.
해결 방법:1. 토큰을 큐 대신 스트리머에서 직접 yield (메모리 효율)2. 클라이언트 연결 끊김 즉시 생성 중단3. 서버 측 최대 버퍼 크기 제한FastAPI의 StreamingResponse는 클라이언트가 연결을 끊으면 generator가 자동으로 종료되므로, 별도로 연결 상태를 확인할 필요가 없다.
토큰 수준 오류 처리
섹션 제목: “토큰 수준 오류 처리”async def generate_tokens_safe(prompt: str): try: async for chunk in generate_tokens(prompt): yield chunk except Exception as e: # SSE 스트림 중간에 오류를 전달하는 방법 error_payload = json.dumps({"error": str(e)}, ensure_ascii=False) yield f"event: error\ndata: {error_payload}\n\n"스트림 중간에 오류가 발생해도 HTTP 상태 코드는 이미 200으로 전송된 상태다. event: error 이벤트로 클라이언트에 오류를 알리고 클라이언트가 이를 구독해 처리하도록 한다.
핵심 정리
섹션 제목: “핵심 정리”- OpenAI는
data: {...}+data: [DONE], Anthropic은event:타입으로 단계를 구분한다 - HuggingFace
TextIteratorStreamer와 별도 스레드로 동기 모델 추론을 비동기 스트림으로 변환한다 - 브라우저에서 POST + 스트림은
fetchAPI로response.body.getReader()를 사용한다 AbortController로 사용자가 생성을 취소하면 서버 generator도 연쇄적으로 종료된다- 스트림 중간 오류는 HTTP 상태 코드 대신
event: errorSSE 이벤트로 전달한다