From 631d4441aa620473aa72e589b1ebe9eeeca757a3 Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Tue, 19 Aug 2025 19:10:15 +0200 Subject: [PATCH 1/6] UPD: ExceptionGroup catch in tasks.py ExceptionGroup catch in tasks.py Added caching for multiple concurrent exceptions. Modified: - async def redis_task_command_listener - async def cleanup_task --- backend/open_webui/tasks.py | 58 ++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 10 deletions(-) diff --git a/backend/open_webui/tasks.py b/backend/open_webui/tasks.py index 714c532fca..579fe28975 100644 --- a/backend/open_webui/tasks.py +++ b/backend/open_webui/tasks.py @@ -7,6 +7,7 @@ import logging from redis.asyncio import Redis from fastapi import Request from typing import Dict, List, Optional +from builtins import ExceptionGroup from open_webui.env import SRC_LOG_LEVELS, REDIS_KEY_PREFIX @@ -33,12 +34,33 @@ async def redis_task_command_listener(app): if message["type"] != "message": continue try: - command = json.loads(message["data"]) + # Check if message data is empty or None + if not message["data"]: + log.warning("Received empty message data from Redis pub/sub") + continue + + # Attempt to parse JSON + try: + command = json.loads(message["data"]) + except json.JSONDecodeError as json_error: + log.warning(f"Invalid JSON in Redis message: {message['data'][:100]}... Error: {json_error}") + continue + if command.get("action") == "stop": task_id = command.get("task_id") local_task = tasks.get(task_id) if local_task: - local_task.cancel() + try: + local_task.cancel() + # Wait briefly for cancellation to complete + await asyncio.sleep(0.1) + except Exception as cancel_error: + log.error(f"Error cancelling task {task_id}: {cancel_error}") + except ExceptionGroup as eg: + # Handle multiple concurrent exceptions + log.error(f"Multiple errors in task command processing: {len(eg.exceptions)} exceptions") + for i, exc in enumerate(eg.exceptions): + log.error(f" Exception {i+1}: {type(exc).__name__}: {exc}") except Exception as e: log.exception(f"Error handling distributed task command: {e}") @@ -80,18 +102,34 @@ async def redis_send_command(redis: Redis, command: dict): async def cleanup_task(redis, task_id: str, id=None): """ - Remove a completed or canceled task from the global `tasks` dictionary. + Remove a completed or canceled task with proper exception handling. """ + cleanup_errors = [] + + # Redis cleanup if redis: - await redis_cleanup_task(redis, task_id, id) + try: + await redis_cleanup_task(redis, task_id, id) + except Exception as e: + cleanup_errors.append(e) + log.error(f"Redis cleanup failed for task {task_id}: {e}") - tasks.pop(task_id, None) # Remove the task if it exists + # Local cleanup + try: + tasks.pop(task_id, None) + if id and task_id in item_tasks.get(id, []): + item_tasks[id].remove(task_id) + if not item_tasks[id]: + item_tasks.pop(id, None) + except Exception as e: + cleanup_errors.append(e) + log.error(f"Local cleanup failed for task {task_id}: {e}") - # If an ID is provided, remove the task from the item_tasks dictionary - if id and task_id in item_tasks.get(id, []): - item_tasks[id].remove(task_id) - if not item_tasks[id]: # If no tasks left for this ID, remove the entry - item_tasks.pop(id, None) + # If multiple errors occurred, group them + if len(cleanup_errors) > 1 and ExceptionGroup: + raise ExceptionGroup(f"Multiple cleanup errors for task {task_id}", cleanup_errors) + elif cleanup_errors: + raise cleanup_errors[0] async def create_task(redis, coroutine, id=None): From aa5f5330d3abd1a3780cd7e883e75c80cf3273ef Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Tue, 19 Aug 2025 19:13:04 +0200 Subject: [PATCH 2/6] UPD: ExceptionGroup catch - update main.py ExceptionGroup catch - update main.py for catch and handle multiple concurrent exceptions Add caching for concurrent exceptions. Modified: - redis_task_command_listener --- backend/open_webui/main.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/backend/open_webui/main.py b/backend/open_webui/main.py index d5b89c8d50..1781dd4204 100644 --- a/backend/open_webui/main.py +++ b/backend/open_webui/main.py @@ -578,8 +578,19 @@ async def lifespan(app: FastAPI): yield - if hasattr(app.state, "redis_task_command_listener"): - app.state.redis_task_command_listener.cancel() +# In the lifespan shutdown + if hasattr(app.state, "redis_task_command_listener"): + try: + app.state.redis_task_command_listener.cancel() + await app.state.redis_task_command_listener + except ExceptionGroup as eg: + log.error(f"Multiple errors during listener shutdown: {len(eg.exceptions)} exceptions") + for exc in eg.exceptions: + log.error(f"Shutdown error: {type(exc).__name__}: {exc}") + except asyncio.CancelledError: + pass # Expected during shutdown + except Exception as e: + log.error(f"Error during listener shutdown: {e}") app = FastAPI( From 8a423c0a98842997d1917e8f3c5ee52a494885ba Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Tue, 19 Aug 2025 20:12:01 +0200 Subject: [PATCH 3/6] Fix Format Code Warmings --- backend/open_webui/tasks.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/backend/open_webui/tasks.py b/backend/open_webui/tasks.py index 579fe28975..0f64787f86 100644 --- a/backend/open_webui/tasks.py +++ b/backend/open_webui/tasks.py @@ -43,7 +43,9 @@ async def redis_task_command_listener(app): try: command = json.loads(message["data"]) except json.JSONDecodeError as json_error: - log.warning(f"Invalid JSON in Redis message: {message['data'][:100]}... Error: {json_error}") + log.warning( + f"Invalid JSON in Redis message: {message['data'][:100]}... Error: {json_error}" + ) continue if command.get("action") == "stop": @@ -58,7 +60,9 @@ async def redis_task_command_listener(app): log.error(f"Error cancelling task {task_id}: {cancel_error}") except ExceptionGroup as eg: # Handle multiple concurrent exceptions - log.error(f"Multiple errors in task command processing: {len(eg.exceptions)} exceptions") + log.error( + f"Multiple errors in task command processing: {len(eg.exceptions)} exceptions" + ) for i, exc in enumerate(eg.exceptions): log.error(f" Exception {i+1}: {type(exc).__name__}: {exc}") except Exception as e: @@ -127,7 +131,9 @@ async def cleanup_task(redis, task_id: str, id=None): # If multiple errors occurred, group them if len(cleanup_errors) > 1 and ExceptionGroup: - raise ExceptionGroup(f"Multiple cleanup errors for task {task_id}", cleanup_errors) + raise ExceptionGroup( + f"Multiple cleanup errors for task {task_id}", cleanup_errors + ) elif cleanup_errors: raise cleanup_errors[0] From 5fdb68bd2730e71a9c8c8273db37a1fd7096ad61 Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Tue, 19 Aug 2025 20:13:55 +0200 Subject: [PATCH 4/6] Fix Code Format Warmings --- backend/open_webui/main.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/backend/open_webui/main.py b/backend/open_webui/main.py index 1781dd4204..847bbaf460 100644 --- a/backend/open_webui/main.py +++ b/backend/open_webui/main.py @@ -578,17 +578,19 @@ async def lifespan(app: FastAPI): yield -# In the lifespan shutdown - if hasattr(app.state, "redis_task_command_listener"): - try: - app.state.redis_task_command_listener.cancel() - await app.state.redis_task_command_listener - except ExceptionGroup as eg: - log.error(f"Multiple errors during listener shutdown: {len(eg.exceptions)} exceptions") - for exc in eg.exceptions: - log.error(f"Shutdown error: {type(exc).__name__}: {exc}") - except asyncio.CancelledError: - pass # Expected during shutdown + # In the lifespan shutdown + if hasattr(app.state, "redis_task_command_listener"): + try: + app.state.redis_task_command_listener.cancel() + await app.state.redis_task_command_listener + except ExceptionGroup as eg: + log.error( + f"Multiple errors during listener shutdown: {len(eg.exceptions)} exceptions" + ) + for exc in eg.exceptions: + log.error(f"Shutdown error: {type(exc).__name__}: {exc}") + except asyncio.CancelledError: + pass # Expected during shutdown except Exception as e: log.error(f"Error during listener shutdown: {e}") From bd883c802601a6ded04742772a1be54de92eee78 Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Tue, 19 Aug 2025 20:16:25 +0200 Subject: [PATCH 5/6] Fix Code Format Warming --- backend/open_webui/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/open_webui/main.py b/backend/open_webui/main.py index 847bbaf460..3e8d326f94 100644 --- a/backend/open_webui/main.py +++ b/backend/open_webui/main.py @@ -591,7 +591,7 @@ async def lifespan(app: FastAPI): log.error(f"Shutdown error: {type(exc).__name__}: {exc}") except asyncio.CancelledError: pass # Expected during shutdown - except Exception as e: + except Exception as e: log.error(f"Error during listener shutdown: {e}") From beb7b0bf3d7a80a609a39973509c7663bcb61964 Mon Sep 17 00:00:00 2001 From: _00_ <131402327+rgaricano@users.noreply.github.com> Date: Tue, 19 Aug 2025 20:22:07 +0200 Subject: [PATCH 6/6] re-add "from the global `tasks` dictionary" comment that was removed re-add "from the global `tasks` dictionary" comment that was removed --- backend/open_webui/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/open_webui/tasks.py b/backend/open_webui/tasks.py index 0f64787f86..26a3a61741 100644 --- a/backend/open_webui/tasks.py +++ b/backend/open_webui/tasks.py @@ -106,7 +106,7 @@ async def redis_send_command(redis: Redis, command: dict): async def cleanup_task(redis, task_id: str, id=None): """ - Remove a completed or canceled task with proper exception handling. + Remove a completed or canceled task from the global `tasks` dictionary with proper exception handling. """ cleanup_errors = []