Skip to content

Commit ff26aee

Browse files
committed
feat: continue with refactored processing
1 parent f96587d commit ff26aee

File tree

5 files changed

+135
-81
lines changed

5 files changed

+135
-81
lines changed

create_downsampled/.env.example

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Example environment configuration file
2+
# Copy this file to .env and modify the values as needed
3+
4+
# Data source configuration
5+
USE_GCS_BUCKET=true
6+
GCS_BUCKET_NAME=YOUR_GCS_BUCKET_NAME
7+
GCS_PREFIX=YOUR/GCS/PREFIX
8+
GCS_FILE_EXTENSION=.zarr
9+
GCS_PROJECT=YOUR_GCP_PROJECT_ID
10+
GOOGLE_CLOUD_PROJECT=YOUR_GCP_PROJECT_ID
11+
GCS_FILES_LOCAL_LIST=/path/to/bucket-list.txt
12+
13+
# Output GCS bucket configuration (for uploading processed results)
14+
USE_GCS_OUTPUT=true
15+
GCS_OUTPUT_BUCKET_NAME=YOUR_OUTPUT_GCS_BUCKET_NAME
16+
GCS_OUTPUT_PREFIX=YOUR/OUTPUT/PREFIX
17+
OVERWRITE_GCS=false
18+
NUM_UPLOAD_WORKERS=8
19+
20+
# Local paths (used when USE_GCS_BUCKET=false)
21+
INPUT_PATH=/path/to/input
22+
OUTPUT_PATH=/path/to/output
23+
DELETE_INPUT=false
24+
DELETE_OUTPUT=false
25+
26+
# Processing settings
27+
OVERWRITE=false
28+
NUM_MIPS=5
29+
MIP_CUTOFF=3
30+
CHANNEL_LIMIT=4
31+
ALLOW_NON_ALIGNED_WRITE=true
32+
MANUAL_CHUNK_SIZE=64,64,32

create_downsampled/chunk_utils.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
def is_chunk_fully_covered(chunk_bounds, processed_chunks_bounds):
2+
"""
3+
Check if a chunk is fully covered by processed bounds.
4+
5+
Args:
6+
chunk_bounds: [start_coord, end_coord] where each coord is [x, y, z]
7+
processed_chunks_bounds: List of tuples (start, end) where start and end are [x, y, z]
8+
9+
Returns:
10+
bool: True if all 8 corners of the chunk are covered by processed bounds
11+
"""
12+
if not processed_chunks_bounds:
13+
return False
14+
15+
start_coord, end_coord = chunk_bounds
16+
x0, y0, z0 = start_coord
17+
x1, y1, z1 = end_coord
18+
19+
# Generate all 8 corners of the chunk
20+
corners = [
21+
[x0, y0, z0], # min corner
22+
[x1, y0, z0],
23+
[x0, y1, z0],
24+
[x0, y0, z1],
25+
[x1, y1, z0],
26+
[x1, y0, z1],
27+
[x0, y1, z1],
28+
[x1, y1, z1], # max corner
29+
]
30+
31+
# Check if each corner is covered by at least one processed bound
32+
for corner in corners:
33+
corner_covered = False
34+
for start, end in processed_chunks_bounds:
35+
# Check if corner is inside this processed bound
36+
if (
37+
start[0] <= corner[0] < end[0]
38+
and start[1] <= corner[1] < end[1]
39+
and start[2] <= corner[2] < end[2]
40+
):
41+
corner_covered = True
42+
break
43+
44+
# If any corner is not covered, the chunk is not fully covered
45+
if not corner_covered:
46+
return False
47+
48+
# All corners are covered
49+
return True

create_downsampled/chunking.py

Lines changed: 17 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
import numpy as np
55
from neuroglancer.downsample import downsample_with_averaging
66

7-
from io import load_data_from_zarr_store, delete_cached_zarr_file, load_file
8-
7+
from gcs_local_io import load_data_from_zarr_store, delete_cached_zarr_file, load_file
98

109

1110
def compute_optimal_chunk_size(single_file_shape, num_mips, max_chunk_size=None):
@@ -100,6 +99,12 @@ def process(
10099
allow_non_aligned_write,
101100
overwrite_output,
102101
progress_dir,
102+
total_rows,
103+
total_cols,
104+
use_gcs_bucket,
105+
input_path,
106+
all_files,
107+
delete_input,
103108
):
104109
x_i, y_i, z_i = args
105110

