Skip to content

Commit e67d24e

Browse files
Merge pull request #1112 from lsst/tickets/DM-50431
DM-50431: Pass Exposure metadata from multiBand to propagateSourceFlags
2 parents 30d60c3 + 4465ffb commit e67d24e

File tree

2 files changed

+79
-29
lines changed

2 files changed

+79
-29
lines changed

python/lsst/pipe/tasks/multiBand.py

+63-24
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
# You should have received a copy of the GNU General Public License
2020
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2121

22-
__all__ = ["DetectCoaddSourcesConfig", "DetectCoaddSourcesTask"]
22+
__all__ = ["DetectCoaddSourcesConfig", "DetectCoaddSourcesTask",
23+
"MeasureMergedCoaddSourcesConfig", "MeasureMergedCoaddSourcesTask",
24+
]
2325

2426
from lsst.pipe.base import (Struct, PipelineTask, PipelineTaskConfig, PipelineTaskConnections)
2527
import lsst.pipe.base.connectionTypes as cT
@@ -176,9 +178,13 @@ def __init__(self, schema=None, **kwargs):
176178
def runQuantum(self, butlerQC, inputRefs, outputRefs):
177179
inputs = butlerQC.get(inputRefs)
178180
idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
179-
inputs["idFactory"] = idGenerator.make_table_id_factory()
180-
inputs["expId"] = idGenerator.catalog_id
181-
outputs = self.run(**inputs)
181+
exposure = inputs.pop("exposure")
182+
assert not inputs, "runQuantum got more inputs than expected."
183+
outputs = self.run(
184+
exposure=exposure,
185+
idFactory=idGenerator.make_table_id_factory(),
186+
expId=idGenerator.catalog_id,
187+
)
182188
butlerQC.put(outputs, outputRefs)
183189

