feat: Allow configuration of not process large single-line data

This commit is contained in:
Shirasawa 2025-11-06 04:02:49 +00:00
parent 89c0e150c8
commit ce1079d358

View file

@ -542,60 +542,66 @@ def extract_urls(text: str) -> list[str]:
return url_pattern.findall(text) return url_pattern.findall(text)
async def handle_large_stream_chunks(stream: aiohttp.StreamReader, max_buffer_size: int = CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE): def handle_large_stream_chunks(stream: aiohttp.StreamReader, max_buffer_size: int = CHAT_STREAM_RESPONSE_CHUNK_MAX_BUFFER_SIZE):
""" """
Handle stream response chunks, supporting large data chunks that exceed the original 16kb limit. Handle stream response chunks, supporting large data chunks that exceed the original 16kb limit.
When a single line exceeds max_buffer_size, returns an empty JSON string {} and skips subsequent data When a single line exceeds max_buffer_size, returns an empty JSON string {} and skips subsequent data
until encountering normally sized data. until encountering normally sized data.
:param stream: The stream reader to handle. :param stream: The stream reader to handle.
:param max_buffer_size: The maximum buffer size in bytes. :param max_buffer_size: The maximum buffer size in bytes, -1 means not handle large chunks, default is 10MB.
:return: An async generator that yields the stream data. :return: An async generator that yields the stream data.
""" """
buffer = b"" if max_buffer_size <= 0:
skip_mode = False return stream
async for data, _ in stream.iter_chunks(): async def handle_stream_chunks():
if not data: buffer = b""
continue skip_mode = False
# In skip_mode, if buffer already exceeds the limit, clear it (it's part of an oversized line) async for data, _ in stream.iter_chunks():
if skip_mode and len(buffer) > max_buffer_size: if not data:
buffer = b"" continue
lines = (buffer + data).split(b"\n") # In skip_mode, if buffer already exceeds the limit, clear it (it's part of an oversized line)
if skip_mode and len(buffer) > max_buffer_size:
buffer = b""
# Process complete lines (except the last possibly incomplete fragment) lines = (buffer + data).split(b"\n")
for i in range(len(lines) - 1):
line = lines[i]
if skip_mode: # Process complete lines (except the last possibly incomplete fragment)
# Skip mode: check if current line is small enough to exit skip mode for i in range(len(lines) - 1):
if len(line) <= max_buffer_size: line = lines[i]
skip_mode = False
yield line if skip_mode:
# Skip mode: check if current line is small enough to exit skip mode
if len(line) <= max_buffer_size:
skip_mode = False
yield line
else:
yield b"data: {}"
else: else:
yield b"data: {}" # Normal mode: check if line exceeds limit
else: if len(line) > max_buffer_size:
# Normal mode: check if line exceeds limit skip_mode = True
if len(line) > max_buffer_size: yield b"data: {}"
skip_mode = True log.info(f"Skip mode triggered, line size: {len(line)}")
yield b"data: {}" else:
log.info(f"Skip mode triggered, line size: {len(line)}") yield line
else:
yield line
# Save the last incomplete fragment # Save the last incomplete fragment
buffer = lines[-1] buffer = lines[-1]
# Check if buffer exceeds limit # Check if buffer exceeds limit
if not skip_mode and len(buffer) > max_buffer_size: if not skip_mode and len(buffer) > max_buffer_size:
skip_mode = True skip_mode = True
log.info(f"Skip mode triggered, buffer size: {len(buffer)}") log.info(f"Skip mode triggered, buffer size: {len(buffer)}")
# Clear oversized buffer to prevent unlimited growth # Clear oversized buffer to prevent unlimited growth
buffer = b"" buffer = b""
# Process remaining buffer data # Process remaining buffer data
if buffer and not skip_mode: if buffer and not skip_mode:
yield buffer yield buffer
return handle_stream_chunks()