open-webui/backend/open_webui/retrieval/vector/dbs/oracle23ai.py

944 lines
33 KiB
Python
Raw Normal View History

"""
2025-07-07 16:21:34 +00:00
Oracle 23ai Vector Database Client - Fixed Version
# .env
VECTOR_DB = "oracle23ai"
## DBCS or oracle 23ai free
ORACLE_DB_USE_WALLET = false
ORACLE_DB_USER = "DEMOUSER"
ORACLE_DB_PASSWORD = "Welcome123456"
ORACLE_DB_DSN = "localhost:1521/FREEPDB1"
## ADW or ATP
# ORACLE_DB_USE_WALLET = true
# ORACLE_DB_USER = "DEMOUSER"
# ORACLE_DB_PASSWORD = "Welcome123456"
# ORACLE_DB_DSN = "medium"
# ORACLE_DB_DSN = "(description= (retry_count=3)(retry_delay=3)(address=(protocol=tcps)(port=1522)(host=xx.oraclecloud.com))(connect_data=(service_name=yy.adb.oraclecloud.com))(security=(ssl_server_dn_match=no)))"
# ORACLE_WALLET_DIR = "/home/opc/adb_wallet"
# ORACLE_WALLET_PASSWORD = "Welcome1"
ORACLE_VECTOR_LENGTH = 768
ORACLE_DB_POOL_MIN = 2
ORACLE_DB_POOL_MAX = 10
ORACLE_DB_POOL_INCREMENT = 1
"""
2025-07-07 16:21:34 +00:00
from typing import Optional, List, Dict, Any, Union
2025-07-07 05:58:02 +00:00
from decimal import Decimal
import logging
2025-07-07 05:58:02 +00:00
import os
2025-07-07 16:21:34 +00:00
import threading
import time
import json
import array
2025-07-07 05:58:02 +00:00
import oracledb
from open_webui.retrieval.vector.main import (
VectorDBBase,
VectorItem,
SearchResult,
GetResult,
)
from open_webui.config import (
ORACLE_DB_USE_WALLET,
2025-07-07 05:58:02 +00:00
ORACLE_DB_USER,
ORACLE_DB_PASSWORD,
ORACLE_DB_DSN,
ORACLE_WALLET_DIR,
ORACLE_WALLET_PASSWORD,
ORACLE_VECTOR_LENGTH,
ORACLE_DB_POOL_MIN,
ORACLE_DB_POOL_MAX,
ORACLE_DB_POOL_INCREMENT,
2025-07-07 05:58:02 +00:00
)
from open_webui.env import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["RAG"])
2025-07-07 05:58:02 +00:00
class Oracle23aiClient(VectorDBBase):
"""
Oracle Vector Database Client for vector similarity search using Oracle Database 23ai.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
This client provides an interface to store, retrieve, and search vector embeddings
in an Oracle database. It uses connection pooling for efficient database access
and supports vector similarity search operations.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Attributes:
pool: Connection pool for Oracle database connections
"""
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
def __init__(self) -> None:
"""
Initialize the Oracle23aiClient with a connection pool.
2025-08-08 14:09:31 +00:00
2025-07-07 16:21:34 +00:00
Creates a connection pool with configurable min/max connections, initializes
2025-07-07 05:58:02 +00:00
the database schema if needed, and sets up necessary tables and indexes.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Raises:
ValueError: If required configuration parameters are missing
Exception: If database initialization fails
"""
2025-07-07 16:21:34 +00:00
self.pool = None
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
try:
# Create the appropriate connection pool based on DB type
if ORACLE_DB_USE_WALLET:
self._create_adb_pool()
else: # DBCS
self._create_dbcs_pool()
2025-08-08 14:09:31 +00:00
dsn = ORACLE_DB_DSN
2025-07-07 16:21:34 +00:00
log.info(f"Creating Connection Pool [{ORACLE_DB_USER}:**@{dsn}]")
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
with self.get_connection() as connection:
2025-07-07 16:21:34 +00:00
log.info(f"Connection version: {connection.version}")
2025-07-07 05:58:02 +00:00
self._initialize_database(connection)
2025-08-08 14:09:31 +00:00
log.info("Oracle Vector Search initialization complete.")
2025-07-07 05:58:02 +00:00
except Exception as e:
log.exception(f"Error during Oracle Vector Search initialization: {e}")
2025-07-07 05:58:02 +00:00
raise
2025-08-08 14:09:31 +00:00
def _create_adb_pool(self) -> None:
"""
Create connection pool for Oracle Autonomous Database.
2025-08-08 14:09:31 +00:00
Uses wallet-based authentication.
"""
self.pool = oracledb.create_pool(
user=ORACLE_DB_USER,
password=ORACLE_DB_PASSWORD,
dsn=ORACLE_DB_DSN,
min=ORACLE_DB_POOL_MIN,
max=ORACLE_DB_POOL_MAX,
increment=ORACLE_DB_POOL_INCREMENT,
config_dir=ORACLE_WALLET_DIR,
wallet_location=ORACLE_WALLET_DIR,
2025-08-08 14:09:31 +00:00
wallet_password=ORACLE_WALLET_PASSWORD,
)
2025-07-07 16:21:34 +00:00
log.info("Created ADB connection pool with wallet authentication.")
2025-08-08 14:09:31 +00:00
def _create_dbcs_pool(self) -> None:
2025-07-07 05:58:02 +00:00
"""
Create connection pool for Oracle Database Cloud Service.
2025-08-08 14:09:31 +00:00
Uses basic authentication without wallet.
"""
self.pool = oracledb.create_pool(
user=ORACLE_DB_USER,
password=ORACLE_DB_PASSWORD,
dsn=ORACLE_DB_DSN,
min=ORACLE_DB_POOL_MIN,
max=ORACLE_DB_POOL_MAX,
2025-08-08 14:09:31 +00:00
increment=ORACLE_DB_POOL_INCREMENT,
)
log.info("Created DB connection pool with basic authentication.")
2025-08-08 14:09:31 +00:00
def get_connection(self):
"""
Acquire a connection from the connection pool with retry logic.
2025-07-07 05:58:02 +00:00
Returns:
connection: A database connection with output type handler configured
"""
max_retries = 3
for attempt in range(max_retries):
try:
connection = self.pool.acquire()
connection.outputtypehandler = self._output_type_handler
return connection
except oracledb.DatabaseError as e:
2025-08-08 14:09:31 +00:00
(error_obj,) = e.args
log.exception(
f"Connection attempt {attempt + 1} failed: {error_obj.message}"
)
if attempt < max_retries - 1:
2025-08-08 14:09:31 +00:00
wait_time = 2**attempt
log.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
2025-08-08 14:09:31 +00:00
raise
def start_health_monitor(self, interval_seconds: int = 60):
"""
Start a background thread to periodically check the health of the connection pool.
Args:
interval_seconds (int): Number of seconds between health checks
"""
2025-08-08 14:09:31 +00:00
def _monitor():
while True:
try:
log.info("[HealthCheck] Running periodic DB health check...")
self.ensure_connection()
log.info("[HealthCheck] Connection is healthy.")
except Exception as e:
log.exception(f"[HealthCheck] Connection health check failed: {e}")
time.sleep(interval_seconds)
thread = threading.Thread(target=_monitor, daemon=True)
thread.start()
log.info(f"Started DB health monitor every {interval_seconds} seconds.")
def _reconnect_pool(self):
"""
Attempt to reinitialize the connection pool if it's been closed or broken.
"""
try:
log.info("Attempting to reinitialize the Oracle connection pool...")
2025-08-08 14:09:31 +00:00
2025-07-07 16:21:34 +00:00
# Close existing pool if it exists
if self.pool:
try:
self.pool.close()
except Exception as close_error:
log.warning(f"Error closing existing pool: {close_error}")
2025-08-08 14:09:31 +00:00
# Re-create the appropriate connection pool based on DB type
if ORACLE_DB_USE_WALLET:
self._create_adb_pool()
else: # DBCS
self._create_dbcs_pool()
2025-08-08 14:09:31 +00:00
log.info("Connection pool reinitialized.")
except Exception as e:
log.exception(f"Failed to reinitialize the connection pool: {e}")
raise
def ensure_connection(self):
"""
Ensure the database connection is alive, reconnecting pool if needed.
"""
try:
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT 1 FROM dual")
except Exception as e:
2025-08-08 14:09:31 +00:00
log.exception(
f"Connection check failed: {e}, attempting to reconnect pool..."
)
self._reconnect_pool()
2025-07-07 05:58:02 +00:00
def _output_type_handler(self, cursor, metadata):
"""
Handle Oracle vector type conversion.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
cursor: Oracle database cursor
metadata: Metadata for the column
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
A variable with appropriate conversion for vector types
"""
if metadata.type_code is oracledb.DB_TYPE_VECTOR:
2025-08-08 14:09:31 +00:00
return cursor.var(
metadata.type_code, arraysize=cursor.arraysize, outconverter=list
)
2025-07-07 05:58:02 +00:00
def _initialize_database(self, connection) -> None:
"""
Initialize database schema, tables and indexes.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Creates the document_chunk table and necessary indexes if they don't exist.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
connection: Oracle database connection
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Raises:
Exception: If schema initialization fails
"""
with connection.cursor() as cursor:
2025-07-07 16:21:34 +00:00
try:
log.info("Creating Table document_chunk")
2025-08-08 14:09:31 +00:00
cursor.execute(
"""
2025-07-07 16:21:34 +00:00
BEGIN
EXECUTE IMMEDIATE '
CREATE TABLE IF NOT EXISTS document_chunk (
id VARCHAR2(255) PRIMARY KEY,
collection_name VARCHAR2(255) NOT NULL,
text CLOB,
vmetadata JSON,
vector vector(*, float32)
)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
2025-08-08 14:09:31 +00:00
"""
)
2025-07-07 16:21:34 +00:00
log.info("Creating Index document_chunk_collection_name_idx")
2025-08-08 14:09:31 +00:00
cursor.execute(
"""
2025-07-07 16:21:34 +00:00
BEGIN
EXECUTE IMMEDIATE '
CREATE INDEX IF NOT EXISTS document_chunk_collection_name_idx
ON document_chunk (collection_name)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
2025-08-08 14:09:31 +00:00
"""
)
2025-07-07 16:21:34 +00:00
log.info("Creating VECTOR INDEX document_chunk_vector_ivf_idx")
2025-08-08 14:09:31 +00:00
cursor.execute(
"""
2025-07-07 16:21:34 +00:00
BEGIN
EXECUTE IMMEDIATE '
CREATE VECTOR INDEX IF NOT EXISTS document_chunk_vector_ivf_idx
ON document_chunk(vector)
ORGANIZATION NEIGHBOR PARTITIONS
DISTANCE COSINE
WITH TARGET ACCURACY 95
PARAMETERS (TYPE IVF, NEIGHBOR PARTITIONS 100)
';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
2025-08-08 14:09:31 +00:00
"""
)
2025-07-07 16:21:34 +00:00
connection.commit()
log.info("Database initialization completed successfully.")
2025-08-08 14:09:31 +00:00
2025-07-07 16:21:34 +00:00
except Exception as e:
connection.rollback()
log.exception(f"Error during database initialization: {e}")
raise
2025-07-07 05:58:02 +00:00
def check_vector_length(self) -> None:
"""
Check vector length compatibility (placeholder).
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
This method would check if the configured vector length matches the database schema.
Currently implemented as a placeholder.
"""
pass
def _vector_to_blob(self, vector: List[float]) -> bytes:
"""
Convert a vector to Oracle BLOB format.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
vector (List[float]): The vector to convert
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
bytes: The vector in Oracle BLOB format
"""
return array.array("f", vector)
def adjust_vector_length(self, vector: List[float]) -> List[float]:
"""
Adjust vector to the expected length if needed.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
vector (List[float]): The vector to adjust
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
List[float]: The adjusted vector
"""
return vector
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
def _decimal_handler(self, obj):
"""
Handle Decimal objects for JSON serialization.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
obj: Object to serialize
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
float: Converted decimal value
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Raises:
TypeError: If object is not JSON serializable
"""
if isinstance(obj, Decimal):
return float(obj)
raise TypeError(f"{obj} is not JSON serializable")
def _metadata_to_json(self, metadata: Dict) -> str:
"""
Convert metadata dictionary to JSON string.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
metadata (Dict): Metadata dictionary
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
str: JSON representation of metadata
"""
return json.dumps(metadata, default=self._decimal_handler) if metadata else "{}"
def _json_to_metadata(self, json_str: str) -> Dict:
"""
Convert JSON string to metadata dictionary.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
json_str (str): JSON string
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
Dict: Metadata dictionary
"""
return json.loads(json_str) if json_str else {}
def insert(self, collection_name: str, items: List[VectorItem]) -> None:
"""
Insert vector items into the database.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
collection_name (str): Name of the collection
items (List[VectorItem]): List of vector items to insert
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Raises:
Exception: If insertion fails
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> items = [
... {"id": "1", "text": "Sample text", "vector": [0.1, 0.2, ...], "metadata": {"source": "doc1"}},
... {"id": "2", "text": "Another text", "vector": [0.3, 0.4, ...], "metadata": {"source": "doc2"}}
... ]
>>> client.insert("my_collection", items)
"""
2025-07-07 16:21:34 +00:00
log.info(f"Inserting {len(items)} items into collection '{collection_name}'.")
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
with self.get_connection() as connection:
try:
with connection.cursor() as cursor:
for item in items:
vector_blob = self._vector_to_blob(item["vector"])
metadata_json = self._metadata_to_json(item["metadata"])
2025-08-08 14:09:31 +00:00
cursor.execute(
"""
2025-07-07 05:58:02 +00:00
INSERT INTO document_chunk
(id, collection_name, text, vmetadata, vector)
VALUES (:id, :collection_name, :text, :metadata, :vector)
2025-08-08 14:09:31 +00:00
""",
{
"id": item["id"],
"collection_name": collection_name,
"text": item["text"],
"metadata": metadata_json,
"vector": vector_blob,
},
)
2025-07-07 05:58:02 +00:00
connection.commit()
2025-08-08 14:09:31 +00:00
log.info(
f"Successfully inserted {len(items)} items into collection '{collection_name}'."
)
2025-07-07 16:21:34 +00:00
2025-07-07 05:58:02 +00:00
except Exception as e:
connection.rollback()
log.exception(f"Error during insert: {e}")
2025-07-07 05:58:02 +00:00
raise
def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
"""
Update or insert vector items into the database.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
If an item with the same ID exists, it will be updated;
otherwise, it will be inserted.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
collection_name (str): Name of the collection
items (List[VectorItem]): List of vector items to upsert
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Raises:
Exception: If upsert operation fails
2025-07-07 16:21:34 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> items = [
... {"id": "1", "text": "Updated text", "vector": [0.1, 0.2, ...], "metadata": {"source": "doc1"}},
... {"id": "3", "text": "New item", "vector": [0.5, 0.6, ...], "metadata": {"source": "doc3"}}
... ]
>>> client.upsert("my_collection", items)
"""
2025-07-07 16:21:34 +00:00
log.info(f"Upserting {len(items)} items into collection '{collection_name}'.")
2025-07-07 05:58:02 +00:00
with self.get_connection() as connection:
try:
with connection.cursor() as cursor:
for item in items:
vector_blob = self._vector_to_blob(item["vector"])
metadata_json = self._metadata_to_json(item["metadata"])
2025-08-08 14:09:31 +00:00
cursor.execute(
"""
2025-07-07 05:58:02 +00:00
MERGE INTO document_chunk d
2025-07-07 16:21:34 +00:00
USING (SELECT :merge_id as id FROM dual) s
2025-07-07 05:58:02 +00:00
ON (d.id = s.id)
WHEN MATCHED THEN
UPDATE SET
2025-07-07 16:21:34 +00:00
collection_name = :upd_collection_name,
text = :upd_text,
vmetadata = :upd_metadata,
vector = :upd_vector
2025-07-07 05:58:02 +00:00
WHEN NOT MATCHED THEN
INSERT (id, collection_name, text, vmetadata, vector)
2025-07-07 16:21:34 +00:00
VALUES (:ins_id, :ins_collection_name, :ins_text, :ins_metadata, :ins_vector)
2025-08-08 14:09:31 +00:00
""",
{
"merge_id": item["id"],
"upd_collection_name": collection_name,
"upd_text": item["text"],
"upd_metadata": metadata_json,
"upd_vector": vector_blob,
"ins_id": item["id"],
"ins_collection_name": collection_name,
"ins_text": item["text"],
"ins_metadata": metadata_json,
"ins_vector": vector_blob,
},
)
2025-07-07 05:58:02 +00:00
connection.commit()
2025-08-08 14:09:31 +00:00
log.info(
f"Successfully upserted {len(items)} items into collection '{collection_name}'."
)
2025-07-07 16:21:34 +00:00
2025-07-07 05:58:02 +00:00
except Exception as e:
connection.rollback()
log.exception(f"Error during upsert: {e}")
2025-07-07 05:58:02 +00:00
raise
def search(
2025-08-08 14:09:31 +00:00
self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int
2025-07-07 05:58:02 +00:00
) -> Optional[SearchResult]:
"""
Search for similar vectors in the database.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Performs vector similarity search using cosine distance.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
collection_name (str): Name of the collection to search
2025-07-07 16:21:34 +00:00
vectors (List[List[Union[float, int]]]): Query vectors to find similar items for
limit (int): Maximum number of results to return per query
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
Optional[SearchResult]: Search results containing ids, distances, documents, and metadata
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> query_vector = [0.1, 0.2, 0.3, ...] # Must match VECTOR_LENGTH
>>> results = client.search("my_collection", [query_vector], limit=5)
>>> if results:
2025-07-07 16:21:34 +00:00
... log.info(f"Found {len(results.ids[0])} matches")
2025-07-07 05:58:02 +00:00
... for i, (id, dist) in enumerate(zip(results.ids[0], results.distances[0])):
2025-07-07 16:21:34 +00:00
... log.info(f"Match {i+1}: id={id}, distance={dist}")
2025-07-07 05:58:02 +00:00
"""
2025-08-08 14:09:31 +00:00
log.info(
f"Searching items from collection '{collection_name}' with limit {limit}."
)
2025-07-07 05:58:02 +00:00
try:
if not vectors:
2025-07-07 16:21:34 +00:00
log.warning("No vectors provided for search.")
2025-07-07 05:58:02 +00:00
return None
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
num_queries = len(vectors)
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
ids = [[] for _ in range(num_queries)]
distances = [[] for _ in range(num_queries)]
documents = [[] for _ in range(num_queries)]
metadatas = [[] for _ in range(num_queries)]
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
with self.get_connection() as connection:
with connection.cursor() as cursor:
for qid, vector in enumerate(vectors):
vector_blob = self._vector_to_blob(vector)
2025-08-08 14:09:31 +00:00
cursor.execute(
"""
2025-07-07 05:58:02 +00:00
SELECT dc.id, dc.text,
2025-07-10 12:12:43 +00:00
JSON_SERIALIZE(dc.vmetadata RETURNING VARCHAR2(4096)) as vmetadata,
2025-07-07 05:58:02 +00:00
VECTOR_DISTANCE(dc.vector, :query_vector, COSINE) as distance
FROM document_chunk dc
WHERE dc.collection_name = :collection_name
ORDER BY VECTOR_DISTANCE(dc.vector, :query_vector, COSINE)
FETCH APPROX FIRST :limit ROWS ONLY
2025-08-08 14:09:31 +00:00
""",
{
"query_vector": vector_blob,
"collection_name": collection_name,
"limit": limit,
},
)
2025-07-07 05:58:02 +00:00
results = cursor.fetchall()
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
for row in results:
ids[qid].append(row[0])
2025-08-08 14:09:31 +00:00
documents[qid].append(
row[1].read()
if isinstance(row[1], oracledb.LOB)
else str(row[1])
)
2025-07-07 17:25:16 +00:00
# 🔧 FIXED: Parse JSON metadata properly
2025-08-08 14:09:31 +00:00
metadata_str = (
row[2].read()
if isinstance(row[2], oracledb.LOB)
else row[2]
)
2025-07-07 17:25:16 +00:00
metadatas[qid].append(self._json_to_metadata(metadata_str))
2025-07-07 05:58:02 +00:00
distances[qid].append(float(row[3]))
2025-08-08 14:09:31 +00:00
log.info(
f"Search completed. Found {sum(len(ids[i]) for i in range(num_queries))} total results."
)
2025-07-07 16:21:34 +00:00
2025-07-07 05:58:02 +00:00
return SearchResult(
2025-08-08 14:09:31 +00:00
ids=ids, distances=distances, documents=documents, metadatas=metadatas
2025-07-07 05:58:02 +00:00
)
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
except Exception as e:
log.exception(f"Error during search: {e}")
2025-07-07 05:58:02 +00:00
return None
def query(
2025-08-08 14:09:31 +00:00
self, collection_name: str, filter: Dict, limit: Optional[int] = None
2025-07-07 05:58:02 +00:00
) -> Optional[GetResult]:
"""
Query items based on metadata filters.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Retrieves items that match specified metadata criteria.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
collection_name (str): Name of the collection to query
filter (Dict[str, Any]): Metadata filters to apply
limit (Optional[int]): Maximum number of results to return
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
Optional[GetResult]: Query results containing ids, documents, and metadata
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> filter = {"source": "doc1", "category": "finance"}
>>> results = client.query("my_collection", filter, limit=20)
>>> if results:
... print(f"Found {len(results.ids[0])} matching documents")
"""
2025-07-07 16:21:34 +00:00
log.info(f"Querying items from collection '{collection_name}' with filters.")
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
try:
limit = limit or 100
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
query = """
2025-07-10 12:12:43 +00:00
SELECT id, text, JSON_SERIALIZE(vmetadata RETURNING VARCHAR2(4096)) as vmetadata
2025-07-07 05:58:02 +00:00
FROM document_chunk
WHERE collection_name = :collection_name
"""
2025-08-08 14:09:31 +00:00
params = {"collection_name": collection_name}
2025-07-07 05:58:02 +00:00
for i, (key, value) in enumerate(filter.items()):
param_name = f"value_{i}"
query += f" AND JSON_VALUE(vmetadata, '$.{key}' RETURNING VARCHAR2(4096)) = :{param_name}"
params[param_name] = str(value)
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
query += " FETCH FIRST :limit ROWS ONLY"
2025-08-08 14:09:31 +00:00
params["limit"] = limit
2025-07-07 05:58:02 +00:00
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(query, params)
results = cursor.fetchall()
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
if not results:
2025-07-07 16:21:34 +00:00
log.info("No results found for query.")
2025-07-07 05:58:02 +00:00
return None
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
ids = [[row[0] for row in results]]
2025-08-08 14:09:31 +00:00
documents = [
[
row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1])
for row in results
]
]
2025-07-07 17:25:16 +00:00
# 🔧 FIXED: Parse JSON metadata properly
2025-08-08 14:09:31 +00:00
metadatas = [
[
self._json_to_metadata(
row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]
)
for row in results
]
]
2025-07-07 17:25:16 +00:00
log.info(f"Query completed. Found {len(results)} results.")
2025-08-08 14:09:31 +00:00
return GetResult(ids=ids, documents=documents, metadatas=metadatas)
2025-07-07 05:58:02 +00:00
except Exception as e:
log.exception(f"Error during query: {e}")
2025-07-07 05:58:02 +00:00
return None
2025-08-08 14:09:31 +00:00
def get(self, collection_name: str) -> Optional[GetResult]:
2025-07-07 05:58:02 +00:00
"""
Get all items in a collection.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Retrieves items from a specified collection up to the limit.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
collection_name (str): Name of the collection to retrieve
limit (Optional[int]): Maximum number of items to retrieve
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
Optional[GetResult]: Result containing ids, documents, and metadata
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> results = client.get("my_collection", limit=50)
>>> if results:
... print(f"Retrieved {len(results.ids[0])} documents from collection")
"""
2025-08-08 14:09:31 +00:00
log.info(
f"Getting items from collection '{collection_name}' with limit {limit}."
)
2025-07-07 05:58:02 +00:00
try:
2025-07-07 16:21:34 +00:00
limit = limit or 1000
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
with self.get_connection() as connection:
with connection.cursor() as cursor:
2025-08-08 14:09:31 +00:00
cursor.execute(
"""
2025-07-10 12:12:43 +00:00
SELECT /*+ MONITOR */ id, text, JSON_SERIALIZE(vmetadata RETURNING VARCHAR2(4096)) as vmetadata
2025-07-07 05:58:02 +00:00
FROM document_chunk
WHERE collection_name = :collection_name
FETCH FIRST :limit ROWS ONLY
2025-08-08 14:09:31 +00:00
""",
{"collection_name": collection_name, "limit": limit},
)
2025-07-07 05:58:02 +00:00
results = cursor.fetchall()
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
if not results:
2025-07-07 16:21:34 +00:00
log.info("No results found.")
2025-07-07 05:58:02 +00:00
return None
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
ids = [[row[0] for row in results]]
2025-08-08 14:09:31 +00:00
documents = [
[
row[1].read() if isinstance(row[1], oracledb.LOB) else str(row[1])
for row in results
]
]
2025-07-07 17:25:16 +00:00
# 🔧 FIXED: Parse JSON metadata properly
2025-08-08 14:09:31 +00:00
metadatas = [
[
self._json_to_metadata(
row[2].read() if isinstance(row[2], oracledb.LOB) else row[2]
)
for row in results
]
]
return GetResult(ids=ids, documents=documents, metadatas=metadatas)
2025-07-07 16:21:34 +00:00
2025-07-07 05:58:02 +00:00
except Exception as e:
log.exception(f"Error during get: {e}")
2025-07-07 05:58:02 +00:00
return None
def delete(
self,
collection_name: str,
ids: Optional[List[str]] = None,
filter: Optional[Dict[str, Any]] = None,
) -> None:
"""
Delete items from the database.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Deletes items from a collection based on IDs or metadata filters.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
collection_name (str): Name of the collection to delete from
ids (Optional[List[str]]): Specific item IDs to delete
filter (Optional[Dict[str, Any]]): Metadata filters for deletion
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Raises:
Exception: If deletion fails
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> # Delete specific items by ID
>>> client.delete("my_collection", ids=["1", "3", "5"])
>>> # Or delete by metadata filter
>>> client.delete("my_collection", filter={"source": "deprecated_source"})
"""
2025-07-07 16:21:34 +00:00
log.info(f"Deleting items from collection '{collection_name}'.")
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
try:
2025-08-08 14:09:31 +00:00
query = (
"DELETE FROM document_chunk WHERE collection_name = :collection_name"
)
params = {"collection_name": collection_name}
2025-07-07 05:58:02 +00:00
if ids:
2025-07-07 17:25:16 +00:00
# 🔧 FIXED: Use proper parameterized query to prevent SQL injection
2025-08-08 14:09:31 +00:00
placeholders = ",".join([f":id_{i}" for i in range(len(ids))])
2025-07-07 17:25:16 +00:00
query += f" AND id IN ({placeholders})"
for i, id_val in enumerate(ids):
2025-08-08 14:09:31 +00:00
params[f"id_{i}"] = id_val
2025-07-07 05:58:02 +00:00
if filter:
for i, (key, value) in enumerate(filter.items()):
param_name = f"value_{i}"
query += f" AND JSON_VALUE(vmetadata, '$.{key}' RETURNING VARCHAR2(4096)) = :{param_name}"
params[param_name] = str(value)
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(query, params)
deleted = cursor.rowcount
connection.commit()
2025-08-08 14:09:31 +00:00
log.info(f"Deleted {deleted} items from collection '{collection_name}'.")
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
except Exception as e:
log.exception(f"Error during delete: {e}")
2025-07-07 05:58:02 +00:00
raise
def reset(self) -> None:
"""
Reset the database by deleting all items.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Deletes all items from the document_chunk table.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Raises:
Exception: If reset fails
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> client.reset() # Warning: Removes all data!
"""
2025-07-07 16:21:34 +00:00
log.info("Resetting database - deleting all items.")
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
try:
with self.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("DELETE FROM document_chunk")
deleted = cursor.rowcount
connection.commit()
2025-08-08 14:09:31 +00:00
log.info(
f"Reset complete. Deleted {deleted} items from 'document_chunk' table."
)
2025-07-07 16:21:34 +00:00
2025-07-07 05:58:02 +00:00
except Exception as e:
log.exception(f"Error during reset: {e}")
2025-07-07 05:58:02 +00:00
raise
def close(self) -> None:
"""
Close the database connection pool.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Properly closes the connection pool and releases all resources.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> # After finishing all operations
>>> client.close()
"""
try:
2025-08-08 14:09:31 +00:00
if hasattr(self, "pool") and self.pool:
2025-07-07 05:58:02 +00:00
self.pool.close()
2025-07-07 16:21:34 +00:00
log.info("Oracle Vector Search connection pool closed.")
2025-07-07 05:58:02 +00:00
except Exception as e:
2025-07-07 16:21:34 +00:00
log.exception(f"Error closing connection pool: {e}")
2025-07-07 05:58:02 +00:00
def has_collection(self, collection_name: str) -> bool:
"""
Check if a collection exists.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
collection_name (str): Name of the collection to check
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Returns:
bool: True if the collection exists, False otherwise
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> if client.has_collection("my_collection"):
... print("Collection exists!")
... else:
... print("Collection does not exist.")
"""
try:
with self.get_connection() as connection:
with connection.cursor() as cursor:
2025-08-08 14:09:31 +00:00
cursor.execute(
"""
2025-07-07 05:58:02 +00:00
SELECT COUNT(*)
FROM document_chunk
WHERE collection_name = :collection_name
FETCH FIRST 1 ROWS ONLY
2025-08-08 14:09:31 +00:00
""",
{"collection_name": collection_name},
)
2025-07-07 05:58:02 +00:00
count = cursor.fetchone()[0]
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
return count > 0
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
except Exception as e:
log.exception(f"Error checking collection existence: {e}")
2025-07-07 05:58:02 +00:00
return False
def delete_collection(self, collection_name: str) -> None:
"""
Delete an entire collection.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Removes all items belonging to the specified collection.
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Args:
collection_name (str): Name of the collection to delete
2025-08-08 14:09:31 +00:00
2025-07-07 05:58:02 +00:00
Example:
>>> client = Oracle23aiClient()
>>> client.delete_collection("obsolete_collection")
"""
2025-07-07 16:21:34 +00:00
log.info(f"Deleting collection '{collection_name}'.")
2025-08-08 14:09:31 +00:00
2025-07-07 16:21:34 +00:00
try:
with self.get_connection() as connection:
with connection.cursor() as cursor:
2025-08-08 14:09:31 +00:00
cursor.execute(
"""
2025-07-07 16:21:34 +00:00
DELETE FROM document_chunk
WHERE collection_name = :collection_name
2025-08-08 14:09:31 +00:00
""",
{"collection_name": collection_name},
)
2025-07-07 16:21:34 +00:00
deleted = cursor.rowcount
connection.commit()
2025-08-08 14:09:31 +00:00
log.info(
f"Collection '{collection_name}' deleted. Removed {deleted} items."
)
2025-07-07 16:21:34 +00:00
except Exception as e:
log.exception(f"Error deleting collection '{collection_name}': {e}")
2025-08-08 14:09:31 +00:00
raise