Server Send Events With Spring Boot

cc
cc
2023-04-02 / 0 评论 / 410 阅读 / 正在检测是否收录...

背景

在使用ChatGPT时,通过http.Post进行调用耗时非常高,体验较差。 OpenAI提供了Stream(即SSE)的返回模式,不用等到响应生成完后才给到客户端响应。

使用Stream API方式时,那么后端与前端也需要使用SSE的方式进行API交互。

源代码在文末。

Server-Sent Events(SSE)是一种在Web浏览器和服务器之间实现服务器推送事件的技术。它允许服务器向客户端推送事件流,这些事件可以是简单的文本消息或具有自定义格式的复杂数据。SSE 是一种基于 HTTP 的协议,使用长轮询或 HTTP 流来实现服务器推送。

SSE 可以用于各种用例,例如实时更新股票报价、即时聊天应用程序、新闻或社交媒体网站上的实时更新等等。与传统的轮询或 WebSocket 不同,SSE 不需要客户端发起请求,而是服务器始终保持连接打开并推送数据,因此 SSE 可以更高效地使用网络带宽和服务器资源。

在前端,可以使用 JavaScript API 来监听服务器发送的事件,以及处理和显示这些事件的数据。

引用:

## open ai chat api
https://platform.openai.com/docs/api-reference/chat/create

## Request ChatGPT API Using Spring boot
https://www.taogenjia.com/2023/03/16/The-Guide-to-Call-ChatGPT-3-5-Event-Stream-API-in-Spring-Boot/

## Doc
https://developer.mozilla.org/en-US/docs/web/api/server-sent_events/using_server-sent_events
https://developer.mozilla.org/zh-CN/docs/Web/API/EventSource

## nginx
https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering

理解

常见的HTTP请求是服务端把命令执行完成过后给到一个结果,那么客户端必须等待,大部分场景下这是没问题的。 那么在处理时间比较高的场景下,客户端可以实时接收部分处理结果,增加用户体验。

我们从另一个角度看Http,

  • HttpServerRequest: 有输入流,包含请求的数据
  • HttpServletResponse:有输出流,可以写响应数据。

那么我们处理Http请求时,不断的向Response的 outputStream写入数据,那么在没有处理完成响应的情况下也可以收到一部分数据。

    @GetMapping("test")
    public void test(HttpServletRequest request, HttpServletResponse response) throws Exception {
        // 设置响应的Content-Type为text/event-stream
        response.setContentType("text/event-stream");
        response.setStatus(200);
        OutputStream outputStream = response.getOutputStream();

        int i = 10;
        while (i >= 0) {
            String message = "i : " + i + "\n";
            outputStream.write(message.getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
            try {
                Thread.sleep(1000);
            } catch (Exception ignored) {}
            i--;
        }
    }

我们打开浏览器访问这个api时,界面会间隔一秒输出一行:

i : 10
i : 9
i : 8
i : 7
i : 6
i : 5
i : 4
i : 3
i : 2
i : 1
i : 0

点击查看效果
有了以上理解,我们只需要按着SSE要求的格式不停的向写入数据即可。

SSE 格式

1、id event data

id: xxx \n
event: message \n
data:  hello from cc\n\n

2、event data

event: message \n
data: hello from cc\n\n

3、data only

data: hello from cc\n\n

其中:
event: 事件名称,可以自己定义,默认是message
id、event: 可选

4、结束推送
如果使用sse,需要服务端进行关闭,如果不关闭前端会不停的重试(retry)

  • 自定义事件

    event: close \n
    data:\n\n

前端订阅然后close,然后关闭sse

  const sse = new EventSource('/api/sse');

  /*
   * 自定义订阅
   */
  sse.addEventListener("close", function(e) {
    sse.close();
  })

  /* This will listen only for events
   * similar to the following:
   *
   * event: notice
   * data: useful data
   * id: someid
   *
   */
  sse.addEventListener("notice", function(e) {
    console.log(e.data)
  })

  /* Similarly, this will listen for events
   * with the field `event: update`
   */
  sse.addEventListener("update", function(e) {
    console.log(e.data)
  })

  /* The event "message" is a special case, as it
   * will capture events without an event field
   * as well as events that have the specific type
   * `event: message` It will not trigger on any
   * other event type.
   */
  sse.addEventListener("message", function(e) {
    console.log(e.data)
  })
  • 发送特殊标记的data

    data: [DONE]\n\n

处理 '[DONE]'的message

 const sse = new EventSource('/api/sse');

  /* The event "message" is a special case, as it
   * will capture events without an event field
   * as well as events that have the specific type
   * `event: message` It will not trigger on any
   * other event type.
   */
  sse.addEventListener("message", function(e) {
    var data = e.data;
    if (data.contains('[DONE]')) {
       sse.close()
    } else {
       console.log(data)
    }
  })

踩坑

1、使用Nginx部署后,消息没有一个一个收到,而是在处理完成后整个返回
修改 nginx

location / {
    proxy_pass http://127.0.0.1:xxx/;
    ...
    # When buffering is disabled, the response is passed to a client synchronously, immediately as 
    # it is received. nginx will not try to read the whole response from the proxied server. The 
    # maximum size of the data that nginx can receive from the server at a time is set by the 
    # proxy_buffer_size directive.
    proxy_buffering off;
     ...
    }

2、如果使用了WebFiler,需要开启asyncSupported


@WebFilter(urlPatterns = "*",asyncSupported = true)
public class SomeFilter implements Filter {
  ....
}

源代码

https://gitee.com/pchenc/sse-demo

1

评论 (0)

取消