184190
def run(self, exposure, idFactory, expId):
@@ -191,7 +197,7 @@ def run(self, exposure, idFactory, expId):
191197
Parameters
192198
----------
193199
exposure : `lsst.afw.image.Exposure`
194-
Exposure on which to detect (may be backround-subtracted and scaled,
200+
Exposure on which to detect (may be background-subtracted and scaled,
195201
depending on configuration).
196202
idFactory : `lsst.afw.table.IdFactory`
197203
IdFactory to set source identifiers.
@@ -294,6 +300,14 @@ class MeasureMergedCoaddSourcesConnections(
294300
multiple=True,
295301
deferLoad=True,
296302
)
303+
finalVisitSummaryHandles = cT.Input(
304+
doc="Final visit summary table",
305+
name="finalVisitSummary",
306+
storageClass="ExposureCatalog",
307+
dimensions=("instrument", "visit"),
308+
multiple=True,
309+
deferLoad=True,
310+
)
297311
# TODO[DM-47797]: remove this deprecated connection.
298312
inputCatalog = cT.Input(
299313
doc=("Name of the input catalog to use."
@@ -573,12 +587,15 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
573587

574588
# Set psfcache
575589
# move this to run after gen2 deprecation
576-
inputs['exposure'].getPsf().setCacheCapacity(self.config.psfCache)
590+
exposure = inputs.pop("exposure")
591+
exposure.getPsf().setCacheCapacity(self.config.psfCache)
592+
593+
ccdInputs = exposure.getInfo().getCoaddInputs().ccds
594+
apCorrMap = exposure.getInfo().getApCorrMap()
577595

578596
# Get unique integer ID for IdFactory and RNG seeds; only the latter
579597
# should really be used as the IDs all come from the input catalog.
580598
idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
581-
inputs['exposureId'] = idGenerator.catalog_id
582599

583600
# Transform inputCatalog
584601
table = afwTable.SourceTable.make(self.schema, idGenerator.make_table_id_factory())
@@ -596,7 +613,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
596613
if self.config.doAddFootprints:
597614
modelData = inputs.pop('scarletModels')
598615
if self.config.doConserveFlux:
599-
imageForRedistribution = inputs['exposure']
616+
imageForRedistribution = exposure
600617
else:
601618
imageForRedistribution = None
602619
updateCatalogFootprints(
@@ -609,7 +626,6 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
609626
)
610627
table = sources.getTable()
611628
table.setMetadata(self.algMetadata) # Capture algorithm metadata to write out to the source catalog.
612-
inputs['sources'] = sources
613629

614630
skyMap = inputs.pop('skyMap')
615631
tractNumber = catalogRef.dataId['tract']
@@ -622,29 +638,45 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
622638
wcs=tractInfo.getWcs(),
623639
bbox=patchInfo.getOuterBBox()
624640
)
625-
inputs['skyInfo'] = skyInfo
626641

627642
if self.config.doPropagateFlags:
628-
ccdInputs = inputs["exposure"].getInfo().getCoaddInputs().ccds
629-
inputs["ccdInputs"] = ccdInputs
630-
631643
if "sourceTableHandles" in inputs:
632644
sourceTableHandles = inputs.pop("sourceTableHandles")
633645
sourceTableHandleDict = {handle.dataId["visit"]: handle for handle in sourceTableHandles}
634-
inputs["sourceTableHandleDict"] = sourceTableHandleDict
646+
else:
647+
sourceTableHandleDict = None
635648
if "finalizedSourceTableHandles" in inputs:
636649
finalizedSourceTableHandles = inputs.pop("finalizedSourceTableHandles")
637650
finalizedSourceTableHandleDict = {handle.dataId["visit"]: handle
638651
for handle in finalizedSourceTableHandles}
639-
inputs["finalizedSourceTableHandleDict"] = finalizedSourceTableHandleDict
640-
641-
outputs = self.run(**inputs)
652+
else:
653+
finalizedSourceTableHandleDict = None
654+
if "finalVisitSummaryHandles" in inputs:
655+
finalVisitSummaryHandles = inputs.pop("finalVisitSummaryHandles")
656+
finalVisitSummaryHandleDict = {handle.dataId["visit"]: handle
657+
for handle in finalVisitSummaryHandles}
658+
else:
659+
finalVisitSummaryHandleDict = None
660+
661+
assert not inputs, "runQuantum got more inputs than expected."
662+
outputs = self.run(
663+
exposure=exposure,
664+
sources=sources,
665+
skyInfo=skyInfo,
666+
exposureId=idGenerator.catalog_id,
667+
ccdInputs=ccdInputs,
668+
sourceTableHandleDict=sourceTableHandleDict,
669+
finalizedSourceTableHandleDict=finalizedSourceTableHandleDict,
670+
finalVisitSummaryHandleDict=finalVisitSummaryHandleDict,
671+
apCorrMap=apCorrMap,
672+
)
642673
# Strip HeavyFootprints to save space on disk
643674
sources = outputs.outputSources
644675
butlerQC.put(outputs, outputRefs)
645676

646677
def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None,
647-
sourceTableHandleDict=None, finalizedSourceTableHandleDict=None):
678+
sourceTableHandleDict=None, finalizedSourceTableHandleDict=None, finalVisitSummaryHandleDict=None,
679+
apCorrMap=None):
648680
"""Run measurement algorithms on the input exposure, and optionally populate the
649681
resulting catalog with extra information.
650682
@@ -665,12 +697,16 @@ def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None,
665697
the coadd.
666698
sourceTableHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional
667699
Dict for sourceTable_visit handles (key is visit) for propagating flags.
668-
These tables are derived from the ``CalibrateTask`` sources, and contain
669-
astrometry and photometry flags, and optionally PSF flags.
700+
These tables contain astrometry and photometry flags, and optionally PSF flags.
670701
finalizedSourceTableHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional
671702
Dict for finalized_src_table handles (key is visit) for propagating flags.
672-
These tables are derived from ``FinalizeCalibrationTask`` and contain
673-
PSF flags from the finalized PSF estimation.
703+
These tables contain PSF flags from the finalized PSF estimation.
704+
finalVisitSummaryHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional
705+
Dict for visit_summary handles (key is visit) for visit-level information.
706+
These tables contain the WCS information of the single-visit input images.
707+
apCorrMap : `lsst.afw.image.ApCorrMap`, optional
708+
Aperture correction map attached to the ``exposure``. If None, it
709+
will be read from the ``exposure``.
674710
675711
Returns
676712
-------
@@ -683,9 +719,11 @@ def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None,
683719
self.measurement.run(sources, exposure, exposureId=exposureId)
684720

685721
if self.config.doApCorr:
722+
if apCorrMap is None:
723+
apCorrMap = exposure.getInfo().getApCorrMap()
686724
self.applyApCorr.run(
687725
catalog=sources,
688-
apCorrMap=exposure.getInfo().getApCorrMap()
726+
apCorrMap=apCorrMap,
689727
)
690728

691729
# TODO DM-11568: this contiguous check-and-copy could go away if we
@@ -705,7 +743,8 @@ def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None,
705743
sources,
706744
ccdInputs,
707745
sourceTableHandleDict,
708-
finalizedSourceTableHandleDict
746+
finalizedSourceTableHandleDict,
747+
finalVisitSummaryHandleDict,
709748
)
710749

711750
results = Struct()

python/lsst/pipe/tasks/propagateSourceFlags.py

+16-5
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ def __init__(self, schema, **kwargs):
113113
self.schema.addField(f, type="Flag", doc="Propagated from finalized sources")
114114

115115
def run(self, coadd_object_cat, ccd_inputs,
116-
source_table_handle_dict=None, finalized_source_table_handle_dict=None):
116+
source_table_handle_dict=None, finalized_source_table_handle_dict=None,
117+
visit_summary_handle_dict=None):
117118
"""Propagate flags from single-frame sources to coadd objects.
118119
119120
Flags are only propagated if a configurable percentage of the sources
@@ -126,13 +127,16 @@ def run(self, coadd_object_cat, ccd_inputs,
126127
Table of coadd objects.
127128
ccd_inputs : `lsst.afw.table.ExposureCatalog`
128129
Table of single-frame inputs to coadd.
129-
source_table_handle_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
130+
source_table_handle_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`], optional
130131
Dict for sourceTable_visit handles (key is visit). May be None if
131132
``config.source_flags`` has no entries.
132133
finalized_source_table_handle_dict : `dict` [`int`:
133-
`lsst.daf.butler.DeferredDatasetHandle`]
134+
`lsst.daf.butler.DeferredDatasetHandle`], optional
134135
Dict for finalized_src_table handles (key is visit). May be None if
135136
``config.finalized_source_flags`` has no entries.
137+
visit_summary_handle_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`], optional
138+
Dict for visitSummary handles (key is visit). If None, using WCS
139+
from the ccd_inputs will be attempted.
136140
"""
137141
if len(self.config.source_flags) == 0 and len(self.config.finalized_source_flags) == 0:
138142
return
@@ -185,11 +189,18 @@ def run(self, coadd_object_cat, ccd_inputs,
185189
continue
186190
handle = handle_dict[visit]
187191
tbl = handle.get(parameters={"columns": columns})
192+
if visit_summary_handle_dict is not None:
193+
visit_summary = visit_summary_handle_dict[visit].get()
188194

189195
# Loop over all ccd_inputs rows for this visit.
190-
for row in ccd_inputs[ccd_inputs["visit"] == visit]:
196+
for row in ccd_inputs:
197+
if row["visit"] != visit:
198+
continue
191199
detector = row["ccd"]
192-
wcs = row.getWcs()
200+
if visit_summary_handle_dict is None:
201+
wcs = row.getWcs()
202+
else:
203+
wcs = visit_summary.find(detector).getWcs()
193204
if wcs is None:
194205
self.log.info("No WCS for visit %d detector %d, so can't match sources to "
195206
"propagate flags. Skipping...", visit, detector)

0 commit comments

Comments
 (0)