后端代码配置
返回数据必须新增心跳消息,否则如果长时间内不通讯,浏览器会直接断掉其长连接
while True:
if await request.is_disconnected():
break
## 等待时常,超过5秒则主动查询一次(最晚五秒运行一次)
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=5)
if message is not None:
## ...
else:
## 发送心跳包
yield ": ping\n\n"
Cache-Control 必须配置:no-cache, no-transform,否则浏览器会将数据进行转换导致无法接收到正常的数据
return StreamingResponse(
event_queue(),
media_type="text/event-stream",
headers={
"Cache-Control": 'no-cache, no-transform', ## 需配置
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
完整代码如下
import json
import asyncio
from fastapi.responses import StreamingResponse
from fastapi import APIRouter, Request, HTTPException, status
from src.db.redis import async_redis_client
from src.common.constance import REDIS_PIPELINE_CHANNEL
sse_router = APIRouter(tags=["sse"])
@sse_router.get('/gitlab/pipeline')
async def pipeline_subscribe(request: Request):
pname = request.query_params.get('pname')
branch = request.query_params.get('branch')
if not pname or not branch:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="缺少订阅的流水线 Parameter 参数"
)
async def event_queue():
pubsub = async_redis_client.pubsub()
await pubsub.subscribe(REDIS_PIPELINE_CHANNEL)
yield f"data: {json.dumps({ 'status': 'success' })}\n\n"
try:
while True:
if await request.is_disconnected():
break
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=5)
if message is not None:
payload = message["data"]
data = json.loads(payload)
p_name = data['project']['name']
p_branch = data['object_attributes']['ref']
if p_name == pname and branch == p_branch:
yield f"data: {payload}\n\n"
else:
## 发送心跳包,避免长时间内无心跳包,导致sse直接断掉
yield ": ping\n\n"
except asyncio.CancelledError:
raise
finally:
await pubsub.unsubscribe(REDIS_PIPELINE_CHANNEL)
await pubsub.close()
return StreamingResponse(
event_queue(),
media_type="text/event-stream",
headers={
"Cache-Control": 'no-cache, no-transform', ## 需配置
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
Nginx 部署
主服务 nginx 配置
location /api/v1/sse {
proxy_pass http://192.168.18.19:8080/api/v1/sse;
proxy_http_version 1.1; ## 开启1.1 多路复用
proxy_set_header Connection "";
proxy_cache off;
proxy_buffering off; ## 重要,需要加上
add_header X-Accel-Buffering "no" always; ## 重要,需要加上
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}