diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 769d867c8..54ca5fed3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -19,6 +19,7 @@ import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap +import org.opensearch.alerting.util.getCancelAfterTimeInterval import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexNameExpressionResolver @@ -26,6 +27,7 @@ import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.ClusterMetricsInput @@ -185,6 +187,11 @@ class InputService( searchRequest.source(SearchSourceBuilder.fromXContent(it)) } + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } + // Add user role filter for AD result client.threadPool().threadContext.stashContext().use { // Possible long term solution: @@ -268,6 +275,11 @@ class InputService( searchRequest.source(SearchSourceBuilder.fromXContent(it)) } + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } + return searchRequest } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 30d61cc27..fc2403db7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -59,6 +59,7 @@ import org.opensearch.alerting.util.destinationmigration.getTitle import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification import org.opensearch.alerting.util.destinationmigration.sendNotification import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.util.getCancelAfterTimeInterval import org.opensearch.alerting.util.isAllowed import org.opensearch.alerting.util.isTestAction import org.opensearch.alerting.util.parseSampleDocTags @@ -69,6 +70,7 @@ import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.AlertingPluginInterface @@ -969,6 +971,11 @@ class TransportDocLevelMonitorFanOutAction .query(boolQueryBuilder) ) + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } + val response: SearchResponse = client.suspendUntil { client.search(request, it) } if (response.status() !== RestStatus.OK) { throw IOException( @@ -1013,6 +1020,10 @@ class TransportDocLevelMonitorFanOutAction val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } log.debug( "Monitor ${monitor.id}: " + "Executing percolate query for docs from source indices " + @@ -1092,6 +1103,11 @@ class TransportDocLevelMonitorFanOutAction .size(docLevelMonitorShardFetchSize) ) + val cancelTimeout = getCancelAfterTimeInterval() + if (cancelTimeout != -1L) { + request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout) + } + if (fieldsToFetch.isNotEmpty() && fetchOnlyQueryFieldNames) { request.source().fetchSource(false) for (field in fieldsToFetch) { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt index 31dcb6591..ba1ae85b1 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/AlertingUtilsTests.kt @@ -5,6 +5,8 @@ package org.opensearch.alerting.util +import org.opensearch.alerting.AlertService +import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomBucketLevelTrigger @@ -14,6 +16,7 @@ import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.randomTemplateScript import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.common.unit.TimeValue import org.opensearch.test.OpenSearchTestCase class AlertingUtilsTests : OpenSearchTestCase() { @@ -176,4 +179,35 @@ class AlertingUtilsTests : OpenSearchTestCase() { triggers.forEach { trigger -> assertFalse(printsSampleDocData(trigger)) } } + + fun `test getCancelAfterTimeInterval returns -1 when setting is default`() { + val original = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval + try { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue.timeValueMinutes(-1) + assertEquals(-1L, getCancelAfterTimeInterval()) + } finally { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = original + } + } + + fun `test getCancelAfterTimeInterval returns at least ALERTS_SEARCH_TIMEOUT`() { + val original = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval + try { + // Setting lower than ALERTS_SEARCH_TIMEOUT (5 min) should return 5 min + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue.timeValueMinutes(1) + assertEquals(AlertService.ALERTS_SEARCH_TIMEOUT.minutes, getCancelAfterTimeInterval()) + } finally { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = original + } + } + + fun `test getCancelAfterTimeInterval returns setting when higher than ALERTS_SEARCH_TIMEOUT`() { + val original = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval + try { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue.timeValueMinutes(10) + assertEquals(10L, getCancelAfterTimeInterval()) + } finally { + MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = original + } + } }