首页 > Ai资讯 > Ai知识库 > 模拟ChatGPT流式数据——SSE最佳实践(附可运行案例)

模拟ChatGPT流式数据——SSE最佳实践(附可运行案例)

发布时间:2024年06月06日

在使用 ChatGPT 时,发现输入 prompt 后,是使用流式的效果返回的数据,起初以为使用了 双工协议做的持久化连接,查看其网络请求,发现这个接口的通信方式并非传统的 http 接口或者 WebSockets,而是基于 EventStream 的事件流。

为什么要这样传输,从使用场景上来说,ChatGPT 是一个基于深度学习的大型语言模型,处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的读数据库要慢

接口等待时间过长,显然不合适。对于这种对话场景,ChagtGPT 将先计算出的数据“推送”给用户,采用 SSE 技术边计算边返回,避免用户因为等待时间过长关闭页面。。

IMG_256

概述


Server-Sent Events
服务器推送事件,简称 SSE,是一种服务端实时主动向浏览器推送消息的技术。

SSE HTML5 中一个与通信相关的
API
,主要由两部分组成:

·服务端与浏览器端的通信协议(HTTP协议)

·浏览器端可供 JavaScript 使用的EventSource对象。

从“服务端主动向浏览器实时推送消息”这一点来看,该 API WebSockets API 有一些相似之处。但是,该 API WebSockers API 的不同之处在于:

Server-Sent Events API

WebSockets API

基于 HTTP 协议

基于 TCP 协议

单工,只能服务端单向发送消息

全双工,可以同时发送和接收消息

轻量级,使用简单

相对复杂

内置断线重连和消息追踪的功能

不在协议范围内,需手动实现

文本或使用 Base64 编码和 gzip 压缩的二进制消息

类型广泛

支持自定义事件类型

不支持自定义事件类型

连接数 HTTP/1.1 6 个,HTTP/2 可协商(默认 100

连接数无限制

服务端实现


协议

本质是浏览器发起 http 请求,服务器在收到请求后,返回状态与数据,并附带以下

headers
 Content-Type: text/event-stream 
 Cache-Control: no-cache 
 Connection: keep-alive
 

·SSE API规定推送事件流的 MIME 类型为text/event-stream

·必须指定浏览器不缓存服务端发送的数据,以确保浏览器可以实时显示服务端发送的数据。

·SSE 是一个一直保持开启的 TCP 连接,所以 Connection keep-alive

消息格式

·EventStream(事件流)为UTF-8格式编码的文本或使用 Base64 编码和 gzip 压缩的二进制消息。

·​每条消息由一行或多行字段(eventidretrydata)组成,每个字段组成形式为:字段名:字段值。字段以行为单位,每行一个(即以\n结尾)。

·冒号开头的行为注释行,会被浏览器忽略。​

·每次推送,可由多个消息组成,每个消息之间以空行分隔(即最后一个字段以\n\n结尾)

tips

·如果一行字段中不包含冒号,则整行文本将被视为字段名,字段值为空。

·注释行可以用来防止链接超时,服务端可以定期向浏览器发送一条消息注释行,以保持连接不断。


event


​ 事件类型。如果指定了该字段,则在浏览器收到该条消息时,会在当前EventSource对象(见 4)上触发一个事件,事件类型就是该字段的字段值。可以使用addEventListener方法在当前EventSource对象上监听任意类型的命名事件。
​ 如果该条消息没有
event字段,则会触发EventSource对象onmessage属性上的事件处理函数。

id


​ 事件ID。事件的唯一标识符,浏览器会跟踪事件ID,如果发生断连,浏览器会把收到的最后一个事件ID放到 HTTP Header Last-Event-Id中进行重连,作为一种简单的同步机制。
​ 例如可以在服务端将每次发送的事件ID值自动加 1,当浏览器接收到该事件ID后,下次与服务端建立连接后再请求的 Header 中将同时提交该事件ID,服务端检查该事件ID是否为上次发送的事件ID,如果与上次发送的事件ID不一致则说明浏览器存在与服务器连接失败的情况,本次需要同时发送前几次浏览器未接收到的数据。

retry


​ 重连时间。整数值,单位 ms,如果与服务器的连接丢失,浏览器将等待指定时间,然后尝试重新连接。如果该字段不是整数值,会被忽略。
​ 当服务端没有指定浏览器的重连时间时,由浏览器自行决定每隔多久与服务端建立一次连接(一般为 30s)。


data


​ 消息数据。数据内容只能以一个字符串的文本形式进行发送,如果需要发送一个对象时,需要将该对象以一个 JSON 格式的字符串的形式进行发送。在浏览器接收到该字符串后,再把它还原为一个
JSON
对象。

浏览器 API


​ 在浏览器端,可以使用 JavaScript EventSource API 创建EventSource对象监听服务器发送的事件。一旦建立连接,服务器就可以使用 HTTP 响应的 'text/event-stream' 内容类型发送事件消息,浏览器则可以通过监听
EventSource
对象的
onmessageonopenonerror事件来处理这些消息。

建立连接


EventSource 接受两个参数:URL options
URL http 事件来源,一旦 EventSource 对象被创建后,浏览器立即开始对该 URL 地址发送过来的事件进行监听。
options 是一个可选的对象,包含
withCredentials
属性,表示是否发送凭证(cookieHTTP认证信息等)到服务端,默认为 false

 const eventSource = new EventSource('http_api_url', { withCredentials: true })

IMG_257


​ 与 XMLHttpRequest 对象类型,EventSource 对象有一个 readyState 属性值,具体含义如下表:

readyState

含义

0

浏览器与服务端尚未建立连接或连接已被关闭

1

浏览器与服务端已成功连接,浏览器正在处理接收到的事件及数据

2

浏览器与服务端建立连接失败,客户端不再继续建立与服务端之间的连接

​ 可以使用 EventSource 对象的close方法关闭与服务端之间的连接,使浏览器不再建立与服务端之间的连接。

// 关闭连接
eventSource.close()


监听事件


EventSource 对象本身继承自 EventTarget 接口,因此可以使用 addEventListener() 方法来监听事件。EventSource 对象触发的事件主要包括以下三种:

·open 事件:当成功连接到服务端时触发。

·message 事件:当接收到服务器发送的消息时触发。该事件对象的 data 属性包含了服务器发送的消息内容。

·error 事件:当发生错误时触发。该事件对象的 event 属性包含了错误信息。

IMG_258

IMG_259


tips

EventSource对象的属性监听只能监听预定义的事件类型(openmessageerror)。不能用于监听自定义事件类型。如果要实现自定义事件类型的监听,可以使用addEventListener()方法。

实践


服务端


使用 Node.js 实现 SSE 的简单示例:(知乎的markdown真难用~

const http = require('http') const fs = require('fs')  http.createServer((req, res) => {  
   const url = req.url  
   if (url === '/' || url === 'index.html') { 
   // 如果请求根路径,返回 index.html 文件            fs.readFile('index.html', (err, data) => {  
              if (err) {        
                    res.writeHead(500)        
                    res.end('Error loading')      
              } else {         
                    res.writeHead(200, {'Content-Type': 'text/html'})        
                    res.end(data)
                   }     })   
              } else if (url.includes('/sse')) { 
               // 如果请求 /events 路径,建立 SSE 连接                   res.writeHead(200, {  'Content-Type': 'text/event-stream',  'Cache-Control': 'no-cache',  'Connection': 'keep-alive',  'Access-Control-Allow-Origin': '*', // 允许跨域     })                 // 每隔 1 秒发送一条消息                    let id = 0  
                  const intervalId = setInterval(() => {      
                     res.write(`event: customEvent\n`)
                     res.write(`id: ${id}\n`)
                     res.write(`retry: 30000\n`)
                     const params = url.split('?')[1]  
                     const data = { id, time: new Date().toISOString(), params }
                     res.write(`data: ${JSON.stringify(data)}\n\n`)
                     id++
                     if (id >= 10) {
                          clearInterval(intervalId)
                          res.end()       }     }, 1000)  
 // 当客户端关闭连接时停止发送消息                              req.on('close', () => {  
                               clearInterval(intervalId)
                               id = 0
                               res.end()    
                           })   } else { 
 // 如果请求的路径无效,返回 404 状态码                               res.writeHead(404)
                           res.end()   } }).listen(3000)
               console.log('Server listening on port 3000')

