diff --git a/backend/open_webui/config.py b/backend/open_webui/config.py index 898ac1b594..a226dd6a55 100644 --- a/backend/open_webui/config.py +++ b/backend/open_webui/config.py @@ -1794,10 +1794,10 @@ MILVUS_IVF_FLAT_NLIST = int(os.environ.get("MILVUS_IVF_FLAT_NLIST", "128")) QDRANT_URI = os.environ.get("QDRANT_URI", None) QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", None) QDRANT_ON_DISK = os.environ.get("QDRANT_ON_DISK", "false").lower() == "true" -QDRANT_PREFER_GRPC = os.environ.get("QDRANT_PREFER_GRPC", "False").lower() == "true" +QDRANT_PREFER_GRPC = os.environ.get("QDRANT_PREFER_GRPC", "false").lower() == "true" QDRANT_GRPC_PORT = int(os.environ.get("QDRANT_GRPC_PORT", "6334")) ENABLE_QDRANT_MULTITENANCY_MODE = ( - os.environ.get("ENABLE_QDRANT_MULTITENANCY_MODE", "false").lower() == "true" + os.environ.get("ENABLE_QDRANT_MULTITENANCY_MODE", "true").lower() == "true" ) # OpenSearch diff --git a/backend/open_webui/retrieval/vector/dbs/qdrant_multitenancy.py b/backend/open_webui/retrieval/vector/dbs/qdrant_multitenancy.py index e83c437ef7..377b036247 100644 --- a/backend/open_webui/retrieval/vector/dbs/qdrant_multitenancy.py +++ b/backend/open_webui/retrieval/vector/dbs/qdrant_multitenancy.py @@ -23,6 +23,7 @@ from qdrant_client.http.models import PointStruct from qdrant_client.models import models NO_LIMIT = 999999999 +TENANT_ID_FIELD = "tenant_id" log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["RAG"]) @@ -113,141 +114,35 @@ class QdrantClient(VectorDBBase): else: return self.KNOWLEDGE_COLLECTION, tenant_id - def _extract_error_message(self, exception): - """ - Extract error message from either HTTP or gRPC exceptions - - Returns: - tuple: (status_code, error_message) - """ - # Check if it's an HTTP exception - if isinstance(exception, UnexpectedResponse): - try: - error_data = exception.structured() - error_msg = error_data.get("status", {}).get("error", "") - return exception.status_code, error_msg - except Exception as inner_e: - log.error(f"Failed to parse HTTP error: {inner_e}") - return exception.status_code, str(exception) - - # Check if it's a gRPC exception - elif isinstance(exception, grpc.RpcError): - # Extract status code from gRPC error - status_code = None - if hasattr(exception, "code") and callable(exception.code): - status_code = exception.code().value[0] - - # Extract error message - error_msg = str(exception) - if "details =" in error_msg: - # Parse the details line which contains the actual error message - try: - details_line = [ - line.strip() - for line in error_msg.split("\n") - if "details =" in line - ][0] - error_msg = details_line.split("details =")[1].strip(' "') - except (IndexError, AttributeError): - # Fall back to full message if parsing fails - pass - - return status_code, error_msg - - # For any other type of exception - return None, str(exception) - - def _is_collection_not_found_error(self, exception): - """ - Check if the exception is due to collection not found, supporting both HTTP and gRPC - """ - status_code, error_msg = self._extract_error_message(exception) - - # HTTP error (404) - if ( - status_code == 404 - and "Collection" in error_msg - and "doesn't exist" in error_msg - ): - return True - - # gRPC error (NOT_FOUND status) - if ( - isinstance(exception, grpc.RpcError) - and exception.code() == grpc.StatusCode.NOT_FOUND - ): - return True - - return False - - def _is_dimension_mismatch_error(self, exception): - """ - Check if the exception is due to dimension mismatch, supporting both HTTP and gRPC - """ - status_code, error_msg = self._extract_error_message(exception) - - # Common patterns in both HTTP and gRPC - return ( - "Vector dimension error" in error_msg - or "dimensions mismatch" in error_msg - or "invalid vector size" in error_msg - ) - - def _create_multi_tenant_collection_if_not_exists( - self, mt_collection_name: str, dimension: int = 384 + def _create_multi_tenant_collection( + self, + mt_collection_name: str, + dimension: int = 384, ): """ - Creates a collection with multi-tenancy configuration if it doesn't exist. - Default dimension is set to 384 which corresponds to 'sentence-transformers/all-MiniLM-L6-v2'. - When creating collections dynamically (insert/upsert), the actual vector dimensions will be used. + Creates a collection with multi-tenancy configuration and payload indexes for tenant_id and metadata fields. """ - try: - # Try to create the collection directly - will fail if it already exists - self.client.create_collection( - collection_name=mt_collection_name, - vectors_config=models.VectorParams( - size=dimension, - distance=models.Distance.COSINE, - on_disk=self.QDRANT_ON_DISK, - ), - hnsw_config=models.HnswConfigDiff( - payload_m=16, # Enable per-tenant indexing - m=0, - on_disk=self.QDRANT_ON_DISK, - ), - ) + self.client.create_collection( + collection_name=mt_collection_name, + vectors_config=models.VectorParams( + size=dimension, + distance=models.Distance.COSINE, + on_disk=self.QDRANT_ON_DISK, + ), + ) + log.info( + f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!" + ) - # Create tenant ID payload index - self.client.create_payload_index( - collection_name=mt_collection_name, - field_name="tenant_id", - field_schema=models.KeywordIndexParams( - type=models.KeywordIndexType.KEYWORD, - is_tenant=True, - on_disk=self.QDRANT_ON_DISK, - ), - wait=True, - ) - - log.info( - f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!" - ) - except (UnexpectedResponse, grpc.RpcError) as e: - # Check for the specific error indicating collection already exists - status_code, error_msg = self._extract_error_message(e) - - # HTTP status code 409 or gRPC ALREADY_EXISTS - if (isinstance(e, UnexpectedResponse) and status_code == 409) or ( - isinstance(e, grpc.RpcError) - and e.code() == grpc.StatusCode.ALREADY_EXISTS - ): - if "already exists" in error_msg: - log.debug(f"Collection {mt_collection_name} already exists") - return - # If it's not an already exists error, re-raise - raise e - except Exception as e: - raise e + self.client.create_payload_index( + collection_name=mt_collection_name, + field_name=TENANT_ID_FIELD, + field_schema=models.KeywordIndexParams( + type=models.KeywordIndexType.KEYWORD, + is_tenant=True, + on_disk=self.QDRANT_ON_DISK, + ), + ) def _create_points(self, items: list[VectorItem], tenant_id: str): """ @@ -260,50 +155,41 @@ class QdrantClient(VectorDBBase): payload={ "text": item["text"], "metadata": item["metadata"], - "tenant_id": tenant_id, + TENANT_ID_FIELD: tenant_id, }, ) for item in items ] + def _ensure_collection( + self, + mt_collection_name: str, + dimension: int = 384, + ): + """ + Ensure the collection exists and payload indexes are created for tenant_id and metadata fields. + """ + if self.client.collection_exists(collection_name=mt_collection_name): + return + self._create_multi_tenant_collection(mt_collection_name, dimension) + def has_collection(self, collection_name: str) -> bool: """ Check if a logical collection exists by checking for any points with the tenant ID. """ if not self.client: return False - - # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) - - # Create tenant filter - tenant_filter = models.FieldCondition( - key="tenant_id", match=models.MatchValue(value=tenant_id) - ) - - try: - # Try directly querying - most of the time collection should exist - response = self.client.query_points( - collection_name=mt_collection, - query_filter=models.Filter(must=[tenant_filter]), - limit=1, - ) - - # Collection exists with this tenant ID if there are points - return len(response.points) > 0 - except (UnexpectedResponse, grpc.RpcError) as e: - if self._is_collection_not_found_error(e): - log.debug(f"Collection {mt_collection} doesn't exist") - return False - else: - # For other API errors, log and return False - _, error_msg = self._extract_error_message(e) - log.warning(f"Unexpected Qdrant error: {error_msg}") - return False - except Exception as e: - # For any other errors, log and return False - log.debug(f"Error checking collection {mt_collection}: {e}") + if not self.client.collection_exists(collection_name=mt_collection): return False + tenant_filter = models.FieldCondition( + key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id) + ) + count_result = self.client.count( + collection_name=mt_collection, + count_filter=models.Filter(must=[tenant_filter]), + ) + return count_result.count > 0 def delete( self, @@ -317,17 +203,16 @@ class QdrantClient(VectorDBBase): if not self.client: return None - # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) + if not self.client.collection_exists(collection_name=mt_collection): + log.debug(f"Collection {mt_collection} doesn't exist, nothing to delete") + return None - # Create tenant filter tenant_filter = models.FieldCondition( - key="tenant_id", match=models.MatchValue(value=tenant_id) + key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id) ) - must_conditions = [tenant_filter] should_conditions = [] - if ids: for id_value in ids: should_conditions.append( @@ -346,7 +231,6 @@ class QdrantClient(VectorDBBase): ) try: - # Try to delete directly - most of the time collection should exist update_result = self.client.delete( collection_name=mt_collection, points_selector=models.FilterSelector( @@ -355,20 +239,9 @@ class QdrantClient(VectorDBBase): ) return update_result - except (UnexpectedResponse, grpc.RpcError) as e: - if self._is_collection_not_found_error(e): - log.debug( - f"Collection {mt_collection} doesn't exist, nothing to delete" - ) - return None - else: - # For other API errors, log and re-raise - _, error_msg = self._extract_error_message(e) - log.warning(f"Unexpected Qdrant error: {error_msg}") - raise except Exception as e: - # For non-Qdrant exceptions, re-raise - raise + log.warning(f"Error deleting from collection {mt_collection}: {e}") + return None def search( self, collection_name: str, vectors: list[list[float | int]], limit: int @@ -378,26 +251,19 @@ class QdrantClient(VectorDBBase): """ if not self.client: return None - - # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) + if not self.client.collection_exists(collection_name=mt_collection): + log.debug(f"Collection {mt_collection} doesn't exist, search returns None") + return None - # Get the vector dimension from the query vector dimension = len(vectors[0]) if vectors and len(vectors) > 0 else None - try: - # Try the search operation directly - most of the time collection should exist - - # Create tenant filter tenant_filter = models.FieldCondition( - key="tenant_id", match=models.MatchValue(value=tenant_id) + key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id) ) - - # Ensure vector dimensions match the collection collection_dim = self.client.get_collection( mt_collection ).config.params.vectors.size - if collection_dim != dimension: if collection_dim < dimension: vectors = [vector[:collection_dim] for vector in vectors] @@ -406,8 +272,6 @@ class QdrantClient(VectorDBBase): vector + [0] * (collection_dim - dimension) for vector in vectors ] - - # Search with tenant filter prefetch_query = models.Prefetch( filter=models.Filter(must=[tenant_filter]), limit=NO_LIMIT, @@ -418,30 +282,16 @@ class QdrantClient(VectorDBBase): prefetch=prefetch_query, limit=limit, ) - get_result = self._result_to_get_result(query_response.points) return SearchResult( ids=get_result.ids, documents=get_result.documents, metadatas=get_result.metadatas, - # qdrant distance is [-1, 1], normalize to [0, 1] distances=[ [(point.score + 1.0) / 2.0 for point in query_response.points] ], ) - except (UnexpectedResponse, grpc.RpcError) as e: - if self._is_collection_not_found_error(e): - log.debug( - f"Collection {mt_collection} doesn't exist, search returns None" - ) - return None - else: - # For other API errors, log and re-raise - _, error_msg = self._extract_error_message(e) - log.warning(f"Unexpected Qdrant error during search: {error_msg}") - raise except Exception as e: - # For non-Qdrant exceptions, log and return None log.exception(f"Error searching collection '{collection_name}': {e}") return None @@ -451,20 +301,16 @@ class QdrantClient(VectorDBBase): """ if not self.client: return None - - # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) + if not self.client.collection_exists(collection_name=mt_collection): + log.debug(f"Collection {mt_collection} doesn't exist, query returns None") + return None - # Set default limit if not provided if limit is None: limit = NO_LIMIT - - # Create tenant filter tenant_filter = models.FieldCondition( - key="tenant_id", match=models.MatchValue(value=tenant_id) + key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id) ) - - # Create metadata filters field_conditions = [] for key, value in filter.items(): field_conditions.append( @@ -472,32 +318,15 @@ class QdrantClient(VectorDBBase): key=f"metadata.{key}", match=models.MatchValue(value=value) ) ) - - # Combine tenant filter with metadata filters combined_filter = models.Filter(must=[tenant_filter, *field_conditions]) - try: - # Try the query directly - most of the time collection should exist points = self.client.query_points( collection_name=mt_collection, query_filter=combined_filter, limit=limit, ) - return self._result_to_get_result(points.points) - except (UnexpectedResponse, grpc.RpcError) as e: - if self._is_collection_not_found_error(e): - log.debug( - f"Collection {mt_collection} doesn't exist, query returns None" - ) - return None - else: - # For other API errors, log and re-raise - _, error_msg = self._extract_error_message(e) - log.warning(f"Unexpected Qdrant error during query: {error_msg}") - raise except Exception as e: - # For non-Qdrant exceptions, log and re-raise log.exception(f"Error querying collection '{collection_name}': {e}") return None @@ -507,17 +336,15 @@ class QdrantClient(VectorDBBase): """ if not self.client: return None - - # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) + if not self.client.collection_exists(collection_name=mt_collection): + log.debug(f"Collection {mt_collection} doesn't exist, get returns None") + return None - # Create tenant filter tenant_filter = models.FieldCondition( - key="tenant_id", match=models.MatchValue(value=tenant_id) + key=TENANT_ID_FIELD, match=models.MatchValue(value=tenant_id) ) - try: - # Try to get points directly - most of the time collection should exist points = self.client.query_points( collection_name=mt_collection, query_filter=models.Filter(must=[tenant_filter]), @@ -525,151 +352,28 @@ class QdrantClient(VectorDBBase): ) return self._result_to_get_result(points.points) - except (UnexpectedResponse, grpc.RpcError) as e: - if self._is_collection_not_found_error(e): - log.debug(f"Collection {mt_collection} doesn't exist, get returns None") - return None - else: - # For other API errors, log and re-raise - _, error_msg = self._extract_error_message(e) - log.warning(f"Unexpected Qdrant error during get: {error_msg}") - raise except Exception as e: - # For non-Qdrant exceptions, log and return None log.exception(f"Error getting collection '{collection_name}': {e}") return None - def _handle_operation_with_error_retry( - self, operation_name, mt_collection, points, dimension - ): - """ - Private helper to handle common error cases for insert and upsert operations. - - Args: - operation_name: 'insert' or 'upsert' - mt_collection: The multi-tenant collection name - points: The vector points to insert/upsert - dimension: The dimension of the vectors - - Returns: - The operation result (for upsert) or None (for insert) - """ - try: - if operation_name == "insert": - self.client.upload_points(mt_collection, points) - return None - else: # upsert - return self.client.upsert(mt_collection, points) - except (UnexpectedResponse, grpc.RpcError) as e: - # Handle collection not found - if self._is_collection_not_found_error(e): - log.info( - f"Collection {mt_collection} doesn't exist. Creating it with dimension {dimension}." - ) - # Create collection with correct dimensions from our vectors - self._create_multi_tenant_collection_if_not_exists( - mt_collection_name=mt_collection, dimension=dimension - ) - # Try operation again - no need for dimension adjustment since we just created with correct dimensions - if operation_name == "insert": - self.client.upload_points(mt_collection, points) - return None - else: # upsert - return self.client.upsert(mt_collection, points) - - # Handle dimension mismatch - elif self._is_dimension_mismatch_error(e): - # For dimension errors, the collection must exist, so get its configuration - mt_collection_info = self.client.get_collection(mt_collection) - existing_size = mt_collection_info.config.params.vectors.size - - log.info( - f"Dimension mismatch: Collection {mt_collection} expects {existing_size}, got {dimension}" - ) - - if existing_size < dimension: - # Truncate vectors to fit - log.info( - f"Truncating vectors from {dimension} to {existing_size} dimensions" - ) - points = [ - PointStruct( - id=point.id, - vector=point.vector[:existing_size], - payload=point.payload, - ) - for point in points - ] - elif existing_size > dimension: - # Pad vectors with zeros - log.info( - f"Padding vectors from {dimension} to {existing_size} dimensions with zeros" - ) - points = [ - PointStruct( - id=point.id, - vector=point.vector - + [0] * (existing_size - len(point.vector)), - payload=point.payload, - ) - for point in points - ] - # Try operation again with adjusted dimensions - if operation_name == "insert": - self.client.upload_points(mt_collection, points) - return None - else: # upsert - return self.client.upsert(mt_collection, points) - else: - # Not a known error we can handle, log and re-raise - _, error_msg = self._extract_error_message(e) - log.warning(f"Unhandled Qdrant error: {error_msg}") - raise - except Exception as e: - # For non-Qdrant exceptions, re-raise - raise - - def insert(self, collection_name: str, items: list[VectorItem]): - """ - Insert items with tenant ID. - """ - if not self.client or not items: - return None - - # Map to multi-tenant collection and tenant ID - mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) - - # Get dimensions from the actual vectors - dimension = len(items[0]["vector"]) if items else None - - # Create points with tenant ID - points = self._create_points(items, tenant_id) - - # Handle the operation with error retry - return self._handle_operation_with_error_retry( - "insert", mt_collection, points, dimension - ) - def upsert(self, collection_name: str, items: list[VectorItem]): """ Upsert items with tenant ID. """ if not self.client or not items: return None - - # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) - - # Get dimensions from the actual vectors dimension = len(items[0]["vector"]) if items else None - - # Create points with tenant ID + self._ensure_collection(mt_collection, dimension) points = self._create_points(items, tenant_id) + self.client.upload_points(mt_collection, points) + return None - # Handle the operation with error retry - return self._handle_operation_with_error_retry( - "upsert", mt_collection, points, dimension - ) + def insert(self, collection_name: str, items: list[VectorItem]): + """ + Insert items with tenant ID. + """ + return self.upsert(collection_name, items) def reset(self): """ @@ -689,24 +393,21 @@ class QdrantClient(VectorDBBase): """ if not self.client: return None - - # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) + if not self.client.collection_exists(collection_name=mt_collection): + log.debug(f"Collection {mt_collection} doesn't exist, nothing to delete") + return None - tenant_filter = models.FieldCondition( - key="tenant_id", match=models.MatchValue(value=tenant_id) - ) - - field_conditions = [tenant_filter] - - update_result = self.client.delete( + self.client.delete( collection_name=mt_collection, points_selector=models.FilterSelector( - filter=models.Filter(must=field_conditions) + filter=models.Filter( + must=[ + models.FieldCondition( + key=TENANT_ID_FIELD, + match=models.MatchValue(value=tenant_id), + ) + ] + ) ), ) - - if self.client.get_collection(mt_collection).points_count == 0: - self.client.delete_collection(mt_collection) - - return update_result