Skip to content

DM-51822: PP-dev upload_from_repo test with LSSTCam #337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion etc/tester/LATISS.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ query:
collections: LATISS/raw/all
# Observations before 2024-04-09 are not compatible with calibs in dev repo
where: "exposure.science_program in ('AUXTEL_PHOTO_IMAGING', 'BLOCK-306') and exposure.day_obs in (20240409..20241001)"
repo: embargo_old
repo: /repo/main
5 changes: 5 additions & 0 deletions etc/tester/LSSTCam-all.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
instrument: LSSTCam
query:
collections: LSSTCam/raw/all
where: "exposure.science_program='BLOCK-365' and exposure.day_obs in (20250501..20250502)"
repo: /repo/main
5 changes: 5 additions & 0 deletions etc/tester/LSSTCam-clean.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
instrument: LSSTCam
query:
collections: u/hchiang2/tagged/test3
where: "exposure.science_program='BLOCK-365' and exposure.day_obs in (20250501..20250502)"
repo: /repo/main
34 changes: 7 additions & 27 deletions python/tester/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

from shared.raw import (
LSST_REGEXP,
IMSIM_REGEXP,
OTHER_REGEXP,
get_raw_path,
_LSST_CAMERA_LIST,
Expand All @@ -51,7 +50,6 @@
get_last_group,
increment_group,
make_exposure_id,
make_imsim_time_headers,
replace_header_key,
send_next_visit,
)
Expand All @@ -69,7 +67,7 @@
_log.setLevel(logging.INFO)


def process_group(kafka_url, visit_infos, uploader, start_time):
def process_group(kafka_url, visit_infos, uploader):
"""Simulate the observation of a single on-sky pointing.

Parameters
Expand All @@ -83,8 +81,6 @@ def process_group(kafka_url, visit_infos, uploader, start_time):
uploader : callable [`shared.visit.FannedOutVisit`, int]
A callable that takes an exposure spec and a snap ID, and uploads the
visit's data.
start_time : `float`
The Unix time (TAI) of the exposure start.
"""
# Assume group/snaps is shared among all visit_infos
for info in visit_infos:
Expand All @@ -110,7 +106,7 @@ def process_group(kafka_url, visit_infos, uploader, start_time):
for info in visit_infos:
_log.info(f"Uploading group: {info.groupId} snap: {snap} filters: {info.filters} "
f"detector: {info.detector}")
uploader(info, snap, start_time)
uploader(info, snap)
_log.info(f"Uploaded group: {info.groupId} snap: {snap} filters: {info.filters} "
f"detector: {info.detector}")

Expand Down Expand Up @@ -284,10 +280,7 @@ def get_samples_lsst(bucket, instrument):
for blob in blobs:
# Assume that the unobserved bucket uses the same filename scheme as
# the observed bucket.
if instrument == "LSSTCam-imSim":
m = re.match(IMSIM_REGEXP, blob.key)
else:
m = re.match(LSST_REGEXP, blob.key)
m = re.match(LSST_REGEXP, blob.key)
if not m or m["extension"] == ".json":
continue

Expand All @@ -302,10 +295,7 @@ def get_samples_lsst(bucket, instrument):

sal_index = INSTRUMENTS[instrument].sal_index
# Use special sal_index to indicate a subset of detectors
if instrument == "LSSTCam-imSim":
# For imSim data, the OBSID header has the exposure ID.
sal_index = int(md["OBSID"])
elif instrument == "LSSTCam":
if instrument == "LSSTCam":
_, _, day_obs, seq_num = md["OBSID"].split("_")
exposure_num = LsstBaseTranslator.compute_exposure_id(int(day_obs), int(seq_num))
sal_index = exposure_num
Expand Down Expand Up @@ -382,18 +372,11 @@ def upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, n
# Copy all the visit-blob dictionaries under each snap_id,
# replacing the (immutable) FannedOutVisit objects to point to group
# instead of true_group.
# Update next_visit timestamp for LSSTCam-imSim only.
now = astropy.time.Time.now().unix_tai
start_time = now + 2*(EXPOSURE_INTERVAL + SLEW_INTERVAL)
for snap_id, old_visits in raw_pool[true_group].items():
snap_dict[snap_id] = {
dataclasses.replace(
true_visit,
groupId=group,
**({
"startTime": start_time,
"private_sndStamp": now
} if instrument == "LSSTCam-imSim" else {})
): blob
for true_visit, blob in old_visits.items()}
# Gather all the FannedOutVisit objects found in snap_dict, merging
Expand All @@ -402,14 +385,11 @@ def upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, n

# TODO: may be cleaner to use a functor object than to depend on
# closures for the buckets and data.
def upload_from_pool(visit, snap_id, start_time):
def upload_from_pool(visit, snap_id):
src_blob = snap_dict[snap_id][visit]
exposure_num, headers = \
make_exposure_id(visit.instrument, visit.groupId, snap_id)
# Only LSSTCam-imSim uses the given timestamp for the exposure start.
# Other instruments keep the original exposure timespan.
if instrument == "LSSTCam-imSim":
headers.update(make_imsim_time_headers(EXPOSURE_INTERVAL, start_time))
# The original exposure timespan is kept.
filename = get_raw_path(visit.instrument, visit.detector, visit.groupId, snap_id,
exposure_num, visit.filters)

Expand All @@ -433,7 +413,7 @@ def upload_from_pool(visit, snap_id, start_time):
dest_bucket.upload_fileobj(buffer, filename)
_log.debug(f"{filename} is uploaded to {dest_bucket}")

process_group(kafka_url, visit_infos, upload_from_pool, start_time)
process_group(kafka_url, visit_infos, upload_from_pool)


if __name__ == "__main__":
Expand Down
145 changes: 98 additions & 47 deletions python/tester/upload_from_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import tempfile
import time
import yaml
import zipfile

import astropy
from astropy.io import fits
import boto3
from botocore.handlers import validate_bucket_name
Expand All @@ -47,7 +47,6 @@
get_last_group,
increment_group,
make_exposure_id,
make_imsim_time_headers,
replace_header_key,
send_next_visit,
)
Expand Down Expand Up @@ -141,17 +140,19 @@ def main():
tempfile.TemporaryDirectory() as temp_dir:
for visit in visit_list:
group = increment_group(instrument, group, 1)
# For imSim, use current time as the event publish time. This does
# not guarantee that the actual exposure will occur at a specific time.
exposure_start = astropy.time.Time.now().unix_tai + 2*float(EXPOSURE_INTERVAL + SLEW_INTERVAL)
refs = prepare_one_visit(kafka_url, group, butler, instrument, visit, exposure_start)
refs = prepare_one_visit(kafka_url, group, butler, instrument, visit)
ref_dict = butler.get_many_uris(refs)
result = load_raw_to_temp(temp_dir, ref_dict, pool=pool)
_log.info(f"Slewing to group {group}, with {instrument} visit {visit}")
time.sleep(SLEW_INTERVAL)
_log.info(f"Taking exposure for group {group}")
time.sleep(EXPOSURE_INTERVAL)
_log.info(f"Finishing exposure for group {group}")
# Wait for the temp_dir loading to finish
if result["async_result"]:
result["async_result"].wait()
_log.info(f"Uploading detector images for group {group}")
upload_images(pool, temp_dir, group, ref_dict, exposure_start)
upload_images(pool, temp_dir, group, ref_dict)
pool.close()
_log.info("Waiting for uploads to finish...")
pool.join()
Expand Down Expand Up @@ -190,7 +191,7 @@ def get_visit_list(butler, n_sample, ordered=False, **kwargs):
return visits


def prepare_one_visit(kafka_url, group_id, butler, instrument, visit_id, start_time_unix_tai):
def prepare_one_visit(kafka_url, group_id, butler, instrument, visit_id):
"""Extract metadata and send next_visit events for one visit

One ``next_visit`` message is sent to the development fan-out service,
Expand All @@ -208,9 +209,6 @@ def prepare_one_visit(kafka_url, group_id, butler, instrument, visit_id, start_t
The short instrument name of this visit.
visit_id : `int`
The ID of a visit in the dataset.
start_time_unix_tai : `float`
The Unix time (TAI) of the exposure start.
This is only used for LSSTCam-imSim.

Returns
-------
Expand All @@ -226,10 +224,8 @@ def prepare_one_visit(kafka_url, group_id, butler, instrument, visit_id, start_t
duration = float(EXPOSURE_INTERVAL + SLEW_INTERVAL)
# all items in refs share the same visit info and one event is to be sent
for data_id in refs.dataIds.limit(1).expanded():
# All instruments except LSSTCam-imSim uses original exposure timespan.
# All instruments use original exposure timespan.
start_time = data_id.records["exposure"].timespan.begin
if instrument == "LSSTCam-imSim":
start_time = astropy.time.Time(start_time_unix_tai, format="unix_tai")
visit = SummitVisit(
instrument=instrument,
groupId=group_id,
Expand All @@ -254,7 +250,73 @@ def prepare_one_visit(kafka_url, group_id, butler, instrument, visit_id, start_t
return refs


def upload_images(pool, temp_dir, group_id, ref_dict, exposure_start):
def load_raw_to_temp(temp_dir, ref_dict, pool=None):
"""
Copy the raw data from the source butler to a temporary folder

Parameters
----------
temp_dir : `str`
A directory in which to temporarily hold the images so that their
metadata can be modified.
ref_dict : `dict` [ `lsst.daf.butler.DatasetRef`, `lsst.daf.butler.datastore.DatasetRefURIs` ]
A dict of the datasetRefs to upload and their corresponding URIs.
pool : `multiprocessing.Pool`, optional
A multiprocessing pool to use for parallel file copying.

Returns
-------
result : `dict`
A dictionary with:
- "mode": "zip", "fits-serial", or "fits-parallel"
- "async_result": `multiprocessing.AsyncResult` if multiprocessing is used, else `None`.

Notes
-----
A data butler repo can store raw data in two formats: one fits file
for each detector, or one zip file for each exposure containing all
detector fits files. This function assumes that either all refs point
to one same zip file, or each ref is its own fits file. For zip,
extract all files to the temporary folder. Either way, the temporary
folder will be loaded with detector-level fits files.
"""
if not ref_dict:
raise ValueError("ref_dict is empty")

uri = next(iter(ref_dict.values())).primaryURI
# Determine whether the detector data is stored as zip file
if uri.fragment and (uri.getExtension() == ".zip"):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be overkill, but do you want to handle the case where a file might really be a zip file but not have the right extension? Using python-magic or some such would do it, but I don't think it's installed in the stack, and maybe it's not worth it to deal with something that's pretty unlikely to happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If butler is returning you a URI with a fragment the you know exactly what the file within the zip is that the butler wants you to retrieve. The fragment has the form zip-path= and you can look at the logic in the FormatterV2 code for dealing with extracting it.

_log.info(f"Extracting zip file {uri.basename()}")
with uri.open("rb") as fd:
with zipfile.ZipFile(fd) as zf:
zf.extractall(temp_dir)
return {
"mode": "zip",
"async_result": None,
}

if pool is None:
_log.warning("Multiprocessing pool is not provided; fallback to serial.")
for r in ref_dict:
_load_one_detector_to_temp(temp_dir, r, ref_dict[r].primaryURI)
return {
"mode": "fits-serial",
"async_result": None,
}

async_result = pool.starmap_async(
_load_one_detector_to_temp,
[(temp_dir, r, ref_dict[r].primaryURI) for r in ref_dict],
error_callback=_log.exception,
chunksize=5,
)
return {
"mode": "fits-parallel",
"async_result": async_result,
}


def upload_images(pool, temp_dir, group_id, ref_dict):
"""Upload one group of raw images to the central repo

