作者

Sales Engineer at Intersystems
文章 Jeff Liu · 2 小时 前 8m read

将服务器发送事件(SSE)引入 ObjectScript:实现 AI 流式传输

引言 - ObjectScript 中的人工智能流问题

今天,我想介绍一下我在将 AI API 集成到 ObjectScript 应用程序时遇到的一个问题以及找到的解决方案。我最初的测试很成功,但也有些令人沮丧。

HTTP 调用成功了;请求正确地发送到了我的 LLM API。但随后,沉默......漫长的等待。最终,整个响应以单个块的形式到达。

从技术上讲,它成功了,但与 ChatGPT 会话相比,用户体验令人失望。

现代模型设计为逐个令牌流式输出。这使得等待时间大大缩短,因为即使尚未生成完整的答案,您也可以开始读取响应。 要启用此行为,只 需向 API传递 stream=true。 不过,在这种看似简单的操作背后有一个重要的细节:流媒体依赖于服务器发送事件(Server-Sent Events,SSE)。

如果不支持客户端 SSE,就无法利用这种模式。

对于通常在 ObjectScript 中使用的 %Net.HttpRequest 类,响应会被缓冲,直到连接关闭。换句话说,没有增量读取,没有渐进标记,因此也就没有流。

如果我们希望将 LLM 集成到 IRIS 应用程序中,那么能够处理 文本/事件 流、即时解析事件和实时处理数据是至关重要的

正因如此,我在 fast-http 项目中添加了对服务器发送事件的客户端支持,使 ObjectScript 应用程序能够以流式模式使用 AI API。

为什么 LLM API 使用服务器发送事件而不是 WebSocket?

当您发现 OpenAI、Anthropic 或 Mistral AI 的 API 使用服务器发送事件(Server-Sent Events)进行流式传输时,自然会产生一个问题:

为什么是 SSE?为什么不是 WebSocket?

乍一看,WebSocket 似乎是 "实时 "通信的不二之选。不过,就 LLM 而言,SSE 在架构上有几个优势。

单向流就足够了

在 LLM 流中,客户端发送请求,服务器逐步生成响应。不需要持续的双向通信。

这种模式非常简单:

Client  →  HTTP Request
Server →  Continuous Data Stream

SSE 完全是为这种单向服务器到客户端模式而设计的。WebSocket 本身就是双向的,这就为这种特定用例增加了不必要的复杂性。

SSE 保留标准 HTTP

SSE 依靠 HTTP/1.1 和简单的内容类型:

Content-Type: text/event-stream

没有像 WebSockets 那样的特定要求或协议升级。这意味着

  • 与现有基础设施的本地兼容性
  • 与代理和负载平衡器更好地集成
  • 服务器端的简单性

流式传输过程仍然是一个 "简单 "的长期 HTTP 请求。

因此,SSE 是理想的折中方案。 客户端不需要在生成过程中发送消息;服务器会生成一个顺序流,一旦响应完成,连接就会自然关闭。

服务器发送的事件流剖析

SSE 流既不是 JSON,也不是传统的 "分块 JSON"。

它是一种基于行的结构化文本流,格式非常简单但很特殊。

服务器响应如下:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

然后,服务器保持连接打开并发送一系列事件。

SSE 事件的结构

此类事件由 "key: value "行和一个空行组成。

基本示例

data: Hello world

空行至关重要;它向客户端发出信号,表示事件已完成。

标准字段

