diff --git a/backend/open_webui/utils/misc.py b/backend/open_webui/utils/misc.py index cd90b58839..49465fb3ea 100644 --- a/backend/open_webui/utils/misc.py +++ b/backend/open_webui/utils/misc.py @@ -542,60 +542,66 @@ def extract_urls(text: str) -> list[str]: return url_pattern.findall(text) -async def handle_large_stream_chunks(stream: aiohttp.StreamReader, max_buffer_size: int = CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE): +def handle_large_stream_chunks(stream: aiohttp.StreamReader, max_buffer_size: int = CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE): """ Handle stream response chunks, supporting large data chunks that exceed the original 16kb limit. When a single line exceeds max_buffer_size, returns an empty JSON string {} and skips subsequent data until encountering normally sized data. :param stream: The stream reader to handle. - :param max_buffer_size: The maximum buffer size in bytes. + :param max_buffer_size: The maximum buffer size in bytes, -1 means not handle large chunks, default is 10MB. :return: An async generator that yields the stream data. """ - buffer = b"" - skip_mode = False + if max_buffer_size <= 0: + return stream - async for data, _ in stream.iter_chunks(): - if not data: - continue + async def handle_stream_chunks(): + buffer = b"" + skip_mode = False - # In skip_mode, if buffer already exceeds the limit, clear it (it's part of an oversized line) - if skip_mode and len(buffer) > max_buffer_size: - buffer = b"" + async for data, _ in stream.iter_chunks(): + if not data: + continue - lines = (buffer + data).split(b"\n") + # In skip_mode, if buffer already exceeds the limit, clear it (it's part of an oversized line) + if skip_mode and len(buffer) > max_buffer_size: + buffer = b"" - # Process complete lines (except the last possibly incomplete fragment) - for i in range(len(lines) - 1): - line = lines[i] + lines = (buffer + data).split(b"\n") - if skip_mode: - # Skip mode: check if current line is small enough to exit skip mode - if len(line) <= max_buffer_size: - skip_mode = False - yield line + # Process complete lines (except the last possibly incomplete fragment) + for i in range(len(lines) - 1): + line = lines[i] + + if skip_mode: + # Skip mode: check if current line is small enough to exit skip mode + if len(line) <= max_buffer_size: + skip_mode = False + yield line + else: + yield b"data: {}" else: - yield b"data: {}" - else: - # Normal mode: check if line exceeds limit - if len(line) > max_buffer_size: - skip_mode = True - yield b"data: {}" - log.info(f"Skip mode triggered, line size: {len(line)}") - else: - yield line + # Normal mode: check if line exceeds limit + if len(line) > max_buffer_size: + skip_mode = True + yield b"data: {}" + log.info(f"Skip mode triggered, line size: {len(line)}") + else: + yield line - # Save the last incomplete fragment - buffer = lines[-1] + # Save the last incomplete fragment + buffer = lines[-1] - # Check if buffer exceeds limit - if not skip_mode and len(buffer) > max_buffer_size: - skip_mode = True - log.info(f"Skip mode triggered, buffer size: {len(buffer)}") - # Clear oversized buffer to prevent unlimited growth - buffer = b"" + # Check if buffer exceeds limit + if not skip_mode and len(buffer) > max_buffer_size: + skip_mode = True + log.info(f"Skip mode triggered, buffer size: {len(buffer)}") + # Clear oversized buffer to prevent unlimited growth + buffer = b"" - # Process remaining buffer data - if buffer and not skip_mode: - yield buffer + # Process remaining buffer data + if buffer and not skip_mode: + yield buffer + + return handle_stream_chunks()