Skip to content

Commit 3ec6253

Browse files
committed
feat: more on uploads
1 parent 326454d commit 3ec6253

File tree

1 file changed

+96
-33
lines changed

1 file changed

+96
-33
lines changed

examples/create_downampled.py

Lines changed: 96 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -869,11 +869,77 @@ def is_chunk_fully_covered(chunk_bounds, processed_chunks_bounds):
869869
# All corners are covered
870870
return True
871871

872+
# %% Read in the files that were already processed
873+
874+
already_uploaded_path = output_path / "uploaded_to_gcs_chunks.txt"
875+
if already_uploaded_path.exists():
876+
with open(already_uploaded_path, "r") as f:
877+
uploaded_files = f.readlines()
878+
else:
879+
uploaded_files = []
872880

873881
# %% Function to check the output directory for completed chunks and upload them to GCS
874882

875883
processed_chunks_bounds = []
876-
uploaded_files = []
884+
failed_files = []
885+
886+
887+
def upload_many_blobs_with_transfer_manager(
888+
bucket_name, filenames, source_directory="", workers=8
889+
):
890+
"""Upload every file in a list to a bucket, concurrently in a process pool.
891+
892+
Each blob name is derived from the filename, not including the
893+
`source_directory` parameter. For complete control of the blob name for each
894+
file (and other aspects of individual blob metadata), use
895+
transfer_manager.upload_many() instead.
896+
"""
897+
898+
# The ID of your GCS bucket
899+
# bucket_name = "your-bucket-name"
900+
901+
# A list (or other iterable) of filenames to upload.
902+
# filenames = ["file_1.txt", "file_2.txt"]
903+
904+
# The directory on your computer that is the root of all of the files in the
905+
# list of filenames. This string is prepended (with os.path.join()) to each
906+
# filename to get the full path to the file. Relative paths and absolute
907+
# paths are both accepted. This string is not included in the name of the
908+
# uploaded blob; it is only used to find the source files. An empty string
909+
# means "the current working directory". Note that this parameter allows
910+
# directory traversal (e.g. "/", "../") and is not intended for unsanitized
911+
# end user input.
912+
# source_directory=""
913+
914+
# The maximum number of processes to use for the operation. The performance
915+
# impact of this value depends on the use case, but smaller files usually
916+
# benefit from a higher number of processes. Each additional process occupies
917+
# some CPU and memory resources until finished. Threads can be used instead
918+
# of processes by passing `worker_type=transfer_manager.THREAD`.
919+
# workers=8
920+
921+
from google.cloud.storage import Client, transfer_manager
922+
923+
storage_client = Client(project=gcs_project)
924+
bucket = storage_client.bucket(bucket_name)
925+
926+
results = transfer_manager.upload_many_from_filenames(
927+
bucket,
928+
filenames,
929+
source_directory=source_directory,
930+
blob_name_prefix=gcs_output_path,
931+
max_workers=workers,
932+
)
933+
934+
for name, result in zip(filenames, results):
935+
# The results list is either `None` or an exception for each filename in
936+
# the input list, in order.
937+
938+
if isinstance(result, Exception):
939+
failed_files.append(name)
940+
print("Failed to upload {} due to exception: {}".format(name, result))
941+
else:
942+
uploaded_files.append(name)
877943

878944

879945
# TODO this probably wants to bulk together uploads to reduce overhead
@@ -886,6 +952,7 @@ def check_and_upload_completed_chunks():
886952
int: Number of chunks uploaded
887953
"""
888954
uploaded_count = 0
955+
files_to_upload_this_batch = []
889956

890957
for mip_level in range(num_mips):
891958
factor = 2**mip_level
@@ -894,7 +961,8 @@ def check_and_upload_completed_chunks():
894961
# For each file in the output dir check if it is fully covered by the already processed bounds
895962
# First, we loop over all the files in the output directory
896963
for chunk_file in output_path_for_mip.glob("**/*"):
897-
if chunk_file in [uf[0] for uf in uploaded_files]:
964+
# TODO probably need to use remote files here
965+
if chunk_file in uploaded_files:
898966
continue
899967
# 1. Pull out the bounds of the chunk from the filename
900968
# filename format is x0-x1_y0-y1_z0-z1
@@ -918,23 +986,26 @@ def check_and_upload_completed_chunks():
918986
covered = is_chunk_fully_covered(chunk_bounds, processed_chunks_bounds)
919987

920988
if covered:
921-
# 3. If it is, upload it to GCS
922-
relative_path = chunk_file.relative_to(output_path)
923-
gcs_chunk_path = (
924-
gcs_output_path.rstrip("/")
925-
+ "/"
926-
+ str(relative_path).replace("\\", "/")
927-
)
928-
# Skip re-uploading files that are already uploaded
929-
if upload_file_to_gcs(
930-
chunk_file, gcs_chunk_path, overwrite=overwrite_gcs
931-
):
932-
uploaded_count += 1
933-
# Remove local chunk to save space
934-
if use_gcs_output and delete_output:
935-
chunk_file.unlink()
936-
uploaded_files.append((chunk_file, gcs_chunk_path))
989+
# 3. If it is, mark to upload it to GCS
990+
files_to_upload_this_batch.append(chunk_file)
937991

992+
if files_to_upload_this_batch:
993+
print(f"Uploading {len(files_to_upload_this_batch)} completed chunks to GCS...")
994+
upload_many_blobs_with_transfer_manager(
995+
gcs_output_bucket_name, files_to_upload_this_batch, workers=8
996+
)
997+
uploaded_count += len(files_to_upload_this_batch)
998+
999+
# Remove local chunks to save space
1000+
if use_gcs_output and delete_output:
1001+
for chunk_file in files_to_upload_this_batch:
1002+
if chunk_file in failed_files:
1003+
print(f"Skipping deletion of failed upload chunk file {chunk_file}")
1004+
continue
1005+
try:
1006+
chunk_file.unlink()
1007+
except Exception as e:
1008+
print(f"Error deleting local chunk file {chunk_file}: {e}")
9381009
return uploaded_count
9391010

9401011

@@ -954,7 +1025,7 @@ def upload_any_remaining_chunks():
9541025
output_path_for_mip = output_path / dir_name
9551026
# For each file in the output dir
9561027
for chunk_file in output_path_for_mip.glob("**/*"):
957-
if chunk_file in [uf[0] for uf in uploaded_files]:
1028+
if chunk_file in uploaded_files:
9581029
continue
9591030
relative_path = chunk_file.relative_to(output_path)
9601031
gcs_chunk_path = (
@@ -971,20 +1042,6 @@ def upload_any_remaining_chunks():
9711042

9721043
return uploaded_count
9731044

974-
# Upload process
975-
# 1. Write chunks to a local directory that is temporary
976-
# 2. When a processing step is done (or after every N steps), check the output
977-
# files in the temp directory to see which ones are fully ready to be uploaded
978-
# 3. Move these files to a new folder that is for completed files
979-
# 4. Upload the completed files to GCS using gcloud command line tool
980-
# if no errors, then write the names of all the files in that folder to a text file
981-
# ensuring to always append to the text file
982-
# 5. Delete the files in the completed folder to save space
983-
# Separately now during the chunk writing step, we should check if any of the
984-
# files that were created are listed in the uploaded files text file
985-
# because if they are that was a mistake and we should throw an error about this
986-
# If there are errors during the upload step, we need to crash the process
987-
# and check what the error was to know how to proceed from there
9881045

9891046
# %% Move the data across with a single worker
9901047
total_uploaded_files = 0
@@ -1008,6 +1065,12 @@ def upload_any_remaining_chunks():
10081065
for local_path, gcs_path in uploaded_files:
10091066
f.write(f"{local_path} -> {gcs_path}\n")
10101067

1068+
# %% Show any failed uploads
1069+
if failed_files:
1070+
print("The following files failed to upload to GCS:")
1071+
for f in failed_files:
1072+
print(f)
1073+
10111074
# %% Final upload of any remaining chunks - hopefully should be none here, but maybe some failed
10121075
# This is not something we always want to run, so puttin an input prompt here just in case
10131076
continue_upload = input(

0 commit comments

Comments
 (0)