From 4fa3da71635454129a7117460b6f5822f26d8468 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Eli=C5=A1=C3=A1k?= <55753928+YetheSamartaka@users.noreply.github.com> Date: Wed, 12 Nov 2025 15:44:55 +0100 Subject: [PATCH] updated nesting after latest dev branch update - updated nesting after latest dev branch update - Improved type safety --- backend/open_webui/utils/middleware.py | 650 +++++++++++++------------ 1 file changed, 341 insertions(+), 309 deletions(-) diff --git a/backend/open_webui/utils/middleware.py b/backend/open_webui/utils/middleware.py index 8c336bfba4..ea2bbe0c40 100644 --- a/backend/open_webui/utils/middleware.py +++ b/backend/open_webui/utils/middleware.py @@ -1769,6 +1769,319 @@ async def process_chat_response( event_emitter = get_event_emitter(metadata) event_caller = get_event_call(metadata) + model_id = form_data.get("model", "") + + def split_content_and_whitespace(content): + content_stripped = content.rstrip() + original_whitespace = ( + content[len(content_stripped) :] + if len(content) > len(content_stripped) + else "" + ) + return content_stripped, original_whitespace + + def is_opening_code_block(content): + backtick_segments = content.split("```") + # Even number of segments means the last backticks are opening a new block + return len(backtick_segments) > 1 and len(backtick_segments) % 2 == 0 + + def serialize_content_blocks(content_blocks, raw=False): + content = "" + + for block in content_blocks: + if block["type"] == "text": + block_content = block["content"].strip() + if block_content: + content = f"{content}{block_content}\n" + elif block["type"] == "tool_calls": + attributes = block.get("attributes", {}) + + tool_calls = block.get("content", []) + results = block.get("results", []) + + if content and not content.endswith("\n"): + content += "\n" + + if results: + + tool_calls_display_content = "" + for tool_call in tool_calls: + + tool_call_id = tool_call.get("id", "") + tool_name = tool_call.get("function", {}).get( + "name", "" + ) + tool_arguments = tool_call.get("function", {}).get( + "arguments", "" + ) + + tool_result = None + tool_result_files = None + for result in results: + if tool_call_id == result.get("tool_call_id", ""): + tool_result = result.get("content", None) + tool_result_files = result.get("files", None) + break + + if tool_result is not None: + tool_result_embeds = result.get("embeds", "") + tool_calls_display_content = f'{tool_calls_display_content}
\nTool Executed\n
\n' + else: + tool_calls_display_content = f'{tool_calls_display_content}
\nExecuting...\n
\n' + + if not raw: + content = f"{content}{tool_calls_display_content}" + else: + tool_calls_display_content = "" + + for tool_call in tool_calls: + tool_call_id = tool_call.get("id", "") + tool_name = tool_call.get("function", {}).get( + "name", "" + ) + tool_arguments = tool_call.get("function", {}).get( + "arguments", "" + ) + + tool_calls_display_content = f'{tool_calls_display_content}\n
\nExecuting...\n
\n' + + if not raw: + content = f"{content}{tool_calls_display_content}" + + elif block["type"] == "reasoning": + reasoning_display_content = html.escape( + "\n".join( + (f"> {line}" if not line.startswith(">") else line) + for line in block["content"].splitlines() + ) + ) + + reasoning_duration = block.get("duration", None) + + start_tag = block.get("start_tag", "") + end_tag = block.get("end_tag", "") + + if content and not content.endswith("\n"): + content += "\n" + + if reasoning_duration is not None: + if raw: + content = ( + f'{content}{start_tag}{block["content"]}{end_tag}\n' + ) + else: + content = f'{content}
\nThought for {reasoning_duration} seconds\n{reasoning_display_content}\n
\n' + else: + if raw: + content = ( + f'{content}{start_tag}{block["content"]}{end_tag}\n' + ) + else: + content = f'{content}
\nThinkingļæ½\n{reasoning_display_content}\n
\n' + + elif block["type"] == "code_interpreter": + attributes = block.get("attributes", {}) + output = block.get("output", None) + lang = attributes.get("lang", "") + + content_stripped, original_whitespace = ( + split_content_and_whitespace(content) + ) + if is_opening_code_block(content_stripped): + # Remove trailing backticks that would open a new block + content = ( + content_stripped.rstrip("`").rstrip() + + original_whitespace + ) + else: + # Keep content as is - either closing backticks or no backticks + content = content_stripped + original_whitespace + + if content and not content.endswith("\n"): + content += "\n" + + if output: + output = html.escape(json.dumps(output)) + + if raw: + content = f'{content}\n{block["content"]}\n\n```output\n{output}\n```\n' + else: + content = f'{content}
\nAnalyzed\n```{lang}\n{block["content"]}\n```\n
\n' + else: + if raw: + content = f'{content}\n{block["content"]}\n\n' + else: + content = f'{content}
\nAnalyzing...\n```{lang}\n{block["content"]}\n```\n
\n' + + else: + block_content = str(block["content"]).strip() + if block_content: + content = f"{content}{block['type']}: {block_content}\n" + + return content.strip() + + def convert_content_blocks_to_messages(content_blocks, raw=False): + messages = [] + + temp_blocks = [] + for idx, block in enumerate(content_blocks): + if block["type"] == "tool_calls": + messages.append( + { + "role": "assistant", + "content": serialize_content_blocks(temp_blocks, raw), + "tool_calls": block.get("content"), + } + ) + + results = block.get("results", []) + + for result in results: + messages.append( + { + "role": "tool", + "tool_call_id": result["tool_call_id"], + "content": result.get("content", "") or "", + } + ) + temp_blocks = [] + else: + temp_blocks.append(block) + + if temp_blocks: + content = serialize_content_blocks(temp_blocks, raw) + if content: + messages.append( + { + "role": "assistant", + "content": content, + } + ) + + return messages + + message = Chats.get_message_by_id_and_message_id( + metadata["chat_id"], metadata["message_id"] + ) + + last_assistant_message = None + try: + if form_data["messages"][-1]["role"] == "assistant": + last_assistant_message = get_last_assistant_message( + form_data["messages"] + ) + except Exception as e: + pass + + initial_content = ( + message.get("content", "") + if message + else last_assistant_message if last_assistant_message else "" + ) + + content_blocks = [ + { + "type": "text", + "content": initial_content, + } + ] + + latest_usage = None + completion_dispatched = False + collected_sources = [] + source_hashes = set() + outlet_result_data = None + outlet_content_override = None + + def extend_sources(items): + if not items: + return + for item in items: + try: + key = json.dumps(item, sort_keys=True) + except (TypeError, ValueError): + key = None + if key and key in source_hashes: + continue + if key: + source_hashes.add(key) + collected_sources.append(item) + + async def dispatch_chat_completed(): + nonlocal completion_dispatched, outlet_result_data, outlet_content_override, latest_usage + if completion_dispatched: + return outlet_result_data + + base_messages = [dict(message) for message in form_data.get("messages", [])] + generated_messages = convert_content_blocks_to_messages( + content_blocks, raw=True + ) + final_messages = [*base_messages, *generated_messages] + + if final_messages: + last_message = final_messages[-1] + if isinstance(last_message, dict) and last_message.get("role") == "assistant": + last_message = {**last_message} + if collected_sources: + last_message["sources"] = collected_sources + if latest_usage: + last_message["usage"] = latest_usage + final_messages[-1] = last_message + + payload = { + "model": model_id, + "messages": final_messages, + "chat_id": metadata["chat_id"], + "session_id": metadata["session_id"], + "id": metadata["message_id"], + "model_item": model, + } + if metadata.get("filter_ids"): + payload["filter_ids"] = metadata["filter_ids"] + + try: + outlet_result_data = await chat_completed(request, payload, user) + + if isinstance(outlet_result_data, dict): + extend_sources(outlet_result_data.get("sources")) + message_updates = outlet_result_data.get("messages") + if isinstance(message_updates, list): + for message_update in message_updates: + if not isinstance(message_update, dict): + continue + if message_update.get("id") != metadata["message_id"]: + continue + + if message_update.get("sources"): + extend_sources(message_update.get("sources")) + + usage_update = message_update.get("usage") + if usage_update: + try: + latest_usage = dict(usage_update) + except Exception: + latest_usage = usage_update + + outlet_content_override = message_update.get("content") + + try: + Chats.upsert_message_to_chat_by_id_and_message_id( + metadata["chat_id"], + metadata["message_id"], + { + **message_update, + }, + ) + except Exception as e: + log.debug(f"Failed to upsert outlet message: {e}") + break + except Exception as e: + log.warning(f"chat_completed outlet failed: {e}") + finally: + completion_dispatched = True + + return outlet_result_data + # Non-streaming response if not isinstance(response, StreamingResponse): if event_emitter: @@ -1833,11 +2146,11 @@ async def process_chat_response( await dispatch_chat_completed() if outlet_content_override is not None: - data["content"] = outlet_content_override + response_data["content"] = outlet_content_override if collected_sources: - data["sources"] = collected_sources + response_data["sources"] = collected_sources if latest_usage: - data["usage"] = latest_usage + response_data["usage"] = latest_usage await event_emitter( { @@ -1966,199 +2279,9 @@ async def process_chat_response( # Streaming response if event_emitter and event_caller: task_id = str(uuid4()) # Create a unique task ID. - model_id = form_data.get("model", "") - - def split_content_and_whitespace(content): - content_stripped = content.rstrip() - original_whitespace = ( - content[len(content_stripped) :] - if len(content) > len(content_stripped) - else "" - ) - return content_stripped, original_whitespace - - def is_opening_code_block(content): - backtick_segments = content.split("```") - # Even number of segments means the last backticks are opening a new block - return len(backtick_segments) > 1 and len(backtick_segments) % 2 == 0 # Handle as a background task async def response_handler(response, events): - def serialize_content_blocks(content_blocks, raw=False): - content = "" - - for block in content_blocks: - if block["type"] == "text": - block_content = block["content"].strip() - if block_content: - content = f"{content}{block_content}\n" - elif block["type"] == "tool_calls": - attributes = block.get("attributes", {}) - - tool_calls = block.get("content", []) - results = block.get("results", []) - - if content and not content.endswith("\n"): - content += "\n" - - if results: - - tool_calls_display_content = "" - for tool_call in tool_calls: - - tool_call_id = tool_call.get("id", "") - tool_name = tool_call.get("function", {}).get( - "name", "" - ) - tool_arguments = tool_call.get("function", {}).get( - "arguments", "" - ) - - tool_result = None - tool_result_files = None - for result in results: - if tool_call_id == result.get("tool_call_id", ""): - tool_result = result.get("content", None) - tool_result_files = result.get("files", None) - break - - if tool_result is not None: - tool_result_embeds = result.get("embeds", "") - tool_calls_display_content = f'{tool_calls_display_content}
\nTool Executed\n
\n' - else: - tool_calls_display_content = f'{tool_calls_display_content}
\nExecuting...\n
\n' - - if not raw: - content = f"{content}{tool_calls_display_content}" - else: - tool_calls_display_content = "" - - for tool_call in tool_calls: - tool_call_id = tool_call.get("id", "") - tool_name = tool_call.get("function", {}).get( - "name", "" - ) - tool_arguments = tool_call.get("function", {}).get( - "arguments", "" - ) - - tool_calls_display_content = f'{tool_calls_display_content}\n
\nExecuting...\n
\n' - - if not raw: - content = f"{content}{tool_calls_display_content}" - - elif block["type"] == "reasoning": - reasoning_display_content = html.escape( - "\n".join( - (f"> {line}" if not line.startswith(">") else line) - for line in block["content"].splitlines() - ) - ) - - reasoning_duration = block.get("duration", None) - - start_tag = block.get("start_tag", "") - end_tag = block.get("end_tag", "") - - if content and not content.endswith("\n"): - content += "\n" - - if reasoning_duration is not None: - if raw: - content = ( - f'{content}{start_tag}{block["content"]}{end_tag}\n' - ) - else: - content = f'{content}
\nThought for {reasoning_duration} seconds\n{reasoning_display_content}\n
\n' - else: - if raw: - content = ( - f'{content}{start_tag}{block["content"]}{end_tag}\n' - ) - else: - content = f'{content}
\nThinking…\n{reasoning_display_content}\n
\n' - - elif block["type"] == "code_interpreter": - attributes = block.get("attributes", {}) - output = block.get("output", None) - lang = attributes.get("lang", "") - - content_stripped, original_whitespace = ( - split_content_and_whitespace(content) - ) - if is_opening_code_block(content_stripped): - # Remove trailing backticks that would open a new block - content = ( - content_stripped.rstrip("`").rstrip() - + original_whitespace - ) - else: - # Keep content as is - either closing backticks or no backticks - content = content_stripped + original_whitespace - - if content and not content.endswith("\n"): - content += "\n" - - if output: - output = html.escape(json.dumps(output)) - - if raw: - content = f'{content}\n{block["content"]}\n\n```output\n{output}\n```\n' - else: - content = f'{content}
\nAnalyzed\n```{lang}\n{block["content"]}\n```\n
\n' - else: - if raw: - content = f'{content}\n{block["content"]}\n\n' - else: - content = f'{content}
\nAnalyzing...\n```{lang}\n{block["content"]}\n```\n
\n' - - else: - block_content = str(block["content"]).strip() - if block_content: - content = f"{content}{block['type']}: {block_content}\n" - - return content.strip() - - def convert_content_blocks_to_messages(content_blocks, raw=False): - messages = [] - - temp_blocks = [] - for idx, block in enumerate(content_blocks): - if block["type"] == "tool_calls": - messages.append( - { - "role": "assistant", - "content": serialize_content_blocks(temp_blocks, raw), - "tool_calls": block.get("content"), - } - ) - - results = block.get("results", []) - - for result in results: - messages.append( - { - "role": "tool", - "tool_call_id": result["tool_call_id"], - "content": result.get("content", "") or "", - } - ) - temp_blocks = [] - else: - temp_blocks.append(block) - - if temp_blocks: - content = serialize_content_blocks(temp_blocks, raw) - if content: - messages.append( - { - "role": "assistant", - "content": content, - } - ) - - return messages - def tag_content_handler(content_type, tags, content, content_blocks): end_flag = False @@ -2335,34 +2458,14 @@ async def process_chat_response( return content, content_blocks, end_flag - message = Chats.get_message_by_id_and_message_id( - metadata["chat_id"], metadata["message_id"] - ) - - tool_calls = [] - - last_assistant_message = None - try: - if form_data["messages"][-1]["role"] == "assistant": - last_assistant_message = get_last_assistant_message( - form_data["messages"] - ) - except Exception as e: - pass - - content = ( - message.get("content", "") - if message - else last_assistant_message if last_assistant_message else "" - ) + nonlocal content_blocks, latest_usage, completion_dispatched, collected_sources, source_hashes, outlet_result_data, outlet_content_override content_blocks = [ { "type": "text", - "content": content, + "content": initial_content, } ] - latest_usage = None completion_dispatched = False collected_sources = [] @@ -2370,94 +2473,8 @@ async def process_chat_response( outlet_result_data = None outlet_content_override = None - def extend_sources(items): - if not items: - return - for item in items: - try: - key = json.dumps(item, sort_keys=True) - except (TypeError, ValueError): - key = None - if key and key in source_hashes: - continue - if key: - source_hashes.add(key) - collected_sources.append(item) - - async def dispatch_chat_completed(): - nonlocal completion_dispatched, outlet_result_data, outlet_content_override, latest_usage - if completion_dispatched: - return outlet_result_data - - base_messages = [dict(message) for message in form_data.get("messages", [])] - generated_messages = convert_content_blocks_to_messages( - content_blocks, raw=True - ) - final_messages = [*base_messages, *generated_messages] - - if final_messages: - last_message = final_messages[-1] - if isinstance(last_message, dict) and last_message.get("role") == "assistant": - last_message = {**last_message} - if collected_sources: - last_message["sources"] = collected_sources - if latest_usage: - last_message["usage"] = latest_usage - final_messages[-1] = last_message - - payload = { - "model": model_id, - "messages": final_messages, - "chat_id": metadata["chat_id"], - "session_id": metadata["session_id"], - "id": metadata["message_id"], - "model_item": model, - } - if metadata.get("filter_ids"): - payload["filter_ids"] = metadata["filter_ids"] - - try: - outlet_result_data = await chat_completed(request, payload, user) - - if isinstance(outlet_result_data, dict): - extend_sources(outlet_result_data.get("sources")) - message_updates = outlet_result_data.get("messages") - if isinstance(message_updates, list): - for message_update in message_updates: - if not isinstance(message_update, dict): - continue - if message_update.get("id") != metadata["message_id"]: - continue - - if message_update.get("sources"): - extend_sources(message_update.get("sources")) - - usage_update = message_update.get("usage") - if usage_update: - try: - latest_usage = dict(usage_update) - except Exception: - latest_usage = usage_update - - outlet_content_override = message_update.get("content") - - try: - Chats.upsert_message_to_chat_by_id_and_message_id( - metadata["chat_id"], - metadata["message_id"], - { - **message_update, - }, - ) - except Exception as e: - log.debug(f"Failed to upsert outlet message: {e}") - break - except Exception as e: - log.warning(f"chat_completed outlet failed: {e}") - finally: - completion_dispatched = True - - return outlet_result_data + content = initial_content + tool_calls = [] reasoning_tags_param = metadata.get("params", {}).get("reasoning_tags") DETECT_REASONING_TAGS = reasoning_tags_param is not False @@ -3277,7 +3294,7 @@ async def process_chat_response( }, ) finally: - await dispatch_chat_completed() + await dispatch_chat_completed() if response.background is not None: await response.background() @@ -3287,8 +3304,10 @@ async def process_chat_response( else: # Fallback to the original response latest_usage = None + async def stream_wrapper(original_generator, events): nonlocal latest_usage + def wrap_item(item): return f"data: {item}\n\n" @@ -3315,11 +3334,24 @@ async def process_chat_response( extra_params=extra_params, ) - if data: - extend_sources(data.get("sources")) - usage = data.get("usage") - if usage: - latest_usage = dict(usage) + if not data: + continue + + if isinstance(data, (bytes, bytearray)): + try: + data = data.decode("utf-8") + except Exception: + data = data.decode("utf-8", "replace") + + if isinstance(data, str): + yield data + continue + + if isinstance(data, dict): + extend_sources(data.get("sources")) + usage = data.get("usage") + if usage: + latest_usage = dict(usage) yield data finally: await dispatch_chat_completed()