Skip to content

Commit 02ebd3a

Browse files
committed
Support reading zip raw files from the source butler repo
LSSTCam raws are archived as zip files, with one zip for each exposure containing all detector images. /repo/main has LSSTCam raws in zip format, versus /repo/embargo has LSSTCam raws in individual fits format for each detector image.
1 parent 54afd52 commit 02ebd3a

File tree

1 file changed

+29
-2
lines changed

1 file changed

+29
-2
lines changed

python/tester/upload_from_repo.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import tempfile
3030
import time
3131
import yaml
32+
import zipfile
3233

3334
from astropy.io import fits
3435
import boto3
@@ -267,12 +268,33 @@ def load_raw_to_temp(temp_dir, ref_dict, pool=None):
267268
-------
268269
result : `dict`
269270
A dictionary with:
270-
- "mode": "fits-serial" or "fits-parallel"
271+
- "mode": "zip", "fits-serial", or "fits-parallel"
271272
- "async_result": `multiprocessing.AsyncResult` if multiprocessing is used, else `None`.
273+
274+
Notes
275+
-----
276+
A data butler repo can store raw data in two formats: one fits file
277+
for each detector, or one zip file for each exposure containing all
278+
detector fits files. This function assumes that either all refs point
279+
to one same zip file, or each ref is its own fits file. For zip,
280+
extract all files to the temporary folder. Either way, the temporary
281+
folder will be loaded with detector-level fits files.
272282
"""
273283
if not ref_dict:
274284
raise ValueError("ref_dict is empty")
275285

286+
uri = next(iter(ref_dict.values())).primaryURI
287+
# Determine whether the detector data is stored as zip file
288+
if uri.fragment and (uri.getExtension() == ".zip"):
289+
_log.info(f"Extracting zip file {uri.basename()}")
290+
with uri.open("rb") as fd:
291+
with zipfile.ZipFile(fd) as zf:
292+
zf.extractall(temp_dir)
293+
return {
294+
"mode": "zip",
295+
"async_result": None,
296+
}
297+
276298
if pool is None:
277299
_log.warning("Multiprocessing pool is not provided; fallback to serial.")
278300
for r in ref_dict:
@@ -312,9 +334,14 @@ def upload_images(pool, temp_dir, group_id, ref_dict):
312334
# Non-blocking assignment lets us upload during the next exposure.
313335
# Can't time these tasks directly, but the blocking equivalent took
314336
# 12-20 s depending on tuning, or less than a single exposure.
337+
args = []
338+
for ref in ref_dict:
339+
uri = ref_dict[ref].primaryURI
340+
filename = uri.fragment.partition("=")[-1] if uri.fragment else uri.basename()
341+
args.append((temp_dir, group_id, ref, filename))
315342
pool.starmap_async(
316343
_upload_one_image,
317-
[(temp_dir, group_id, r, ref_dict[r].primaryURI.basename()) for r in ref_dict],
344+
args,
318345
error_callback=_log.exception,
319346
chunksize=5 # Works well across a broad range of # processes
320347
)

0 commit comments

Comments
 (0)