Server-Sent Events 教程
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。
1. 客户端
SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。
总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。
- SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
- SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
- SSE 默认支持断线重连,WebSocket 需要自己实现。
- SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
- SSE 支持自定义发送的消息类型。
封装的 SSEClient 函数
class SSEClient {
private url: string;
private withCredentials: boolean;
private reconnectInterval: number;
private maxReconnectAttempts: number;
private currentReconnectAttempts: number;
private shouldReconnect: boolean; // 控制是否重连
private eventSource: EventSource | null;
private listeners: {[key: string]: Array<(data: any) => void>};
private isConnected: boolean;
private onErrorCallback: ((error: any) => void) | null;
constructor(
url: string,
options: {
withCredentials?: boolean;
reconnectInterval?: number;
maxReconnectAttempts?: number;
shouldReconnect?: boolean; // 新增选项
onError?: (error: any) => void;
} = {}
) {
this.url = url;
this.withCredentials = options.withCredentials || false;
this.reconnectInterval = options.reconnectInterval || 5000;
this.maxReconnectAttempts = options.maxReconnectAttempts || 10;
this.currentReconnectAttempts = 0;
this.shouldReconnect = options.shouldReconnect !== undefined ? options.shouldReconnect : true; // 默认允许重连
this.eventSource = null;
this.listeners = {};
this.isConnected = false;
this.onErrorCallback = options.onError || null;
this.connect();
}
connect() {
try {
this.eventSource = new EventSource(this.url, {
withCredentials: this.withCredentials
});
this.eventSource.onopen = () => {
console.log('SSE connection opened');
this.isConnected = true;
this.currentReconnectAttempts = 0;
};
this.eventSource.onmessage = event => {
const data = event.data.trim();
if (this.listeners['message']) {
this.listeners['message'].forEach(callback => callback(data));
}
};
this.eventSource.onerror = error => {
this.isConnected = false;
this.eventSource?.close();
if (this.onErrorCallback) {
this.onErrorCallback(error);
} else {
console.error('SSE connection error:', error);
}
// 如果 shouldReconnect 为 true,则尝试重连
if (this.shouldReconnect && this.currentReconnectAttempts < this.maxReconnectAttempts) {
this.currentReconnectAttempts++;
console.log(`Reconnect attempt ${this.currentReconnectAttempts}`);
setTimeout(() => {
this.connect();
}, this.reconnectInterval);
} else if (!this.shouldReconnect) {
console.log('Reconnect disabled, connection closed.');
} else {
console.error('Max reconnect attempts reached. Connection closed.');
}
};
} catch (error) {
console.error('Failed to create EventSource:', error);
if (this.onErrorCallback) {
this.onErrorCallback(error);
}
}
}
on(eventType: string, callback: (data: any) => void) {
if (!this.listeners[eventType]) {
this.listeners[eventType] = [];
}
this.listeners[eventType].push(callback);
}
off(eventType: string, callback: (data: any) => void) {
if (!this.listeners[eventType]) return;
this.listeners[eventType] = this.listeners[eventType].filter(cb => cb !== callback);
}
close() {
if (this.eventSource) {
this.eventSource.close();
this.isConnected = false;
console.log('SSE connection closed');
}
}
}
export default SSEClient;如何进行调用
const sseClient = new SSEClient(`sse_url`, {
withCredentials: true,
shouldReconnect: false,
onError: error => {
console.log(`🌧🌧🌧 [SSEClient error]`, error);
}
});
sseClient.on('message', value => {
if (value?.includes('MESSAGE_END')) {
sseClient.close();
}
console.log(`🌧🌧🌧 [SSEClient message]`, value);
});2. 服务端
SSE 要求服务器与浏览器保持连接。对于不同的服务器软件来说,所消耗的资源是不一样的。Apache 服务器,每个连接就是一个线程,如果要维持大量连接,势必要消耗大量资源。Node 则是所有连接都使用同一个线程,因此消耗的资源会小得多,但是这要求每个连接不能包含很耗时的操作,比如磁盘的 IO 读写。
如何用 Node 起一个 SSE 服务器
// node server.js
var http = require("http");
http.createServer(function (req, res) {
var fileName = "." + req.url;
if (fileName === "./stream") {
res.writeHead(200, {
"Content-Type":"text/event-stream",
"Cache-Control":"no-cache",
"Connection":"keep-alive",
"Access-Control-Allow-Origin": '*',
});
res.write("retry: 10000\n");
res.write("event: connecttime\n");
res.write("data: " + (new Date()) + "\n\n");
res.write("data: " + (new Date()) + "\n\n");
interval = setInterval(function () {
res.write("data: " + (new Date()) + "\n\n");
}, 1000);
req.connection.addListener("close", function () {
clearInterval(interval);
}, false);
}
}).listen(8844, "127.0.0.1");参考文章:
