diff --git a/backend/open_webui/retrieval/vector/dbs/s3vector.py b/backend/open_webui/retrieval/vector/dbs/s3vector.py index e0fc4cb1a2..6df4a4aa81 100644 --- a/backend/open_webui/retrieval/vector/dbs/s3vector.py +++ b/backend/open_webui/retrieval/vector/dbs/s3vector.py @@ -11,19 +11,36 @@ log.setLevel(SRC_LOG_LEVELS["RAG"]) class S3VectorClient(VectorDBBase): """ AWS S3 Vector integration for Open WebUI Knowledge. - Assumes AWS credentials are available via environment variables or IAM roles. """ + def __init__(self): self.bucket_name = S3_VECTOR_BUCKET_NAME self.region = S3_VECTOR_REGION - self.client = boto3.client("s3vectors", region_name=self.region) + + # Simple validation - log warnings instead of raising exceptions + if not self.bucket_name: + log.warning("S3_VECTOR_BUCKET_NAME not set - S3Vector will not work") + if not self.region: + log.warning("S3_VECTOR_REGION not set - S3Vector will not work") + + if self.bucket_name and self.region: + try: + self.client = boto3.client("s3vectors", region_name=self.region) + log.info(f"S3Vector client initialized for bucket '{self.bucket_name}' in region '{self.region}'") + except Exception as e: + log.error(f"Failed to initialize S3Vector client: {e}") + self.client = None + else: + self.client = None - def _create_index(self, index_name: str, dimension: int, data_type: str = "float32", distance_metric: str = "cosine"): + def _create_index(self, index_name: str, dimension: int, data_type: str = "float32", distance_metric: str = "cosine") -> None: """ Create a new index in the S3 vector bucket for the given collection if it does not exist. """ if self.has_collection(index_name): + log.debug(f"Index '{index_name}' already exists, skipping creation") return + try: self.client.create_index( vectorBucketName=self.bucket_name, @@ -35,12 +52,11 @@ class S3VectorClient(VectorDBBase): log.info(f"Created S3 index: {index_name} (dim={dimension}, type={data_type}, metric={distance_metric})") except Exception as e: log.error(f"Error creating S3 index '{index_name}': {e}") + raise def _filter_metadata(self, metadata: Dict[str, Any], item_id: str) -> Dict[str, Any]: """ Filter vector metadata keys to comply with S3 Vector API limit of 10 keys maximum. - If AWS S3 Vector feature starts supporting more than 10 keys, this should be adjusted, and preferably removed. - Limitation is documented here: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-vectors-limitations.html """ if not isinstance(metadata, dict) or len(metadata) <= 10: return metadata @@ -82,6 +98,7 @@ class S3VectorClient(VectorDBBase): """ Check if a vector index (collection) exists in the S3 vector bucket. """ + try: response = self.client.list_indexes(vectorBucketName=self.bucket_name) indexes = response.get("indexes", []) @@ -89,10 +106,12 @@ class S3VectorClient(VectorDBBase): except Exception as e: log.error(f"Error listing indexes: {e}") return False + def delete_collection(self, collection_name: str) -> None: """ Delete an entire S3 Vector index/collection. """ + if not self.has_collection(collection_name): log.warning(f"Collection '{collection_name}' does not exist, nothing to delete") return @@ -108,11 +127,9 @@ class S3VectorClient(VectorDBBase): log.error(f"Error deleting collection '{collection_name}': {e}") raise - def insert(self, collection_name: str, items: List[Dict[str, Any]]) -> None: + def insert(self, collection_name: str, items: List[VectorItem]) -> None: """ Insert vector items into the S3 Vector index. Create index if it does not exist. - - Supports both knowledge collection indexes and file-specific indexes (file-{file_id}). """ if not items: log.warning("No items to insert") @@ -143,10 +160,7 @@ class S3VectorClient(VectorDBBase): metadata = item.get("metadata", {}).copy() # Add the text field to metadata so it's available for retrieval - if "text" in item: - metadata["text"] = item["text"] - else: - log.warning(f"No 'text' field found in item with ID: {item.get('id')}") + metadata["text"] = item["text"] # Filter metadata to comply with S3 Vector API limit of 10 keys metadata = self._filter_metadata(metadata, item["id"]) @@ -169,7 +183,7 @@ class S3VectorClient(VectorDBBase): log.error(f"Error inserting vectors: {e}") raise - def upsert(self, collection_name: str, items: List[Dict[str, Any]]) -> None: + def upsert(self, collection_name: str, items: List[VectorItem]) -> None: """ Insert or update vector items in the S3 Vector index. Create index if it does not exist. """ @@ -202,8 +216,7 @@ class S3VectorClient(VectorDBBase): # Prepare metadata, ensuring the text field is preserved metadata = item.get("metadata", {}).copy() # Add the text field to metadata so it's available for retrieval - if "text" in item: - metadata["text"] = item["text"] + metadata["text"] = item["text"] # Filter metadata to comply with S3 Vector API limit of 10 keys metadata = self._filter_metadata(metadata, item["id"]) @@ -230,17 +243,8 @@ class S3VectorClient(VectorDBBase): def search(self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int) -> Optional[SearchResult]: """ Search for similar vectors in a collection using multiple query vectors. - - Uses S3 Vector's query_vectors API to perform similarity search. - - Args: - collection_name: Name of the collection to search in - vectors: List of query vectors to search with - limit: Maximum number of results to return per query - - Returns: - SearchResult containing IDs, documents, metadatas, and distances """ + if not self.has_collection(collection_name): log.warning(f"Collection '{collection_name}' does not exist") return None @@ -343,18 +347,8 @@ class S3VectorClient(VectorDBBase): def query(self, collection_name: str, filter: Dict, limit: Optional[int] = None) -> Optional[GetResult]: """ Query vectors from a collection using metadata filter. - - For S3 Vector, this uses the list_vectors API with metadata filters. - Note: S3 Vector supports metadata filtering, but the exact filter syntax may vary. - - Args: - collection_name: Name of the collection to query - filter: Dictionary containing metadata filter conditions - limit: Maximum number of results to return (optional) - - Returns: - GetResult containing IDs, documents, and metadatas """ + if not self.has_collection(collection_name): log.warning(f"Collection '{collection_name}' does not exist") return GetResult(ids=[[]], documents=[[]], metadatas=[[]]) @@ -423,10 +417,8 @@ class S3VectorClient(VectorDBBase): def get(self, collection_name: str) -> Optional[GetResult]: """ Retrieve all vectors from a collection. - - Uses S3 Vector's list_vectors API to get all vectors with their data and metadata. - Handles pagination automatically to retrieve all vectors. """ + if not self.has_collection(collection_name): log.warning(f"Collection '{collection_name}' does not exist") return GetResult(ids=[[]], documents=[[]], metadatas=[[]]) @@ -519,10 +511,8 @@ class S3VectorClient(VectorDBBase): def delete(self, collection_name: str, ids: Optional[List[str]] = None, filter: Optional[Dict] = None) -> None: """ Delete vectors by ID or filter from a collection. - - For S3 Vector, we support deletion by IDs. Filter-based deletion requires querying first. - For knowledge collections, also handles cleanup of related file-specific collections. """ + if not self.has_collection(collection_name): log.warning(f"Collection '{collection_name}' does not exist, nothing to delete") return @@ -578,9 +568,9 @@ class S3VectorClient(VectorDBBase): def reset(self) -> None: """ - Reset/clear all vector data. For S3 Vector, this would mean deleting all indexes. - Use with caution as this is destructive. + Reset/clear all vector data. For S3 Vector, this deletes all indexes. """ + try: log.warning("Reset called - this will delete all vector indexes in the S3 bucket") @@ -616,14 +606,6 @@ class S3VectorClient(VectorDBBase): def _matches_filter(self, metadata: Dict[str, Any], filter: Dict[str, Any]) -> bool: """ Check if metadata matches the given filter conditions. - Supports basic equality matching and simple logical operations. - - Args: - metadata: The metadata to check - filter: The filter conditions to match against - - Returns: - True if metadata matches all filter conditions, False otherwise """ if not isinstance(metadata, dict) or not isinstance(filter, dict): return False