@@ -202,58 +207,16 @@ def process(
202207
touch(f_name)
203208

204209
# Clean up cached file to save disk space
205-
delete_cached_zarr_file(x_i, y_i)
210+
delete_cached_zarr_file(
211+
x_i,
212+
y_i,
213+
total_rows,
214+
total_cols,
215+
use_gcs_bucket,
216+
delete_input,
217+
input_path,
218+
all_files,
219+
)
206220

207221
# Return the bounds of the processed chunk
208222
return (start, end)
209-
210-
211-
def is_chunk_fully_covered(chunk_bounds, processed_chunks_bounds):
212-
"""
213-
Check if a chunk is fully covered by processed bounds.
214-
215-
Args:
216-
chunk_bounds: [start_coord, end_coord] where each coord is [x, y, z]
217-
processed_chunks_bounds: List of tuples (start, end) where start and end are [x, y, z]
218-
219-
Returns:
220-
bool: True if all 8 corners of the chunk are covered by processed bounds
221-
"""
222-
if not processed_chunks_bounds:
223-
return False
224-
225-
start_coord, end_coord = chunk_bounds
226-
x0, y0, z0 = start_coord
227-
x1, y1, z1 = end_coord
228-
229-
# Generate all 8 corners of the chunk
230-
corners = [
231-
[x0, y0, z0], # min corner
232-
[x1, y0, z0],
233-
[x0, y1, z0],
234-
[x0, y0, z1],
235-
[x1, y1, z0],
236-
[x1, y0, z1],
237-
[x0, y1, z1],
238-
[x1, y1, z1], # max corner
239-
]
240-
241-
# Check if each corner is covered by at least one processed bound
242-
for corner in corners:
243-
corner_covered = False
244-
for start, end in processed_chunks_bounds:
245-
# Check if corner is inside this processed bound
246-
if (
247-
start[0] <= corner[0] < end[0]
248-
and start[1] <= corner[1] < end[1]
249-
and start[2] <= corner[2] < end[2]
250-
):
251-
corner_covered = True
252-
break
253-
254-
# If any corner is not covered, the chunk is not fully covered
255-
if not corner_covered:
256-
return False
257-
258-
# All corners are covered
259-
return True

create_downsampled/io.py renamed to create_downsampled/gcs_local_io.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
import zarr
77
import numpy as np
88

9-
from chunking import is_chunk_fully_covered
9+
from chunk_utils import is_chunk_fully_covered
1010
from wells import extract_row_col_from_filename
1111
from gcs import gcloud_download_dir, upload_many_blobs_with_transfer_manager
1212

1313

14-
def get_local_cache_path(row, col, use_gcs_bucket, input_path):
14+
def get_local_cache_path(remote_file, use_gcs_bucket, input_path):
1515
"""
1616
Get the local cache path where a file for the given row/col should be stored.
1717
@@ -23,11 +23,6 @@ def get_local_cache_path(row, col, use_gcs_bucket, input_path):
2323
Path: Local cache path for the file
2424
"""
2525

26-
# Get the remote file path/name for this row/col
27-
remote_file = get_remote_file_path(row, col)
28-
if remote_file is None:
29-
return None
30-
3126
# Create local filename based on remote file
3227
if use_gcs_bucket:
3328
cache_dir = input_path
@@ -83,7 +78,7 @@ def download_zarr_file(
8378
print(f"No file found for row {row}, col {col}")
8479
return None
8580

86-
local_path = get_local_cache_path(row, col, use_gcs_bucket, input_path)
81+
local_path = get_local_cache_path(remote_file, use_gcs_bucket, input_path)
8782
if local_path is None:
8883
return None
8984

@@ -137,7 +132,16 @@ def load_file(
137132
return None
138133

139134

140-
def delete_cached_zarr_file(row, col, use_gcs_bucket, delete_input, input_path):
135+
def delete_cached_zarr_file(
136+
row,
137+
col,
138+
total_rows,
139+
total_cols,
140+
use_gcs_bucket,
141+
delete_input,
142+
input_path,
143+
all_files,
144+
):
141145
"""
142146
Delete the locally cached file for a specific row and column to save disk space.
143147
@@ -150,7 +154,8 @@ def delete_cached_zarr_file(row, col, use_gcs_bucket, delete_input, input_path):
150154
"""
151155
if not use_gcs_bucket or not delete_input:
152156
return True
153-
local_path = get_local_cache_path(row, col, use_gcs_bucket, input_path)
157+
remote_path = get_remote_file_path(row, col, total_rows, total_cols, all_files)
158+
local_path = get_local_cache_path(remote_path, use_gcs_bucket, input_path)
154159
if local_path is None:
155160
return True
156161

create_downsampled/main.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from load_config import load_env_config
55
from gcs import list_gcs_files, sync_info_to_gcs_output
66
from wells import compute_grid_dimensions, get_grid_coords
7-
from io import load_file, check_and_upload_completed_chunks, check_any_remaining_chunks
7+
from gcs_local_io import load_file, check_and_upload_completed_chunks, check_any_remaining_chunks
88
from chunking import compute_volume_and_chunk_size, process
99
from volume import create_cloudvolume_info
1010

@@ -87,28 +87,29 @@ def main():
8787
)
8888

8989
# Process each well into chunks
90-
iter_coords = get_grid_coords(num_chunks_per_dim)
90+
iter_coords = list(get_grid_coords(num_chunks_per_dim))
9191

9292
processed_chunks = []
9393
failed_chunks = []
9494
total_uploads = 0
9595
for coord in iter_coords[:MAX_ITERS]:
9696
bounds = process(
97-
coord,
98-
single_file_shape,
99-
volume_shape,
100-
vols,
101-
chunk_size,
102-
num_mips,
103-
mip_cutoff,
104-
allow_non_aligned_write,
105-
overwrite_output,
106-
use_gcs_bucket,
107-
input_path,
108-
total_rows,
109-
total_cols,
110-
all_files,
111-
gcs_project,
97+
args=coord,
98+
single_file_shape=single_file_shape,
99+
volume_shape=volume_shape,
100+
vols=vols,
101+
chunk_size=chunk_size,
102+
num_mips=num_mips,
103+
mip_cutoff=mip_cutoff,
104+
allow_non_aligned_write=allow_non_aligned_write,
105+
overwrite_output=overwrite_output,
106+
progress_dir=output_path / "progress",
107+
total_rows=total_rows,
108+
total_cols=total_cols,
109+
use_gcs_bucket=use_gcs_bucket,
110+
input_path=input_path,
111+
all_files=all_files,
112+
delete_input=delete_input,
112113
)
113114
start, end = bounds
114115
processed_chunks.append((start, end))
@@ -133,3 +134,7 @@ def main():
133134
gcs_project,
134135
gcs_output_bucket_name,
135136
)
137+
138+
139+
if __name__ == "__main__":
140+
main()

0 commit comments

Comments
 (0)