refactor oracle23ai.py

This commit is contained in:
Oracle Public Cloud User 2025-07-07 16:21:34 +00:00
parent 25e241ae41
commit 12ebdbae81
2 changed files with 164 additions and 121 deletions

1
.gitignore vendored
View file

@ -1,3 +1,4 @@
x.py
.DS_Store
node_modules
/build

View file

@ -1,5 +1,7 @@
"""
# ORACLE23AI (Oracle23ai Vector Search) : env.examples
Oracle 23ai Vector Database Client - Fixed Version
# .env
VECTOR_DB = "oracle23ai"
## DBCS or oracle 23ai free
@ -24,10 +26,14 @@ ORACLE_DB_POOL_MAX = 10
ORACLE_DB_POOL_INCREMENT = 1
"""
from typing import Optional, List, Dict, Any
from typing import Optional, List, Dict, Any, Union
from decimal import Decimal
import logging
import os
import threading
import time
import json
import array
import oracledb
from open_webui.retrieval.vector.main import (
@ -54,11 +60,6 @@ from open_webui.env import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["RAG"])
# ORACLE_DB_USE_WALLET = os.environ.get("ORACLE_DB_USE_WALLET", "DBCS")
# ORACLE_DB_USER = os.environ.get("ORACLE_DB_USER", "DEMOUSER")
# ORACLE_DB_PASSWORD = os.environ.get("ORACLE_DB_PASSWORD", "Welcome123456")
# ORACLE_DB_DSN = os.environ.get("ORACLE_DB_DSN", "medium")
# ORACLE_DB_DSN = os.environ.get("ORACLE_DB_DSN", "(description= (retry_count=3)(retry_delay=3)(address=(protocol=tcps)(port=1522)(host=d6aqmjs6.adb.us-chicago-1.oraclecloud.com))(connect_data=(service_name=g13fc7c96b5ee55_agentvs_medium.adb.oraclecloud.com))(security=(ssl_server_dn_match=no)))")
class Oracle23aiClient(VectorDBBase):
"""
@ -76,16 +77,15 @@ class Oracle23aiClient(VectorDBBase):
"""
Initialize the Oracle23aiClient with a connection pool.
Creates a connection pool with min=2 and max=10 connections, initializes
Creates a connection pool with configurable min/max connections, initializes
the database schema if needed, and sets up necessary tables and indexes.
Args:
db_type (str): Database type - "ADB" for Autonomous Database or "DBCS" for Database Cloud Service
Raises:
ValueError: If required configuration parameters are missing
Exception: If database initialization fails
"""
self.pool = None
try:
# Create the appropriate connection pool based on DB type
if ORACLE_DB_USE_WALLET:
@ -94,10 +94,10 @@ class Oracle23aiClient(VectorDBBase):
self._create_dbcs_pool()
dsn = ORACLE_DB_DSN
log.info(f" >>> Creating Connection Pool [{ORACLE_DB_USER}:**@{dsn}]")
log.info(f"Creating Connection Pool [{ORACLE_DB_USER}:**@{dsn}]")
with self.get_connection() as connection:
log.info("Connection version:", connection.version)
log.info(f"Connection version: {connection.version}")
self._initialize_database(connection)
log.info("Oracle Vector Search initialization complete.")
@ -122,7 +122,7 @@ class Oracle23aiClient(VectorDBBase):
wallet_location=ORACLE_WALLET_DIR,
wallet_password=ORACLE_WALLET_PASSWORD
)
log.info(f"Created ADB connection pool with wallet authentication.")
log.info("Created ADB connection pool with wallet authentication.")
def _create_dbcs_pool(self) -> None:
"""
@ -158,7 +158,6 @@ class Oracle23aiClient(VectorDBBase):
log.exception(f"Connection attempt {attempt + 1} failed: {error_obj.message}")
if attempt < max_retries - 1:
import time
wait_time = 2 ** attempt
log.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
@ -189,12 +188,17 @@ class Oracle23aiClient(VectorDBBase):
def _reconnect_pool(self):
"""
Attempt to reinitialize the connection pool if it's been closed or broken.
Args:
db_type (str): Database type - "ADB" for Autonomous Database or "DBCS" for Database Cloud Service
"""
try:
log.info("Attempting to reinitialize the Oracle connection pool...")
# Close existing pool if it exists
if self.pool:
try:
self.pool.close()
except Exception as close_error:
log.warning(f"Error closing existing pool: {close_error}")
# Re-create the appropriate connection pool based on DB type
if ORACLE_DB_USE_WALLET:
self._create_adb_pool()
@ -233,8 +237,6 @@ class Oracle23aiClient(VectorDBBase):
return cursor.var(metadata.type_code, arraysize=cursor.arraysize,
outconverter=list)
# Rest of the Oracle23aiClient class remains unchanged...
def _initialize_database(self, connection) -> None:
"""
Initialize database schema, tables and indexes.
@ -247,62 +249,69 @@ class Oracle23aiClient(VectorDBBase):
Raises:
Exception: If schema initialization fails
"""
with connection.cursor() as cursor:
log.info(f" >>> Creating Table document_chunk")
cursor.execute(f"""
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE IF NOT EXISTS document_chunk (
id VARCHAR2(255) PRIMARY KEY,
collection_name VARCHAR2(255) NOT NULL,
text CLOB,
vmetadata JSON,
vector vector(*, float32)
)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
log.info(f" >>> Creating Table document_chunk_collection_name_idx")
cursor.execute("""
BEGIN
EXECUTE IMMEDIATE '
CREATE INDEX IF NOT exists document_chunk_collection_name_idx
ON document_chunk (collection_name)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
log.info(f" >>> Creating VECTOR INDEX document_chunk_vector_ivf_idx")
cursor.execute("""
BEGIN
EXECUTE IMMEDIATE '
create vector index IF NOT EXISTS document_chunk_vector_ivf_idx on document_chunk(vector)
organization neighbor partitions
distance cosine
with target accuracy 95
PARAMETERS (type IVF, NEIGHBOR PARTITIONS 100)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
connection.commit()
try:
log.info("Creating Table document_chunk")
cursor.execute("""
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE IF NOT EXISTS document_chunk (
id VARCHAR2(255) PRIMARY KEY,
collection_name VARCHAR2(255) NOT NULL,
text CLOB,
vmetadata JSON,
vector vector(*, float32)
)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
log.info("Creating Index document_chunk_collection_name_idx")
cursor.execute("""
BEGIN
EXECUTE IMMEDIATE '
CREATE INDEX IF NOT EXISTS document_chunk_collection_name_idx
ON document_chunk (collection_name)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
log.info("Creating VECTOR INDEX document_chunk_vector_ivf_idx")
cursor.execute("""
BEGIN
EXECUTE IMMEDIATE '
CREATE VECTOR INDEX IF NOT EXISTS document_chunk_vector_ivf_idx
ON document_chunk(vector)
ORGANIZATION NEIGHBOR PARTITIONS
DISTANCE COSINE
WITH TARGET ACCURACY 95
PARAMETERS (TYPE IVF, NEIGHBOR PARTITIONS 100)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
""")
connection.commit()
log.info("Database initialization completed successfully.")
except Exception as e:
connection.rollback()
log.exception(f"Error during database initialization: {e}")
raise
def check_vector_length(self) -> None:
"""
@ -323,7 +332,6 @@ class Oracle23aiClient(VectorDBBase):
Returns:
bytes: The vector in Oracle BLOB format
"""
import array
return array.array("f", vector)
def adjust_vector_length(self, vector: List[float]) -> List[float]:
@ -365,7 +373,6 @@ class Oracle23aiClient(VectorDBBase):
Returns:
str: JSON representation of metadata
"""
import json
return json.dumps(metadata, default=self._decimal_handler) if metadata else "{}"
def _json_to_metadata(self, json_str: str) -> Dict:
@ -378,7 +385,6 @@ class Oracle23aiClient(VectorDBBase):
Returns:
Dict: Metadata dictionary
"""
import json
return json.loads(json_str) if json_str else {}
def insert(self, collection_name: str, items: List[VectorItem]) -> None:
@ -400,7 +406,8 @@ class Oracle23aiClient(VectorDBBase):
... ]
>>> client.insert("my_collection", items)
"""
log.info(f"Oracle23aiClient:Inserting {len(items)} items into collection '{collection_name}'.")
log.info(f"Inserting {len(items)} items into collection '{collection_name}'.")
with self.get_connection() as connection:
try:
with connection.cursor() as cursor:
@ -421,7 +428,8 @@ class Oracle23aiClient(VectorDBBase):
})
connection.commit()
log.info(f"Oracle23aiClient:Inserted {len(items)} items into collection '{collection_name}'.")
log.info(f"Successfully inserted {len(items)} items into collection '{collection_name}'.")
except Exception as e:
connection.rollback()
log.exception(f"Error during insert: {e}")
@ -440,7 +448,7 @@ class Oracle23aiClient(VectorDBBase):
Raises:
Exception: If upsert operation fails
Example:
>>> client = Oracle23aiClient()
>>> items = [
@ -449,6 +457,8 @@ class Oracle23aiClient(VectorDBBase):
... ]
>>> client.upsert("my_collection", items)
"""
log.info(f"Upserting {len(items)} items into collection '{collection_name}'.")
with self.get_connection() as connection:
try:
with connection.cursor() as cursor:
@ -458,32 +468,33 @@ class Oracle23aiClient(VectorDBBase):
cursor.execute("""
MERGE INTO document_chunk d
USING (SELECT :id as id FROM dual) s
USING (SELECT :merge_id as id FROM dual) s
ON (d.id = s.id)
WHEN MATCHED THEN
UPDATE SET
collection_name = :collection_name,
text = :text,
vmetadata = :metadata,
vector = :vector
collection_name = :upd_collection_name,
text = :upd_text,
vmetadata = :upd_metadata,
vector = :upd_vector
WHEN NOT MATCHED THEN
INSERT (id, collection_name, text, vmetadata, vector)
VALUES (:id, :collection_name, :text, :metadata, :vector)
VALUES (:ins_id, :ins_collection_name, :ins_text, :ins_metadata, :ins_vector)
""", {
'id': item["id"],
'collection_name': collection_name,
'text': item["text"],
'metadata': metadata_json,
'vector': vector_blob,
'id': item["id"],
'collection_name': collection_name,
'text': item["text"],
'metadata': metadata_json,
'vector': vector_blob
'merge_id': item["id"],
'upd_collection_name': collection_name,
'upd_text': item["text"],
'upd_metadata': metadata_json,
'upd_vector': vector_blob,
'ins_id': item["id"],
'ins_collection_name': collection_name,
'ins_text': item["text"],
'ins_metadata': metadata_json,
'ins_vector': vector_blob
})
connection.commit()
log.info(f"Upserted {len(items)} items into collection '{collection_name}'.")
log.info(f"Successfully upserted {len(items)} items into collection '{collection_name}'.")
except Exception as e:
connection.rollback()
log.exception(f"Error during upsert: {e}")
@ -492,8 +503,8 @@ class Oracle23aiClient(VectorDBBase):
def search(
self,
collection_name: str,
vectors: List[List[float]],
limit: Optional[int] = None
vectors: List[List[Union[float, int]]],
limit: int
) -> Optional[SearchResult]:
"""
Search for similar vectors in the database.
@ -502,8 +513,8 @@ class Oracle23aiClient(VectorDBBase):
Args:
collection_name (str): Name of the collection to search
vectors (List[List[float]]): Query vectors to find similar items for
limit (Optional[int]): Maximum number of results to return per query
vectors (List[List[Union[float, int]]]): Query vectors to find similar items for
limit (int): Maximum number of results to return per query
Returns:
Optional[SearchResult]: Search results containing ids, distances, documents, and metadata
@ -513,16 +524,17 @@ class Oracle23aiClient(VectorDBBase):
>>> query_vector = [0.1, 0.2, 0.3, ...] # Must match VECTOR_LENGTH
>>> results = client.search("my_collection", [query_vector], limit=5)
>>> if results:
... print(f"Found {len(results.ids[0])} matches")
... log.info(f"Found {len(results.ids[0])} matches")
... for i, (id, dist) in enumerate(zip(results.ids[0], results.distances[0])):
... print(f"Match {i+1}: id={id}, distance={dist}")
... log.info(f"Match {i+1}: id={id}, distance={dist}")
"""
print(f"Oracle23aiClient:Searching items from collection '{collection_name}'.")
log.info(f"Searching items from collection '{collection_name}' with limit {limit}.")
try:
if not vectors:
log.warning("No vectors provided for search.")
return None
limit = limit or 10
num_queries = len(vectors)
ids = [[] for _ in range(num_queries)]
@ -557,22 +569,23 @@ class Oracle23aiClient(VectorDBBase):
metadatas[qid].append(row[2].read() if isinstance(row[2], oracledb.LOB) else row[2])
distances[qid].append(float(row[3]))
log.info(f"Search completed. Found {sum(len(ids[i]) for i in range(num_queries))} total results.")
return SearchResult(
ids=ids,
distances=distances,
documents=documents,
metadatas=metadatas
)
except Exception as e:
log.exception(f"Error during search: {e}")
import traceback
log.exception(traceback.format_exc())
return None
def query(
self,
collection_name: str,
filter: Dict[str, Any],
filter: Dict,
limit: Optional[int] = None
) -> Optional[GetResult]:
"""
@ -595,7 +608,8 @@ class Oracle23aiClient(VectorDBBase):
>>> if results:
... print(f"Found {len(results.ids[0])} matching documents")
"""
print(f"Oracle23aiClient:Querying items from collection '{collection_name}'.")
log.info(f"Querying items from collection '{collection_name}' with filters.")
try:
limit = limit or 100
@ -621,27 +635,27 @@ class Oracle23aiClient(VectorDBBase):
results = cursor.fetchall()
if not results:
log.info("No results found for query.")
return None
ids = [[row[0] for row in results]]
documents = [[row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1]) for row in results]]
metadatas = [[row[2].read() if isinstance(row[2], oracledb.LOB) else row[2] for row in results]]
log.info(f"Query completed. Found {len(ids)} results.")
return GetResult(
ids=ids,
documents=documents,
metadatas=metadatas
)
except Exception as e:
log.exception(f"Error during query: {e}")
import traceback
log.exception(traceback.format_exc())
return None
def get(
self,
collection_name: str,
limit: Optional[int] = None
collection_name: str
) -> Optional[GetResult]:
"""
Get all items in a collection.
@ -661,8 +675,10 @@ class Oracle23aiClient(VectorDBBase):
>>> if results:
... print(f"Retrieved {len(results.ids[0])} documents from collection")
"""
log.info(f"Getting items from collection '{collection_name}' with limit {limit}.")
try:
limit = limit or 100
limit = limit or 1000
with self.get_connection() as connection:
with connection.cursor() as cursor:
@ -679,6 +695,7 @@ class Oracle23aiClient(VectorDBBase):
results = cursor.fetchall()
if not results:
log.info("No results found.")
return None
ids = [[row[0] for row in results]]
@ -690,10 +707,9 @@ class Oracle23aiClient(VectorDBBase):
documents=documents,
metadatas=metadatas
)
except Exception as e:
log.exception(f"Error during get: {e}")
import traceback
log.exception(traceback.format_exc())
return None
def delete(
@ -722,11 +738,14 @@ class Oracle23aiClient(VectorDBBase):
>>> # Or delete by metadata filter
>>> client.delete("my_collection", filter={"source": "deprecated_source"})
"""
log.info(f"Deleting items from collection '{collection_name}'.")
try:
query = "DELETE FROM document_chunk WHERE collection_name = :collection_name"
params = {'collection_name': collection_name}
if ids:
# Use proper parameterized query for IDs
id_list = ",".join([f"'{id}'" for id in ids])
query += f" AND id IN ({id_list})"
@ -743,6 +762,7 @@ class Oracle23aiClient(VectorDBBase):
connection.commit()
log.info(f"Deleted {deleted} items from collection '{collection_name}'.")
except Exception as e:
log.exception(f"Error during delete: {e}")
raise
@ -760,13 +780,17 @@ class Oracle23aiClient(VectorDBBase):
>>> client = Oracle23aiClient()
>>> client.reset() # Warning: Removes all data!
"""
log.info("Resetting database - deleting all items.")
try:
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("DELETE FROM document_chunk")
deleted = cursor.rowcount
connection.commit()
log.info(f"Reset complete. Deleted {deleted} items from 'document_chunk' table.")
except Exception as e:
log.exception(f"Error during reset: {e}")
raise
@ -785,9 +809,9 @@ class Oracle23aiClient(VectorDBBase):
try:
if hasattr(self, 'pool') and self.pool:
self.pool.close()
print("Oracle Vector Search connection pool closed.")
log.info("Oracle Vector Search connection pool closed.")
except Exception as e:
print(f"Error closing connection pool: {e}")
log.exception(f"Error closing connection pool: {e}")
def has_collection(self, collection_name: str) -> bool:
"""
@ -817,7 +841,9 @@ class Oracle23aiClient(VectorDBBase):
""", {'collection_name': collection_name})
count = cursor.fetchone()[0]
return count > 0
except Exception as e:
log.exception(f"Error checking collection existence: {e}")
return False
@ -835,5 +861,21 @@ class Oracle23aiClient(VectorDBBase):
>>> client = Oracle23aiClient()
>>> client.delete_collection("obsolete_collection")
"""
self.delete(collection_name)
log.info(f"Collection '{collection_name}' deleted.")
log.info(f"Deleting collection '{collection_name}'.")
try:
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("""
DELETE FROM document_chunk
WHERE collection_name = :collection_name
""", {'collection_name': collection_name})
deleted = cursor.rowcount
connection.commit()
log.info(f"Collection '{collection_name}' deleted. Removed {deleted} items.")
except Exception as e:
log.exception(f"Error deleting collection '{collection_name}': {e}")
raise