From 17ea99e7589ca0bb270ef19f294f167f83370c96 Mon Sep 17 00:00:00 2001 From: Yethe Samartaka <55753928+YetheSamartaka@users.noreply.github.com> Date: Thu, 11 Dec 2025 12:22:51 +0100 Subject: [PATCH] fix: tasks run without proper metadata - Fixed tasks running without proper metadata being included when those task models are used. This creates issue for example for Langfuse where title_generation or tag_generation and so on is not properly retrieved because it is never triggered as it should --- backend/open_webui/routers/tasks.py | 289 +++++++++++++++++++++++++++- 1 file changed, 280 insertions(+), 9 deletions(-) diff --git a/backend/open_webui/routers/tasks.py b/backend/open_webui/routers/tasks.py index 040c2382d7..71710bddf4 100644 --- a/backend/open_webui/routers/tasks.py +++ b/backend/open_webui/routers/tasks.py @@ -3,6 +3,7 @@ from fastapi.responses import JSONResponse, RedirectResponse from pydantic import BaseModel from typing import Optional +from uuid import uuid4 import logging import re @@ -20,10 +21,17 @@ from open_webui.utils.task import ( from open_webui.utils.auth import get_admin_user, get_verified_user from open_webui.constants import TASKS -from open_webui.routers.pipelines import process_pipeline_inlet_filter +from open_webui.routers.pipelines import ( + process_pipeline_inlet_filter, + process_pipeline_outlet_filter, +) from open_webui.utils.task import get_task_model_id +from open_webui.utils.filter import get_sorted_filter_ids, process_filter_functions +from open_webui.models.functions import Functions +from copy import deepcopy + from open_webui.config import ( DEFAULT_TITLE_GENERATION_PROMPT_TEMPLATE, DEFAULT_FOLLOW_UP_GENERATION_PROMPT_TEMPLATE, @@ -44,6 +52,157 @@ log.setLevel(SRC_LOG_LEVELS["MODELS"]) router = APIRouter() +def _attach_model_metadata(payload: dict, models: dict, model_id: str) -> None: + """ + Ensure payload.metadata carries the full model object so downstream filters + (e.g., Responses/Langfuse) can resolve model name/id for telemetry. + """ + try: + model_item = models.get(model_id) + if payload.get("metadata") is None: + payload["metadata"] = {} + if model_item and isinstance(payload["metadata"], dict): + payload["metadata"]["model"] = model_item + except Exception: + pass + + +def _ensure_task_identifiers(payload: dict, form_data: dict) -> dict: + """ + Guarantee chat/session identifiers live in both root and metadata so filters + that rely on them (e.g., Langfuse) can create traces for task runs. + """ + metadata = payload.get("metadata") or {} + form_meta = form_data.get("metadata") or {} + + chat_id = ( + metadata.get("chat_id") + or form_data.get("chat_id") + or form_meta.get("chat_id") + or f"task-{uuid4()}" + ) + session_id = ( + metadata.get("session_id") + or form_data.get("session_id") + or form_meta.get("session_id") + or chat_id + ) + message_id = metadata.get("message_id") or form_data.get("message_id") + + if not isinstance(metadata, dict): + metadata = {} + metadata.setdefault("chat_id", chat_id) + metadata.setdefault("session_id", session_id) + if message_id: + metadata.setdefault("message_id", message_id) + + payload["metadata"] = metadata + payload["chat_id"] = chat_id + payload["session_id"] = session_id + if message_id: + payload.setdefault("id", message_id) + + return payload + + +async def _run_task_filters( + request: Request, + payload: dict, + user, + models: dict, + response: Optional[dict] = None, +): + """ + Run filter/pipeline outlet hooks for task payloads so telemetry filters (e.g. Langfuse) + capture non-chat generations such as titles, tags, or search queries. + """ + + model_id = payload.get("model") + if not model_id or model_id not in models: + return payload + + payload = _ensure_task_identifiers(payload, payload) + + model = models[model_id] + metadata = payload.get("metadata") or {} + filter_ids = metadata.get("filter_ids", []) + + filter_functions = [ + Functions.get_function_by_id(filter_id) + for filter_id in get_sorted_filter_ids(request, model, filter_ids) + ] + filter_functions = [f for f in filter_functions if f] + + if not filter_functions: + return payload + + async def _noop_event_emitter(event): + return None + + async def _noop_event_call(event): + return None + + extra_params = { + "__event_emitter__": _noop_event_emitter, + "__event_call__": _noop_event_call, + "__user__": user.model_dump() if hasattr(user, "model_dump") else {}, + "__metadata__": metadata, + "__request__": request, + "__model__": model, + "__task__": metadata.get("task"), + "__task_body__": metadata.get("task_body"), + } + + body = deepcopy(payload) + filter_type = "inlet" + + if response is not None: + assistant_message = None + if isinstance(response, dict): + choices = response.get("choices", []) + if choices: + assistant_message = dict(choices[0].get("message") or {}) + if response.get("usage"): + assistant_message["usage"] = response["usage"] + + messages = [ + dict(message) + for message in body.get("messages", []) + if isinstance(message, dict) + ] + if assistant_message: + messages.append(assistant_message) + + body = { + "model": body.get("model"), + "messages": messages, + "metadata": metadata, + "chat_id": metadata.get("chat_id"), + "session_id": metadata.get("session_id"), + **({"filter_ids": filter_ids} if filter_ids else {}), + } + + try: + body = await process_pipeline_outlet_filter(request, body, user, models) + except Exception as e: + log.debug(f"Task pipeline outlet filter failed: {e}") + + filter_type = "outlet" + + try: + body, _ = await process_filter_functions( + request=request, + filter_functions=filter_functions, + filter_type=filter_type, + form_data=body, + extra_params=extra_params, + ) + except Exception as e: + log.debug(f"Task filter {filter_type} failed: {e}") + + return body if response is None else payload + + ################################## # # Task Endpoints @@ -231,6 +390,9 @@ async def generate_title( }, } + payload = _ensure_task_identifiers(payload, form_data) + _attach_model_metadata(payload, models, task_model_id) + # Process the payload through the pipeline try: payload = await process_pipeline_inlet_filter(request, payload, user, models) @@ -238,7 +400,18 @@ async def generate_title( raise e try: - return await generate_chat_completion(request, form_data=payload, user=user) + payload = await _run_task_filters(request, payload, user, models) + except Exception as e: + log.debug(f"Task inlet filter execution failed: {e}") + + try: + res = await generate_chat_completion(request, form_data=payload, user=user) + if isinstance(res, dict): + try: + await _run_task_filters(request, payload, user, models, response=res) + except Exception as e: + log.debug(f"Task outlet filter execution failed: {e}") + return res except Exception as e: log.error("Exception occurred", exc_info=True) return JSONResponse( @@ -304,6 +477,9 @@ async def generate_follow_ups( }, } + payload = _ensure_task_identifiers(payload, form_data) + _attach_model_metadata(payload, models, task_model_id) + # Process the payload through the pipeline try: payload = await process_pipeline_inlet_filter(request, payload, user, models) @@ -311,7 +487,18 @@ async def generate_follow_ups( raise e try: - return await generate_chat_completion(request, form_data=payload, user=user) + payload = await _run_task_filters(request, payload, user, models) + except Exception as e: + log.debug(f"Task inlet filter execution failed: {e}") + + try: + res = await generate_chat_completion(request, form_data=payload, user=user) + if isinstance(res, dict): + try: + await _run_task_filters(request, payload, user, models, response=res) + except Exception as e: + log.debug(f"Task outlet filter execution failed: {e}") + return res except Exception as e: log.error("Exception occurred", exc_info=True) return JSONResponse( @@ -377,6 +564,9 @@ async def generate_chat_tags( }, } + payload = _ensure_task_identifiers(payload, form_data) + _attach_model_metadata(payload, models, task_model_id) + # Process the payload through the pipeline try: payload = await process_pipeline_inlet_filter(request, payload, user, models) @@ -384,7 +574,18 @@ async def generate_chat_tags( raise e try: - return await generate_chat_completion(request, form_data=payload, user=user) + payload = await _run_task_filters(request, payload, user, models) + except Exception as e: + log.debug(f"Task inlet filter execution failed: {e}") + + try: + res = await generate_chat_completion(request, form_data=payload, user=user) + if isinstance(res, dict): + try: + await _run_task_filters(request, payload, user, models, response=res) + except Exception as e: + log.debug(f"Task outlet filter execution failed: {e}") + return res except Exception as e: log.error(f"Error generating chat completion: {e}") return JSONResponse( @@ -443,6 +644,9 @@ async def generate_image_prompt( }, } + payload = _ensure_task_identifiers(payload, form_data) + _attach_model_metadata(payload, models, task_model_id) + # Process the payload through the pipeline try: payload = await process_pipeline_inlet_filter(request, payload, user, models) @@ -450,7 +654,18 @@ async def generate_image_prompt( raise e try: - return await generate_chat_completion(request, form_data=payload, user=user) + payload = await _run_task_filters(request, payload, user, models) + except Exception as e: + log.debug(f"Task inlet filter execution failed: {e}") + + try: + res = await generate_chat_completion(request, form_data=payload, user=user) + if isinstance(res, dict): + try: + await _run_task_filters(request, payload, user, models, response=res) + except Exception as e: + log.debug(f"Task outlet filter execution failed: {e}") + return res except Exception as e: log.error("Exception occurred", exc_info=True) return JSONResponse( @@ -528,6 +743,9 @@ async def generate_queries( }, } + payload = _ensure_task_identifiers(payload, form_data) + _attach_model_metadata(payload, models, task_model_id) + # Process the payload through the pipeline try: payload = await process_pipeline_inlet_filter(request, payload, user, models) @@ -535,7 +753,18 @@ async def generate_queries( raise e try: - return await generate_chat_completion(request, form_data=payload, user=user) + payload = await _run_task_filters(request, payload, user, models) + except Exception as e: + log.debug(f"Task inlet filter execution failed: {e}") + + try: + res = await generate_chat_completion(request, form_data=payload, user=user) + if isinstance(res, dict): + try: + await _run_task_filters(request, payload, user, models, response=res) + except Exception as e: + log.debug(f"Task outlet filter execution failed: {e}") + return res except Exception as e: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, @@ -613,6 +842,9 @@ async def generate_autocompletion( }, } + payload = _ensure_task_identifiers(payload, form_data) + _attach_model_metadata(payload, models, task_model_id) + # Process the payload through the pipeline try: payload = await process_pipeline_inlet_filter(request, payload, user, models) @@ -620,7 +852,18 @@ async def generate_autocompletion( raise e try: - return await generate_chat_completion(request, form_data=payload, user=user) + payload = await _run_task_filters(request, payload, user, models) + except Exception as e: + log.debug(f"Task inlet filter execution failed: {e}") + + try: + res = await generate_chat_completion(request, form_data=payload, user=user) + if isinstance(res, dict): + try: + await _run_task_filters(request, payload, user, models, response=res) + except Exception as e: + log.debug(f"Task outlet filter execution failed: {e}") + return res except Exception as e: log.error(f"Error generating chat completion: {e}") return JSONResponse( @@ -682,6 +925,9 @@ async def generate_emoji( }, } + payload = _ensure_task_identifiers(payload, form_data) + _attach_model_metadata(payload, models, task_model_id) + # Process the payload through the pipeline try: payload = await process_pipeline_inlet_filter(request, payload, user, models) @@ -689,7 +935,18 @@ async def generate_emoji( raise e try: - return await generate_chat_completion(request, form_data=payload, user=user) + payload = await _run_task_filters(request, payload, user, models) + except Exception as e: + log.debug(f"Task inlet filter execution failed: {e}") + + try: + res = await generate_chat_completion(request, form_data=payload, user=user) + if isinstance(res, dict): + try: + await _run_task_filters(request, payload, user, models, response=res) + except Exception as e: + log.debug(f"Task outlet filter execution failed: {e}") + return res except Exception as e: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, @@ -737,6 +994,9 @@ async def generate_moa_response( }, } + payload = _ensure_task_identifiers(payload, form_data) + _attach_model_metadata(payload, models, model_id) + # Process the payload through the pipeline try: payload = await process_pipeline_inlet_filter(request, payload, user, models) @@ -744,7 +1004,18 @@ async def generate_moa_response( raise e try: - return await generate_chat_completion(request, form_data=payload, user=user) + payload = await _run_task_filters(request, payload, user, models) + except Exception as e: + log.debug(f"Task inlet filter execution failed: {e}") + + try: + res = await generate_chat_completion(request, form_data=payload, user=user) + if isinstance(res, dict): + try: + await _run_task_filters(request, payload, user, models, response=res) + except Exception as e: + log.debug(f"Task outlet filter execution failed: {e}") + return res except Exception as e: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST,