Merge pull request #16385 from gaby/2025-08-08-13-38-31

feat: Propagate upstream OpenAI router errors
This commit is contained in:
Tim Jaeryang Baek 2025-08-09 00:58:14 +04:00 committed by GitHub
commit 17084f629c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 352 additions and 278 deletions

View file

@ -120,7 +120,7 @@ class Oracle23aiClient(VectorDBBase):
increment=ORACLE_DB_POOL_INCREMENT,
config_dir=ORACLE_WALLET_DIR,
wallet_location=ORACLE_WALLET_DIR,
wallet_password=ORACLE_WALLET_PASSWORD
wallet_password=ORACLE_WALLET_PASSWORD,
)
log.info("Created ADB connection pool with wallet authentication.")
@ -136,7 +136,7 @@ class Oracle23aiClient(VectorDBBase):
dsn=ORACLE_DB_DSN,
min=ORACLE_DB_POOL_MIN,
max=ORACLE_DB_POOL_MAX,
increment=ORACLE_DB_POOL_INCREMENT
increment=ORACLE_DB_POOL_INCREMENT,
)
log.info("Created DB connection pool with basic authentication.")
@ -154,8 +154,10 @@ class Oracle23aiClient(VectorDBBase):
connection.outputtypehandler = self._output_type_handler
return connection
except oracledb.DatabaseError as e:
error_obj, = e.args
log.exception(f"Connection attempt {attempt + 1} failed: {error_obj.message}")
(error_obj,) = e.args
log.exception(
f"Connection attempt {attempt + 1} failed: {error_obj.message}"
)
if attempt < max_retries - 1:
wait_time = 2**attempt
@ -171,6 +173,7 @@ class Oracle23aiClient(VectorDBBase):
Args:
interval_seconds (int): Number of seconds between health checks
"""
def _monitor():
while True:
try:
@ -219,7 +222,9 @@ class Oracle23aiClient(VectorDBBase):
with connection.cursor() as cursor:
cursor.execute("SELECT 1 FROM dual")
except Exception as e:
log.exception(f"Connection check failed: {e}, attempting to reconnect pool...")
log.exception(
f"Connection check failed: {e}, attempting to reconnect pool..."
)
self._reconnect_pool()
def _output_type_handler(self, cursor, metadata):
@ -234,8 +239,9 @@ class Oracle23aiClient(VectorDBBase):
A variable with appropriate conversion for vector types
"""
if metadata.type_code is oracledb.DB_TYPE_VECTOR:
return cursor.var(metadata.type_code, arraysize=cursor.arraysize,
outconverter=list)
return cursor.var(
metadata.type_code, arraysize=cursor.arraysize, outconverter=list
)
def _initialize_database(self, connection) -> None:
"""
@ -252,7 +258,8 @@ class Oracle23aiClient(VectorDBBase):
with connection.cursor() as cursor:
try:
log.info("Creating Table document_chunk")
cursor.execute("""
cursor.execute(
"""
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE IF NOT EXISTS document_chunk (
@ -269,10 +276,12 @@ class Oracle23aiClient(VectorDBBase):
RAISE;
END IF;
END;
""")
"""
)
log.info("Creating Index document_chunk_collection_name_idx")
cursor.execute("""
cursor.execute(
"""
BEGIN
EXECUTE IMMEDIATE '
CREATE INDEX IF NOT EXISTS document_chunk_collection_name_idx
@ -284,10 +293,12 @@ class Oracle23aiClient(VectorDBBase):
RAISE;
END IF;
END;
""")
"""
)
log.info("Creating VECTOR INDEX document_chunk_vector_ivf_idx")
cursor.execute("""
cursor.execute(
"""
BEGIN
EXECUTE IMMEDIATE '
CREATE VECTOR INDEX IF NOT EXISTS document_chunk_vector_ivf_idx
@ -303,7 +314,8 @@ class Oracle23aiClient(VectorDBBase):
RAISE;
END IF;
END;
""")
"""
)
connection.commit()
log.info("Database initialization completed successfully.")
@ -415,20 +427,25 @@ class Oracle23aiClient(VectorDBBase):
vector_blob = self._vector_to_blob(item["vector"])
metadata_json = self._metadata_to_json(item["metadata"])
cursor.execute("""
cursor.execute(
"""
INSERT INTO document_chunk
(id, collection_name, text, vmetadata, vector)
VALUES (:id, :collection_name, :text, :metadata, :vector)
""", {
'id': item["id"],
'collection_name': collection_name,
'text': item["text"],
'metadata': metadata_json,
'vector': vector_blob
})
""",
{
"id": item["id"],
"collection_name": collection_name,
"text": item["text"],
"metadata": metadata_json,
"vector": vector_blob,
},
)
connection.commit()
log.info(f"Successfully inserted {len(items)} items into collection '{collection_name}'.")
log.info(
f"Successfully inserted {len(items)} items into collection '{collection_name}'."
)
except Exception as e:
connection.rollback()
@ -466,7 +483,8 @@ class Oracle23aiClient(VectorDBBase):
vector_blob = self._vector_to_blob(item["vector"])
metadata_json = self._metadata_to_json(item["metadata"])
cursor.execute("""
cursor.execute(
"""
MERGE INTO document_chunk d
USING (SELECT :merge_id as id FROM dual) s
ON (d.id = s.id)
@ -479,21 +497,25 @@ class Oracle23aiClient(VectorDBBase):
WHEN NOT MATCHED THEN
INSERT (id, collection_name, text, vmetadata, vector)
VALUES (:ins_id, :ins_collection_name, :ins_text, :ins_metadata, :ins_vector)
""", {
'merge_id': item["id"],
'upd_collection_name': collection_name,
'upd_text': item["text"],
'upd_metadata': metadata_json,
'upd_vector': vector_blob,
'ins_id': item["id"],
'ins_collection_name': collection_name,
'ins_text': item["text"],
'ins_metadata': metadata_json,
'ins_vector': vector_blob
})
""",
{
"merge_id": item["id"],
"upd_collection_name": collection_name,
"upd_text": item["text"],
"upd_metadata": metadata_json,
"upd_vector": vector_blob,
"ins_id": item["id"],
"ins_collection_name": collection_name,
"ins_text": item["text"],
"ins_metadata": metadata_json,
"ins_vector": vector_blob,
},
)
connection.commit()
log.info(f"Successfully upserted {len(items)} items into collection '{collection_name}'.")
log.info(
f"Successfully upserted {len(items)} items into collection '{collection_name}'."
)
except Exception as e:
connection.rollback()
@ -501,10 +523,7 @@ class Oracle23aiClient(VectorDBBase):
raise
def search(
self,
collection_name: str,
vectors: List[List[Union[float, int]]],
limit: int
self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int
) -> Optional[SearchResult]:
"""
Search for similar vectors in the database.
@ -528,7 +547,9 @@ class Oracle23aiClient(VectorDBBase):
... for i, (id, dist) in enumerate(zip(results.ids[0], results.distances[0])):
... log.info(f"Match {i+1}: id={id}, distance={dist}")
"""
log.info(f"Searching items from collection '{collection_name}' with limit {limit}.")
log.info(
f"Searching items from collection '{collection_name}' with limit {limit}."
)
try:
if not vectors:
@ -547,7 +568,8 @@ class Oracle23aiClient(VectorDBBase):
for qid, vector in enumerate(vectors):
vector_blob = self._vector_to_blob(vector)
cursor.execute("""
cursor.execute(
"""
SELECT dc.id, dc.text,
JSON_SERIALIZE(dc.vmetadata RETURNING VARCHAR2(4096)) as vmetadata,
VECTOR_DISTANCE(dc.vector, :query_vector, COSINE) as distance
@ -555,29 +577,38 @@ class Oracle23aiClient(VectorDBBase):
WHERE dc.collection_name = :collection_name
ORDER BY VECTOR_DISTANCE(dc.vector, :query_vector, COSINE)
FETCH APPROX FIRST :limit ROWS ONLY
""", {
'query_vector': vector_blob,
'collection_name': collection_name,
'limit': limit
})
""",
{
"query_vector": vector_blob,
"collection_name": collection_name,
"limit": limit,
},
)
results = cursor.fetchall()
for row in results:
ids[qid].append(row[0])
documents[qid].append(row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]))
documents[qid].append(
row[1].read()
if isinstance(row[1], oracledb.LOB)
else str(row[1])
)
# 🔧 FIXED: Parse JSON metadata properly
metadata_str = row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]
metadata_str = (
row[2].read()
if isinstance(row[2], oracledb.LOB)
else row[2]
)
metadatas[qid].append(self._json_to_metadata(metadata_str))
distances[qid].append(float(row[3]))
log.info(f"Search completed. Found {sum(len(ids[i]) for i in range(num_queries))} total results.")
log.info(
f"Search completed. Found {sum(len(ids[i]) for i in range(num_queries))} total results."
)
return SearchResult(
ids=ids,
distances=distances,
documents=documents,
metadatas=metadatas
ids=ids, distances=distances, documents=documents, metadatas=metadatas
)
except Exception as e:
@ -585,10 +616,7 @@ class Oracle23aiClient(VectorDBBase):
return None
def query(
self,
collection_name: str,
filter: Dict,
limit: Optional[int] = None
self, collection_name: str, filter: Dict, limit: Optional[int] = None
) -> Optional[GetResult]:
"""
Query items based on metadata filters.
@ -621,7 +649,7 @@ class Oracle23aiClient(VectorDBBase):
WHERE collection_name = :collection_name
"""
params = {'collection_name': collection_name}
params = {"collection_name": collection_name}
for i, (key, value) in enumerate(filter.items()):
param_name = f"value_{i}"
@ -629,7 +657,7 @@ class Oracle23aiClient(VectorDBBase):
params[param_name] = str(value)
query += " FETCH FIRST :limit ROWS ONLY"
params['limit'] = limit
params["limit"] = limit
with self.get_connection() as connection:
with connection.cursor() as cursor:
@ -641,26 +669,31 @@ class Oracle23aiClient(VectorDBBase):
return None
ids = [[row[0] for row in results]]
documents = [[row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]) for row in results]]
documents = [
[
row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1])
for row in results
]
]
# 🔧 FIXED: Parse JSON metadata properly
metadatas = [[self._json_to_metadata(row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]) for row in results]]
metadatas = [
[
self._json_to_metadata(
row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]
)
for row in results
]
]
log.info(f"Query completed. Found {len(results)} results.")
return GetResult(
ids=ids,
documents=documents,
metadatas=metadatas
)
return GetResult(ids=ids, documents=documents, metadatas=metadatas)
except Exception as e:
log.exception(f"Error during query: {e}")
return None
def get(
self,
collection_name: str
) -> Optional[GetResult]:
def get(self, collection_name: str) -> Optional[GetResult]:
"""
Get all items in a collection.
@ -679,22 +712,24 @@ class Oracle23aiClient(VectorDBBase):
>>> if results:
... print(f"Retrieved {len(results.ids[0])} documents from collection")
"""
log.info(f"Getting items from collection '{collection_name}' with limit {limit}.")
log.info(
f"Getting items from collection '{collection_name}' with limit {limit}."
)
try:
limit = limit or 1000
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("""
cursor.execute(
"""
SELECT /*+ MONITOR */ id, text, JSON_SERIALIZE(vmetadata RETURNING VARCHAR2(4096)) as vmetadata
FROM document_chunk
WHERE collection_name = :collection_name
FETCH FIRST :limit ROWS ONLY
""", {
'collection_name': collection_name,
'limit': limit
})
""",
{"collection_name": collection_name, "limit": limit},
)
results = cursor.fetchall()
@ -703,15 +738,23 @@ class Oracle23aiClient(VectorDBBase):
return None
ids = [[row[0] for row in results]]
documents = [[row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]) for row in results]]
documents = [
[
row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1])
for row in results
]
]
# 🔧 FIXED: Parse JSON metadata properly
metadatas = [[self._json_to_metadata(row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]) for row in results]]
return GetResult(
ids=ids,
documents=documents,
metadatas=metadatas
metadatas = [
[
self._json_to_metadata(
row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]
)
for row in results
]
]
return GetResult(ids=ids, documents=documents, metadatas=metadatas)
except Exception as e:
log.exception(f"Error during get: {e}")
@ -746,15 +789,17 @@ class Oracle23aiClient(VectorDBBase):
log.info(f"Deleting items from collection '{collection_name}'.")
try:
query = "DELETE FROM document_chunk WHERE collection_name = :collection_name"
params = {'collection_name': collection_name}
query = (
"DELETE FROM document_chunk WHERE collection_name = :collection_name"
)
params = {"collection_name": collection_name}
if ids:
# 🔧 FIXED: Use proper parameterized query to prevent SQL injection
placeholders = ','.join([f':id_{i}' for i in range(len(ids))])
placeholders = ",".join([f":id_{i}" for i in range(len(ids))])
query += f" AND id IN ({placeholders})"
for i, id_val in enumerate(ids):
params[f'id_{i}'] = id_val
params[f"id_{i}"] = id_val
if filter:
for i, (key, value) in enumerate(filter.items()):
@ -796,7 +841,9 @@ class Oracle23aiClient(VectorDBBase):
deleted = cursor.rowcount
connection.commit()
log.info(f"Reset complete. Deleted {deleted} items from 'document_chunk' table.")
log.info(
f"Reset complete. Deleted {deleted} items from 'document_chunk' table."
)
except Exception as e:
log.exception(f"Error during reset: {e}")
@ -814,7 +861,7 @@ class Oracle23aiClient(VectorDBBase):
>>> client.close()
"""
try:
if hasattr(self, 'pool') and self.pool:
if hasattr(self, "pool") and self.pool:
self.pool.close()
log.info("Oracle Vector Search connection pool closed.")
except Exception as e:
@ -840,12 +887,15 @@ class Oracle23aiClient(VectorDBBase):
try:
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("""
cursor.execute(
"""
SELECT COUNT(*)
FROM document_chunk
WHERE collection_name = :collection_name
FETCH FIRST 1 ROWS ONLY
""", {'collection_name': collection_name})
""",
{"collection_name": collection_name},
)
count = cursor.fetchone()[0]
@ -873,15 +923,20 @@ class Oracle23aiClient(VectorDBBase):
try:
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("""
cursor.execute(
"""
DELETE FROM document_chunk
WHERE collection_name = :collection_name
""", {'collection_name': collection_name})
""",
{"collection_name": collection_name},
)
deleted = cursor.rowcount
connection.commit()
log.info(f"Collection '{collection_name}' deleted. Removed {deleted} items.")
log.info(
f"Collection '{collection_name}' deleted. Removed {deleted} items."
)
except Exception as e:
log.exception(f"Error deleting collection '{collection_name}': {e}")

