From 8c1f3aba1a571b2f668909e3a14f9da74ec29415 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 17 Mar 2026 23:00:23 +0000 Subject: [PATCH] fix: Set cancelAfterTimeInterval on SearchRequest in InputService and DocLevelMonitorFanOut (#2042) PR #1366 added cancelAfterTimeInterval to BucketLevelMonitorRunner and DocumentLevelMonitorRunner, but missed InputService.getSearchRequest() and collectInputResultsForADMonitor(). TransportDocLevelMonitorFanOutAction was added after #1366 and also lacks the setting on its 3 SearchRequest sites. This sets cancelAfterTimeInterval on all remaining SearchRequest constructions in monitor runners, guarding against the -1 default to avoid setting a negative timeout. Adds unit tests for getCancelAfterTimeInterval(). Resolves #827 Signed-off-by: Manaswini Ragamouni Co-authored-by: Manaswini Ragamouni (cherry picked from commit 665cd286a3337ce4846fd498ae40111015b1db91) Signed-off-by: github-actions[bot] --- .../org/opensearch/alerting/InputService.kt | 12 +++++++ .../TransportDocLevelMonitorFanOutAction.kt | 16 +++++++++ .../alerting/util/AlertingUtilsTests.kt | 34 +++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index c77878bd4..3ee6d644d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -19,6 +19,7 @@ import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap +import org.opensearch.alerting.util.getCancelAfterTimeInterval import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.alerting.util.use import org.opensearch.cluster.metadata.IndexNameExpressionResolver @@ -26,6 +27,7 @@ import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.ClusterMetricsInput @@ -186,6 +188,11 @@ class InputService( searchRequest.source(SearchSourceBuilder.fromXContent(it)) } + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } + // Add user role filter for AD result client.threadPool().threadContext.stashContext().use { // Possible long term solution: @@ -269,6 +276,11 @@ class InputService( searchRequest.source(SearchSourceBuilder.fromXContent(it)) } + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } + return searchRequest } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 2e71cf5af..51fc6c070 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -59,6 +59,7 @@ import org.opensearch.alerting.util.destinationmigration.getTitle import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification import org.opensearch.alerting.util.destinationmigration.sendNotification import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.util.getCancelAfterTimeInterval import org.opensearch.alerting.util.isAllowed import org.opensearch.alerting.util.isTestAction import org.opensearch.alerting.util.parseSampleDocTags @@ -68,6 +69,7 @@ import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.AlertingPluginInterface @@ -970,6 +972,11 @@ class TransportDocLevelMonitorFanOutAction .query(boolQueryBuilder) ) + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } + val response: SearchResponse = client.suspendUntil { client.search(request, it) } if (response.status() !== RestStatus.OK) { throw IOException( @@ -1014,6 +1021,10 @@ class TransportDocLevelMonitorFanOutAction val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } log.debug( "Monitor ${monitor.id}: " + "Executing percolate query for docs from source indices " + @@ -1093,6 +1104,11 @@ class TransportDocLevelMonitorFanOutAction .size(docLevelMonitorShardFetchSize) ) + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } + if (fieldsToFetch.isNotEmpty() && fetchOnlyQueryFieldNames) { request.source().fetchSource(false) for (field in fieldsToFetch) { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt index 31dcb6591..ba1ae85b1 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt @@ -5,6 +5,8 @@ package org.opensearch.alerting.util +import org.opensearch.alerting.AlertService +import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomBucketLevelTrigger @@ -14,6 +16,7 @@ import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.randomTemplateScript import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.common.unit.TimeValue import org.opensearch.test.OpenSearchTestCase class AlertingUtilsTests : OpenSearchTestCase() { @@ -176,4 +179,35 @@ class AlertingUtilsTests : OpenSearchTestCase() { triggers.forEach { trigger -> assertFalse(printsSampleDocData(trigger)) } } + + fun `test getCancelAfterTimeInterval returns -1 when setting is default`() { + val original = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval + try { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue.timeValueMinutes(-1) + assertEquals(-1L, getCancelAfterTimeInterval()) + } finally { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = original + } + } + + fun `test getCancelAfterTimeInterval returns at least ALERTS_SEARCH_TIMEOUT`() { + val original = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval + try { + // Setting lower than ALERTS_SEARCH_TIMEOUT (5 min) should return 5 min + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue.timeValueMinutes(1) + assertEquals(AlertService.ALERTS_SEARCH_TIMEOUT.minutes, getCancelAfterTimeInterval()) + } finally { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = original + } + } + + fun `test getCancelAfterTimeInterval returns setting when higher than ALERTS_SEARCH_TIMEOUT`() { + val original = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval + try { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue.timeValueMinutes(10) + assertEquals(10L, getCancelAfterTimeInterval()) + } finally { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = original + } + } }