SSE 事件可包含多个字段:

  • retry(重试:建议的重新连接延迟
  • event(事件:自定义事件类型
  • id事件标识符事件标识符
  • data数据主要有效载荷

事件也可以是一个简单的注释。在这种情况下,以冒号(:开头,例如:

: this is a comment

注释通常用作保持连接的机制。

真实案例:流式传输 LLM API

对于像 OpenAI 这样的 LLM API,典型的数据流如下所示:

data: {"id":"...","choices":[{"delta":{"content":"Hel"}}]}

data: {"id":"...","choices":[{"delta":{"content":"lo"}}]}

data: {"id":"...","choices":[{"delta":{"content":" world"}}]}

data: [DONE]

每个 数据: 块对应一个生成块。

要正确使用 SSE 数据流,我们必须做到以下几点:

  • 增量读取响应。
  • 累积收到的行数。
  • 检测事件分隔符(空行)。
  • 解析数据:内容。

在开始处理之前,我们不应等待连接关闭。

这正是使用传统的 %Net.HttpRequest 会出现问题的地方 :我们必须在收到完整的响应后才能开始处理。

ObjectScript 面临的挑战

有了 fast-http,我们就可以很容易地完成类似下面这样的聊天调用:

Set body = {
"model": "gpt-4",
"messages": [{"role": "user", "content": "Tell me a short story."}],
"stream": true
}
Set response = ##class(dc.http.FastHTTP).DirectPost("url=https://api.openai.com/v1/chat/completions,Header_Authorization=Bearer {YOUR_TOKEN},Header_Accept=text/event-stream", body, .client)

但是,在完全收到响应之前,我们无法对其进行处理。

拦截响应流

要在事件到达时对其进行处理,我们需要能在收到数据时准确访问数据。在探索 %Net.HttpRequest 提供的可能性时,我遇到了一个有趣的属性:ResponseStream(响应流)。

该属性允许我们定义 HTTP 响应将被写入的流。默认情况下,IRIS 会创建一个 %Stream 实例,因此我们通常不会去碰它,甚至会忽略它的存在。

我们的想法很简单:与其让 %Net.HttpRequest 将响应写入标准流,不如提供一个我们自己控制的流。

为此,只需创建一个继承自 %Stream.GlobalBinary 的,并覆盖 WriteWriteLine 写入方法即可:

/// Stream used to handle stream mode response
Class dc.http.Stream Extends %Stream.GlobalBinary
{

Property SSEHandler As dc.http.SSEHandler;

Method Write(data As %String = "") As %Status
{
    Do ..Notify(data)
    Return ##super(data)
}

Method WriteLine(data As %String = "") As %Status
{
    Do ..Notify(data _ ..LineTerminator)
    Return ##super(data)
}

Method Notify(data As %String) As %Status
{
    Return:'$IsObject(..SSEHandler) $$$OK
    Return ..SSEHandler.BufferProcessing($ZConvert(data, "I", "UTF8"))
}
}

我们就有了钩子!
每次 %Net.HttpRequest 从服务器接收数据 ,数据都会自动写入 ResponseStream

通过拦截 WriteWriteLine 方法,我们可以进行以下操作:

  • 分析接收到的数据。
  • 累积数据流的行数。
  • 检测 SSE 事件的结束。
  • 立即触发相应的处理。

换句话说,我们可以将原始 HTTP 流转化为实时、可操作的事件流。我们只需在发送请求前分配我们的自定义流即可:

Set httpRequest = ##class(%Net.HttpRequest).%New()
Set httpRequest.ResponseStream = ##class(dc.http.Stream).%New()

从这一刻起,从服务器接收到的每一个块都会通过我们的流,这样我们就可以在 ObjectScript 中构建 SSE 客户端了。下一步是正确解析文本/事件流事件,以重建数据:块并触发应用程序回调。

SSE 解析器架构

SSE 支持不是直接在流类中实现解析,而是围绕三个主要组件构建的:

  • 截取接收数据的流(Stream)
  • 解释 SSE 事件的处理程序(Handler)
  • 一个 适配器(Adapter) ,将这些事件暴露给应用程序。

目标是明确区分责任:

  • 流严格处理接收网络数据。
  • 处理程序解析 SSE 协议、管理缓冲区 分割信息, 然后调用适配器。
  • 适配器将这些事件转换成应用程序代码中可用的回调。

这种设计允许灵活、轻松地重复使用。

fast-http的用户 不再需要担心解析问题;他们可以选择提供的适配器,也可以实现自定义的适配器。

diagram_class_sse.png

从代码上看,调用一个响应事件的端点只需几行代码:

Set stream = ##class(dc.http.SSEChatConsoleAdapter).GetStream()
Set config = "url=http://sse-mock:5000/stream,timeout=10"
Set response = ##class(dc.http.FastHTTP).DirectGet(config, , , stream)

如果你已经使用 docker-compose.yml 文件 启动了容器 ,上面的示例就可以使用 sse_server.py 的硬编码响应

如果你有一个密钥或其他访问与 OpenAI 兼容的 LLM API 的权限,你可以调整下面的示例,使其适用于 v1/chat/completion 调用:

Set stream = ##class(dc.http.SSEChatConsoleAdapter).GetStream()
Set body = {"model": "gpt-4","messages": [{"role": "user", "content": "Tell me a short story."}],"stream": true}
Set response = ##class(dc.http.FastHTTP).DirectPost("url=https://api.openai.com/v1/chat/completions,Header_Authorization=Bearer {MyToken}", body, .client, stream)

为了更好地理解运行时发生的情况,请看下面的序列图:

diagram_sequence_sse.png

适配器(Adapter)

在撰写本文时,fast-http 实现了两个适配器:

SSEBasicAdapter
该适配器只在收到解析后的 SSE 事件时显示该事件。它对于调试或了解工作原理特别有用。

SSEChatConsoleAdapter
该适配器专为聊天完成会话定制,它会检查收到的消息类型,如果是 chat.completion.chunk 消息,则会立即输出生成的文本。

显然,我们还可以设计其他更通用的适配器,例如将接收到的 SSE 消息转换为发出的 WebSocket 消息。

开发人员只需继承 dc.http.SSEAdapter 并实现 OnMessage 方法,即可创建自定义适配器

用 ObjectScript 创建聊天会话

有了这些工具,在 ObjectScript 中构建聊天会话就变得简单易行了。 下面是一个示范方法 (注: $Char(27)... 序列是 ANSI 转义码,用于在终端内添加颜色)

ClassMethod DemoSession(systemPrompt As %String = "You are a helpful assistant.", model As %String = "gpt-4") As %Status
{
    Set config = "url=https://api.openai.com/v1/chat/completions,Header_Authorization=Bearer {MyToken},Header_Accept=text/event-stream"
    Set chat = { "model": (model), "messages": [{"role": "system", "content": (systemPrompt)}], "stream": true }
    Write !,"Ask something to start a session.  Type ""exit"" to quit.",!

    Set stream = ##class(dc.http.SSEChatConsoleAdapter).GetStream()
    Set adapter = stream.SSEHandler.Adapter

    For  {
        Write !,!, $Char(27)_"[1;34mYour message: "_$Char(27)_"[0m"
        Read input
        Quit:input="exit"
        Write !,!, $Char(27)_"[1;32mResponse: "_$Char(27)_"[0m",!
        Do chat.messages.%Push({"role": "user", "content": (input)})
        Do ##class(dc.http.FastHTTP).DirectPost(config, chat, .client, stream)
        Do chat.messages.%Push(adapter.GetAssistantMessage())
    }
    Return $$$OK
}

下面是该脚本的操作视频短片。不过,在录制之前,我已经将端点更改为具有轻量级本地 LLM 的点:


结论

关于 fast-http 的工作源于一个简单的挫折:当我开始尝试使用 ObjectScript 中的 LLM API 时,我清楚地发现缺少了一些东西。要编写的代码很多,而且响应都是一次性到达,而人工智能助手从根本上依赖于令牌流。

通过探索服务器发送事件(Server-Sent Events)的工作原理和 研究%Net.HttpRequest 提供的可能性 ,我意识到可以通过一些恰当的抽象来填补这一空白。于是,我在 fast-http 中实现了 SSE 支持,并围绕流、处理程序和适配器构建了一个更完整的架构。

最终,从最初单纯的技术实验转变为在 ObjectScript 应用程序中集成人工智能应用程序接口的更自然的方式。看到聊天会话直接在 IRIS 终端中运行,令牌在生成过程中出现,我感到非常满意。

我希望现在向社区开放的这项工作能促进其他开发人员使用 IRIS 进行 LLM 实验,或许还能带来一些灵感。