View file

@ -2,17 +2,20 @@ import asyncio
import hashlib
import json
import logging
from pathlib import Path
from typing import Literal, Optional, overload
from typing import Optional
import aiohttp
from aiocache import cached
import requests
from urllib.parse import quote
from fastapi import Depends, FastAPI, HTTPException, Request, APIRouter
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
from fastapi import Depends, HTTPException, Request, APIRouter
from fastapi.responses import (
FileResponse,
StreamingResponse,
JSONResponse,
PlainTextResponse,
)
from pydantic import BaseModel
from starlette.background import BackgroundTask
@ -31,7 +34,7 @@ from open_webui.env import (
from open_webui.models.users import UserModel
from open_webui.constants import ERROR_MESSAGES
from open_webui.env import ENV, SRC_LOG_LEVELS
from open_webui.env import SRC_LOG_LEVELS
from open_webui.utils.payload import (
@ -595,15 +598,21 @@ async def verify_connection(
headers=headers,
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as r:
if r.status != 200:
# Extract response error details if available
error_detail = f"HTTP Error: {r.status}"
res = await r.json()
if "error" in res:
error_detail = f"External Error: {res['error']}"
raise Exception(error_detail)
try:
response_data = await r.json()
except Exception:
response_data = await r.text()
if r.status != 200:
if isinstance(response_data, (dict, list)):
return JSONResponse(
status_code=r.status, content=response_data
)
else:
return PlainTextResponse(
status_code=r.status, content=response_data
)
return response_data
else:
headers["Authorization"] = f"Bearer {key}"
@ -613,15 +622,21 @@ async def verify_connection(
headers=headers,
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as r:
if r.status != 200:
# Extract response error details if available
error_detail = f"HTTP Error: {r.status}"
res = await r.json()
if "error" in res:
error_detail = f"External Error: {res['error']}"
raise Exception(error_detail)
try:
response_data = await r.json()
except Exception:
response_data = await r.text()
if r.status != 200:
if isinstance(response_data, (dict, list)):
return JSONResponse(
status_code=r.status, content=response_data
)
else:
return PlainTextResponse(
status_code=r.status, content=response_data
)
return response_data
except aiohttp.ClientError as e:
@ -632,8 +647,9 @@ async def verify_connection(
)
except Exception as e:
log.exception(f"Unexpected error: {e}")
error_detail = f"Unexpected error: {str(e)}"
raise HTTPException(status_code=500, detail=error_detail)
raise HTTPException(
status_code=500, detail="Open WebUI: Server Connection Error"
)
def get_azure_allowed_params(api_version: str) -> set[str]:
@ -883,21 +899,19 @@ async def generate_chat_completion(
log.error(e)
response = await r.text()
r.raise_for_status()
if r.status >= 400:
if isinstance(response, (dict, list)):
return JSONResponse(status_code=r.status, content=response)
else:
return PlainTextResponse(status_code=r.status, content=response)
return response
except Exception as e:
log.exception(e)
detail = None
if isinstance(response, dict):
if "error" in response:
detail = f"{response['error']['message'] if 'message' in response['error'] else response['error']}"
elif isinstance(response, str):
detail = response
raise HTTPException(
status_code=r.status if r else 500,
detail=detail if detail else "Open WebUI: Server Connection Error",
detail="Open WebUI: Server Connection Error",
)
finally:
if not streaming:
@ -951,7 +965,7 @@ async def embeddings(request: Request, form_data: dict, user):
),
},
)
r.raise_for_status()
if "text/event-stream" in r.headers.get("Content-Type", ""):
streaming = True
return StreamingResponse(
@ -963,21 +977,25 @@ async def embeddings(request: Request, form_data: dict, user):
),
)
else:
try:
response_data = await r.json()
except Exception:
response_data = await r.text()
if r.status >= 400:
if isinstance(response_data, (dict, list)):
return JSONResponse(status_code=r.status, content=response_data)
else:
return PlainTextResponse(
status_code=r.status, content=response_data
)
return response_data
except Exception as e:
log.exception(e)
detail = None
if r is not None:
try:
res = await r.json()
if "error" in res:
detail = f"External: {res['error']['message'] if 'message' in res['error'] else res['error']}"
except Exception:
detail = f"External: {e}"
raise HTTPException(
status_code=r.status if r else 500,
detail=detail if detail else "Open WebUI: Server Connection Error",
detail="Open WebUI: Server Connection Error",
)
finally:
if not streaming:
@ -1043,7 +1061,6 @@ async def proxy(path: str, request: Request, user=Depends(get_verified_user)):
headers=headers,
ssl=AIOHTTP_CLIENT_SESSION_SSL,
)
r.raise_for_status()
# Check if response is SSE
if "text/event-stream" in r.headers.get("Content-Type", ""):
@ -1057,24 +1074,26 @@ async def proxy(path: str, request: Request, user=Depends(get_verified_user)):
),
)
else:
try:
response_data = await r.json()
except Exception:
response_data = await r.text()
if r.status >= 400:
if isinstance(response_data, (dict, list)):
return JSONResponse(status_code=r.status, content=response_data)
else:
return PlainTextResponse(
status_code=r.status, content=response_data
)
return response_data
except Exception as e:
log.exception(e)
detail = None
if r is not None:
try:
res = await r.json()
log.error(res)
if "error" in res:
detail = f"External: {res['error']['message'] if 'message' in res['error'] else res['error']}"
except Exception:
detail = f"External: {e}"
raise HTTPException(
status_code=r.status if r else 500,
detail=detail if detail else "Open WebUI: Server Connection Error",
detail="Open WebUI: Server Connection Error",
)
finally:
if not streaming: