diff --git a/backend/open_webui/retrieval/vector/dbs/s3vector.py b/backend/open_webui/retrieval/vector/dbs/s3vector.py index 8db1ecb778..1e9b8d9a00 100644 --- a/backend/open_webui/retrieval/vector/dbs/s3vector.py +++ b/backend/open_webui/retrieval/vector/dbs/s3vector.py @@ -197,13 +197,19 @@ class S3VectorClient(VectorDBBase): "metadata": metadata, } ) - # Insert vectors - self.client.put_vectors( - vectorBucketName=self.bucket_name, - indexName=collection_name, - vectors=vectors, - ) - log.info(f"Inserted {len(vectors)} vectors into index '{collection_name}'.") + + # Insert vectors in batches of 500 (S3 Vector API limit) + batch_size = 500 + for i in range(0, len(vectors), batch_size): + batch = vectors[i:i + batch_size] + self.client.put_vectors( + vectorBucketName=self.bucket_name, + indexName=collection_name, + vectors=batch, + ) + log.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} vectors into index '{collection_name}'.") + + log.info(f"Completed insertion of {len(vectors)} vectors into index '{collection_name}'.") except Exception as e: log.error(f"Error inserting vectors: {e}") raise @@ -258,16 +264,25 @@ class S3VectorClient(VectorDBBase): "metadata": metadata, } ) - # Upsert vectors (using put_vectors for upsert semantics) - log.info( - f"Upserting {len(vectors)} vectors. First vector sample: key={vectors[0]['key']}, data_type={type(vectors[0]['data']['float32'])}, data_len={len(vectors[0]['data']['float32'])}" - ) - self.client.put_vectors( - vectorBucketName=self.bucket_name, - indexName=collection_name, - vectors=vectors, - ) - log.info(f"Upserted {len(vectors)} vectors into index '{collection_name}'.") + + # Upsert vectors in batches of 500 (S3 Vector API limit) + batch_size = 500 + for i in range(0, len(vectors), batch_size): + batch = vectors[i:i + batch_size] + if i == 0: # Log sample info for first batch only + log.info( + f"Upserting batch 1: {len(batch)} vectors. First vector sample: key={batch[0]['key']}, data_type={type(batch[0]['data']['float32'])}, data_len={len(batch[0]['data']['float32'])}" + ) + else: + log.info(f"Upserting batch {i//batch_size + 1}: {len(batch)} vectors.") + + self.client.put_vectors( + vectorBucketName=self.bucket_name, + indexName=collection_name, + vectors=batch, + ) + + log.info(f"Completed upsert of {len(vectors)} vectors into index '{collection_name}'.") except Exception as e: log.error(f"Error upserting vectors: {e}") raise