注册

AI场景前端必学——SSE流式传输

背景


由于大模型通常是需要实时推理的,Web 应用调用大模型时,它的标准模式是浏览器提交数据,服务端完成推理,然后将结果以 JSON 数据格式通过标准的 HTTP 协议返回给前端。但是这么做有一个问题,主要是推理所花费的时间和问题复杂度、以及生成的 token 数量有关。在日常使用中会发现,只是简单问候一句,可能 Deepseek 推理所花费的时间很少,但是如果我们提出稍微复杂一点的要求,比如编写一本小说的章节目录,或者撰写一篇千字的作文,那么 AI 推理的时间会大大增加,这在具体应用中就带来一个显而易见的问题,那就是用户等待的时间很长。能够发现,我们在使用线上大模型服务时,不管是哪一家大模型,通常前端的响应速度并没有太慢,这正是因为它们默认采用了流式(streaming)传输,不必等到整个推理完成再将内容返回,而是可以将逐个 token 实时返回给前端,这样就大大减少了响应时间。


服务端推送


服务端推送,也称为消息推送或通知推送,是一种允许应用服务器主动将信息发送到客户端的能力,为客户端提供了实时的信息更新和通知,增强了用户体验。


服务端推送的背景与需求主要基于以下几个诉求:


实时通知:在很多情况下,用户期望实时接收到应用的通知,如新消息提醒、商品活动提醒等。节省资源:如果没有服务端推送,客户端需要通过轮询的方式来获取新信息,会造成客户端、服务端的资源损耗。通过服务端推送,客户端只需要在收到通知时做出响应,大大减少了资源的消耗。增强用户体验:通过服务端推送,应用可以针对特定用户或用户群发送有针对性的内容,如优惠活动、个性化推荐等。这有助于提高用户对应用的满意度和黏性。


常见推送场景有:微信消息通知栏、新闻推送、外卖状态 等等,我们自身的推送场景有:下载、连线请求、直播提醒 ......


解决方案


传统实时处理方案:


轮询:这是一种较为传统的方式,客户端会定时地向服务端发送请求,询问是否有新数据。服务端只需要检查数据状态,然后将结果返回给客户端。轮询的优点是实现简单,兼容性好;缺点是可能产生较大的延迟,且对服务端资源消耗较高。长轮询(Long Polling):轮询的改进版。客户端向服务器发送请求,服务器收到请求后,如果有新的数据,立即返回给客户端;如果没有新数据,服务器会等待一定时间(比如30秒超时时间),在这段时间内,如果有新数据,就返回给客户端,否则返回空数据。客户端处理完服务器返回的响应后,再次发起新的请求,如此反复。长轮询相较于传统的轮询方式减少了请求次数,但仍然存在一定的延迟。


HTML5 标准引入的实时处理方案:


WebSocket:一种双向通信协议,同时支持服务端和客户端之间的实时交互。WebSocket 是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。SSE:Server-Sent Events 服务器推送事件,简称 SSE,是一种服务端实时主动向浏览器推送消息的技术。SSE 是 HTML5 中一个与通信相关的 API,主要由两部分组成:服务端与浏览器端的通信协议( HTTP 协议)及浏览器端可供 JavaScript 使用的 EventSource 对象。


​ 从“服务端主动向浏览器实时推送消息”这一点来看,SSE 与 WebSockets API 有一些相似之处。但是,SSE 与 WebSockers API 的不同之处在于:


Server-Sent Events APIWebSockets API
协议基于 HTTP 协议基于 TCP 协议
通信单工,只能服务端单向发送消息全双工,可以同时发送和接收消息
量级轻量级,使用简单相对复杂
自动重连内置断线重连和消息追踪的功能不在协议范围内,需手动实现
数据格式文本或使用 Base64 编码和 gzip 压缩的二进制消息类型广泛
事件支持自定义事件类型不支持自定义事件类型
连接数连接数 HTTP/1.1 6 个,HTTP/2 可协商(默认 100)连接数无限制
浏览器支持大部分支持,但在ie及早期的edge浏览器中并不被支持主流浏览器(包括移动端)的支持较好