IMG_260


客户端

IMG_261


​ 将上面的两份代码保存为server.jsindex.html,并在命令行中执行node
server.js
启动服务端,然后在浏览器中打开http://localhost:3000即可看到 SSE 效果。

IMG_262

兼容性

发展至今,SSE 已具有广泛的的浏览器兼容性,几乎除 IE 之外的浏览器均已支持。

IMG_263


​ 对于不支持 EventSource 的浏览器,可以使用polyfill实现。判断浏览器是否支持 EventSource

if(typeof(EventSource) !== undefined) {  // 支持 } else { // 不支持,使用 polyfill}


Fetch
实现


​ 虽然使用 SSE 技术可以实现 ChatGPT 一样的打字机效果,但是通过上文请求 type 对比可以发现,在使用 SSE 时,type eventSource,而 ChatGPT fetch。且受浏览器 EventSource API 限制,在使用 SSE 时不能自定义请求头、只能发出 GET 请求,且在大多数浏览器中,URL 限制2000个字符,也无法满足 ChatGPT 参数传递需求。
​ 此时,可以使用 Fetch API 实现一个替代接口,用于模拟 SSE 实现。简单实现如下:


服务端

IMG_264


浏览器

IMG_265

1

IMG_266

2

不同于XMLHttpRequestfetch并未原生提供终止操作方法,可以通过 DOM API [AbortController](https://developer.mozilla.org/zh-CN/docs/Web/API/AbortController)AbortSignal实现 fetch 请求终止操作。

​ 将上面的两份代码保存为server-fetch.jsindex-fetch.html,并在命令行中执行node
server-fetch.js
启动服务端,然后在浏览器中打开http://localhost:3001即可看到 fetch SSE 效果。

IMG_267

爬坑

代理环境

IMG_268

pending时间过长

在本地代理的加持下,存在eventstream在当前请求结束后一股脑发完的情况

IMG_269

后台限定1s一条,但到了客户端便成了上图这样(与云服务器流量大小也有一定关系)

重定向

IMG_270

nginx重定向接口后,存在等待服务器返回的问题(与云服务器流量大小也有一定关系)

总结


SSE 技术是一种轻量级的实时通信技术,基于 HTTP 协议,具有服务端推送、断线重连、简单轻量等优点。但是,SSE 技术也有一些缺点,如不能进行双向通信、连接数受限、仅支持 get 请求等。
SSE 可以在 Web 应用程序中实现诸如股票在线数据、日志推送、聊天室实时人数等即时数据推送功能。需要注意的是,SSE 并不是适用于所有的实时推送场景。在需要高并发、高吞吐量和低延迟的场景下,WebSockets
可能更加适合。而在需要更轻量级的推送场景下,SSE 可能更加适合。因此,在选择即时更新方案时,需要根据具体的需求和场景进行选择。

出自:https://zhuanlan.zhihu.com/p/656030695#showWechatShareTip?utm_source=wechat_session&utm_medium=social&wechatShare=1&s_r=0