Skip to content

Server-Sent Events 教程

约 953 字大约 3 分钟

javascriptSSE

2024-12-18

严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。

也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。

SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。

1. 客户端

SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。

总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。

  • SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
  • SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
  • SSE 默认支持断线重连,WebSocket 需要自己实现。
  • SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
  • SSE 支持自定义发送的消息类型。
封装的 SSEClient 函数
class SSEClient {
    private url: string;
    private withCredentials: boolean;
    private reconnectInterval: number;
    private maxReconnectAttempts: number;
    private currentReconnectAttempts: number;
    private shouldReconnect: boolean; // 控制是否重连
    private eventSource: EventSource | null;
    private listeners: {[key: string]: Array<(data: any) => void>};
    private isConnected: boolean;
    private onErrorCallback: ((error: any) => void) | null;

    constructor(
        url: string,
        options: {
            withCredentials?: boolean;
            reconnectInterval?: number;
            maxReconnectAttempts?: number;
            shouldReconnect?: boolean; // 新增选项
            onError?: (error: any) => void;
        } = {}
    ) {
        this.url = url;
        this.withCredentials = options.withCredentials || false;
        this.reconnectInterval = options.reconnectInterval || 5000;
        this.maxReconnectAttempts = options.maxReconnectAttempts || 10;
        this.currentReconnectAttempts = 0;
        this.shouldReconnect = options.shouldReconnect !== undefined ? options.shouldReconnect : true; // 默认允许重连
        this.eventSource = null;
        this.listeners = {};
        this.isConnected = false;
        this.onErrorCallback = options.onError || null;

        this.connect();
    }

    connect() {
        try {
            this.eventSource = new EventSource(this.url, {
                withCredentials: this.withCredentials
            });

            this.eventSource.onopen = () => {
                console.log('SSE connection opened');
                this.isConnected = true;
                this.currentReconnectAttempts = 0;
            };

            this.eventSource.onmessage = event => {
                const data = event.data.trim();
                if (this.listeners['message']) {
                    this.listeners['message'].forEach(callback => callback(data));
                }
            };

            this.eventSource.onerror = error => {
                this.isConnected = false;
                this.eventSource?.close();

                if (this.onErrorCallback) {
                    this.onErrorCallback(error);
                } else {
                    console.error('SSE connection error:', error);
                }

                // 如果 shouldReconnect 为 true,则尝试重连
                if (this.shouldReconnect && this.currentReconnectAttempts < this.maxReconnectAttempts) {
                    this.currentReconnectAttempts++;
                    console.log(`Reconnect attempt ${this.currentReconnectAttempts}`);
                    setTimeout(() => {
                        this.connect();
                    }, this.reconnectInterval);
                } else if (!this.shouldReconnect) {
                    console.log('Reconnect disabled, connection closed.');
                } else {
                    console.error('Max reconnect attempts reached. Connection closed.');
                }
            };
        } catch (error) {
            console.error('Failed to create EventSource:', error);
            if (this.onErrorCallback) {
                this.onErrorCallback(error);
            }
        }
    }

    on(eventType: string, callback: (data: any) => void) {
        if (!this.listeners[eventType]) {
            this.listeners[eventType] = [];
        }
        this.listeners[eventType].push(callback);
    }

    off(eventType: string, callback: (data: any) => void) {
        if (!this.listeners[eventType]) return;
        this.listeners[eventType] = this.listeners[eventType].filter(cb => cb !== callback);
    }

    close() {
        if (this.eventSource) {
            this.eventSource.close();
            this.isConnected = false;
            console.log('SSE connection closed');
        }
    }
}

export default SSEClient;
如何进行调用
const sseClient = new SSEClient(`sse_url`, {
    withCredentials: true,
    shouldReconnect: false,
    onError: error => {
        console.log(`🌧🌧🌧 [SSEClient error]`, error);
    }
});

sseClient.on('message', value => {
    if (value?.includes('MESSAGE_END')) {
        sseClient.close();
    }

    console.log(`🌧🌧🌧 [SSEClient message]`, value);
});

2. 服务端

SSE 要求服务器与浏览器保持连接。对于不同的服务器软件来说,所消耗的资源是不一样的。Apache 服务器,每个连接就是一个线程,如果要维持大量连接,势必要消耗大量资源。Node 则是所有连接都使用同一个线程,因此消耗的资源会小得多,但是这要求每个连接不能包含很耗时的操作,比如磁盘的 IO 读写。

如何用 Node 起一个 SSE 服务器
// node server.js
var http = require("http");

http.createServer(function (req, res) {
  var fileName = "." + req.url;

  if (fileName === "./stream") {
    res.writeHead(200, {
      "Content-Type":"text/event-stream",
      "Cache-Control":"no-cache",
      "Connection":"keep-alive",
      "Access-Control-Allow-Origin": '*',
    });
    res.write("retry: 10000\n");
    res.write("event: connecttime\n");
    res.write("data: " + (new Date()) + "\n\n");
    res.write("data: " + (new Date()) + "\n\n");

    interval = setInterval(function () {
      res.write("data: " + (new Date()) + "\n\n");
    }, 1000);

    req.connection.addListener("close", function () {
      clearInterval(interval);
    }, false);
  }
}).listen(8844, "127.0.0.1");

参考文章: