Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ClusterMetricsInput
Expand Down Expand Up @@ -185,6 +187,11 @@ class InputService(
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}

val cancelTimeout = getCancelAfterTimeInterval()
if (cancelTimeout != -1L) {
searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout)
}

// Add user role filter for AD result
client.threadPool().threadContext.stashContext().use {
// Possible long term solution:
Expand Down Expand Up @@ -268,6 +275,11 @@ class InputService(
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}

val cancelTimeout = getCancelAfterTimeInterval()
if (cancelTimeout != -1L) {
searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout)
}

return searchRequest
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import org.opensearch.alerting.util.destinationmigration.getTitle
import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification
import org.opensearch.alerting.util.destinationmigration.sendNotification
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.isAllowed
import org.opensearch.alerting.util.isTestAction
import org.opensearch.alerting.util.parseSampleDocTags
Expand All @@ -69,6 +70,7 @@ import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
Expand Down Expand Up @@ -969,6 +971,11 @@ class TransportDocLevelMonitorFanOutAction
.query(boolQueryBuilder)
)

val cancelTimeout = getCancelAfterTimeInterval()
if (cancelTimeout != -1L) {
request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout)
}

val response: SearchResponse = client.suspendUntil { client.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException(
Expand Down Expand Up @@ -1013,6 +1020,10 @@ class TransportDocLevelMonitorFanOutAction
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
val cancelTimeout = getCancelAfterTimeInterval()
if (cancelTimeout != -1L) {
searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout)
}
log.debug(
"Monitor ${monitor.id}: " +
"Executing percolate query for docs from source indices " +
Expand Down Expand Up @@ -1092,6 +1103,11 @@ class TransportDocLevelMonitorFanOutAction
.size(docLevelMonitorShardFetchSize)
)

val cancelTimeout = getCancelAfterTimeInterval()
if (cancelTimeout != -1L) {
request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(cancelTimeout)
}

if (fieldsToFetch.isNotEmpty() && fetchOnlyQueryFieldNames) {
request.source().fetchSource(false)
for (field in fieldsToFetch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.alerting.util

import org.opensearch.alerting.AlertService
import org.opensearch.alerting.MonitorRunnerService
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.randomAction
import org.opensearch.alerting.randomBucketLevelTrigger
Expand All @@ -14,6 +16,7 @@ import org.opensearch.alerting.randomQueryLevelTrigger
import org.opensearch.alerting.randomTemplateScript
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.common.unit.TimeValue
import org.opensearch.test.OpenSearchTestCase

class AlertingUtilsTests : OpenSearchTestCase() {
Expand Down Expand Up @@ -176,4 +179,35 @@ class AlertingUtilsTests : OpenSearchTestCase() {

triggers.forEach { trigger -> assertFalse(printsSampleDocData(trigger)) }
}

fun `test getCancelAfterTimeInterval returns -1 when setting is default`() {
val original = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval
try {
MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue.timeValueMinutes(-1)
assertEquals(-1L, getCancelAfterTimeInterval())
} finally {
MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = original
}
}

fun `test getCancelAfterTimeInterval returns at least ALERTS_SEARCH_TIMEOUT`() {
val original = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval
try {
// Setting lower than ALERTS_SEARCH_TIMEOUT (5 min) should return 5 min
MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue.timeValueMinutes(1)
assertEquals(AlertService.ALERTS_SEARCH_TIMEOUT.minutes, getCancelAfterTimeInterval())
} finally {
MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = original
}
}

fun `test getCancelAfterTimeInterval returns setting when higher than ALERTS_SEARCH_TIMEOUT`() {
val original = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval
try {
MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue.timeValueMinutes(10)
assertEquals(10L, getCancelAfterTimeInterval())
} finally {
MonitorRunnerService.monitorCtx.cancelAfterTimeInterval = original
}
}
}
Loading