From 3da22af859f3bd6db12b196eb6377a0822607806 Mon Sep 17 00:00:00 2001 From: Sihyeon Jang Date: Sat, 16 Aug 2025 12:43:32 +0900 Subject: [PATCH] fix(utils/middleware): flush pending chat deltas on stream termination Guarantees the last partial delta chunk is emitted when the SSE stream closes (EOF, break, or `[DONE]` sentinel). * Buffer `last_delta_data` and track `delta_count` * Flush automatically once `delta_count >= chunk_size` * Perform a final flush after the iterator ends Signed-off-by: Sihyeon Jang --- backend/open_webui/utils/middleware.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/backend/open_webui/utils/middleware.py b/backend/open_webui/utils/middleware.py index 03704cd6be..ac8e5ea388 100644 --- a/backend/open_webui/utils/middleware.py +++ b/backend/open_webui/utils/middleware.py @@ -1849,6 +1849,21 @@ async def process_chat_response( or 1 ), ) + last_delta_data = None + + async def flush_pending_delta_data(threshold: int = 0): + nonlocal delta_count + nonlocal last_delta_data + + if delta_count >= threshold and last_delta_data: + await event_emitter( + { + "type": "chat:completion", + "data": last_delta_data, + } + ) + delta_count = 0 + last_delta_data = None async for line in response.body_iterator: line = line.decode("utf-8") if isinstance(line, bytes) else line @@ -2099,14 +2114,9 @@ async def process_chat_response( if delta: delta_count += 1 + last_delta_data = data if delta_count >= delta_chunk_size: - await event_emitter( - { - "type": "chat:completion", - "data": data, - } - ) - delta_count = 0 + await flush_pending_delta_data(delta_chunk_size) else: await event_emitter( { @@ -2121,6 +2131,7 @@ async def process_chat_response( else: log.debug(f"Error: {e}") continue + await flush_pending_delta_data() if content_blocks: # Clean up the last text block