Python) asyncio 기초 공부 및 예시로 알아보기
coroutine 기초 개념
coroutine : 나중에 다시 실행할 수 있는 함수 (일시 정지가 가능한 함수)
정의
async def 로 정의된 함수가 ‘코루틴 객체’를 만든다.
중요한 특징 3개
- 실행하면 바로 실행되지 않고 “코루틴 객체”만 반환됨
- 내부에서 await을 만나면 즉시 멈춤(suspend)
- 나중에 event loop가 재개(resume)시킴
즉, 코루틴은:
"스스로 멈췄다가 나중에 다시 실행될 수 있는 함수"
async def work():
print("시작")
await asyncio.sleep(1)
print("다시 시작")
왜 필요해?
동기 함수처럼 “한 번 시작하면 끝까지 달림”이면,
다른 작업들이 모두 막히기 때문.
코루틴은:
“난 여기까지 했고, 지금은 I/O 기다려야 하니까
event loop야, 니가 다른 작업 먼저 해.”
이런 구조를 가능하게 해주는 함수.
asyncio 기초 개념
개념
await 개념
Await은:
- “여기서 잠깐 멈출게”
- “event loop야, 제어권 가져가”
- “내 연산이 끝나면 다시 깨워줘”
이걸 말하는 핵심 키워드라서
코루틴·event loop를 이해하는 데 절대 빠지면 안 돼.
Task(태스크)
코루틴은 그냥 “정지 가능한 함수”일 뿐이고,
이걸 event loop가 실행할 수 있게 만든 것이 Task임.
asyncio.create_task(coro)
→ “코루틴을 event loop에게 실행 스케줄링 요청”
Task는 event loop가 관리하는 실행 단위.
asyncio.wait_for 개념
wait_for는:
“이 코루틴을 기다릴게.
하지만 정해진 시간 안에 끝나지 않으면 timeout 발생시켜라.”
중요한 건 timeout 발생 시 내부 코루틴에 자동으로 cancel()을 보낸다.
cancel / cancel 전파
cancel()은:
“너 지금 실행 중이던 작업 멈춰라”
이걸 코루틴 내부로 “예외”로 전달함.
async def work():
try:
await something
except CancelledError:
cleanup()
raise
asyncio.shield
shield는 cancel 전파를 막는다.
“밖에서 cancel이 와도
이 task는 죽이지 마라.”
SSE 스트리밍에서는 필수임.
왜냐면:
- wait_for timeout나면 cancel 자동 전파
- LLM 스트리밍 task 죽어서 다음 토큰 안 나옴
- SSE 끊김
- 클라이언트에서 오류 발생
shield는 이걸 막아준다.
그리고 asyncio는 이런 coroutine 들을 스케줄링해서 번갈아 실행하는 시스템이다.

“일반 함수는 한 번 들어가면 끝까지 달리고 나오지만, 코루틴은 실행 → 멈춤 → 재개 → 멈춤을 반복할 수 있습니다.”
왼쪽은 한번 호출하면 멈추지 않고 끝까지 실행된다 (일시 정지 불가능, CPU를 다른 코드에게 넘길 수 없다)
코루틴은 호출 → 실행 → 멈춤(suspend) → 재개(resume)를 반복할 수 있다.
1) call
Caller가 코루틴(coroutine)을 처음 실행한다
→ 코루틴 내부 코드가 실행되다가 await 또는 yield에서 suspend 된다
2) suspend
코루틴은 “아직 끝나지 않았지만, 잠깐 멈춘 상태”가 된다
다른 작업이 CPU를 사용할 수 있다.
3) resume
Caller(또는 이벤트 루프)가 다시 코루틴을 깨운다(resume)
→ suspend 지점부터 다시 실행됨
4) suspend (또 멈춤)
또 다른 await 지점에서 일시정지
5) destroy / return
코루틴이 끝까지 실행되면 종료된다
Caller에게 제어가 돌아온다
이 다음에는 asyncio를 알아보자
위에 있는 그림처럼 실행되고 멈추고 다시 실행하고 또 멈추고 반복하는 것을 관리하는 것을 asyncio(Event Loop) 이다
“여러 코루틴이 멈췄다 재개되며 동시에 돌고 있는 것처럼 보이도록
스케줄링을 담당하는 시스템"

위애 있는 그림 설명
1) Event Loop는 asyncio의 두뇌
“코루틴을 실행하고, 멈추고, 재개(resume)시키는 스케줄러”
2) REQUESTS ↔ EVENT LOOP
왼쪽에 REQUESTS가 있고, 그 요청이 EVENT LOOP로 들어가는 흐름이 보임
이건 실제로 FastAPI나 aiohttp 등에서:
- 클라이언트 요청을 받으면
- async 함수로 처리하고
- 내부에서 await 시
- 이벤트 루프가 다른 작업을 실행할 수 있게 해주는 구조
즉:
“여러 요청을 동시에 처리할 수 있는 이유는 EVENT LOOP가 주도권을 갖고 있기 때문”
3) EVENT LOOP → Intensive Operation
오른쪽 박스에는 Intensive Operation (filesystem, DB, computation)
이건 이벤트 루프가 코루틴을 실행하다가:
- 파일 I/O (await aiofiles.open)
- DB I/O (await conn.execute)
- 네트워크 I/O (await http request)
- LLM 요청 (await astream)
이런 I/O 작업을 만나면 suspend하고
이 작업이 완성되면 다시 resume하는 구조를 뜻해.
4) Trigger Callback, Operation Complete
이 dotted line(점선)은 “작업 완료 후 다시 event loop로 돌아오는 흐름”.
즉 함수 흐름은 이렇게 된다:
- 요청 도착
- 이벤트 루프가 코루틴을 실행
- await에서 일시정지 (suspend)
- I/O가 끝나면 signal을 event loop에 전달
- 이벤트 루프가 다시 코루틴을 재개(resume)
이게 바로 ‘논리적 동시성’이고, 이것 때문에 Python이 단일 스레드임에도 불구하고 수천 개의 요청을 처리할 수 있는 것.
간단한 예시
라멘집 직원 한 명이 아래를 반복한다고 예시
- 면 넣고 끓이기 시작 → 물 끓는 동안 잠깐 멈춤
- 그 상태에서 다른 손님 주문 받음
- 다른 라멘 국물 준비 → 끓는 동안 멈춤
- 다시 처음 라멘 확인
- 또 다른 라멘으로 이동
- 계속 반복
직원은 한 명인데 손님 입장에서는 여러 라멘이 동시에 조리되는 것처럼 보임. 이게 asyncio임
하지만 우리가 코딩을 하면 모든 코드를 비동기로 하기 어려울 때가 있을 수 있다. 그러면 중간에 동기 함수가 들어있다면 어떻게 될까?
async def work():
await something_async() # OK. suspend/resume 잘 됨
sync_func() # ❌ 여기서 문제 발생
실제로 요청이 엄청 쏟아 질 때 위처럼 되어있다면 어떻게 될까?
요런 구조가 되면:
1. await something_async()
→ 일시정지되고
→ 다른 코루틴들도 잘 실행됨
2. sync_func()
→ 이 코드가 끝날 때까지 event loop가 멈춤
→ 다른 요청들(다른 코루틴) 전부 대기
→ “비동기 서버가 갑자기 동기처럼 행동”하게 됨
즉 asyncio 운영체제가 멈춘다 → 이벤트 루프가 block된다
import time
async def handle_request():
await asyncio.sleep(0.1)
time.sleep(2) # ❌ 동기 함수
return "OK"
무슨 일이 벌어질까?
- await asyncio.sleep(0.1) → 잘 suspend됨
- 그 다음 time.sleep(2) → 2초 동안 event loop 완전 정지
- 다른 요청이 1000개가 와도 동시에 처리 불가
- SSE 스트림도 모두 멈춤
- 클라이언트 다 타임아웃
즉 async 서버라도 동기 호출이 있으면 병목에 걸린다.
비동기 함수 안에 동기 함수가 있으면 위험한 이유는?
✔ 동기 함수는 기다리는 동안 event loop 전체를 막기 때문이다.
✔ 단일 스레드 비동기 서버에서 이것은 치명적 bottleneck이다.
문제가 되는 간단한 유형들
- time.sleep
- CPU-heavy 연산 (딥러닝 inference 직접 돌리기)
- 대용량 JSON 파싱
- 파일 읽기/쓰기 (동기 방식)
- requests 라이브러리 (동기 http)
- psycopg2 DB 호출 (동기)
- pandas 대량 데이터 처리
얼마나 짧은 동기 함수여야 가능할까?
AI가 말해주길
동기 작업의 실행 시간이 이벤트 루프 전체에서 ‘무시될 정도로 짧으면’ 문제 없다.
(예: 0.0001초 ~ 0.001초 수준)
asyncio의 event loop는:
- 매우 짧고 빠른 코드들은
- block이 거의 없는 것처럼 처리할 수 있기 때문
문제의 본질은 이거야:
❗ async 안에서 sync 코드가 0.2초 이상 걸리기 시작하면
→ event loop가 0.2초 동안 멈춘다
→ 그 0.2초 동안 다른 모든 요청이 대기한다
→ SSE 스트림도 멈춘다
→ 클라이언트 timeout
→ 서버 응답 느려짐 발생
예를 들어 요청100개에 프로세스가 4개인데 동기 함수가 있어서 대기가 걸리면?
uvicorn app:app --workers 4
각 프로세스는:
- CPU 1개를 사용하고
- 각각 자신만의 asyncio 이벤트 루프를 가진다
즉: 👉 실제로는 서버가 4개 있는 셈이다
그러면 로드밸런싱이 되서 우선 25개씩 각각 할당이 될 것이다
그리고 나서 각 프로세스에서 동시에 실행되는 것처럼 보이는 것은 EventLoop에서 교대로 빠르게 막 실행을 하는거다
근데 25개 중에 어떤 작업은 동기 작업이 긴 작업이 있고 어떤 것은 그대로 간다면
25개 중 ‘느린 동기 작업’이 1개라도 있으면, 그 동기 작업이 실행되는 순간, Worker 1의 25개 요청 전체가 멈춘다.
동기 작업은 이벤트 루프를 잠가버린다.
비유하면:
- Worker 1은 “한 명의 직원과 25개의 손님”
- 손님 중 한 명에게
“직원님, 여기서 마늘 다지기 1분만 해주세요(time.sleep)”
라고 하는 순간
→ 직원(이벤트 루프) 전체가 1분 동안 그 일만 함 → 나머지 24명 손님은 모두 대기 상태로 멈춤 → 비동기 구조에서의 교대 실행 기법이 사라짐
예를 들어 더 복잡하게 엔드포인트 안에 특정 엔드포인트가 문제일 때 다른 것에 영향을 주는 지 예시를 든다면
- 요청은 25개 들어오는데
- 그 25개가 4가지 다른 종류의 API 엔드포인트로 나뉘어 있음
(예: 메뉴 A, 메뉴 B, 메뉴 C, 메뉴 D) - 그런데 그중 특정 메뉴(예: 메뉴 C) 안에서
time.sleep(10) 같은 동기 블로킹이 있음
요청1 → 메뉴 A
요청2 → 메뉴 D
요청3 → 메뉴 C ← 여기에 time.sleep(10)
요청4 → 메뉴 B
요청5 → 메뉴 A
...
이런 식으로 되기 때문에 빠르게 나오던 메뉴들도 10초씩 더 걸려서 처리된다는게 무섭다.
즉 하나의 코드에 먼가 동기 함수가 있으면 같은 요청으로 처리 되는 코드에도 다 영향을 줘서 느려지게 만든다.
그러면 만약 잘 짠 비동기 Event Loop 기반 FastAPI 서버 얼마나 커버 가능할까?
- 이벤트 루프는 항상 단일 스레드 → 한 번에 하나만 실행
- OS TCP 큐는 매우 제한적 → 요청이 너무 많이 오면 FastAPI까지 도달 못함
- 메모리는 대량 요청을 버퍼링 못함 → Worker가 대기 요청을 오래 들고 있을 수 없음
AI가 제시한 수치로는 수만 단위 RPS는 가능하지만, 수십만~수백만 트래픽은 절대 불가능
비동기 서버가 1초에 처리할 수 있는 최대 처리량은 결국:
CPU가 1초 동안 몇 번 “코루틴 실행 → await → 다음 코루틴”
이 사이클을 반복할 수 있는가
CPU가 아주 빠르다고 해도,
이 작업이 초당 약 50만~200만 번 정도가 한계야.
근데 코루틴 하나를 처리할 때는 보통:
- 파싱
- 검증
- 비즈니스 로직
- 응답 직렬화(json.dumps)
- 네트워크 전송
등등이 들어있기 때문에 실제 처리 가능한 RPS는 줄어든다.
→ 실전에서 단일 worker는 1초에 약 2,000~10,000 요청 정도가 현실적인 최대치.
만약 코어가 8개라면?
uvicorn --workers 8
단일 서버 최대 성능 16,000 ~ 80,000 RPS까지 가능하다고 할 수 있음
동시에 연결된 SSE 요청이 많아질수록
이벤트 루프는 오히려 더 많은 suspend/resume 컨텍스트 스위칭을 해야 한다.
그러면 실제 처리 가능한 RPS는 더 줄어든다.
즉: SSE 1만 개만 열려 있어도 단일 worker는 이미 바쁨 , SSE 3만~5만 개면 단일 서버 한계 근처
그래서 다른 이야기이지만, 결국 이러한 구조로 인해서 fastapi만으로는 요청을 모두 받을 수도 없음 그래서 kafka 같은 걸로 들어오는 요청을 저장하고 그걸로 다시 밸런싱하는 기술이 필요함
파이썬의 asyncio 공부하기

async def compute(x, y):
print("Compute ...")
await asyncio.sleep(1)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print(f"{x} + {y} = {result}")
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
즉:
- print_sum 코루틴이 compute 코루틴을 await 하고,
- compute가 sleep 중에 멈추고,
- event loop가 시간 지나면 다시 깨워서 결과를 리턴하고,
- print_sum이 그걸 받아서 프린트하고 끝남.
그걸 시간 흐름으로 표현한 것이 바로 위 그림.
개념
1. Event Loop (가장 왼쪽 노란 박스)
Event Loop = asyncio의 두뇌 + 스케줄러
그림에서 event loop는 이런 역할을 함:
- Task에게 “너 이제 실행해”라고 지시
- await 만나면 다시 제어권을 가져옴
- sleep 같은 I/O가 끝나면 Task를 다시 실행
- 전체 작업이 끝나면 loop 종료
👉 한 줄 요약
“누구를 언제 돌릴지 결정하는 중앙 사령부”
2. Task (두 번째 박스)
Task = 코루틴을 event loop가 실행 가능하게 만든 객체
코루틴 자체는 "정지 가능한 함수"일 뿐이지만
event loop가 실행하게 만들려면 Task로 포장해야 함.
그림에서 Task는:
- 처음에 “pending 상태”
- event loop가 실행시키면 “running 상태”
- await 만나면 “suspended 상태”
- 마지막에 compute 결과를 받고 finish
- final: Task는 “done 상태”
👉 한 줄 요약
“작업 단위 (event loop가 실제로 실행하는 대상)”
3. print_sum() — 이용자 코드 (세 번째 박스)
이 코루틴이 하는 일:
- 실행을 시작한다
- await compute(1, 2) 를 만나 실행을 compute에게 넘긴다
- compute가 끝나면 다시 깨어난다
- sum 결과를 출력하고 종료한다
그래서 그림에서:
- 처음엔 “코루틴이 running”
- await compute → 코루틴 suspended
- compute가 끝난 후 → 다시 running
- StopIteration 리턴 → 종료
👉 한 줄 요약
“사용자 비즈니스 로직 코루틴.
compute 코루틴을 await 하므로 중간에 멈춤(suspend)”
4. compute() — 서브 코루틴 (네 번째 박스)
compute는 print_sum 내부에서 호출됨.
이 코루틴은:
- 실행 시작 → Compute 출력
- await sleep(1.0) → suspend
- 1초 후 다시 resume
- return x + y
- StopIteration(3)로 print_sum에게 결과 전달
즉:
- compute()가 중간에 일시정지됨
- event loop가 1초 후에 compute를 다시 깨움
- compute 결과를 print_sum에게 전달함
👉 한 줄 요약
“서브 작업. I/O(sleep) 때문에 suspend → 완료 후 print_sum에게 값 반환”
코드로 간단히 이해해보기
1단계: 코루틴 + 이벤트 루프 + await 기본 이해 (가장 기초)
import asyncio
async def do_work():
print("작업 시작")
await asyncio.sleep(1) # 여기서 event loop에 제어권 반환
print("작업 재개")
return "완료"
async def main():
result = await do_work()
print(result)
asyncio.run(main())
1. do_work 실행 → "작업 시작"
2. await sleep(1) → event loop에 제어권 반환
3. event loop는 다른 작업을 실행
4. 1초 후 도_work 재개 → "작업 재개"
2단계: event loop가 여러 코루틴을 어떻게 스케줄링하는지
동시에 여러 작업을 돌리기 위해 Task를 만들면:
async def job(name, t):
print(f"{name} 시작")
await asyncio.sleep(t)
print(f"{name} 완료")
async def main():
t1 = asyncio.create_task(job("A", 1))
t2 = asyncio.create_task(job("B", 2))
t3 = asyncio.create_task(job("C", 3))
await t1
await t2
await t3
asyncio.run(main())
A 실행 → await → 멈춤
B 실행 → await → 멈춤
C 실행 → await → 멈춤
(시간 흐름)
1초 되면 A 재개 → 완료
2초 되면 B 재개 → 완료
3초 되면 C 재개 → 완료
한 순간에는 1개만 실행하지만,
너무 빨리 왔다 갔다 해서 “동시에 실행”처럼 보임.
3단계: asyncio.wait_for() 개념을 코드로 이해
이 코드는 작업이 1초 넘으면 “포기(timeout)”한다.
async def slow():
await asyncio.sleep(2)
return "완료"
async def main():
try:
result = await asyncio.wait_for(slow(), timeout=1)
print(result)
except asyncio.TimeoutError:
print("시간 초과!")
asyncio.run(main())
❗ wait_for는 timeout 발생 시 slow() 코루틴에게 cancel()을 보낸다.
4단계: cancel 전파 방지(shield) 개념을 코드로 이해
이번에는 같은 slow 함수지만 cancel이 전달되지 않게 막아보자.
async def slow():
try:
await asyncio.sleep(2)
return "완료"
except asyncio.CancelledError:
print("❌ slow가 취소됐습니다(이러면 안 됨)")
raise
async def main():
task = asyncio.create_task(slow())
try:
await asyncio.wait_for(asyncio.shield(task), timeout=1)
except asyncio.TimeoutError:
print("시간 초과 (하지만 task는 계속 진행 중)")
result = await task
print("slow 결과:", result)
asyncio.run(main())
여기서 핵심
- wait_for는 timeout이 나도
- slow() 코루틴을 취소시키지 못한다.
- slow는 계속 진행되고 끝까지 실행된다.
그렇다면 다른 예시로 EDNPOINT에서 BREAK를 걸면 내부 서비스에서는 어떻게 될까?
안되는 예시
import asyncio
async def generator():
print("[GEN] yield finished")
yield "finished"
print("[GEN] save_data 실행") # ❌ 실행 안 됨
await asyncio.sleep(1)
print("[GEN] more_work 실행") # ❌ 실행 안 됨
async def endpoint():
gen = generator()
token = await gen.__anext__()
print("[ENDPOINT] 받은 token:", token)
if token == "finished":
print("[ENDPOINT] FINISHED → break")
return # break 와 동일
asyncio.run(endpoint())
결과 출력
[GEN] yield finished
[ENDPOINT] 받은 token: finished
[ENDPOINT] FINISHED → break
이유:
endpoint가 FINISHED에서 break 하면 generator의 anext()를 더 이상 호출하지 않기 때문.
이렇게 ednpoint에서 끊어버리면 내부 로직은 어쩔 수 없이 끊어지는 어찌보면 당연한데...
웹에 보내는 부분이랑 내부에서 이어서 추가 로직이 도는 부분이 있을텐데 그렇다면 어떻게 웹에서는 보내고 내부 로직은 그대로 돌게 할 수 있을까? (물론 의도적으로 끊는 것인데 또 돈다는 거 자체가 이상할 수 있다, 사이드 이팩트가 많이 발생할 것으로 보임 계속 돈다던지...)
chatgpt랑 이야기해 보니 다음과 같은 해결책을 제시한다.
결국 핵심적으로 중요한 것은 서비스 코드에서 여러 상황에 대한 케이스를 잘 분석해서 정확히 사이드 이팩트가 발생하지 않게 처리하는 것이 굉장히 중요하다.
1. generator 전체를 독립 Task 로 움직이게 해야 한다.
2. endpoint는 yield한 값만 받아보고 끝내고,
3. generator는 계속 내부 로직을 완주한다.
방법
- FINISHED 신호로 endpoint는 종료
- generator 내부 로직은 분기 따라 실행/중단 선택
- post-process는 별도 Task로 독립 실행
- timeout/cancel 제어까지 완전 포함
- 실서비스에서 conversation 저장 / redis close / 사용량 적재 등을 그대로 반영 가능
구조 설명
generator 내부에서 다음 3가지 시나리오를 분기한다:
🔵 Case A — FINISHED 이후 로직을 “절대 실행되면 안 되는 경우”
- FINISHED 출력 후 내부 후처리는 무시
- endpoint에서 cancel 시킴
🔵 Case B — FINISHED 이후 로직을 반드시 끝까지 실행해야 하는 경우
- 별도 Task로 전체 로직 끝까지 실행
🔵 Case C — FINISHED 이후 로직을 실행하되 timeout 걸어야 하는 경우
- 후처리를 별도 Task로 분리 + timeout
import asyncio
async def generator(mode):
"""
mode:
A → FINISHED 이후 로직 실행하면 안 됨
B → FINISHED 이후 로직 반드시 실행
C → FINISHED 이후 로직 실행하되 timeout 필요
"""
print("[GEN] 작업 시작")
await asyncio.sleep(0.5)
# 스트리밍 과정
yield "event: data\n\n"
await asyncio.sleep(0.5)
# FINISHED 이벤트
yield "event: finished\n\n"
print("[GEN] FINISHED 발생 (endpoint는 여기서 종료됨)")
# 여기 아래가 “FINISHED 이후 로직”
# 상황별로 처리 다름
if mode == "A":
print("[GEN] A 모드 → FINISHED 아래 로직은 실행되면 안 됨 (endpoint에서 cancel 필요)")
# endpoint에서 cancel 시킬 예정
return
elif mode == "B":
print("[GEN] B 모드 → FINISHED 이후 로직 반드시 실행 (백그라운드 Task)")
asyncio.create_task(post_process_full())
elif mode == "C":
print("[GEN] C 모드 → FINISHED 이후 로직은 실행하되 timeout 관리 필요")
asyncio.create_task(run_post_process_with_timeout())
else:
print("[GEN] 알 수 없는 모드")
async def post_process_full():
print("[B] save_data 실행")
await asyncio.sleep(1)
print("[B] more_work 실행")
await asyncio.sleep(1)
print("[B] FINISHED 이후 전체 후처리 완료")
async def post_process_timeout():
print("[C] save_data 실행")
await asyncio.sleep(2)
print("[C] more_work 실행")
await asyncio.sleep(2)
print("[C] FINISHED 이후 후처리 완료")
async def run_post_process_with_timeout(timeout=3):
try:
await asyncio.wait_for(post_process_timeout(), timeout=timeout)
except asyncio.TimeoutError:
print("[C] 후처리 timeout → 중단")
async def endpoint(mode):
print(f"\n========== {mode} 모드 테스트 ==========")
gen = generator(mode)
try:
async for event in gen:
print(f"[ENDPOINT] 받은 이벤트 → {event.strip()}")
if "finished" in event:
print("[ENDPOINT] FINISHED → 스트림 종료")
break
except Exception as e:
print("[ENDPOINT] 오류:", e)
print("[ENDPOINT] SSE 응답 종료 (여기서 끝)")
async def main():
await endpoint("A") # 이후 로직 절대 실행 금지
await asyncio.sleep(3)
await endpoint("B") # 이후 로직 반드시 실행
await asyncio.sleep(3)
await endpoint("C") # 이후 로직 timeout 관리
await asyncio.sleep(5)
asyncio.run(main())