Skip to content

Commit 8792162

Browse files
authored
flatten all files from writer to bulk upload (NVIDIA#947)
1 parent 7f7db2f commit 8792162

1 file changed

Lines changed: 7 additions & 6 deletions

File tree

  • client/src/nv_ingest_client/util/vdb

client/src/nv_ingest_client/util/vdb/milvus.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ def write_records_minio(records, writer: RemoteBulkWriter) -> RemoteBulkWriter:
729729
for element in records:
730730
writer.append_row(element)
731731
writer.commit()
732-
print(f"Wrote data to: {writer.batch_files}")
732+
logger.debug(f"Wrote data to: {writer.batch_files}")
733733
return writer
734734

735735

@@ -757,9 +757,10 @@ def bulk_insert_milvus(
757757

758758
connections.connect(uri=milvus_uri)
759759
t_bulk_start = time.time()
760+
files_to_upload = [_file for file_set in writer.batch_files for _file in file_set]
760761
task_id = utility.do_bulk_insert(
761762
collection_name=collection_name,
762-
files=writer.batch_files[0],
763+
files=files_to_upload,
763764
consistency_level=CONSISTENCY,
764765
)
765766
# list_bulk_insert_tasks = utility.list_bulk_insert_tasks(collection_name=collection_name)
@@ -769,11 +770,11 @@ def bulk_insert_milvus(
769770
state = task.state_name
770771
if state == "Completed":
771772
t_bulk_end = time.time()
772-
print("Start time:", task.create_time_str)
773-
print("Imported row count:", task.row_count)
774-
print(f"Bulk {collection_name} upload took {t_bulk_end - t_bulk_start} s")
773+
logger.info("Start time:", task.create_time_str)
774+
logger.info("Imported row count:", task.row_count)
775+
logger.info(f"Bulk {collection_name} upload took {t_bulk_end - t_bulk_start} s")
775776
if task.state == BulkInsertState.ImportFailed:
776-
print("Failed reason:", task.failed_reason)
777+
logger.error("Failed reason:", task.failed_reason)
777778
time.sleep(1)
778779

779780

0 commit comments

Comments
 (0)