From 684d1ff269b71241e74d853d0956ceb22cb7e02e Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 31 Mar 2026 06:22:27 -0700 Subject: [PATCH 1/8] feat(settings): Add multi_tenant_trigger_eval_enabled setting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Register new boolean cluster setting to gate multi-tenant trigger evaluation code. Setting is dynamic, node-scoped, defaults to false. - Define MULTI_TENANT_TRIGGER_EVAL_ENABLED in AlertingSettings - Register in AlertingPlugin.getSettings() - Add multiTenantTriggerEvalEnabled field to MonitorRunnerExecutionContext - Wire initial read + dynamic update consumer in MonitorRunnerService - Add unit tests for default value and registration 🤖 Assisted by the code-assist SOP Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertingPlugin.kt | 3 ++- .../alerting/MonitorRunnerExecutionContext.kt | 1 + .../org/opensearch/alerting/MonitorRunnerService.kt | 5 +++++ .../alerting/settings/AlertingSettings.kt | 6 ++++++ .../alerting/settings/AlertingSettingsTests.kt | 13 +++++++++++++ 5 files changed, 27 insertions(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index a2a01e645..9b02e798e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -433,7 +433,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.COMMENTS_MAX_CONTENT_SIZE, AlertingSettings.MAX_COMMENTS_PER_ALERT, AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION, - AlertingSettings.NOTIFICATION_CONTEXT_RESULTS_ALLOWED_ROLES + AlertingSettings.NOTIFICATION_CONTEXT_RESULTS_ALLOWED_ROLES, + AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index a890ec1a6..ba12100ba 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -71,4 +71,5 @@ data class MonitorRunnerExecutionContext( AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, @Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES, @Volatile var lockService: LockService? = null, + @Volatile var multiTenantTriggerEvalEnabled: Boolean = false, ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index f8703aec2..8e4c44760 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -259,6 +259,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.totalNodesFanOut = it } + monitorCtx.multiTenantTriggerEvalEnabled = AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED) { + monitorCtx.multiTenantTriggerEvalEnabled = it + } + return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 2659ae74c..6130522ba 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -311,5 +311,11 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val MULTI_TENANT_TRIGGER_EVAL_ENABLED = Setting.boolSetting( + "plugins.alerting.multi_tenant_trigger_eval_enabled", + false, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/settings/AlertingSettingsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/settings/AlertingSettingsTests.kt index 6ee8c4997..8f18ed531 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/settings/AlertingSettingsTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/settings/AlertingSettingsTests.kt @@ -56,6 +56,19 @@ class AlertingSettingsTests : OpenSearchTestCase() { ) } + fun `test multi tenant trigger eval setting defaults to false`() { + val value = AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED.get(Settings.EMPTY) + assertFalse("multi_tenant_trigger_eval_enabled should default to false", value) + } + + fun `test multi tenant trigger eval setting is registered`() { + val settings = plugin.settings + assertTrue( + "MULTI_TENANT_TRIGGER_EVAL_ENABLED not registered", + settings.contains(AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED) + ) + } + fun `test all opensearch settings returned`() { val settings = plugin.settings assertTrue( From d55398ef1314ae61d9475d267cb0a61e80bb06a9 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 31 Mar 2026 06:38:38 -0700 Subject: [PATCH 2/8] feat(trigger): Add script rewriter and remote query-level trigger evaluator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add TriggerScriptRewriter that replaces ctx.results[0] with params.results_0 for remote Painless evaluation on customer's cluster. Add RemoteQueryLevelTriggerEvaluator that builds a filter-agg search request with one agg per trigger, sends it to the customer's cluster, and parses doc_count to determine trigger results. Multiple triggers are batched in a single request. Errors default to triggered (fail-open). 🤖 Assisted by the code-assist SOP Signed-off-by: Surya Sashank Nistala --- .../RemoteQueryLevelTriggerEvaluator.kt | 139 ++++++++++++++++++ .../alerting/util/TriggerScriptRewriter.kt | 28 ++++ .../RemoteQueryLevelTriggerEvaluatorTests.kt | 84 +++++++++++ .../util/TriggerScriptRewriterTests.kt | 50 +++++++ 4 files changed, 301 insertions(+) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/util/TriggerScriptRewriterTests.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt new file mode 100644 index 000000000..fe7d89b83 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt @@ -0,0 +1,139 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.trigger + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.opensearchapi.convertToMap +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.util.TriggerScriptRewriter +import org.opensearch.commons.alerting.model.QueryLevelTrigger +import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult +import org.opensearch.index.query.QueryBuilders +import org.opensearch.script.Script +import org.opensearch.script.ScriptType +import org.opensearch.search.aggregations.AggregationBuilders +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.transport.client.Client + +/** + * Evaluates query-level triggers remotely on the customer's cluster via filter aggregations. + * + * Instead of executing Painless scripts on the multi-tenant Oasis node, this evaluator: + * 1. Rewrites trigger scripts from `ctx.results[0]` to `params.results_0` + * 2. Sends a single search request with one filter agg per trigger + * 3. Passes the full search response from Call 1 as `params.results_0` + * 4. Reads `doc_count > 0` per trigger agg to determine if the trigger fires + */ +object RemoteQueryLevelTriggerEvaluator { + + private val logger = LogManager.getLogger(javaClass) + private const val TRIGGER_AGG_PREFIX = "_oasis_trigger_" + + /** + * Evaluates all triggers for a query-level monitor by sending a filter-agg request + * to the customer's cluster. + * + * @param client The client connected to the customer's cluster + * @param indices The indices to target (same as the monitor's search input) + * @param triggers The query-level triggers to evaluate + * @param searchResponse The full search response from the monitor's query (Call 1) + * @return Map of trigger ID to QueryLevelTriggerRunResult + */ + suspend fun evaluate( + client: Client, + indices: List, + triggers: List, + searchResponse: Map + ): Map { + val triggerData = triggers.map { TriggerData(it.id, it.name, it.condition.idOrCode) } + val searchSource = buildEvalSearchSource(triggerData, searchResponse) + + return try { + val evalRequest = SearchRequest(*indices.toTypedArray()).source(searchSource) + val evalResponse: SearchResponse = client.suspendUntil { client.search(evalRequest, it) } + val evalMap = evalResponse.convertToMap() + + @Suppress("UNCHECKED_CAST") + val aggs = evalMap["aggregations"] as? Map> ?: emptyMap() + val triggerIds = triggerData.map { it.id } + val parsedResults = parseEvalResponse(triggerIds, aggs) + + triggers.associate { trigger -> + val triggered = parsedResults[trigger.id] ?: true + trigger.id to QueryLevelTriggerRunResult(trigger.name, triggered, null) + } + } catch (e: Exception) { + logger.error("Error evaluating triggers remotely", e) + // On error, trigger all triggers so the user gets notified (matches existing error handling) + triggers.associate { it.id to QueryLevelTriggerRunResult(it.name, true, e) } + } + } + + /** + * Builds the search source for the evaluation request. + * Each trigger becomes a filter aggregation with a script query. + */ + fun buildEvalSearchSource( + triggers: List, + searchResponse: Map + ): SearchSourceBuilder { + val searchSource = SearchSourceBuilder().size(0) + + for (trigger in triggers) { + val (id, script) = when (trigger) { + is TriggerData -> trigger.id to trigger.script + else -> { + val t = trigger as? Map<*, *> + ?: throw IllegalArgumentException("Unsupported trigger type") + t["id"].toString() to t["script"].toString() + } + } + + val rewrittenScript = TriggerScriptRewriter.rewriteScript(script) + val scriptObj = Script( + ScriptType.INLINE, + "painless", + rewrittenScript, + mapOf("results_0" to searchResponse) + ) + val filterAgg = AggregationBuilders.filter( + "$TRIGGER_AGG_PREFIX$id", + QueryBuilders.scriptQuery(scriptObj) + ) + searchSource.aggregation(filterAgg) + } + + return searchSource + } + + /** + * Parses the evaluation response to determine which triggers fired. + * `doc_count > 0` means the trigger condition was true. + * Missing triggers default to triggered (fail-open for safety). + */ + @Suppress("UNCHECKED_CAST") + fun parseEvalResponse( + triggerIds: List, + aggResults: Map> + ): Map { + return triggerIds.associateWith { triggerId -> + val aggKey = "$TRIGGER_AGG_PREFIX$triggerId" + val aggResult = aggResults[aggKey] + if (aggResult == null) { + logger.warn("Missing evaluation result for trigger $triggerId, defaulting to triggered") + true + } else { + val docCount = (aggResult["doc_count"] as? Number)?.toLong() ?: 0L + docCount > 0 + } + } + } + + /** Internal data class for decoupling trigger data from the full model in buildEvalSearchSource */ + data class TriggerData(val id: String, val name: String, val script: String) +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt new file mode 100644 index 000000000..0c59ab114 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +/** + * Rewrites Painless trigger scripts for remote evaluation on the customer's cluster. + * + * In multi-tenant mode, trigger scripts cannot be executed on the Oasis node. + * Instead, they are sent to the customer's cluster via a filter aggregation. + * The script context on the customer's cluster uses `params` instead of `ctx`, + * so all references to `ctx.results[0]` must be replaced with `params.results_0`. + */ +object TriggerScriptRewriter { + + private const val CTX_RESULTS_0 = "ctx.results[0]" + private const val PARAMS_RESULTS_0 = "params.results_0" + + /** + * Replaces all occurrences of `ctx.results[0]` with `params.results_0` in the given script source. + * MONITOR_MAX_INPUTS = 1, so only `ctx.results[0]` ever exists. + */ + fun rewriteScript(source: String): String { + return source.replace(CTX_RESULTS_0, PARAMS_RESULTS_0) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt new file mode 100644 index 000000000..b073f48ce --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.trigger + +import org.opensearch.alerting.trigger.RemoteQueryLevelTriggerEvaluator.TriggerData +import org.opensearch.test.OpenSearchTestCase + +class RemoteQueryLevelTriggerEvaluatorTests : OpenSearchTestCase() { + + fun `test build eval request with single trigger`() { + val triggers = listOf( + TriggerData( + id = "trigger-1", + name = "test-trigger", + script = "ctx.results[0].hits.total.value > 0" + ) + ) + val searchResponse = mapOf("hits" to mapOf("total" to mapOf("value" to 5))) + val request = RemoteQueryLevelTriggerEvaluator.buildEvalSearchSource(triggers, searchResponse) + + assertNotNull(request) + val requestString = request.toString() + assertTrue("Should contain trigger agg", requestString.contains("_oasis_trigger_trigger-1")) + assertTrue("Should contain rewritten script", requestString.contains("params.results_0")) + } + + fun `test build eval request with multiple triggers`() { + val triggers = listOf( + TriggerData(id = "t1", name = "trigger-1", script = "ctx.results[0].hits.total.value > 0"), + TriggerData(id = "t2", name = "trigger-2", script = "ctx.results[0].hits.total.value > 100"), + TriggerData(id = "t3", name = "trigger-3", script = "ctx.results[0].aggregations.avg_val.value > 50") + ) + val searchResponse = mapOf("hits" to mapOf("total" to mapOf("value" to 5))) + val request = RemoteQueryLevelTriggerEvaluator.buildEvalSearchSource(triggers, searchResponse) + + val requestString = request.toString() + assertTrue("Should contain t1 agg", requestString.contains("_oasis_trigger_t1")) + assertTrue("Should contain t2 agg", requestString.contains("_oasis_trigger_t2")) + assertTrue("Should contain t3 agg", requestString.contains("_oasis_trigger_t3")) + } + + fun `test parse eval response triggered`() { + val triggerIds = listOf("trigger-1") + val aggResults = mapOf( + "_oasis_trigger_trigger-1" to mapOf("doc_count" to 5) + ) + val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) + + assertTrue("trigger-1 should be triggered", results["trigger-1"]!!) + } + + fun `test parse eval response not triggered`() { + val triggerIds = listOf("trigger-1") + val aggResults = mapOf( + "_oasis_trigger_trigger-1" to mapOf("doc_count" to 0) + ) + val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) + + assertFalse("trigger-1 should not be triggered", results["trigger-1"]!!) + } + + fun `test parse eval response multiple triggers mixed`() { + val triggerIds = listOf("t1", "t2") + val aggResults = mapOf( + "_oasis_trigger_t1" to mapOf("doc_count" to 3), + "_oasis_trigger_t2" to mapOf("doc_count" to 0) + ) + val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) + + assertTrue("t1 should be triggered", results["t1"]!!) + assertFalse("t2 should not be triggered", results["t2"]!!) + } + + fun `test parse eval response missing trigger defaults to triggered`() { + val triggerIds = listOf("trigger-1") + val aggResults = emptyMap>() + val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) + + assertTrue("Missing trigger should default to triggered (fail-open)", results["trigger-1"]!!) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/TriggerScriptRewriterTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/TriggerScriptRewriterTests.kt new file mode 100644 index 000000000..b30ae8df7 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/TriggerScriptRewriterTests.kt @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.test.OpenSearchTestCase + +class TriggerScriptRewriterTests : OpenSearchTestCase() { + + fun `test rewrite simple threshold`() { + val source = "ctx.results[0].hits.total.value > 100" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("params.results_0.hits.total.value > 100", rewritten) + } + + fun `test rewrite aggregation access`() { + val source = "ctx.results[0].aggregations.avg_cpu.value > 90" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("params.results_0.aggregations.avg_cpu.value > 90", rewritten) + } + + fun `test rewrite multiple occurrences`() { + val source = "ctx.results[0].hits.total.value > 0 && ctx.results[0].hits.hits.size() > 0" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("params.results_0.hits.total.value > 0 && params.results_0.hits.hits.size() > 0", rewritten) + } + + fun `test rewrite preserves non-ctx content`() { + val source = "def x = ctx.results[0].hits.total.value; return x > 0" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("def x = params.results_0.hits.total.value; return x > 0", rewritten) + } + + fun `test rewrite with no ctx reference`() { + val source = "return true" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("return true", rewritten) + } + + fun `test rewrite loop over hits`() { + val source = "for (def hit : ctx.results[0].hits.hits) { if (hit._source.status == 500) return true } return false" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals( + "for (def hit : params.results_0.hits.hits) { if (hit._source.status == 500) return true } return false", + rewritten + ) + } +} From 59a154587618bab0db89764922d46d2515ea86b5 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 1 Apr 2026 02:18:22 -0700 Subject: [PATCH 3/8] feat(trigger): Wire remote trigger eval into QueryLevelMonitorRunner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When multiTenantTriggerEvalEnabled is true and monitor type is QUERY_LEVEL_MONITOR, batch-evaluate all triggers via RemoteQueryLevelTriggerEvaluator before the trigger loop. Results are looked up per trigger from the pre-computed map. When flag is false, existing ScriptService-based evaluation path is completely unchanged. 🤖 Assisted by the code-assist SOP Signed-off-by: Surya Sashank Nistala --- .../alerting/QueryLevelMonitorRunner.kt | 53 ++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index fec6f0bf4..d5af94374 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -11,6 +11,7 @@ import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.trigger.RemoteQueryLevelTriggerEvaluator import org.opensearch.alerting.util.CommentsUtils import org.opensearch.alerting.util.isADMonitor import org.opensearch.commons.alerting.model.Alert @@ -18,6 +19,7 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult +import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.WorkflowRunContext import org.opensearch.transport.TransportService import java.time.Instant @@ -77,6 +79,25 @@ object QueryLevelMonitorRunner : MonitorRunner() { val updatedAlerts = mutableListOf() val triggerResults = mutableMapOf() + // When multi-tenant trigger eval is enabled, batch-evaluate all query-level triggers + // remotely on the customer's cluster instead of running Painless locally + val remoteTriggerResults = if ( + monitorCtx.multiTenantTriggerEvalEnabled && + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.QUERY_LEVEL_MONITOR && + monitorResult.inputResults.results.isNotEmpty() + ) { + val searchInput = monitor.inputs[0] as SearchInput + val queryLevelTriggers = monitor.triggers.filterIsInstance() + RemoteQueryLevelTriggerEvaluator.evaluate( + monitorCtx.client!!, + searchInput.indices, + queryLevelTriggers, + monitorResult.inputResults.results[0] + ) + } else { + null + } + val maxComments = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION) val alertsToExecuteActionsForIds = currentAlerts.mapNotNull { it.value }.map { it.id } val allAlertsComments = CommentsUtils.getCommentsForAlertNotification( @@ -97,19 +118,27 @@ object QueryLevelMonitorRunner : MonitorRunner() { currentAlertContext, monitorCtx.clusterService!!.clusterSettings ) - val triggerResult = when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) { - Monitor.MonitorType.QUERY_LEVEL_MONITOR -> - monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) - Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { - val remoteMonitoringEnabled = - monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED) - logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) - if (remoteMonitoringEnabled) - monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!) - else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + val triggerResult = if (remoteTriggerResults != null) { + // Use pre-computed remote evaluation results + remoteTriggerResults[trigger.id] + ?: QueryLevelTriggerRunResult(trigger.name, true, null) + } else { + when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) { + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> + monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { + val remoteMonitoringEnabled = + monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED) + logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + if (remoteMonitoringEnabled) + monitorCtx.triggerService!!.runClusterMetricsTrigger( + monitor, trigger, triggerCtx, monitorCtx.clusterService!! + ) + else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + } + else -> + throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.") } - else -> - throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.") } triggerResults[trigger.id] = triggerResult From 359ab318b996fbde1b18bc0e1424d9c860c15813 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 1 Apr 2026 02:53:54 -0700 Subject: [PATCH 4/8] test: Add query-level trigger remote evaluation integration tests Add comprehensive integration tests for multi-tenant query-level trigger evaluation covering all trigger script patterns from the design doc: - Simple threshold (hits.total.value comparison) - Aggregation value (avg agg comparison) - Boolean logic (AND and OR conditions) - Loop over hits (for loop checking _source fields) - Multiple triggers (mixed fire/no-fire) - Script error (malformed Painless triggers on error) - Large response (50 docs, non-trivial response) - Dry run (verify no alerts persisted) Each test enables the multi_tenant_trigger_eval_enabled flag, creates an index with test data, builds a monitor with the appropriate trigger script, executes via the _execute API, and verifies trigger results. Signed-off-by: Surya Sashank Nistala --- .../alerting/RemoteQueryLevelTriggerIT.kt | 328 ++++++++++++++++++ 1 file changed, 328 insertions(+) create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt new file mode 100644 index 000000000..764e3b0c3 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt @@ -0,0 +1,328 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.index.query.QueryBuilders +import org.opensearch.script.Script +import org.opensearch.search.aggregations.AggregationBuilders +import org.opensearch.search.builder.SearchSourceBuilder +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit + +/** + * Integration tests for query-level trigger evaluation with the multi-tenant trigger eval flag enabled. + * These tests verify that trigger scripts are correctly evaluated remotely via filter aggregations + * on the customer's cluster instead of locally via ScriptService. + */ +class RemoteQueryLevelTriggerIT : AlertingRestTestCase() { + + private val SETTING_KEY = AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED.key + + private fun enableRemoteTriggerEval() { + client().updateSettings(SETTING_KEY, true) + } + + private fun disableRemoteTriggerEval() { + client().updateSettings(SETTING_KEY, false) + } + + /** + * Helper: create a test index with numeric and keyword fields, index docs, return index name. + */ + private fun createIndexWithDocs( + docCount: Int = 3, + fieldName: String = "cpu", + values: List = (1..docCount).toList() + ): String { + val index = createTestIndex( + randomAlphaOfLength(10).lowercase(), + """ + "properties": { + "test_strict_date_time": { "type": "date", "format": "strict_date_time" }, + "test_field": { "type": "keyword" }, + "$fieldName": { "type": "integer" } + } + """ + ) + val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(twoMinsAgo) + values.forEachIndexed { i, v -> + indexDoc(index, (i + 1).toString(), """{ "test_strict_date_time": "$testTime", "test_field": "val_$i", "$fieldName": $v }""") + } + return index + } + + private fun buildInput(index: String, aggs: SearchSourceBuilder.() -> Unit = {}): SearchInput { + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val ssb = SearchSourceBuilder().query(query) + ssb.aggs() + return SearchInput(indices = listOf(index), query = ssb) + } + + // ---- Tests ---- + + fun `test multi tenant trigger eval simple threshold`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 5) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 3" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval simple threshold not triggered`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 2) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 100" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(false, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval aggregation value`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(values = listOf(80, 90, 95)) + val input = buildInput(index) { + aggregation(AggregationBuilders.avg("avg_cpu").field("cpu")) + } + val triggerScript = "return ctx.results[0].aggregations.avg_cpu.value > 80" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval boolean logic`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 5, values = listOf(10, 20, 30, 40, 50)) + val input = buildInput(index) { + aggregation(AggregationBuilders.avg("avg_cpu").field("cpu")) + } + // Both conditions true: total > 3 AND avg > 20 + val triggerScript = "return ctx.results[0].hits.total.value > 3 && ctx.results[0].aggregations.avg_cpu.value > 20" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval boolean logic or`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 2) + val input = buildInput(index) + // First condition false (total not > 100), second true (total > 0) + val triggerScript = "return ctx.results[0].hits.total.value > 100 || ctx.results[0].hits.total.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval loop over hits`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(values = listOf(10, 95, 20)) + val input = buildInput(index) + // Loop over hits checking if any cpu value > 90 + val triggerScript = """ + for (hit in ctx.results[0].hits.hits) { + if (hit._source.cpu > 90) { + return true; + } + } + return false; + """.trimIndent() + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval multiple triggers`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 5, values = listOf(10, 20, 30, 40, 50)) + val input = buildInput(index) { + aggregation(AggregationBuilders.avg("avg_cpu").field("cpu")) + } + + // Trigger 1: fires (total > 3) + val trigger1 = randomQueryLevelTrigger( + condition = Script("return ctx.results[0].hits.total.value > 3"), + actions = emptyList() + ) + // Trigger 2: does not fire (total > 100) + val trigger2 = randomQueryLevelTrigger( + condition = Script("return ctx.results[0].hits.total.value > 100"), + actions = emptyList() + ) + // Trigger 3: fires (avg > 20) + val trigger3 = randomQueryLevelTrigger( + condition = Script("return ctx.results[0].aggregations.avg_cpu.value > 20"), + actions = emptyList() + ) + + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger1, trigger2, trigger3)) + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResults = output.objectMap("trigger_results") + assertEquals(true, triggerResults.objectMap(trigger1.id)["triggered"].toString().toBoolean()) + assertEquals(false, triggerResults.objectMap(trigger2.id)["triggered"].toString().toBoolean()) + assertEquals(true, triggerResults.objectMap(trigger3.id)["triggered"].toString().toBoolean()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval script error`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 1) + val input = buildInput(index) + // Malformed script — references a field that doesn't exist in the response + val triggerScript = "return ctx.results[0].nonexistent.field.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = createMonitor( + randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + ) + + val response = executeMonitor(monitor.id) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + // On error, trigger defaults to true (fail-open) so user gets notified + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval large response`() { + enableRemoteTriggerEval() + try { + val index = createTestIndex( + randomAlphaOfLength(10).lowercase(), + """ + "properties": { + "test_strict_date_time": { "type": "date", "format": "strict_date_time" }, + "test_field": { "type": "keyword" }, + "cpu": { "type": "integer" } + } + """ + ) + val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(twoMinsAgo) + // Index 50 docs to produce a non-trivial response + for (i in 1..50) { + indexDoc(index, i.toString(), """{ "test_strict_date_time": "$testTime", "test_field": "val_$i", "cpu": $i }""") + } + + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 40" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval dry run`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 3) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + + // Dryrun should not persist alerts — monitor was never saved so no ID + val alerts = searchAlerts(monitor) + assertEquals("Alert should not be saved for dryrun", 0, alerts.size) + } finally { + disableRemoteTriggerEval() + } + } + + @Suppress("UNCHECKED_CAST") + private fun Map.objectMap(key: String): Map> { + return this[key] as Map> + } +} From 90cfb6cc70223532575c833687bacb2dbb789397 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 1 Apr 2026 07:59:26 -0700 Subject: [PATCH 5/8] test: Add query-level trigger regression tests for flag=false MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 3 integration tests to RemoteQueryLevelTriggerIT verifying that existing query-level alerting behavior is preserved when the multi_tenant_trigger_eval_enabled flag is disabled: - test query level trigger flag disabled: monitor executes correctly via ScriptService path when flag is off (default) - test query level trigger flag default is false: verify setting defaults to false via cluster settings REST API - test query level trigger toggle flag during execution: both ScriptService and remote eval paths produce correct results when flag is toggled between executions Also adds includeDefaults param to AlertingRestTestCase.getSettings() helper to support querying default cluster settings. 🤖 Assisted by the code-assist SOP Signed-off-by: Surya Sashank Nistala --- .../alerting/AlertingRestTestCase.kt | 9 ++- .../alerting/RemoteQueryLevelTriggerIT.kt | 62 +++++++++++++++++++ 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 70a3d49bd..77ff20006 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1400,11 +1400,10 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return response.asMap() } - fun RestClient.getSettings(): Map { - val response = this.makeRequest( - "GET", - "_cluster/settings?flat_settings=true" - ) + fun RestClient.getSettings(includeDefaults: Boolean = false): Map { + var url = "_cluster/settings?flat_settings=true" + if (includeDefaults) url += "&include_defaults=true" + val response = this.makeRequest("GET", url) assertEquals(RestStatus.OK, response.restStatus()) return response.asMap() } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt index 764e3b0c3..f0692070c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt @@ -321,6 +321,68 @@ class RemoteQueryLevelTriggerIT : AlertingRestTestCase() { } } + // ---- Regression tests (flag=false) ---- + + fun `test query level trigger flag disabled`() { + // Flag is false by default — do NOT enable it + val index = createIndexWithDocs(docCount = 3) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } + + @Suppress("UNCHECKED_CAST") + fun `test query level trigger flag default is false`() { + val settings = client().getSettings(includeDefaults = true) + val defaults = settings["defaults"] as Map + assertEquals("false", defaults[SETTING_KEY].toString()) + } + + fun `test query level trigger toggle flag during execution`() { + val index = createIndexWithDocs(docCount = 3) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = createMonitor( + randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + ) + + try { + // Execute with flag=false (ScriptService path) + val response1 = executeMonitor(monitor.id) + val output1 = entityAsMap(response1) + val result1 = output1.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, result1["triggered"].toString().toBoolean()) + assertTrue("Unexpected error with flag off", result1["error"]?.toString().isNullOrEmpty()) + + // Toggle flag to true (remote eval path) + enableRemoteTriggerEval() + val response2 = executeMonitor(monitor.id) + val output2 = entityAsMap(response2) + val result2 = output2.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, result2["triggered"].toString().toBoolean()) + assertTrue("Unexpected error with flag on", result2["error"]?.toString().isNullOrEmpty()) + + // Toggle flag back to false (ScriptService path again) + disableRemoteTriggerEval() + val response3 = executeMonitor(monitor.id) + val output3 = entityAsMap(response3) + val result3 = output3.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, result3["triggered"].toString().toBoolean()) + assertTrue("Unexpected error after toggling back", result3["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + @Suppress("UNCHECKED_CAST") private fun Map.objectMap(key: String): Map> { return this[key] as Map> From ae379330a70b72509a9e3a2a6f4a8a6960decd01 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 1 Apr 2026 08:34:18 -0700 Subject: [PATCH 6/8] refactor: Rename trigger agg prefix from _oasis_trigger_ to _query_trigger_ Replace internal jargon in the filter aggregation key prefix used by RemoteQueryLevelTriggerEvaluator. The prefix _query_trigger_ better describes its scope (query-level monitor triggers). Signed-off-by: Surya Sashank Nistala --- .../trigger/RemoteQueryLevelTriggerEvaluator.kt | 2 +- .../RemoteQueryLevelTriggerEvaluatorTests.kt | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt index fe7d89b83..f71af1566 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt @@ -32,7 +32,7 @@ import org.opensearch.transport.client.Client object RemoteQueryLevelTriggerEvaluator { private val logger = LogManager.getLogger(javaClass) - private const val TRIGGER_AGG_PREFIX = "_oasis_trigger_" + private const val TRIGGER_AGG_PREFIX = "_query_trigger_" /** * Evaluates all triggers for a query-level monitor by sending a filter-agg request diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt index b073f48ce..49b769ba0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt @@ -23,7 +23,7 @@ class RemoteQueryLevelTriggerEvaluatorTests : OpenSearchTestCase() { assertNotNull(request) val requestString = request.toString() - assertTrue("Should contain trigger agg", requestString.contains("_oasis_trigger_trigger-1")) + assertTrue("Should contain trigger agg", requestString.contains("_query_trigger_trigger-1")) assertTrue("Should contain rewritten script", requestString.contains("params.results_0")) } @@ -37,15 +37,15 @@ class RemoteQueryLevelTriggerEvaluatorTests : OpenSearchTestCase() { val request = RemoteQueryLevelTriggerEvaluator.buildEvalSearchSource(triggers, searchResponse) val requestString = request.toString() - assertTrue("Should contain t1 agg", requestString.contains("_oasis_trigger_t1")) - assertTrue("Should contain t2 agg", requestString.contains("_oasis_trigger_t2")) - assertTrue("Should contain t3 agg", requestString.contains("_oasis_trigger_t3")) + assertTrue("Should contain t1 agg", requestString.contains("_query_trigger_t1")) + assertTrue("Should contain t2 agg", requestString.contains("_query_trigger_t2")) + assertTrue("Should contain t3 agg", requestString.contains("_query_trigger_t3")) } fun `test parse eval response triggered`() { val triggerIds = listOf("trigger-1") val aggResults = mapOf( - "_oasis_trigger_trigger-1" to mapOf("doc_count" to 5) + "_query_trigger_trigger-1" to mapOf("doc_count" to 5) ) val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) @@ -55,7 +55,7 @@ class RemoteQueryLevelTriggerEvaluatorTests : OpenSearchTestCase() { fun `test parse eval response not triggered`() { val triggerIds = listOf("trigger-1") val aggResults = mapOf( - "_oasis_trigger_trigger-1" to mapOf("doc_count" to 0) + "_query_trigger_trigger-1" to mapOf("doc_count" to 0) ) val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) @@ -65,8 +65,8 @@ class RemoteQueryLevelTriggerEvaluatorTests : OpenSearchTestCase() { fun `test parse eval response multiple triggers mixed`() { val triggerIds = listOf("t1", "t2") val aggResults = mapOf( - "_oasis_trigger_t1" to mapOf("doc_count" to 3), - "_oasis_trigger_t2" to mapOf("doc_count" to 0) + "_query_trigger_t1" to mapOf("doc_count" to 3), + "_query_trigger_t2" to mapOf("doc_count" to 0) ) val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) From bdfa728875ce715faf0cb8dbea449fec14a8947f Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 1 Apr 2026 08:56:05 -0700 Subject: [PATCH 7/8] fix(trigger): Change remote trigger eval error handling to fail-closed On evaluation error or missing trigger results, default to triggered=false instead of triggered=true. Errors are still logged at WARN/ERROR level and surfaced in the trigger run result. This prevents false-positive alerts when the remote evaluation encounters issues. Signed-off-by: Surya Sashank Nistala --- .../opensearch/alerting/QueryLevelMonitorRunner.kt | 2 +- .../trigger/RemoteQueryLevelTriggerEvaluator.kt | 14 +++++++------- .../alerting/util/TriggerScriptRewriter.kt | 4 +--- .../alerting/RemoteQueryLevelTriggerIT.kt | 8 +++++--- .../RemoteQueryLevelTriggerEvaluatorTests.kt | 4 ++-- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index d5af94374..ee6c44ee5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -121,7 +121,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { val triggerResult = if (remoteTriggerResults != null) { // Use pre-computed remote evaluation results remoteTriggerResults[trigger.id] - ?: QueryLevelTriggerRunResult(trigger.name, true, null) + ?: QueryLevelTriggerRunResult(trigger.name, false, null) } else { when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) { Monitor.MonitorType.QUERY_LEVEL_MONITOR -> diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt index f71af1566..cb2dfac8b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt @@ -23,9 +23,9 @@ import org.opensearch.transport.client.Client /** * Evaluates query-level triggers remotely on the customer's cluster via filter aggregations. * - * Instead of executing Painless scripts on the multi-tenant Oasis node, this evaluator: + * This evaluator: * 1. Rewrites trigger scripts from `ctx.results[0]` to `params.results_0` - * 2. Sends a single search request with one filter agg per trigger + * 2. Sends a single search request with one filter agg each per trigger * 3. Passes the full search response from Call 1 as `params.results_0` * 4. Reads `doc_count > 0` per trigger agg to determine if the trigger fires */ @@ -64,13 +64,13 @@ object RemoteQueryLevelTriggerEvaluator { val parsedResults = parseEvalResponse(triggerIds, aggs) triggers.associate { trigger -> - val triggered = parsedResults[trigger.id] ?: true + val triggered = parsedResults[trigger.id] ?: false trigger.id to QueryLevelTriggerRunResult(trigger.name, triggered, null) } } catch (e: Exception) { logger.error("Error evaluating triggers remotely", e) - // On error, trigger all triggers so the user gets notified (matches existing error handling) - triggers.associate { it.id to QueryLevelTriggerRunResult(it.name, true, e) } + // On error, fail closed — don't trigger alerts but surface the error + triggers.associate { it.id to QueryLevelTriggerRunResult(it.name, false, e) } } } @@ -125,8 +125,8 @@ object RemoteQueryLevelTriggerEvaluator { val aggKey = "$TRIGGER_AGG_PREFIX$triggerId" val aggResult = aggResults[aggKey] if (aggResult == null) { - logger.warn("Missing evaluation result for trigger $triggerId, defaulting to triggered") - true + logger.warn("Missing evaluation result for trigger $triggerId, defaulting to not triggered") + false } else { val docCount = (aggResult["doc_count"] as? Number)?.toLong() ?: 0L docCount > 0 diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt index 0c59ab114..4470ee345 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt @@ -7,9 +7,7 @@ package org.opensearch.alerting.util /** * Rewrites Painless trigger scripts for remote evaluation on the customer's cluster. - * - * In multi-tenant mode, trigger scripts cannot be executed on the Oasis node. - * Instead, they are sent to the customer's cluster via a filter aggregation. + * They are sent to the customer's cluster via a filter aggregation. * The script context on the customer's cluster uses `params` instead of `ctx`, * so all references to `ctx.results[0]` must be replaced with `params.results_0`. */ diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt index f0692070c..7e4857f57 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt @@ -254,9 +254,11 @@ class RemoteQueryLevelTriggerIT : AlertingRestTestCase() { val response = executeMonitor(monitor.id) val output = entityAsMap(response) - val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) - // On error, trigger defaults to true (fail-open) so user gets notified - assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + @Suppress("UNCHECKED_CAST") + val triggerResult = (output.objectMap("trigger_results")[trigger.id] as Map) + // On error, trigger defaults to false (fail-closed) with error surfaced + assertEquals(false, triggerResult["triggered"].toString().toBoolean()) + assertFalse("Expected error to be set", triggerResult["error"]?.toString().isNullOrEmpty()) } finally { disableRemoteTriggerEval() } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt index 49b769ba0..ef6671508 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt @@ -74,11 +74,11 @@ class RemoteQueryLevelTriggerEvaluatorTests : OpenSearchTestCase() { assertFalse("t2 should not be triggered", results["t2"]!!) } - fun `test parse eval response missing trigger defaults to triggered`() { + fun `test parse eval response missing trigger defaults to not triggered`() { val triggerIds = listOf("trigger-1") val aggResults = emptyMap>() val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) - assertTrue("Missing trigger should default to triggered (fail-open)", results["trigger-1"]!!) + assertFalse("Missing trigger should default to not triggered (fail-closed)", results["trigger-1"]!!) } } From 007a4a7b97f9bc8005c200148a2d216850757da5 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 2 Apr 2026 00:06:49 -0700 Subject: [PATCH 8/8] docs: Replace 'customer' with 'user' in code comments Use open source terminology consistently throughout the codebase. Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/QueryLevelMonitorRunner.kt | 2 +- .../alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt | 6 +++--- .../org/opensearch/alerting/util/TriggerScriptRewriter.kt | 6 +++--- .../org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index ee6c44ee5..197b0103b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -80,7 +80,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { val triggerResults = mutableMapOf() // When multi-tenant trigger eval is enabled, batch-evaluate all query-level triggers - // remotely on the customer's cluster instead of running Painless locally + // remotely on the user's cluster instead of running Painless locally val remoteTriggerResults = if ( monitorCtx.multiTenantTriggerEvalEnabled && Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.QUERY_LEVEL_MONITOR && diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt index cb2dfac8b..b1812e6f2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt @@ -21,7 +21,7 @@ import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.transport.client.Client /** - * Evaluates query-level triggers remotely on the customer's cluster via filter aggregations. + * Evaluates query-level triggers remotely on the user's cluster via filter aggregations. * * This evaluator: * 1. Rewrites trigger scripts from `ctx.results[0]` to `params.results_0` @@ -36,9 +36,9 @@ object RemoteQueryLevelTriggerEvaluator { /** * Evaluates all triggers for a query-level monitor by sending a filter-agg request - * to the customer's cluster. + * to the user's cluster. * - * @param client The client connected to the customer's cluster + * @param client The client connected to the user's cluster * @param indices The indices to target (same as the monitor's search input) * @param triggers The query-level triggers to evaluate * @param searchResponse The full search response from the monitor's query (Call 1) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt index 4470ee345..8a04ffca1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt @@ -6,9 +6,9 @@ package org.opensearch.alerting.util /** - * Rewrites Painless trigger scripts for remote evaluation on the customer's cluster. - * They are sent to the customer's cluster via a filter aggregation. - * The script context on the customer's cluster uses `params` instead of `ctx`, + * Rewrites Painless trigger scripts for remote evaluation on the user's cluster. + * They are sent to the user's cluster via a filter aggregation. + * The script context on the user's cluster uses `params` instead of `ctx`, * so all references to `ctx.results[0]` must be replaced with `params.results_0`. */ object TriggerScriptRewriter { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt index 7e4857f57..036a27e9a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt @@ -18,7 +18,7 @@ import java.time.temporal.ChronoUnit /** * Integration tests for query-level trigger evaluation with the multi-tenant trigger eval flag enabled. * These tests verify that trigger scripts are correctly evaluated remotely via filter aggregations - * on the customer's cluster instead of locally via ScriptService. + * on the user's cluster instead of locally via ScriptService. */ class RemoteQueryLevelTriggerIT : AlertingRestTestCase() {