Parameters
Expand All @@ -268,15 +330,18 @@ def upload_images(pool, temp_dir, group_id, ref_dict, exposure_start):
The group ID under which to store the images.
ref_dict : `dict` [ `lsst.daf.butler.DatasetRef`, `lsst.daf.butler.datastore.DatasetRefURIs` ]
A dict of the datasetRefs to upload and their corresponding URIs.
exposure_start : `float`
The Unix time (TAI) of the exposure start.
"""
# Non-blocking assignment lets us upload during the next exposure.
# Can't time these tasks directly, but the blocking equivalent took
# 12-20 s depending on tuning, or less than a single exposure.
args = []
for ref in ref_dict:
uri = ref_dict[ref].primaryURI
filename = uri.fragment.partition("=")[-1] if uri.fragment else uri.basename()
args.append((temp_dir, group_id, ref, filename))
pool.starmap_async(
_upload_one_image,
[(temp_dir, group_id, r, ref_dict[r].primaryURI, exposure_start) for r in ref_dict],
args,
error_callback=_log.exception,
chunksize=5 # Works well across a broad range of # processes
)
Expand All @@ -297,8 +362,14 @@ def _get_max_processes():
return 4


def _upload_one_image(temp_dir, group_id, ref, uri, exposure_start):
"""Upload a raw image to the central repo.
def _load_one_detector_to_temp(temp_dir, ref, uri):
path = os.path.join(temp_dir, uri.basename())
ResourcePath(path).transfer_from(uri, transfer="copy")
_log.debug(f"Raw file for {ref.dataId} was copied from Butler to {path}")


def _upload_one_image(temp_dir, group_id, ref, filename):
"""Upload a raw detector image to the central repo.

Parameters
----------
Expand All @@ -309,17 +380,13 @@ def _upload_one_image(temp_dir, group_id, ref, uri, exposure_start):
The group ID under which to store the images.
ref : `lsst.daf.butler.DatasetRef`
The dataset to upload.
uri : `lsst.resources.ResourcePath`
URI to the image to upload.
exposure_start : `float`
The Unix time (TAI) of the exposure start.
Currently only used in imSim tests.
filename : `str`
The fits file with the raw image to upload. The file is expected to
exist in `temp_dir`.
"""
instrument = ref.dataId["instrument"]
with time_this(log=_log, msg="Single-image processing", prefix=None):
exposure_num, headers = make_exposure_id(instrument, group_id, 0)
if instrument == "LSSTCam-imSim":
headers.update(make_imsim_time_headers(EXPOSURE_INTERVAL, exposure_start))
dest_key = get_raw_path(
instrument,
ref.dataId["detector"],
Expand All @@ -329,32 +396,16 @@ def _upload_one_image(temp_dir, group_id, ref, uri, exposure_start):
ref.dataId["physical_filter"],
)

sidecar_uploaded = False
if instrument in _LSST_CAMERA_LIST:
# Upload a corresponding sidecar json file
sidecar = uri.updatedExtension("json")
if sidecar.exists():
with sidecar.open("r") as f:
md = json.load(f)
md.update(headers)
dest_bucket.put_object(
Body=json.dumps(md), Key=dest_key.removesuffix("fits") + "json"
)
sidecar_uploaded = True

path = os.path.join(temp_dir, uri.basename())
ResourcePath(path).transfer_from(uri, transfer="copy")
_log.debug(
f"Raw file for {ref.dataId} was copied from Butler to {path}"
)
path = os.path.join(temp_dir, filename)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth adding a if os.path.exists(path) and then raise an exception if it doesn't, although the file open right after this would fail anyway in that case.

try:
with open(path, "r+b") as temp_file:
for header_key in headers:
replace_header_key(temp_file, header_key, headers[header_key])
if not sidecar_uploaded and instrument in _LSST_CAMERA_LIST:
if instrument in _LSST_CAMERA_LIST:
with fits.open(temp_file, mode="update") as hdul:
header = {k: v for k, v in hdul[0].header.items() if k != ""}
dest_bucket.put_object(
Body=json.dumps(dict(hdul[0].header)),
Body=json.dumps(header),
Key=dest_key.removesuffix("fits") + "json",
)
with open(path, "rb") as temp_file:
Expand Down
Loading