Skip to content

Commit b7f319c

Browse files
committed
Write to the APDB in chunks to avoid overwhelming the database
1 parent 4010046 commit b7f319c

File tree

1 file changed

+36
-6
lines changed

1 file changed

+36
-6
lines changed

python/lsst/ap/association/diaPipe.py

+36-6
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,13 @@ class DiaPipelineConfig(pipeBase.PipelineTaskConfig,
309309
doc="Pad the image by this many pixels before removing off-image "
310310
"diaObjects for association.",
311311
)
312+
maximumTableLength = pexConfig.Field(
313+
dtype=int,
314+
default=65535,
315+
doc="Maximum length of tables allowed to be written in one operation"
316+
" to the Cassandra APDB. The default is the hard maximum, "
317+
"but performance may be improved with lower values",
318+
)
312319
idGenerator = DetectorVisitIdGeneratorConfig.make_field()
313320

314321
def setDefaults(self):
@@ -782,12 +789,35 @@ def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources)
782789
diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject")
783790
diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource")
784791
diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource")
785-
self.apdb.store(
786-
DateTime.now().toAstropy(),
787-
diaObjectStore,
788-
diaSourceStore,
789-
diaForcedSourceStore)
790-
self.log.info("APDB updated.")
792+
793+
nObject = len(diaObjectStore)
794+
nSource = len(diaSourceStore)
795+
nForced = len(diaForcedSourceStore)
796+
nObjWritten = 0
797+
start = 0
798+
end = min(nObject - 1, self.config.maximumTableLength - 1)
799+
time = DateTime.now().toAstropy()
800+
while nObjWritten < nObject:
801+
srcEnd = min(start + self.config.maximumTableLength, nSource)
802+
if srcEnd <= start:
803+
finalDiaSources = None
804+
else:
805+
finalDiaSources = diaSourceStore.iloc[start:srcEnd]
806+
807+
fSrcEnd = min(start + self.config.maximumTableLength, nForced)
808+
if fSrcEnd <= start:
809+
finalForcedSources = None
810+
else:
811+
finalForcedSources = diaForcedSourceStore.iloc[start:fSrcEnd]
812+
self.apdb.store(
813+
time,
814+
diaObjectStore.iloc[start:end],
815+
finalDiaSources,
816+
finalForcedSources)
817+
self.log.info("APDB updated.")
818+
start = end + 1
819+
nObjWritten += self.config.maximumTableLength
820+
end = np.min(nObject - 1, start + self.config.maximumTableLength - 1)
791821

792822
def testDataFrameIndex(self, df):
793823
"""Test the sorted DataFrame index for duplicates.

0 commit comments

Comments
 (0)