Merge pull request #16741 from 0xThresh/s3vector-support

fix: batch S3 vectors in groups of 500 to comply with API limitations
This commit is contained in:
Tim Jaeryang Baek 2025-08-20 13:25:42 +04:00 committed by GitHub
commit 7452b87877
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -197,13 +197,19 @@ class S3VectorClient(VectorDBBase):
"metadata": metadata,
}
)
# Insert vectors
self.client.put_vectors(
vectorBucketName=self.bucket_name,
indexName=collection_name,
vectors=vectors,
)
log.info(f"Inserted {len(vectors)} vectors into index '{collection_name}'.")
# Insert vectors in batches of 500 (S3 Vector API limit)
batch_size = 500
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.client.put_vectors(
vectorBucketName=self.bucket_name,
indexName=collection_name,
vectors=batch,
)
log.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} vectors into index '{collection_name}'.")
log.info(f"Completed insertion of {len(vectors)} vectors into index '{collection_name}'.")
except Exception as e:
log.error(f"Error inserting vectors: {e}")
raise
@ -258,16 +264,25 @@ class S3VectorClient(VectorDBBase):
"metadata": metadata,
}
)
# Upsert vectors (using put_vectors for upsert semantics)
log.info(
f"Upserting {len(vectors)} vectors. First vector sample: key={vectors[0]['key']}, data_type={type(vectors[0]['data']['float32'])}, data_len={len(vectors[0]['data']['float32'])}"
)
self.client.put_vectors(
vectorBucketName=self.bucket_name,
indexName=collection_name,
vectors=vectors,
)
log.info(f"Upserted {len(vectors)} vectors into index '{collection_name}'.")
# Upsert vectors in batches of 500 (S3 Vector API limit)
batch_size = 500
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
if i == 0: # Log sample info for first batch only
log.info(
f"Upserting batch 1: {len(batch)} vectors. First vector sample: key={batch[0]['key']}, data_type={type(batch[0]['data']['float32'])}, data_len={len(batch[0]['data']['float32'])}"
)
else:
log.info(f"Upserting batch {i//batch_size + 1}: {len(batch)} vectors.")
self.client.put_vectors(
vectorBucketName=self.bucket_name,
indexName=collection_name,
vectors=batch,
)
log.info(f"Completed upsert of {len(vectors)} vectors into index '{collection_name}'.")
except Exception as e:
log.error(f"Error upserting vectors: {e}")
raise