refac: stream chunk max buffer size

This commit is contained in:
Timothy Jaeryang Baek 2025-11-09 21:16:34 -05:00
parent 6cb41a59da
commit e76f77bcb7
3 changed files with 14 additions and 12 deletions

View file

@ -570,16 +570,18 @@ else:
CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE = os.environ.get( CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE = os.environ.get(
"CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE", "10485760" # 10MB "CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE", ""
) )
if CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE == "": if CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE == "":
CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE = 1024 * 1024 * 10 CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE = None
else: else:
try: try:
CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE = int(CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE) CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE = int(
CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE
)
except Exception: except Exception:
CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE = 1024 * 1024 * 10 CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE = None
#################################### ####################################

View file

@ -45,7 +45,7 @@ from open_webui.utils.payload import (
) )
from open_webui.utils.misc import ( from open_webui.utils.misc import (
convert_logit_bias_input_to_json, convert_logit_bias_input_to_json,
handle_large_stream_chunks, stream_chunks_handler,
) )
from open_webui.utils.auth import get_admin_user, get_verified_user from open_webui.utils.auth import get_admin_user, get_verified_user
@ -953,7 +953,7 @@ async def generate_chat_completion(
if "text/event-stream" in r.headers.get("Content-Type", ""): if "text/event-stream" in r.headers.get("Content-Type", ""):
streaming = True streaming = True
return StreamingResponse( return StreamingResponse(
handle_large_stream_chunks(r.content), stream_chunks_handler(r.content),
status_code=r.status, status_code=r.status,
headers=dict(r.headers), headers=dict(r.headers),
background=BackgroundTask( background=BackgroundTask(

View file

@ -542,21 +542,21 @@ def extract_urls(text: str) -> list[str]:
return url_pattern.findall(text) return url_pattern.findall(text)
def handle_large_stream_chunks(stream: aiohttp.StreamReader, max_buffer_size: int = CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE): def stream_chunks_handler(stream: aiohttp.StreamReader):
""" """
Handle stream response chunks, supporting large data chunks that exceed the original 16kb limit. Handle stream response chunks, supporting large data chunks that exceed the original 16kb limit.
When a single line exceeds max_buffer_size, returns an empty JSON string {} and skips subsequent data When a single line exceeds max_buffer_size, returns an empty JSON string {} and skips subsequent data
until encountering normally sized data. until encountering normally sized data.
:param stream: The stream reader to handle. :param stream: The stream reader to handle.
:param max_buffer_size: The maximum buffer size in bytes, -1 means not handle large chunks, default is 10MB.
:return: An async generator that yields the stream data. :return: An async generator that yields the stream data.
""" """
if max_buffer_size <= 0: max_buffer_size = CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE
if max_buffer_size is None or max_buffer_size <= 0:
return stream return stream
async def handle_stream_chunks(): async def yield_safe_stream_chunks():
buffer = b"" buffer = b""
skip_mode = False skip_mode = False
@ -604,4 +604,4 @@ def handle_large_stream_chunks(stream: aiohttp.StreamReader, max_buffer_size: in
if buffer and not skip_mode: if buffer and not skip_mode:
yield buffer yield buffer
return handle_stream_chunks() return yield_safe_stream_chunks()