Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,18 @@ private[sql] class ObservationManager(session: SparkSession) {
})

private def tryComplete(qe: QueryExecution): Unit = {
val allMetrics = qe.observedMetrics
// Use lazy val to defer collecting the observed metrics until it is needed so that tryComplete
// can finish faster (e.g., when the logical plan doesn't contain CollectMetrics).
lazy val allMetrics = qe.observedMetrics
qe.logical.foreachWithSubqueriesAndPruning(
_.containsPattern(TreePattern.COLLECT_METRICS)) {
case c: CollectMetrics =>
val keyExists = observations.containsKey((c.name, c.dataframeId))
val metrics = allMetrics.get(c.name)
if (keyExists && metrics.isEmpty) {
// If the key exists but no metrics were collected, it means for some reason the metrics
// could not be collected. This can happen e.g., if the CollectMetricsExec was optimized
// away.
val observation = observations.remove((c.name, c.dataframeId))
if (observation != null) {
observation.setMetricsAndNotify(Row.empty)
}
} else {
metrics.foreach { metrics =>
val observation = observations.remove((c.name, c.dataframeId))
if (observation != null) {
observation.setMetricsAndNotify(metrics)
}
}
val observation = observations.remove((c.name, c.dataframeId))
if (observation != null) {
// If the key exists but no metrics were collected, it means for some reason the
// metrics could not be collected. This can happen e.g., if the CollectMetricsExec
// was optimized away.
observation.setMetricsAndNotify(allMetrics.getOrElse(c.name, Row.empty))
}
case _ =>
}
Expand Down