什么是 SSE?

SSE(Server-Sent Events,服务器推送事件)是一种基于 HTTP 协议的服务器向客户端单向推送数据的技术。与 WebSocket 的双向通信不同,SSE 只允许服务器向客户端发送消息,但它的实现更加简单,天然支持断线重连和事件 ID 追踪,非常适合实时通知、新闻推送、AI 流式输出等场景。

SSE vs WebSocket vs 轮询

在介绍 SSE 之前,我们先对比一下常见的实时通信方案:

特性 轮询(Polling) SSE WebSocket
通信方向 客户端→服务器 服务器→客户端 双向
协议 HTTP HTTP WS/WSS
断线重连 需手动实现 内置自动重连 需手动实现
实现复杂度
浏览器兼容性 全部 现代浏览器 现代浏览器
数据格式 JSON/XML 纯文本 任意二进制/文本
连接开销 每次请求新建连接 长连接 长连接(握手较重)

轮询的问题在于客户端需要不断发起请求询问”有新数据吗”,大量请求在没有新数据时白白浪费带宽和服务器资源。

WebSocket 提供全双工通信,但需要专门的握手协议,实现相对复杂,且在某些代理/防火墙环境下可能被阻断。

SSE 则是一个很好的折中方案:基于普通 HTTP,无需额外协议支持,天然兼容代理和防火墙,且内置断线重连机制。

SSE 协议规范

SSE 遵循 WHATWG Server-Sent Events 规范,数据通过 HTTP 响应以 text/event-stream 的 Content-Type 进行传输。

消息格式

SSE 消息由若干 field: value 行组成,每个消息以两个换行符 \n\n 结束:

1
2
3
4
event: message类型\n
id: 事件ID\n
retry: 重连间隔(毫秒)\n
data: 数据内容\n\n

各字段说明:

  • data: — 消息体内容,可以有多行 data:,客户端会将它们用 \n 拼接
  • event: — 事件类型,客户端通过 addEventListener(type, handler) 监听,省略则默认为 message
  • id: — 事件 ID,浏览器会自动保存,断线重连时通过 Last-Event-ID 请求头告知服务器
  • retry: — 重连间隔(毫秒),覆盖浏览器默认的 3000ms

一个完整的消息示例

1
2
3
4
event: notification
id: 42
retry: 5000
data: {"user":"Alice","action":"commented","target":"你的文章"}

浏览器端 API

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const eventSource = new EventSource('/api/events');

// 监听默认事件(event 未指定时)
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
};

// 监听自定义事件类型
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('通知:', data);
});

// 连接打开
eventSource.onopen = () => {
console.log('SSE 连接已建立');
};

// 发生错误
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
// readyState: 0=连接中, 1=已打开, 2=已关闭
if (eventSource.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
}
};

// 关闭连接
eventSource.close();

带请求头的 SSE

EventSource 原生不支持自定义请求头。如果需要传递认证信息,可以通过 URL 参数或 Cookie 实现,也可以使用 fetch + ReadableStream 的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async function createSSE(url, token) {
const response = await fetch(url, {
headers: {
'Authorization': `Bearer ${token}`,
'Accept': 'text/event-stream'
}
});

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留未完成的行

for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
console.log('收到数据:', data);
}
}
}
}

注意:使用 fetch 方式会失去浏览器内置的自动重连和事件 ID 追踪功能,需要自行实现。

服务端实现

Node.js(原生)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import http from 'node:http';

const server = http.createServer((req, res) => {
if (req.url === '/events') {
// 设置 SSE 必要的响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*'
});

let id = 0;

// 每秒推送一条消息
const timer = setInterval(() => {
id++;
res.write(`id: ${id}\n`);
res.write(`event: tick\n`);
res.write(`data: ${JSON.stringify({ time: new Date().toISOString(), id })}\n\n`);
}, 1000);

// 客户端断开连接时清理
req.on('close', () => {
clearInterval(timer);
res.end();
console.log('客户端断开连接');
});
}
});

server.listen(3000, () => {
console.log('SSE 服务运行在 http://localhost:3000');
});

Express

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import express from 'express';

const app = express();

// 存储所有连接的客户端
const clients = new Set();

app.get('/events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});

clients.add(res);

// 发送初始连接成功消息
res.write(`data: ${JSON.stringify({ type: 'connected' })}\n\n`);

req.on('close', () => {
clients.delete(res);
});
});

// 广播消息给所有客户端
function broadcast(data) {
const message = `data: ${JSON.stringify(data)}\n\n`;
for (const client of clients) {
client.write(message);
}
}

// 业务接口触发广播
app.post('/notify', express.json(), (req, res) => {
broadcast({ type: 'notification', ...req.body });
res.json({ success: true });
});

app.listen(3000);

Python(Flask)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from flask import Flask, Response
import json, time

app = Flask(__name__)

@app.route('/events')
def events():
def generate():
id = 0
while True:
id += 1
data = json.dumps({'id': id, 'time': time.strftime('%Y-%m-%d %H:%M:%S')})
yield f"id: {id}\n"
yield f"event: tick\n"
yield f"data: {data}\n\n"
time.sleep(1)

return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)

if __name__ == '__main__':
app.run(port=3000)

Spring Boot(Java)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@RestController
public class SseController {

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamEvents() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);

// 异步推送
Executors.newSingleThreadExecutor().submit(() -> {
try {
int id = 0;
while (true) {
id++;
emitter.send(SseEmitter.event()
.id(String.valueOf(id))
.name("tick")
.data(Map.of("id", id, "time", Instant.now().toString())));
Thread.sleep(1000);
}
} catch (Exception e) {
emitter.completeWithError(e);
}
});

emitter.onCompletion(() -> System.out.println("客户端断开"));
emitter.onTimeout(() -> System.out.println("连接超时"));

return emitter;
}
}

断线重连机制

SSE 的一大优势是浏览器内置了断线重连。当连接断开时,浏览器会自动尝试重新连接,默认间隔为 3 秒。

工作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sequenceDiagram
participant C as 客户端 (浏览器)
participant S as 服务器

C->>S: GET /events
S-->>C: 200 OK (text/event-stream)
S-->>C: id: 1\ndata: {"msg":"hello"}\n\n
S-->>C: id: 2\ndata: {"msg":"world"}\n\n

Note over C,S: 连接断开(网络抖动/服务器重启)

Note over C: 等待 retry 间隔(默认 3s)

C->>S: GET /events<br/>Last-Event-ID: 2
S-->>C: 200 OK
S-->>C: id: 3\ndata: {"msg":"从断点继续"}\n\n

关键点:

  1. 浏览器自动保存最后收到的 id
  2. 重连时通过 Last-Event-ID 请求头发送给服务器
  3. 服务器根据该 ID 推送丢失的消息
  4. 可通过 retry: 字段自定义重连间隔

服务端处理 Last-Event-ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
app.get('/events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});

// 获取客户端最后收到的事件 ID
const lastEventId = req.headers['last-event-id'];
let currentId = lastEventId ? parseInt(lastEventId, 10) : 0;

// 如果有历史消息需要补发
if (currentId > 0) {
const missedMessages = getMessagesAfter(currentId);
for (const msg of missedMessages) {
res.write(`id: ${msg.id}\ndata: ${JSON.stringify(msg.data)}\n\n`);
}
}

// 继续推送新消息...
});

实战:AI 流式输出

SSE 在 AI 应用中非常常见,比如 ChatGPT 的打字机效果就是通过 SSE 实现的。下面是一个完整的示例:

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import express from 'express';

const app = express();
app.use(express.static('public'));
app.use(express.json());

app.post('/chat', async (req, res) => {
const { message } = req.body;

res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});

// 模拟 AI 逐字输出
const reply = `这是对"${message}"的回复。SSE 非常适合实现这种流式输出效果,用户可以实时看到内容生成的过程,体验更加流畅自然。`;
let index = 0;

const timer = setInterval(() => {
if (index < reply.length) {
// 每次发送一个字符
res.write(`data: ${JSON.stringify({ content: reply[index] })}\n\n`);
index++;
} else {
// 发送结束标记
res.write(`data: ${JSON.stringify({ done: true })}\n\n`);
res.end();
clearInterval(timer);
}
}, 50);

req.on('close', () => {
clearInterval(timer);
});
});

app.listen(3000);

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>SSE 聊天演示</title>
<style>
body { font-family: system-ui, sans-serif; max-width: 600px; margin: 40px auto; }
#chat { border: 1px solid #ddd; padding: 16px; min-height: 200px; margin-bottom: 12px;
border-radius: 8px; white-space: pre-wrap; }
.user { color: #1a73e8; font-weight: bold; }
.ai { color: #333; }
input { width: 70%; padding: 8px; }
button { padding: 8px 16px; }
</style>
</head>
<body>
<h1>SSE 流式聊天</h1>
<div id="chat"></div>
<input id="input" placeholder="输入消息..." />
<button onclick="send()">发送</button>

<script>
async function send() {
const input = document.getElementById('input');
const chat = document.getElementById('chat');
const message = input.value.trim();
if (!message) return;

chat.innerHTML += `<div class="user">你: ${message}</div>`;
input.value = '';

const response = await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let aiDiv = document.createElement('div');
aiDiv.className = 'ai';
aiDiv.textContent = 'AI: ';
chat.appendChild(aiDiv);

while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();

for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.done) return;
aiDiv.textContent += data.content;
}
}
}
}

// 回车发送
document.getElementById('input').addEventListener('keydown', (e) => {
if (e.key === 'Enter') send();
});
</script>
</body>
</html>

常见问题与注意事项

1. 连接数限制

浏览器对同一域名的 SSE 连接数有限制(Chrome 默认 6 个)。如果页面需要多个 SSE 连接,考虑:

  • 合并为单个连接,通过事件类型区分数据
  • 使用不同的子域名
  • 改用 WebSocket

2. Nginx 反向代理配置

如果使用 Nginx 做反向代理,需要正确配置以避免 SSE 连接被缓冲:

1
2
3
4
5
6
7
8
9
location /events {
proxy_pass http://localhost:3000;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off; # 关闭缓冲
proxy_cache off; # 关闭缓存
proxy_read_timeout 86400s; # 长连接超时时间
chunked_transfer_encoding off;
}

3. 数据格式

SSE 只支持文本数据。如果需要传输二进制数据,可以先 Base64 编码,但更推荐使用 WebSocket。

4. IE/旧浏览器兼容

IE 完全不支持 SSE。可以使用 eventsource polyfill(基于长轮询实现)来兼容:

1
<script src="https://cdn.jsdelivr.net/npm/eventsource@1.1.2/lib/eventsource.min.js"></script>

5. 跨域问题

SSE 遵循同源策略。跨域时服务器需要返回 Access-Control-Allow-Origin 头,且浏览器会在发起连接前发送预检请求(OPTIONS)。

适用场景总结

  • 实时通知:站内消息、邮件提醒、系统告警
  • 数据看板:股票行情、监控指标、实时统计
  • AI 应用:ChatGPT 式的流式输出
  • 日志流:构建/部署进度、实时日志查看
  • 新闻/动态:社交媒体 Feed、赛事直播

不适用的场景:

  • 需要客户端频繁向服务器发送数据(如在线游戏、协同编辑)→ 选择 WebSocket
  • 需要传输大量二进制数据(如音视频)→ 选择 WebSocket 或 WebRTC

总结

SSE 是一种轻量、优雅的服务器推送方案。它基于标准 HTTP 协议,无需额外的握手过程,天然支持断线重连和消息追踪,特别适合”服务器推、客户端收”的单向数据流场景。在 AI 流式输出成为主流的今天,SSE 已经成为 ChatGPT、Claude 等 AI 产品的标配技术。如果你的场景不需要客户端主动发送消息,SSE 是比 WebSocket 更简单、更合适的选择。