第三方推送:


常见的有操作系统提供相应的推送服务,如苹果的APNs(Apple Push Notification service)、谷歌的FCM(Firebase Cloud Messaging)等。同时,也有一些跨平台的推送服务,如个推、极光推送、友盟推送等,帮助开发者在不同平台上实现统一的推送功能。


这种推送方式在生活中十分常见,一般你打开手机就能看到各种信息推送,基本就是利用第三方推送来实现。


SSE


developer.mozilla.org/zh-CN/docs/…


SSE 服务端推送,它基于 HTTP 协议,易于实现和部署,特别适合那些需要服务器主动推送信息、客户端只需接收数据的场景:


image.png


EventSource


developer.mozilla.org/zh-CN/docs/…


服务器发送事件 API (SSE)包含在 eventSource 接口中。换句话说 eventsource 接口是 web 内容与服务器发送事件通信的接口。一个 eventsource 实例会对 HTTP 服务器开启一个持久化的连接,以text/event-stream格式发送事件,此连接会一直保持开启直到通过调用EventSource.close()关闭。


image.png
一旦连接开启,来自服务端传入的消息会以事件的形式分发至你代码中。如果接收消息中有一个 event 字段,触发的事件与 event 字段的值相同。如果不存在 event 字段,则将触发通用的 message 事件。


建立连接


EventSource 接受两个参数:URL 和 options。


​ URL 为 http 事件来源,一旦 EventSource 对象被创建后,浏览器立即开始对该 URL 地址发送过来的事件进行监听。


​ options 是一个可选的对象,包含 withCredentials 属性,表示是否发送凭证(cookie、HTTP认证信息等)到服务端,默认为 false。


const eventSource = new EventSource('http_api_url', { withCredentials: true })

// 关闭连接
eventSource.close()

// 可以使用addEventListener()方法监听
eventSource.addEventListener('open', function(event) {
console.log('Connection opened')
})

eventSource.addEventListener('message', function(event) {
console.log('Received message: ' + event.data);
})

// 监听自定义事件
eventSource.addEventListener('xxx', function(event) {
console.log('Received message: ' + event.data);
})

eventSource.addEventListener('error', function(event) {
console.log('Error occurred: ' + event.event);
})
// 也可以使用属性监听的方式
eventSource.onopen = function(event) {
console.log('Connection opened')
}

eventSource.onmessage = function(event) {
console.log('Received message: ' + event.data);
}

eventSource.onerror = function(event) {
console.log('Error occurred: ' + event.event);
})

Stream API


developer.mozilla.org/zh-CN/docs/…


Stream API 允许 JavaScript 以编程方式访问从网络接收的数据流,并且允许开发人员根据需要处理它们。


流会将你想要从网络接受的资源分成一个个小的分块,然后按位处理它。


image.png


@microsoft/fetch-event-source


http://www.npmjs.com/package/@mi…


默认的浏览器eventSource API在以下方面存在一些限制:


无法传递请求体(request body),必须将执行请求所需的所有信息编码到 URL 中,而大多数浏览器对 URL 的长度限制为 2000 个字符。无法传递自定义请求头。只能进行 GET 请求,无法指定其他方法。如果连接中断,无法控制重试策略,浏览器会自动进行几次尝试然后停止。


@microsoft/fetch-event-source 的优势:


@microsoft/fetch-event-source提供了一个基于 Fetch API 的替代接口,完全兼容 Event Stream 格式。这使得我们能够以更加灵活的方式进行服务器发送事件的消费。以下是该库的一些主要优势:


支持任何请求方法、请求头和请求体,以及 Fetch API 提供的其他功能。甚至可以提供替代的 fetch() 实现,以应对默认浏览器实现无法满足需求的情况。


提供对响应对象的访问权限,允许在解析事件源之前进行自定义验证/处理。这在存在 API 网关(如 nginx)的情况下非常有用,如果网关返回错误,我们可能希望正确处理它。


对连接中断或发生错误时,提供完全控制的重试策略。


此外,该库还集成了浏览器的 Page Visibility API,使得在文档被隐藏时(例如用户最小化窗口),连接会关闭,当文档再次可见时会自动使用上次事件 ID 进行重试。这有助于减轻服务器负担,避免不必要的开放连接(但如果需要,可以选择禁用此行为)。


import { fetchEventSource } from "@microsoft/fetch-event-source";

const Assistant: React.FC<Iprops> = (props) => {
const [abortController, setAbortController] = useState(new AbortController());

const send = (question: any) => {
setIsAnswering(true);
setIsScrollAtBottom(true);
setAskText("");

// 创建“生成中...”的占位符消息
const loadingMessage = { content: "生成中...", chatSenderType: 0, isLoading: true };

// 更新 chatList,添加用户消息和占位符消息
setChatList([...chatList, { content: question.text, chatSenderType: 1, problemType: question.problemType }, loadingMessage]);

setLoading(true); // 开始加载

fetchEventSource("https://demo.com/chat", {
method: "post",
body: JSON.stringify({ message: question.text, systemType, oa, problemType: question.problemType }),
headers: {
"Content-Type": "application/json"
},
signal: abortController.signal,
async onopen(response) {
// 可以在这里进行一些操作
},
onmessage(msg: { data: string }) {
msg.data.length && setStopDisabled(false);
// 接收到实际响应后,更新 chatList 中的占位符消息
const newMessage = { ...JSON.parse(msg.data).data, chatSenderType: 0, isLoading: false };
setChatList((prevChatList: any[]) => {
// 替换最后一个消息(占位符)为实际消息
const updatedChatList = [...prevChatList];
updatedChatList[updatedChatList.length - 1] = newMessage;
return updatedChatList;
});

setIsScrollAtBottom(true);
setLoading(false); // 加载完成
},
onclose() {
setIsStop(true);
setLoading(false); // 加载完成
setIsAnswering(false);
// 停止生成禁用
setStopDisabled(true);
},
onerror(err) {
abortController.abort();
setLoading(false); // 加载出错,停止加载
throw err;
}
});
};

const stop = async () => {
abortController.abort();
const answer = chatList[chatList.length - 1];
setAbortController(new AbortController());
setIsAnswering(false);
setLoading(false); // 停止加载
stopAnswer({ message: answer.content, messageId: answer.messageId, problemType: answer.problemType, systemType, oa }).then((res: any) => {
message.success("操作成功");
});
};

return (
<div>
<Chat
chatList={chatList}
setChatList={setChatList}
askText={askText}
setAskText={setAskText}
send={send}
stop={stop}
/>

</div>

)
};

AbortController


developer.mozilla.org/zh-CN/docs/…


在前端开发中,网络请求是不可或缺的一环。但在处理网络请求时,我们经常会遇到需要中途取消请求的情况。这时候,abortController可以帮助大家更好地掌控网络请求。


简介


AbortController是一个Web API,它提供了一个信号对象(AbortSignal),该对象可以用来取消与Fetch API相关的操作。当我们创建AbortController实例时,会自动生成一个与之关联的AbortSignal对象。我们可以将这个AbortSignal对象作为参数传递给fetch函数,从而实现对网络请求的取消控制。


使用方法


创建AbortController实例获取AbortSignal对象使用signal对象发起fetch请求取消fetch请求


const controller = new AbortController();
const signal = controller.signal;
// 当需要取消请求时,我们只需调用AbortController实例的abort方法:
fetch(url, { signal }).then(response => {
// 处理响应数据
}).catch(error => {
if (error.name === 'AbortError') {
console.log('Fetch 请求已被取消');
} else {
// 处理其他错误
}
});
// 当需要取消请求时,我们只需调用AbortController实例的abort方法:
controller.abort();

参考资料


blog.csdn.net/ldw20151080…


juejin.cn/post/722112…


http://www.npmjs.com/package/@mi…


segmentfault.com/a/119000004…


作者:Yancy_
来源:juejin.cn/post/7504843440778870794

0 个评论

要回复文章请先登录注册