From 12ebdbae81e1d6eefe97b4a375d5459c4dee432f Mon Sep 17 00:00:00 2001 From: Oracle Public Cloud User Date: Mon, 7 Jul 2025 16:21:34 +0000 Subject: [PATCH] refactor oracle23ai.py --- .gitignore | 1 + .../retrieval/vector/dbs/oracle23ai.py | 284 ++++++++++-------- 2 files changed, 164 insertions(+), 121 deletions(-) diff --git a/.gitignore b/.gitignore index 32271f8087..c81ddec634 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +x.py .DS_Store node_modules /build diff --git a/backend/open_webui/retrieval/vector/dbs/oracle23ai.py b/backend/open_webui/retrieval/vector/dbs/oracle23ai.py index 20e46df4bf..17a43da432 100644 --- a/backend/open_webui/retrieval/vector/dbs/oracle23ai.py +++ b/backend/open_webui/retrieval/vector/dbs/oracle23ai.py @@ -1,5 +1,7 @@ """ -# ORACLE23AI (Oracle23ai Vector Search) : env.examples +Oracle 23ai Vector Database Client - Fixed Version + +# .env VECTOR_DB = "oracle23ai" ## DBCS or oracle 23ai free @@ -24,10 +26,14 @@ ORACLE_DB_POOL_MAX = 10 ORACLE_DB_POOL_INCREMENT = 1 """ -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Any, Union from decimal import Decimal import logging import os +import threading +import time +import json +import array import oracledb from open_webui.retrieval.vector.main import ( @@ -54,11 +60,6 @@ from open_webui.env import SRC_LOG_LEVELS log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["RAG"]) -# ORACLE_DB_USE_WALLET = os.environ.get("ORACLE_DB_USE_WALLET", "DBCS") -# ORACLE_DB_USER = os.environ.get("ORACLE_DB_USER", "DEMOUSER") -# ORACLE_DB_PASSWORD = os.environ.get("ORACLE_DB_PASSWORD", "Welcome123456") -# ORACLE_DB_DSN = os.environ.get("ORACLE_DB_DSN", "medium") -# ORACLE_DB_DSN = os.environ.get("ORACLE_DB_DSN", "(description= (retry_count=3)(retry_delay=3)(address=(protocol=tcps)(port=1522)(host=d6aqmjs6.adb.us-chicago-1.oraclecloud.com))(connect_data=(service_name=g13fc7c96b5ee55_agentvs_medium.adb.oraclecloud.com))(security=(ssl_server_dn_match=no)))") class Oracle23aiClient(VectorDBBase): """ @@ -76,16 +77,15 @@ class Oracle23aiClient(VectorDBBase): """ Initialize the Oracle23aiClient with a connection pool. - Creates a connection pool with min=2 and max=10 connections, initializes + Creates a connection pool with configurable min/max connections, initializes the database schema if needed, and sets up necessary tables and indexes. - Args: - db_type (str): Database type - "ADB" for Autonomous Database or "DBCS" for Database Cloud Service - Raises: ValueError: If required configuration parameters are missing Exception: If database initialization fails """ + self.pool = None + try: # Create the appropriate connection pool based on DB type if ORACLE_DB_USE_WALLET: @@ -94,10 +94,10 @@ class Oracle23aiClient(VectorDBBase): self._create_dbcs_pool() dsn = ORACLE_DB_DSN - log.info(f" >>> Creating Connection Pool [{ORACLE_DB_USER}:**@{dsn}]") + log.info(f"Creating Connection Pool [{ORACLE_DB_USER}:**@{dsn}]") with self.get_connection() as connection: - log.info("Connection version:", connection.version) + log.info(f"Connection version: {connection.version}") self._initialize_database(connection) log.info("Oracle Vector Search initialization complete.") @@ -122,7 +122,7 @@ class Oracle23aiClient(VectorDBBase): wallet_location=ORACLE_WALLET_DIR, wallet_password=ORACLE_WALLET_PASSWORD ) - log.info(f"Created ADB connection pool with wallet authentication.") + log.info("Created ADB connection pool with wallet authentication.") def _create_dbcs_pool(self) -> None: """ @@ -158,7 +158,6 @@ class Oracle23aiClient(VectorDBBase): log.exception(f"Connection attempt {attempt + 1} failed: {error_obj.message}") if attempt < max_retries - 1: - import time wait_time = 2 ** attempt log.info(f"Retrying in {wait_time} seconds...") time.sleep(wait_time) @@ -189,12 +188,17 @@ class Oracle23aiClient(VectorDBBase): def _reconnect_pool(self): """ Attempt to reinitialize the connection pool if it's been closed or broken. - - Args: - db_type (str): Database type - "ADB" for Autonomous Database or "DBCS" for Database Cloud Service """ try: log.info("Attempting to reinitialize the Oracle connection pool...") + + # Close existing pool if it exists + if self.pool: + try: + self.pool.close() + except Exception as close_error: + log.warning(f"Error closing existing pool: {close_error}") + # Re-create the appropriate connection pool based on DB type if ORACLE_DB_USE_WALLET: self._create_adb_pool() @@ -233,8 +237,6 @@ class Oracle23aiClient(VectorDBBase): return cursor.var(metadata.type_code, arraysize=cursor.arraysize, outconverter=list) - # Rest of the Oracle23aiClient class remains unchanged... - def _initialize_database(self, connection) -> None: """ Initialize database schema, tables and indexes. @@ -247,62 +249,69 @@ class Oracle23aiClient(VectorDBBase): Raises: Exception: If schema initialization fails """ - with connection.cursor() as cursor: - log.info(f" >>> Creating Table document_chunk") - cursor.execute(f""" - BEGIN - EXECUTE IMMEDIATE ' - CREATE TABLE IF NOT EXISTS document_chunk ( - id VARCHAR2(255) PRIMARY KEY, - collection_name VARCHAR2(255) NOT NULL, - text CLOB, - vmetadata JSON, - vector vector(*, float32) - ) - '; - EXCEPTION - WHEN OTHERS THEN - IF SQLCODE != -955 THEN - RAISE; - END IF; - END; - """) - - log.info(f" >>> Creating Table document_chunk_collection_name_idx") - cursor.execute(""" - BEGIN - EXECUTE IMMEDIATE ' - CREATE INDEX IF NOT exists document_chunk_collection_name_idx - ON document_chunk (collection_name) - '; - EXCEPTION - WHEN OTHERS THEN - IF SQLCODE != -955 THEN - RAISE; - END IF; - END; - """) - - log.info(f" >>> Creating VECTOR INDEX document_chunk_vector_ivf_idx") - cursor.execute(""" - BEGIN - EXECUTE IMMEDIATE ' - create vector index IF NOT EXISTS document_chunk_vector_ivf_idx on document_chunk(vector) - organization neighbor partitions - distance cosine - with target accuracy 95 - PARAMETERS (type IVF, NEIGHBOR PARTITIONS 100) - '; - EXCEPTION - WHEN OTHERS THEN - IF SQLCODE != -955 THEN - RAISE; - END IF; - END; - """) - - connection.commit() + try: + log.info("Creating Table document_chunk") + cursor.execute(""" + BEGIN + EXECUTE IMMEDIATE ' + CREATE TABLE IF NOT EXISTS document_chunk ( + id VARCHAR2(255) PRIMARY KEY, + collection_name VARCHAR2(255) NOT NULL, + text CLOB, + vmetadata JSON, + vector vector(*, float32) + ) + '; + EXCEPTION + WHEN OTHERS THEN + IF SQLCODE != -955 THEN + RAISE; + END IF; + END; + """) + + log.info("Creating Index document_chunk_collection_name_idx") + cursor.execute(""" + BEGIN + EXECUTE IMMEDIATE ' + CREATE INDEX IF NOT EXISTS document_chunk_collection_name_idx + ON document_chunk (collection_name) + '; + EXCEPTION + WHEN OTHERS THEN + IF SQLCODE != -955 THEN + RAISE; + END IF; + END; + """) + + log.info("Creating VECTOR INDEX document_chunk_vector_ivf_idx") + cursor.execute(""" + BEGIN + EXECUTE IMMEDIATE ' + CREATE VECTOR INDEX IF NOT EXISTS document_chunk_vector_ivf_idx + ON document_chunk(vector) + ORGANIZATION NEIGHBOR PARTITIONS + DISTANCE COSINE + WITH TARGET ACCURACY 95 + PARAMETERS (TYPE IVF, NEIGHBOR PARTITIONS 100) + '; + EXCEPTION + WHEN OTHERS THEN + IF SQLCODE != -955 THEN + RAISE; + END IF; + END; + """) + + connection.commit() + log.info("Database initialization completed successfully.") + + except Exception as e: + connection.rollback() + log.exception(f"Error during database initialization: {e}") + raise def check_vector_length(self) -> None: """ @@ -323,7 +332,6 @@ class Oracle23aiClient(VectorDBBase): Returns: bytes: The vector in Oracle BLOB format """ - import array return array.array("f", vector) def adjust_vector_length(self, vector: List[float]) -> List[float]: @@ -365,7 +373,6 @@ class Oracle23aiClient(VectorDBBase): Returns: str: JSON representation of metadata """ - import json return json.dumps(metadata, default=self._decimal_handler) if metadata else "{}" def _json_to_metadata(self, json_str: str) -> Dict: @@ -378,7 +385,6 @@ class Oracle23aiClient(VectorDBBase): Returns: Dict: Metadata dictionary """ - import json return json.loads(json_str) if json_str else {} def insert(self, collection_name: str, items: List[VectorItem]) -> None: @@ -400,7 +406,8 @@ class Oracle23aiClient(VectorDBBase): ... ] >>> client.insert("my_collection", items) """ - log.info(f"Oracle23aiClient:Inserting {len(items)} items into collection '{collection_name}'.") + log.info(f"Inserting {len(items)} items into collection '{collection_name}'.") + with self.get_connection() as connection: try: with connection.cursor() as cursor: @@ -421,7 +428,8 @@ class Oracle23aiClient(VectorDBBase): }) connection.commit() - log.info(f"Oracle23aiClient:Inserted {len(items)} items into collection '{collection_name}'.") + log.info(f"Successfully inserted {len(items)} items into collection '{collection_name}'.") + except Exception as e: connection.rollback() log.exception(f"Error during insert: {e}") @@ -440,7 +448,7 @@ class Oracle23aiClient(VectorDBBase): Raises: Exception: If upsert operation fails - + Example: >>> client = Oracle23aiClient() >>> items = [ @@ -449,6 +457,8 @@ class Oracle23aiClient(VectorDBBase): ... ] >>> client.upsert("my_collection", items) """ + log.info(f"Upserting {len(items)} items into collection '{collection_name}'.") + with self.get_connection() as connection: try: with connection.cursor() as cursor: @@ -458,32 +468,33 @@ class Oracle23aiClient(VectorDBBase): cursor.execute(""" MERGE INTO document_chunk d - USING (SELECT :id as id FROM dual) s + USING (SELECT :merge_id as id FROM dual) s ON (d.id = s.id) WHEN MATCHED THEN UPDATE SET - collection_name = :collection_name, - text = :text, - vmetadata = :metadata, - vector = :vector + collection_name = :upd_collection_name, + text = :upd_text, + vmetadata = :upd_metadata, + vector = :upd_vector WHEN NOT MATCHED THEN INSERT (id, collection_name, text, vmetadata, vector) - VALUES (:id, :collection_name, :text, :metadata, :vector) + VALUES (:ins_id, :ins_collection_name, :ins_text, :ins_metadata, :ins_vector) """, { - 'id': item["id"], - 'collection_name': collection_name, - 'text': item["text"], - 'metadata': metadata_json, - 'vector': vector_blob, - 'id': item["id"], - 'collection_name': collection_name, - 'text': item["text"], - 'metadata': metadata_json, - 'vector': vector_blob + 'merge_id': item["id"], + 'upd_collection_name': collection_name, + 'upd_text': item["text"], + 'upd_metadata': metadata_json, + 'upd_vector': vector_blob, + 'ins_id': item["id"], + 'ins_collection_name': collection_name, + 'ins_text': item["text"], + 'ins_metadata': metadata_json, + 'ins_vector': vector_blob }) connection.commit() - log.info(f"Upserted {len(items)} items into collection '{collection_name}'.") + log.info(f"Successfully upserted {len(items)} items into collection '{collection_name}'.") + except Exception as e: connection.rollback() log.exception(f"Error during upsert: {e}") @@ -492,8 +503,8 @@ class Oracle23aiClient(VectorDBBase): def search( self, collection_name: str, - vectors: List[List[float]], - limit: Optional[int] = None + vectors: List[List[Union[float, int]]], + limit: int ) -> Optional[SearchResult]: """ Search for similar vectors in the database. @@ -502,8 +513,8 @@ class Oracle23aiClient(VectorDBBase): Args: collection_name (str): Name of the collection to search - vectors (List[List[float]]): Query vectors to find similar items for - limit (Optional[int]): Maximum number of results to return per query + vectors (List[List[Union[float, int]]]): Query vectors to find similar items for + limit (int): Maximum number of results to return per query Returns: Optional[SearchResult]: Search results containing ids, distances, documents, and metadata @@ -513,16 +524,17 @@ class Oracle23aiClient(VectorDBBase): >>> query_vector = [0.1, 0.2, 0.3, ...] # Must match VECTOR_LENGTH >>> results = client.search("my_collection", [query_vector], limit=5) >>> if results: - ... print(f"Found {len(results.ids[0])} matches") + ... log.info(f"Found {len(results.ids[0])} matches") ... for i, (id, dist) in enumerate(zip(results.ids[0], results.distances[0])): - ... print(f"Match {i+1}: id={id}, distance={dist}") + ... log.info(f"Match {i+1}: id={id}, distance={dist}") """ - print(f"Oracle23aiClient:Searching items from collection '{collection_name}'.") + log.info(f"Searching items from collection '{collection_name}' with limit {limit}.") + try: if not vectors: + log.warning("No vectors provided for search.") return None - limit = limit or 10 num_queries = len(vectors) ids = [[] for _ in range(num_queries)] @@ -557,22 +569,23 @@ class Oracle23aiClient(VectorDBBase): metadatas[qid].append(row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]) distances[qid].append(float(row[3])) + log.info(f"Search completed. Found {sum(len(ids[i]) for i in range(num_queries))} total results.") + return SearchResult( ids=ids, distances=distances, documents=documents, metadatas=metadatas ) + except Exception as e: log.exception(f"Error during search: {e}") - import traceback - log.exception(traceback.format_exc()) return None def query( self, collection_name: str, - filter: Dict[str, Any], + filter: Dict, limit: Optional[int] = None ) -> Optional[GetResult]: """ @@ -595,7 +608,8 @@ class Oracle23aiClient(VectorDBBase): >>> if results: ... print(f"Found {len(results.ids[0])} matching documents") """ - print(f"Oracle23aiClient:Querying items from collection '{collection_name}'.") + log.info(f"Querying items from collection '{collection_name}' with filters.") + try: limit = limit or 100 @@ -621,27 +635,27 @@ class Oracle23aiClient(VectorDBBase): results = cursor.fetchall() if not results: + log.info("No results found for query.") return None ids = [[row[0] for row in results]] documents = [[row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]) for row in results]] metadatas = [[row[2].read() if isinstance(row[2], oracledb.LOB) else row[2] for row in results]] + log.info(f"Query completed. Found {len(ids)} results.") return GetResult( ids=ids, documents=documents, metadatas=metadatas ) + except Exception as e: log.exception(f"Error during query: {e}") - import traceback - log.exception(traceback.format_exc()) return None def get( self, - collection_name: str, - limit: Optional[int] = None + collection_name: str ) -> Optional[GetResult]: """ Get all items in a collection. @@ -661,8 +675,10 @@ class Oracle23aiClient(VectorDBBase): >>> if results: ... print(f"Retrieved {len(results.ids[0])} documents from collection") """ + log.info(f"Getting items from collection '{collection_name}' with limit {limit}.") + try: - limit = limit or 100 + limit = limit or 1000 with self.get_connection() as connection: with connection.cursor() as cursor: @@ -679,6 +695,7 @@ class Oracle23aiClient(VectorDBBase): results = cursor.fetchall() if not results: + log.info("No results found.") return None ids = [[row[0] for row in results]] @@ -690,10 +707,9 @@ class Oracle23aiClient(VectorDBBase): documents=documents, metadatas=metadatas ) + except Exception as e: log.exception(f"Error during get: {e}") - import traceback - log.exception(traceback.format_exc()) return None def delete( @@ -722,11 +738,14 @@ class Oracle23aiClient(VectorDBBase): >>> # Or delete by metadata filter >>> client.delete("my_collection", filter={"source": "deprecated_source"}) """ + log.info(f"Deleting items from collection '{collection_name}'.") + try: query = "DELETE FROM document_chunk WHERE collection_name = :collection_name" params = {'collection_name': collection_name} if ids: + # Use proper parameterized query for IDs id_list = ",".join([f"'{id}'" for id in ids]) query += f" AND id IN ({id_list})" @@ -743,6 +762,7 @@ class Oracle23aiClient(VectorDBBase): connection.commit() log.info(f"Deleted {deleted} items from collection '{collection_name}'.") + except Exception as e: log.exception(f"Error during delete: {e}") raise @@ -760,13 +780,17 @@ class Oracle23aiClient(VectorDBBase): >>> client = Oracle23aiClient() >>> client.reset() # Warning: Removes all data! """ + log.info("Resetting database - deleting all items.") + try: with self.get_connection() as connection: with connection.cursor() as cursor: cursor.execute("DELETE FROM document_chunk") deleted = cursor.rowcount connection.commit() + log.info(f"Reset complete. Deleted {deleted} items from 'document_chunk' table.") + except Exception as e: log.exception(f"Error during reset: {e}") raise @@ -785,9 +809,9 @@ class Oracle23aiClient(VectorDBBase): try: if hasattr(self, 'pool') and self.pool: self.pool.close() - print("Oracle Vector Search connection pool closed.") + log.info("Oracle Vector Search connection pool closed.") except Exception as e: - print(f"Error closing connection pool: {e}") + log.exception(f"Error closing connection pool: {e}") def has_collection(self, collection_name: str) -> bool: """ @@ -817,7 +841,9 @@ class Oracle23aiClient(VectorDBBase): """, {'collection_name': collection_name}) count = cursor.fetchone()[0] + return count > 0 + except Exception as e: log.exception(f"Error checking collection existence: {e}") return False @@ -835,5 +861,21 @@ class Oracle23aiClient(VectorDBBase): >>> client = Oracle23aiClient() >>> client.delete_collection("obsolete_collection") """ - self.delete(collection_name) - log.info(f"Collection '{collection_name}' deleted.") + log.info(f"Deleting collection '{collection_name}'.") + + try: + with self.get_connection() as connection: + with connection.cursor() as cursor: + cursor.execute(""" + DELETE FROM document_chunk + WHERE collection_name = :collection_name + """, {'collection_name': collection_name}) + + deleted = cursor.rowcount + connection.commit() + + log.info(f"Collection '{collection_name}' deleted. Removed {deleted} items.") + + except Exception as e: + log.exception(f"Error deleting collection '{collection_name}': {e}") + raise \ No newline at end of file