> 技术文档 > 【Python】sse_starlette 库:为 Starlette 和 FastAPI 框架提供对 Server-Sent Events(SSE,服务器推送事件)的支持_python starlette

【Python】sse_starlette 库:为 Starlette 和 FastAPI 框架提供对 Server-Sent Events(SSE,服务器推送事件)的支持_python starlette

sse_starlette 是一个 Python 库,为 Starlette 和 FastAPI 框架提供对 Server-Sent Events(SSE,服务器推送事件)的支持。它通过 EventSourceResponse 类实现 SSE 协议,允许服务器异步向客户端推送实时数据,适合构建实时 Web 应用,如通知系统、实时仪表盘或流式数据更新。sse_starlette 轻量且与 ASGI 框架无缝集成,常用于需要高效单向通信的场景。

以下是对 sse_starlette 库的详细介绍,包括其功能、用法和实际应用,基于最新信息(截至 2025)。


1. sse_starlette 库的作用

  • SSE 支持:为 Starlette/FastAPI 提供标准化的 SSE 实现,基于 HTML5 EventSource API。
  • 实时数据推送:支持服务器到客户端的单向数据流,适合实时更新。
  • 高性能:利用 asyncioanyio 的异步任务组(TaskGroups),确保高效并发。
  • 易于集成:与 Starlette/FastAPI 无缝兼容,支持自定义事件和头部。
  • 健壮性:处理客户端断开、服务器关闭和非线程安全对象,增强可靠性。

近期动态

  • 最新版本:2.3.4(截至 2025-05-04,PyPI)。
  • 新增功能
    • 支持自定义 ping 间隔和消息工厂(ping_message_factory)。
    • 修复多 SSE 端点测试中的事件循环绑定问题(Issue #59)。
    • 增强与 anyio 的集成,优化任务管理。
  • 社区活跃:GitHub 仓库(https://github.com/sysid/sse-starlette)有 592 星,30 位贡献者,维护健康。
  • 注意事项:不支持 GZipMiddleware,需确保任务在关闭时清理以避免警告。

2. 安装与环境要求

  • Python 版本:支持 Python 3.8+(推荐 3.9+)。
  • 核心依赖
    • starlette:ASGI 框架基础。
    • anyio:异步任务管理(最低 3.6.2)。
    • 可选:uvicorn(运行 ASGI 应用)、httpx(测试)。
  • 安装命令
    pip install sse-starlette
  • 完整安装(包括运行和测试):
    pip install sse-starlette uvicorn httpx
  • 验证安装
    import sse_starletteprint(sse_starlette.__version__) # 示例输出: 2.3.4

系统要求

  • 确保 uvicornstarlette 版本兼容(starlette>=0.26.1)。
  • 测试环境可能需 pytestpytest-asyncio
    pip install pytest pytest-asyncio

3. 核心功能与用法

sse_starlette 提供 EventSourceResponse 类,用于创建 SSE 响应,支持异步生成器、自定义事件和连接管理。以下是主要功能和示例。

3.1 基本 SSE 实现

使用异步生成器推送事件。

import asyncioimport uvicornfrom starlette.applications import Starlettefrom starlette.routing import Routefrom sse_starlette.sse import EventSourceResponseasync def numbers(minimum: int, maximum: int): for i in range(minimum, maximum + 1): await asyncio.sleep(0.9) yield dict(data=i)async def sse(request): generator = numbers(1, 5) return EventSourceResponse(generator)routes = [ Route(\"/\", endpoint=sse)]app = Starlette(debug=True, routes=routes)if __name__ == \"__main__\": uvicorn.run(app, host=\"0.0.0.0\", port=8000, log_level=\"info\")

运行

python sse_example.py

客户端测试(使用 curl):

curl -N http://localhost:8000

输出示例

data: 1data: 2data: 3data: 4data: 5

说明

  • EventSourceResponse 接受异步生成器,推送 data 字段。
  • 每 0.9 秒发送一个数字,符合 SSE 协议(text/event-stream)。
  • 默认每 15 秒发送 ping 保持连接。
3.2 自定义事件

指定事件名称、ID 和多行数据。

from fastapi import FastAPI, Requestfrom sse_starlette.sse import EventSourceResponse, ServerSentEventimport asyncioimport timeapp = FastAPI()@app.get(\"/stream\")async def sse_stream(request: Request): async def event_generator(): for i in range(1, 6): if await request.is_disconnected(): break msg = f\"Update {i}\\nTime: {time.strftime(\'%H:%M:%S\')}\" yield ServerSentEvent( data=msg, event=\"update\", id=str(i) ) await asyncio.sleep(1) return EventSourceResponse(event_generator(), ping=5)if __name__ == \"__main__\": import uvicorn uvicorn.run(app, host=\"0.0.0.0\", port=8000, log_level=\"info\")

客户端 JavaScript

<script> const source = new EventSource(\"http://localhost:8000/stream\"); source.addEventListener(\"update\", (event) => { console.log(`ID: ${event.id}, Data: ${event.data}`); });</script>

输出示例(浏览器控制台):

ID: 1, Data: Update 1Time: 12:34:56ID: 2, Data: Update 2Time: 12:34:57...

说明

  • ServerSentEvent 支持 dataeventidcomment 字段。
  • ping=5 每 5 秒发送 ping 保持连接。
  • 检查 request.is_disconnected() 确保客户端断开时停止生成。
3.3 处理客户端断开

优雅处理客户端关闭连接。

from fastapi import FastAPI, Requestfrom sse_starlette.sse import EventSourceResponseimport asyncioimport loggingapp = FastAPI()logging.basicConfig(level=logging.INFO)_log = logging.getLogger(__name__)@app.get(\"/endless\")async def endless(req: Request): async def event_publisher(): i = 0 try: while True: i += 1 yield dict(data=i) await asyncio.sleep(0.2) except asyncio.CancelledError as e: _log.info(f\"Disconnected from client {req.client}\") raise e return EventSourceResponse(event_publisher())if __name__ == \"__main__\": import uvicorn uvicorn.run(app, host=\"0.0.0.0\", port=8000, log_level=\"info\")

说明

  • 捕获 asyncio.CancelledError 处理客户端断开(如刷新或关闭页面)。
  • 使用日志记录断开事件。
  • anyio.TaskGroups 确保任务安全管理。
3.4 定向事件推送

向特定客户端发送事件。

from fastapi import FastAPI, Requestfrom pydantic import BaseModelfrom sse_starlette.sse import EventSourceResponsefrom collections import defaultdictimport asynciofrom typing import Optionalapp = FastAPI()clients = defaultdict(list)class EmitEventModel(BaseModel): event_name: str event_data: Optional[str] = \"No Event Data\" event_id: Optional[int] = None recipient_id: strasync def retrieve_events(recipient_id: str): yield dict(data=\"Connection established\") while True: if recipient_id in clients and clients[recipient_id]: yield clients[recipient_id].pop(0) await asyncio.sleep(1)@app.get(\"/subscribe/{recipient_id}\")async def loop_back_stream(req: Request, recipient_id: str): return EventSourceResponse(retrieve_events(recipient_id))@app.post(\"/emit\")async def emit_event(event: EmitEventModel): clients[event.recipient_id].append({ \"event\": event.event_name, \"data\": event.event_data, \"id\": event.event_id }) return {\"status\": \"event queued\"}if __name__ == \"__main__\": import uvicorn uvicorn.run(app, host=\"0.0.0.0\", port=8000, log_level=\"info\")

测试

  1. 订阅客户端:
    curl http://localhost:8000/subscribe/user1
  2. 发送事件:
    curl -X POST http://localhost:8000/emit -H \"Content-Type: application/json\" -d \'{\"event_name\": \"notify\", \"event_data\": \"Hello user1\", \"recipient_id\": \"user1\"}\'

输出示例(客户端):

data: Connection establishedevent: notifydata: Hello user1

说明

  • 使用 defaultdict 存储客户端事件队列。
  • /subscribe/{recipient_id} 订阅事件流。
  • /emit 向指定 recipient_id 推送事件。
  • 基于 Stack Overflow 示例改进。
3.5 测试 SSE 端点

使用 httpxhttpx_sse 测试 SSE。

import asyncioimport httpxfrom httpx_sse import aconnect_ssefrom sse_starlette.sse import EventSourceResponsefrom starlette.applications import Starlettefrom starlette.routing import Routeasync def auth_events(request): async def events(): yield {\"event\": \"login\", \"data\": \'{\"user_id\": \"4135\"}\'} return EventSourceResponse(events())app = Starlette(routes=[Route(\"/sse/auth/\", endpoint=auth_events)])async def test_sse(): async with httpx.AsyncClient(app=app) as client: async with aconnect_sse(client, \"GET\", \"http://localhost:8000/sse/auth/\") as event_source: events = [sse async for sse in event_source.aiter_sse()] (sse,) = events assert sse.event == \"login\" assert sse.json() == {\"user_id\": \"4135\"}if __name__ == \"__main__\": asyncio.run(test_sse())

说明

  • 使用 httpx_sseaconnect_sse 测试 SSE 端点。
  • 验证事件名称和数据。
  • 需安装 httpx-sse
    pip install httpx-sse

4. 性能与特点

  • 高效性:基于 anyio.TaskGroups,支持高并发 SSE 连接。
  • 易用性EventSourceResponse 封装 SSE 协议,简化开发。
  • 兼容性:与 FastAPI、Starlette 和 uvicorn 无缝集成。
  • 局限性
    • 不支持 GZipMiddleware,可能导致压缩冲突。
    • 非线程安全对象(如 SQLAlchemy 的 AsyncSession)需在生成器内创建。
    • 需手动清理任务以避免关闭时的警告。
  • 与替代方案对比
    • Starlette StreamingResponse:手动实现 SSE,代码复杂,缺乏协议支持。
    • WebSocket:双向通信,适合复杂交互,但开销较高。
    • aiohttp-sse:针对 aiohttp 框架,不适用 Starlette/FastAPI。

5. 实际应用场景

  • 实时通知:推送消息或警报(如聊天、系统状态)。
  • 数据流:流式传输数据库更新(如 PostgreSQL 的 TAIL)。
  • 仪表盘:实时更新图表或指标。
  • 进度跟踪:异步任务的状态更新(如文件上传、后台处理)。
  • HTMX 集成:结合 HTMX 的 hx-sse 属性实现动态前端。

示例(数据库流式更新)

from fastapi import FastAPI, Requestfrom sse_starlette.sse import EventSourceResponseimport asyncpgimport asynciofrom loguru import loggerapp = FastAPI()async def get_db_pool(): return await asyncpg.create_pool( user=\"postgres\", password=\"postgres\", database=\"todos\", host=\"localhost\" )@app.get(\"/stream\")async def message_stream(request: Request): pool = await get_db_pool() async def event_generator(): try: async with pool.acquire() as conn: async with conn.transaction():  await conn.execute(\"LISTEN todo_inserted\")  async for notify in conn.notifies(): if await request.is_disconnected(): break yield {\"data\": notify.payload, \"event\": \"todo_added\"} except Exception as e: logger.error(f\"Stream error: {e}\") finally: await pool.close() return EventSourceResponse(event_generator(), ping=10)if __name__ == \"__main__\": logger.add(\"app.log\", rotation=\"1 MB\", level=\"INFO\") import uvicorn uvicorn.run(app, host=\"0.0.0.0\", port=8000, log_level=\"info\")

运行环境

  • 安装 asyncpg
    pip install asyncpg
  • 确保 PostgreSQL 运行并创建 todos 数据库。

说明

  • 监听 PostgreSQL 的 todo_inserted 通道。
  • 使用 asyncpg 接收通知并通过 SSE 推送。
  • loguru 记录错误。
  • 基于 bunny.net 示例。

6. 注意事项

  • GZipMiddleware 冲突
    • SSE 不兼容 GZipMiddleware,需禁用:
      app = Starlette(middleware=[]) # 移除 GZipMiddleware
    • 否则可能导致流式传输失败。
  • 非线程安全对象
    • 避免在生成器外创建数据库会话(如 AsyncSession):
      # ❌ 错误async def bad_route(): async with AsyncSession() as session: async def generator(): async for row in session.execute(select(User)): yield dict(data=row) return EventSourceResponse(generator())# ✅ 正确async def good_route(): async def generator(): async with AsyncSession() as session: async for row in session.execute(select(User)): yield dict(data=row) return EventSourceResponse(generator())
    • 确保会话在生成器内管理。
  • 服务器关闭
    • 停止所有生成器任务以避免警告:
      from sse_starlette.sse import AppStatusAppStatus.should_exit_event = None
    • 尤其在测试多 SSE 端点时(Issue #59)。
  • 客户端断开
    • 使用 request.is_disconnected() 检查客户端状态。
    • 捕获 asyncio.CancelledError 清理资源。
  • 测试问题
    • 多 SSE 端点测试可能引发 RuntimeError(事件循环绑定),使用 pytest 夹具修复:
      @pytest.fixturedef reset_sse_starlette_appstatus_event(): from sse_starlette.sse import AppStatus AppStatus.should_exit_event = None
    • 参考 Issue #59。
  • 浏览器限制
    • 浏览器对同一域名 SSE 连接数有限(通常 6 个),需高效管理连接。
    • 确保连接正确关闭(返回 HTTP 204 或显式关闭)。
  • 生产部署
    • 使用 uvicornmonkeypatch 优化信号处理:
      import uvicornuvicorn.run(app, host=\"0.0.0.0\", port=8000, loop=\"uvloop\")
    • 配合 Nginx 禁用缓冲:
      proxy_buffering off;proxy_cache off;
    • 参考 Stack Overflow 解决方案。

7. 资源与文档

  • 官方文档:https://github.com/sysid/sse-starlette
  • PyPI 页面:https://pypi.org/project/sse-starlette/
  • 相关教程
    • FastAPI SSE 指南:https://devdojo.com/bobbyiliev/how-to-use-server-sent-events-sse-with-fastapi
    • Bunny.net SSE 示例:https://bunny.net/blog/what-is-sse-server-sent-events-and-how-do-they-work/
  • 社区
    • Stack Overflow(sse-starlette 标签):https://stackoverflow.com/questions/tagged/sse-starlette
    • GitHub Issues:https://github.com/sysid/sse-starlette/issues
  • 背景资料
    • SSE 协议:https://sysid.github.io/server-sent-events/
    • Starlette 文档:https://www.starlette.io/