fix: tasks run without proper metadata

- Fixed tasks running without proper metadata being included when those task models are used. This creates issue for example for Langfuse where title_generation or tag_generation and so on is not properly retrieved because it is never triggered as it should
This commit is contained in:
Yethe Samartaka 2025-12-11 12:22:51 +01:00
parent 3b3e12b43a
commit 17ea99e758

View file

@ -3,6 +3,7 @@ from fastapi.responses import JSONResponse, RedirectResponse
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional from typing import Optional
from uuid import uuid4
import logging import logging
import re import re
@ -20,10 +21,17 @@ from open_webui.utils.task import (
from open_webui.utils.auth import get_admin_user, get_verified_user from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.constants import TASKS from open_webui.constants import TASKS
from open_webui.routers.pipelines import process_pipeline_inlet_filter from open_webui.routers.pipelines import (
process_pipeline_inlet_filter,
process_pipeline_outlet_filter,
)
from open_webui.utils.task import get_task_model_id from open_webui.utils.task import get_task_model_id
from open_webui.utils.filter import get_sorted_filter_ids, process_filter_functions
from open_webui.models.functions import Functions
from copy import deepcopy
from open_webui.config import ( from open_webui.config import (
DEFAULT_TITLE_GENERATION_PROMPT_TEMPLATE, DEFAULT_TITLE_GENERATION_PROMPT_TEMPLATE,
DEFAULT_FOLLOW_UP_GENERATION_PROMPT_TEMPLATE, DEFAULT_FOLLOW_UP_GENERATION_PROMPT_TEMPLATE,
@ -44,6 +52,157 @@ log.setLevel(SRC_LOG_LEVELS["MODELS"])
router = APIRouter() router = APIRouter()
def _attach_model_metadata(payload: dict, models: dict, model_id: str) -> None:
"""
Ensure payload.metadata carries the full model object so downstream filters
(e.g., Responses/Langfuse) can resolve model name/id for telemetry.
"""
try:
model_item = models.get(model_id)
if payload.get("metadata") is None:
payload["metadata"] = {}
if model_item and isinstance(payload["metadata"], dict):
payload["metadata"]["model"] = model_item
except Exception:
pass
def _ensure_task_identifiers(payload: dict, form_data: dict) -> dict:
"""
Guarantee chat/session identifiers live in both root and metadata so filters
that rely on them (e.g., Langfuse) can create traces for task runs.
"""
metadata = payload.get("metadata") or {}
form_meta = form_data.get("metadata") or {}
chat_id = (
metadata.get("chat_id")
or form_data.get("chat_id")
or form_meta.get("chat_id")
or f"task-{uuid4()}"
)
session_id = (
metadata.get("session_id")
or form_data.get("session_id")
or form_meta.get("session_id")
or chat_id
)
message_id = metadata.get("message_id") or form_data.get("message_id")
if not isinstance(metadata, dict):
metadata = {}
metadata.setdefault("chat_id", chat_id)
metadata.setdefault("session_id", session_id)
if message_id:
metadata.setdefault("message_id", message_id)
payload["metadata"] = metadata
payload["chat_id"] = chat_id
payload["session_id"] = session_id
if message_id:
payload.setdefault("id", message_id)
return payload
async def _run_task_filters(
request: Request,
payload: dict,
user,
models: dict,
response: Optional[dict] = None,
):
"""
Run filter/pipeline outlet hooks for task payloads so telemetry filters (e.g. Langfuse)
capture non-chat generations such as titles, tags, or search queries.
"""
model_id = payload.get("model")
if not model_id or model_id not in models:
return payload
payload = _ensure_task_identifiers(payload, payload)
model = models[model_id]
metadata = payload.get("metadata") or {}
filter_ids = metadata.get("filter_ids", [])
filter_functions = [
Functions.get_function_by_id(filter_id)
for filter_id in get_sorted_filter_ids(request, model, filter_ids)
]
filter_functions = [f for f in filter_functions if f]
if not filter_functions:
return payload
async def _noop_event_emitter(event):
return None
async def _noop_event_call(event):
return None
extra_params = {
"__event_emitter__": _noop_event_emitter,
"__event_call__": _noop_event_call,
"__user__": user.model_dump() if hasattr(user, "model_dump") else {},
"__metadata__": metadata,
"__request__": request,
"__model__": model,
"__task__": metadata.get("task"),
"__task_body__": metadata.get("task_body"),
}
body = deepcopy(payload)
filter_type = "inlet"
if response is not None:
assistant_message = None
if isinstance(response, dict):
choices = response.get("choices", [])
if choices:
assistant_message = dict(choices[0].get("message") or {})
if response.get("usage"):
assistant_message["usage"] = response["usage"]
messages = [
dict(message)
for message in body.get("messages", [])
if isinstance(message, dict)
]
if assistant_message:
messages.append(assistant_message)
body = {
"model": body.get("model"),
"messages": messages,
"metadata": metadata,
"chat_id": metadata.get("chat_id"),
"session_id": metadata.get("session_id"),
**({"filter_ids": filter_ids} if filter_ids else {}),
}
try:
body = await process_pipeline_outlet_filter(request, body, user, models)
except Exception as e:
log.debug(f"Task pipeline outlet filter failed: {e}")
filter_type = "outlet"
try:
body, _ = await process_filter_functions(
request=request,
filter_functions=filter_functions,
filter_type=filter_type,
form_data=body,
extra_params=extra_params,
)
except Exception as e:
log.debug(f"Task filter {filter_type} failed: {e}")
return body if response is None else payload
################################## ##################################
# #
# Task Endpoints # Task Endpoints
@ -231,6 +390,9 @@ async def generate_title(
}, },
} }
payload = _ensure_task_identifiers(payload, form_data)
_attach_model_metadata(payload, models, task_model_id)
# Process the payload through the pipeline # Process the payload through the pipeline
try: try:
payload = await process_pipeline_inlet_filter(request, payload, user, models) payload = await process_pipeline_inlet_filter(request, payload, user, models)
@ -238,7 +400,18 @@ async def generate_title(
raise e raise e
try: try:
return await generate_chat_completion(request, form_data=payload, user=user) payload = await _run_task_filters(request, payload, user, models)
except Exception as e:
log.debug(f"Task inlet filter execution failed: {e}")
try:
res = await generate_chat_completion(request, form_data=payload, user=user)
if isinstance(res, dict):
try:
await _run_task_filters(request, payload, user, models, response=res)
except Exception as e:
log.debug(f"Task outlet filter execution failed: {e}")
return res
except Exception as e: except Exception as e:
log.error("Exception occurred", exc_info=True) log.error("Exception occurred", exc_info=True)
return JSONResponse( return JSONResponse(
@ -304,6 +477,9 @@ async def generate_follow_ups(
}, },
} }
payload = _ensure_task_identifiers(payload, form_data)
_attach_model_metadata(payload, models, task_model_id)
# Process the payload through the pipeline # Process the payload through the pipeline
try: try:
payload = await process_pipeline_inlet_filter(request, payload, user, models) payload = await process_pipeline_inlet_filter(request, payload, user, models)
@ -311,7 +487,18 @@ async def generate_follow_ups(
raise e raise e
try: try:
return await generate_chat_completion(request, form_data=payload, user=user) payload = await _run_task_filters(request, payload, user, models)
except Exception as e:
log.debug(f"Task inlet filter execution failed: {e}")
try:
res = await generate_chat_completion(request, form_data=payload, user=user)
if isinstance(res, dict):
try:
await _run_task_filters(request, payload, user, models, response=res)
except Exception as e:
log.debug(f"Task outlet filter execution failed: {e}")
return res
except Exception as e: except Exception as e:
log.error("Exception occurred", exc_info=True) log.error("Exception occurred", exc_info=True)
return JSONResponse( return JSONResponse(
@ -377,6 +564,9 @@ async def generate_chat_tags(
}, },
} }
payload = _ensure_task_identifiers(payload, form_data)
_attach_model_metadata(payload, models, task_model_id)
# Process the payload through the pipeline # Process the payload through the pipeline
try: try:
payload = await process_pipeline_inlet_filter(request, payload, user, models) payload = await process_pipeline_inlet_filter(request, payload, user, models)
@ -384,7 +574,18 @@ async def generate_chat_tags(
raise e raise e
try: try:
return await generate_chat_completion(request, form_data=payload, user=user) payload = await _run_task_filters(request, payload, user, models)
except Exception as e:
log.debug(f"Task inlet filter execution failed: {e}")
try:
res = await generate_chat_completion(request, form_data=payload, user=user)
if isinstance(res, dict):
try:
await _run_task_filters(request, payload, user, models, response=res)
except Exception as e:
log.debug(f"Task outlet filter execution failed: {e}")
return res
except Exception as e: except Exception as e:
log.error(f"Error generating chat completion: {e}") log.error(f"Error generating chat completion: {e}")
return JSONResponse( return JSONResponse(
@ -443,6 +644,9 @@ async def generate_image_prompt(
}, },
} }
payload = _ensure_task_identifiers(payload, form_data)
_attach_model_metadata(payload, models, task_model_id)
# Process the payload through the pipeline # Process the payload through the pipeline
try: try:
payload = await process_pipeline_inlet_filter(request, payload, user, models) payload = await process_pipeline_inlet_filter(request, payload, user, models)
@ -450,7 +654,18 @@ async def generate_image_prompt(
raise e raise e
try: try:
return await generate_chat_completion(request, form_data=payload, user=user) payload = await _run_task_filters(request, payload, user, models)
except Exception as e:
log.debug(f"Task inlet filter execution failed: {e}")
try:
res = await generate_chat_completion(request, form_data=payload, user=user)
if isinstance(res, dict):
try:
await _run_task_filters(request, payload, user, models, response=res)
except Exception as e:
log.debug(f"Task outlet filter execution failed: {e}")
return res
except Exception as e: except Exception as e:
log.error("Exception occurred", exc_info=True) log.error("Exception occurred", exc_info=True)
return JSONResponse( return JSONResponse(
@ -528,6 +743,9 @@ async def generate_queries(
}, },
} }
payload = _ensure_task_identifiers(payload, form_data)
_attach_model_metadata(payload, models, task_model_id)
# Process the payload through the pipeline # Process the payload through the pipeline
try: try:
payload = await process_pipeline_inlet_filter(request, payload, user, models) payload = await process_pipeline_inlet_filter(request, payload, user, models)
@ -535,7 +753,18 @@ async def generate_queries(
raise e raise e
try: try:
return await generate_chat_completion(request, form_data=payload, user=user) payload = await _run_task_filters(request, payload, user, models)
except Exception as e:
log.debug(f"Task inlet filter execution failed: {e}")
try:
res = await generate_chat_completion(request, form_data=payload, user=user)
if isinstance(res, dict):
try:
await _run_task_filters(request, payload, user, models, response=res)
except Exception as e:
log.debug(f"Task outlet filter execution failed: {e}")
return res
except Exception as e: except Exception as e:
return JSONResponse( return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,
@ -613,6 +842,9 @@ async def generate_autocompletion(
}, },
} }
payload = _ensure_task_identifiers(payload, form_data)
_attach_model_metadata(payload, models, task_model_id)
# Process the payload through the pipeline # Process the payload through the pipeline
try: try:
payload = await process_pipeline_inlet_filter(request, payload, user, models) payload = await process_pipeline_inlet_filter(request, payload, user, models)
@ -620,7 +852,18 @@ async def generate_autocompletion(
raise e raise e
try: try:
return await generate_chat_completion(request, form_data=payload, user=user) payload = await _run_task_filters(request, payload, user, models)
except Exception as e:
log.debug(f"Task inlet filter execution failed: {e}")
try:
res = await generate_chat_completion(request, form_data=payload, user=user)
if isinstance(res, dict):
try:
await _run_task_filters(request, payload, user, models, response=res)
except Exception as e:
log.debug(f"Task outlet filter execution failed: {e}")
return res
except Exception as e: except Exception as e:
log.error(f"Error generating chat completion: {e}") log.error(f"Error generating chat completion: {e}")
return JSONResponse( return JSONResponse(
@ -682,6 +925,9 @@ async def generate_emoji(
}, },
} }
payload = _ensure_task_identifiers(payload, form_data)
_attach_model_metadata(payload, models, task_model_id)
# Process the payload through the pipeline # Process the payload through the pipeline
try: try:
payload = await process_pipeline_inlet_filter(request, payload, user, models) payload = await process_pipeline_inlet_filter(request, payload, user, models)
@ -689,7 +935,18 @@ async def generate_emoji(
raise e raise e
try: try:
return await generate_chat_completion(request, form_data=payload, user=user) payload = await _run_task_filters(request, payload, user, models)
except Exception as e:
log.debug(f"Task inlet filter execution failed: {e}")
try:
res = await generate_chat_completion(request, form_data=payload, user=user)
if isinstance(res, dict):
try:
await _run_task_filters(request, payload, user, models, response=res)
except Exception as e:
log.debug(f"Task outlet filter execution failed: {e}")
return res
except Exception as e: except Exception as e:
return JSONResponse( return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,
@ -737,6 +994,9 @@ async def generate_moa_response(
}, },
} }
payload = _ensure_task_identifiers(payload, form_data)
_attach_model_metadata(payload, models, model_id)
# Process the payload through the pipeline # Process the payload through the pipeline
try: try:
payload = await process_pipeline_inlet_filter(request, payload, user, models) payload = await process_pipeline_inlet_filter(request, payload, user, models)
@ -744,7 +1004,18 @@ async def generate_moa_response(
raise e raise e
try: try:
return await generate_chat_completion(request, form_data=payload, user=user) payload = await _run_task_filters(request, payload, user, models)
except Exception as e:
log.debug(f"Task inlet filter execution failed: {e}")
try:
res = await generate_chat_completion(request, form_data=payload, user=user)
if isinstance(res, dict):
try:
await _run_task_filters(request, payload, user, models, response=res)
except Exception as e:
log.debug(f"Task outlet filter execution failed: {e}")
return res
except Exception as e: except Exception as e:
return JSONResponse( return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,