diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index a2a01e645..9b02e798e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -433,7 +433,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.COMMENTS_MAX_CONTENT_SIZE, AlertingSettings.MAX_COMMENTS_PER_ALERT, AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION, - AlertingSettings.NOTIFICATION_CONTEXT_RESULTS_ALLOWED_ROLES + AlertingSettings.NOTIFICATION_CONTEXT_RESULTS_ALLOWED_ROLES, + AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index a890ec1a6..ba12100ba 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -71,4 +71,5 @@ data class MonitorRunnerExecutionContext( AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, @Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES, @Volatile var lockService: LockService? = null, + @Volatile var multiTenantTriggerEvalEnabled: Boolean = false, ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index f8703aec2..8e4c44760 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -259,6 +259,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.totalNodesFanOut = it } + monitorCtx.multiTenantTriggerEvalEnabled = AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED) { + monitorCtx.multiTenantTriggerEvalEnabled = it + } + return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index fec6f0bf4..197b0103b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -11,6 +11,7 @@ import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.trigger.RemoteQueryLevelTriggerEvaluator import org.opensearch.alerting.util.CommentsUtils import org.opensearch.alerting.util.isADMonitor import org.opensearch.commons.alerting.model.Alert @@ -18,6 +19,7 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult +import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.WorkflowRunContext import org.opensearch.transport.TransportService import java.time.Instant @@ -77,6 +79,25 @@ object QueryLevelMonitorRunner : MonitorRunner() { val updatedAlerts = mutableListOf() val triggerResults = mutableMapOf() + // When multi-tenant trigger eval is enabled, batch-evaluate all query-level triggers + // remotely on the user's cluster instead of running Painless locally + val remoteTriggerResults = if ( + monitorCtx.multiTenantTriggerEvalEnabled && + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.QUERY_LEVEL_MONITOR && + monitorResult.inputResults.results.isNotEmpty() + ) { + val searchInput = monitor.inputs[0] as SearchInput + val queryLevelTriggers = monitor.triggers.filterIsInstance() + RemoteQueryLevelTriggerEvaluator.evaluate( + monitorCtx.client!!, + searchInput.indices, + queryLevelTriggers, + monitorResult.inputResults.results[0] + ) + } else { + null + } + val maxComments = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION) val alertsToExecuteActionsForIds = currentAlerts.mapNotNull { it.value }.map { it.id } val allAlertsComments = CommentsUtils.getCommentsForAlertNotification( @@ -97,19 +118,27 @@ object QueryLevelMonitorRunner : MonitorRunner() { currentAlertContext, monitorCtx.clusterService!!.clusterSettings ) - val triggerResult = when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) { - Monitor.MonitorType.QUERY_LEVEL_MONITOR -> - monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) - Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { - val remoteMonitoringEnabled = - monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED) - logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) - if (remoteMonitoringEnabled) - monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!) - else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + val triggerResult = if (remoteTriggerResults != null) { + // Use pre-computed remote evaluation results + remoteTriggerResults[trigger.id] + ?: QueryLevelTriggerRunResult(trigger.name, false, null) + } else { + when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) { + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> + monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { + val remoteMonitoringEnabled = + monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED) + logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + if (remoteMonitoringEnabled) + monitorCtx.triggerService!!.runClusterMetricsTrigger( + monitor, trigger, triggerCtx, monitorCtx.clusterService!! + ) + else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + } + else -> + throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.") } - else -> - throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.") } triggerResults[trigger.id] = triggerResult diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 2659ae74c..6130522ba 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -311,5 +311,11 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val MULTI_TENANT_TRIGGER_EVAL_ENABLED = Setting.boolSetting( + "plugins.alerting.multi_tenant_trigger_eval_enabled", + false, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt new file mode 100644 index 000000000..b1812e6f2 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluator.kt @@ -0,0 +1,139 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.trigger + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.opensearchapi.convertToMap +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.util.TriggerScriptRewriter +import org.opensearch.commons.alerting.model.QueryLevelTrigger +import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult +import org.opensearch.index.query.QueryBuilders +import org.opensearch.script.Script +import org.opensearch.script.ScriptType +import org.opensearch.search.aggregations.AggregationBuilders +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.transport.client.Client + +/** + * Evaluates query-level triggers remotely on the user's cluster via filter aggregations. + * + * This evaluator: + * 1. Rewrites trigger scripts from `ctx.results[0]` to `params.results_0` + * 2. Sends a single search request with one filter agg each per trigger + * 3. Passes the full search response from Call 1 as `params.results_0` + * 4. Reads `doc_count > 0` per trigger agg to determine if the trigger fires + */ +object RemoteQueryLevelTriggerEvaluator { + + private val logger = LogManager.getLogger(javaClass) + private const val TRIGGER_AGG_PREFIX = "_query_trigger_" + + /** + * Evaluates all triggers for a query-level monitor by sending a filter-agg request + * to the user's cluster. + * + * @param client The client connected to the user's cluster + * @param indices The indices to target (same as the monitor's search input) + * @param triggers The query-level triggers to evaluate + * @param searchResponse The full search response from the monitor's query (Call 1) + * @return Map of trigger ID to QueryLevelTriggerRunResult + */ + suspend fun evaluate( + client: Client, + indices: List, + triggers: List, + searchResponse: Map + ): Map { + val triggerData = triggers.map { TriggerData(it.id, it.name, it.condition.idOrCode) } + val searchSource = buildEvalSearchSource(triggerData, searchResponse) + + return try { + val evalRequest = SearchRequest(*indices.toTypedArray()).source(searchSource) + val evalResponse: SearchResponse = client.suspendUntil { client.search(evalRequest, it) } + val evalMap = evalResponse.convertToMap() + + @Suppress("UNCHECKED_CAST") + val aggs = evalMap["aggregations"] as? Map> ?: emptyMap() + val triggerIds = triggerData.map { it.id } + val parsedResults = parseEvalResponse(triggerIds, aggs) + + triggers.associate { trigger -> + val triggered = parsedResults[trigger.id] ?: false + trigger.id to QueryLevelTriggerRunResult(trigger.name, triggered, null) + } + } catch (e: Exception) { + logger.error("Error evaluating triggers remotely", e) + // On error, fail closed — don't trigger alerts but surface the error + triggers.associate { it.id to QueryLevelTriggerRunResult(it.name, false, e) } + } + } + + /** + * Builds the search source for the evaluation request. + * Each trigger becomes a filter aggregation with a script query. + */ + fun buildEvalSearchSource( + triggers: List, + searchResponse: Map + ): SearchSourceBuilder { + val searchSource = SearchSourceBuilder().size(0) + + for (trigger in triggers) { + val (id, script) = when (trigger) { + is TriggerData -> trigger.id to trigger.script + else -> { + val t = trigger as? Map<*, *> + ?: throw IllegalArgumentException("Unsupported trigger type") + t["id"].toString() to t["script"].toString() + } + } + + val rewrittenScript = TriggerScriptRewriter.rewriteScript(script) + val scriptObj = Script( + ScriptType.INLINE, + "painless", + rewrittenScript, + mapOf("results_0" to searchResponse) + ) + val filterAgg = AggregationBuilders.filter( + "$TRIGGER_AGG_PREFIX$id", + QueryBuilders.scriptQuery(scriptObj) + ) + searchSource.aggregation(filterAgg) + } + + return searchSource + } + + /** + * Parses the evaluation response to determine which triggers fired. + * `doc_count > 0` means the trigger condition was true. + * Missing triggers default to triggered (fail-open for safety). + */ + @Suppress("UNCHECKED_CAST") + fun parseEvalResponse( + triggerIds: List, + aggResults: Map> + ): Map { + return triggerIds.associateWith { triggerId -> + val aggKey = "$TRIGGER_AGG_PREFIX$triggerId" + val aggResult = aggResults[aggKey] + if (aggResult == null) { + logger.warn("Missing evaluation result for trigger $triggerId, defaulting to not triggered") + false + } else { + val docCount = (aggResult["doc_count"] as? Number)?.toLong() ?: 0L + docCount > 0 + } + } + } + + /** Internal data class for decoupling trigger data from the full model in buildEvalSearchSource */ + data class TriggerData(val id: String, val name: String, val script: String) +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt new file mode 100644 index 000000000..8a04ffca1 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/TriggerScriptRewriter.kt @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +/** + * Rewrites Painless trigger scripts for remote evaluation on the user's cluster. + * They are sent to the user's cluster via a filter aggregation. + * The script context on the user's cluster uses `params` instead of `ctx`, + * so all references to `ctx.results[0]` must be replaced with `params.results_0`. + */ +object TriggerScriptRewriter { + + private const val CTX_RESULTS_0 = "ctx.results[0]" + private const val PARAMS_RESULTS_0 = "params.results_0" + + /** + * Replaces all occurrences of `ctx.results[0]` with `params.results_0` in the given script source. + * MONITOR_MAX_INPUTS = 1, so only `ctx.results[0]` ever exists. + */ + fun rewriteScript(source: String): String { + return source.replace(CTX_RESULTS_0, PARAMS_RESULTS_0) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 70a3d49bd..77ff20006 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1400,11 +1400,10 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return response.asMap() } - fun RestClient.getSettings(): Map { - val response = this.makeRequest( - "GET", - "_cluster/settings?flat_settings=true" - ) + fun RestClient.getSettings(includeDefaults: Boolean = false): Map { + var url = "_cluster/settings?flat_settings=true" + if (includeDefaults) url += "&include_defaults=true" + val response = this.makeRequest("GET", url) assertEquals(RestStatus.OK, response.restStatus()) return response.asMap() } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt new file mode 100644 index 000000000..036a27e9a --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/RemoteQueryLevelTriggerIT.kt @@ -0,0 +1,392 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.index.query.QueryBuilders +import org.opensearch.script.Script +import org.opensearch.search.aggregations.AggregationBuilders +import org.opensearch.search.builder.SearchSourceBuilder +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit + +/** + * Integration tests for query-level trigger evaluation with the multi-tenant trigger eval flag enabled. + * These tests verify that trigger scripts are correctly evaluated remotely via filter aggregations + * on the user's cluster instead of locally via ScriptService. + */ +class RemoteQueryLevelTriggerIT : AlertingRestTestCase() { + + private val SETTING_KEY = AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED.key + + private fun enableRemoteTriggerEval() { + client().updateSettings(SETTING_KEY, true) + } + + private fun disableRemoteTriggerEval() { + client().updateSettings(SETTING_KEY, false) + } + + /** + * Helper: create a test index with numeric and keyword fields, index docs, return index name. + */ + private fun createIndexWithDocs( + docCount: Int = 3, + fieldName: String = "cpu", + values: List = (1..docCount).toList() + ): String { + val index = createTestIndex( + randomAlphaOfLength(10).lowercase(), + """ + "properties": { + "test_strict_date_time": { "type": "date", "format": "strict_date_time" }, + "test_field": { "type": "keyword" }, + "$fieldName": { "type": "integer" } + } + """ + ) + val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(twoMinsAgo) + values.forEachIndexed { i, v -> + indexDoc(index, (i + 1).toString(), """{ "test_strict_date_time": "$testTime", "test_field": "val_$i", "$fieldName": $v }""") + } + return index + } + + private fun buildInput(index: String, aggs: SearchSourceBuilder.() -> Unit = {}): SearchInput { + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val ssb = SearchSourceBuilder().query(query) + ssb.aggs() + return SearchInput(indices = listOf(index), query = ssb) + } + + // ---- Tests ---- + + fun `test multi tenant trigger eval simple threshold`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 5) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 3" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval simple threshold not triggered`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 2) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 100" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(false, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval aggregation value`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(values = listOf(80, 90, 95)) + val input = buildInput(index) { + aggregation(AggregationBuilders.avg("avg_cpu").field("cpu")) + } + val triggerScript = "return ctx.results[0].aggregations.avg_cpu.value > 80" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval boolean logic`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 5, values = listOf(10, 20, 30, 40, 50)) + val input = buildInput(index) { + aggregation(AggregationBuilders.avg("avg_cpu").field("cpu")) + } + // Both conditions true: total > 3 AND avg > 20 + val triggerScript = "return ctx.results[0].hits.total.value > 3 && ctx.results[0].aggregations.avg_cpu.value > 20" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval boolean logic or`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 2) + val input = buildInput(index) + // First condition false (total not > 100), second true (total > 0) + val triggerScript = "return ctx.results[0].hits.total.value > 100 || ctx.results[0].hits.total.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval loop over hits`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(values = listOf(10, 95, 20)) + val input = buildInput(index) + // Loop over hits checking if any cpu value > 90 + val triggerScript = """ + for (hit in ctx.results[0].hits.hits) { + if (hit._source.cpu > 90) { + return true; + } + } + return false; + """.trimIndent() + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval multiple triggers`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 5, values = listOf(10, 20, 30, 40, 50)) + val input = buildInput(index) { + aggregation(AggregationBuilders.avg("avg_cpu").field("cpu")) + } + + // Trigger 1: fires (total > 3) + val trigger1 = randomQueryLevelTrigger( + condition = Script("return ctx.results[0].hits.total.value > 3"), + actions = emptyList() + ) + // Trigger 2: does not fire (total > 100) + val trigger2 = randomQueryLevelTrigger( + condition = Script("return ctx.results[0].hits.total.value > 100"), + actions = emptyList() + ) + // Trigger 3: fires (avg > 20) + val trigger3 = randomQueryLevelTrigger( + condition = Script("return ctx.results[0].aggregations.avg_cpu.value > 20"), + actions = emptyList() + ) + + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger1, trigger2, trigger3)) + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResults = output.objectMap("trigger_results") + assertEquals(true, triggerResults.objectMap(trigger1.id)["triggered"].toString().toBoolean()) + assertEquals(false, triggerResults.objectMap(trigger2.id)["triggered"].toString().toBoolean()) + assertEquals(true, triggerResults.objectMap(trigger3.id)["triggered"].toString().toBoolean()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval script error`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 1) + val input = buildInput(index) + // Malformed script — references a field that doesn't exist in the response + val triggerScript = "return ctx.results[0].nonexistent.field.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = createMonitor( + randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + ) + + val response = executeMonitor(monitor.id) + val output = entityAsMap(response) + + @Suppress("UNCHECKED_CAST") + val triggerResult = (output.objectMap("trigger_results")[trigger.id] as Map) + // On error, trigger defaults to false (fail-closed) with error surfaced + assertEquals(false, triggerResult["triggered"].toString().toBoolean()) + assertFalse("Expected error to be set", triggerResult["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval large response`() { + enableRemoteTriggerEval() + try { + val index = createTestIndex( + randomAlphaOfLength(10).lowercase(), + """ + "properties": { + "test_strict_date_time": { "type": "date", "format": "strict_date_time" }, + "test_field": { "type": "keyword" }, + "cpu": { "type": "integer" } + } + """ + ) + val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(twoMinsAgo) + // Index 50 docs to produce a non-trivial response + for (i in 1..50) { + indexDoc(index, i.toString(), """{ "test_strict_date_time": "$testTime", "test_field": "val_$i", "cpu": $i }""") + } + + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 40" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + fun `test multi tenant trigger eval dry run`() { + enableRemoteTriggerEval() + try { + val index = createIndexWithDocs(docCount = 3) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + + // Dryrun should not persist alerts — monitor was never saved so no ID + val alerts = searchAlerts(monitor) + assertEquals("Alert should not be saved for dryrun", 0, alerts.size) + } finally { + disableRemoteTriggerEval() + } + } + + // ---- Regression tests (flag=false) ---- + + fun `test query level trigger flag disabled`() { + // Flag is false by default — do NOT enable it + val index = createIndexWithDocs(docCount = 3) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + val triggerResult = output.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, triggerResult["triggered"].toString().toBoolean()) + assertTrue("Unexpected error", triggerResult["error"]?.toString().isNullOrEmpty()) + } + + @Suppress("UNCHECKED_CAST") + fun `test query level trigger flag default is false`() { + val settings = client().getSettings(includeDefaults = true) + val defaults = settings["defaults"] as Map + assertEquals("false", defaults[SETTING_KEY].toString()) + } + + fun `test query level trigger toggle flag during execution`() { + val index = createIndexWithDocs(docCount = 3) + val input = buildInput(index) + val triggerScript = "return ctx.results[0].hits.total.value > 0" + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), actions = emptyList()) + val monitor = createMonitor( + randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) + ) + + try { + // Execute with flag=false (ScriptService path) + val response1 = executeMonitor(monitor.id) + val output1 = entityAsMap(response1) + val result1 = output1.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, result1["triggered"].toString().toBoolean()) + assertTrue("Unexpected error with flag off", result1["error"]?.toString().isNullOrEmpty()) + + // Toggle flag to true (remote eval path) + enableRemoteTriggerEval() + val response2 = executeMonitor(monitor.id) + val output2 = entityAsMap(response2) + val result2 = output2.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, result2["triggered"].toString().toBoolean()) + assertTrue("Unexpected error with flag on", result2["error"]?.toString().isNullOrEmpty()) + + // Toggle flag back to false (ScriptService path again) + disableRemoteTriggerEval() + val response3 = executeMonitor(monitor.id) + val output3 = entityAsMap(response3) + val result3 = output3.objectMap("trigger_results").objectMap(trigger.id) + assertEquals(true, result3["triggered"].toString().toBoolean()) + assertTrue("Unexpected error after toggling back", result3["error"]?.toString().isNullOrEmpty()) + } finally { + disableRemoteTriggerEval() + } + } + + @Suppress("UNCHECKED_CAST") + private fun Map.objectMap(key: String): Map> { + return this[key] as Map> + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/settings/AlertingSettingsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/settings/AlertingSettingsTests.kt index 6ee8c4997..8f18ed531 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/settings/AlertingSettingsTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/settings/AlertingSettingsTests.kt @@ -56,6 +56,19 @@ class AlertingSettingsTests : OpenSearchTestCase() { ) } + fun `test multi tenant trigger eval setting defaults to false`() { + val value = AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED.get(Settings.EMPTY) + assertFalse("multi_tenant_trigger_eval_enabled should default to false", value) + } + + fun `test multi tenant trigger eval setting is registered`() { + val settings = plugin.settings + assertTrue( + "MULTI_TENANT_TRIGGER_EVAL_ENABLED not registered", + settings.contains(AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED) + ) + } + fun `test all opensearch settings returned`() { val settings = plugin.settings assertTrue( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt new file mode 100644 index 000000000..ef6671508 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/trigger/RemoteQueryLevelTriggerEvaluatorTests.kt @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.trigger + +import org.opensearch.alerting.trigger.RemoteQueryLevelTriggerEvaluator.TriggerData +import org.opensearch.test.OpenSearchTestCase + +class RemoteQueryLevelTriggerEvaluatorTests : OpenSearchTestCase() { + + fun `test build eval request with single trigger`() { + val triggers = listOf( + TriggerData( + id = "trigger-1", + name = "test-trigger", + script = "ctx.results[0].hits.total.value > 0" + ) + ) + val searchResponse = mapOf("hits" to mapOf("total" to mapOf("value" to 5))) + val request = RemoteQueryLevelTriggerEvaluator.buildEvalSearchSource(triggers, searchResponse) + + assertNotNull(request) + val requestString = request.toString() + assertTrue("Should contain trigger agg", requestString.contains("_query_trigger_trigger-1")) + assertTrue("Should contain rewritten script", requestString.contains("params.results_0")) + } + + fun `test build eval request with multiple triggers`() { + val triggers = listOf( + TriggerData(id = "t1", name = "trigger-1", script = "ctx.results[0].hits.total.value > 0"), + TriggerData(id = "t2", name = "trigger-2", script = "ctx.results[0].hits.total.value > 100"), + TriggerData(id = "t3", name = "trigger-3", script = "ctx.results[0].aggregations.avg_val.value > 50") + ) + val searchResponse = mapOf("hits" to mapOf("total" to mapOf("value" to 5))) + val request = RemoteQueryLevelTriggerEvaluator.buildEvalSearchSource(triggers, searchResponse) + + val requestString = request.toString() + assertTrue("Should contain t1 agg", requestString.contains("_query_trigger_t1")) + assertTrue("Should contain t2 agg", requestString.contains("_query_trigger_t2")) + assertTrue("Should contain t3 agg", requestString.contains("_query_trigger_t3")) + } + + fun `test parse eval response triggered`() { + val triggerIds = listOf("trigger-1") + val aggResults = mapOf( + "_query_trigger_trigger-1" to mapOf("doc_count" to 5) + ) + val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) + + assertTrue("trigger-1 should be triggered", results["trigger-1"]!!) + } + + fun `test parse eval response not triggered`() { + val triggerIds = listOf("trigger-1") + val aggResults = mapOf( + "_query_trigger_trigger-1" to mapOf("doc_count" to 0) + ) + val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) + + assertFalse("trigger-1 should not be triggered", results["trigger-1"]!!) + } + + fun `test parse eval response multiple triggers mixed`() { + val triggerIds = listOf("t1", "t2") + val aggResults = mapOf( + "_query_trigger_t1" to mapOf("doc_count" to 3), + "_query_trigger_t2" to mapOf("doc_count" to 0) + ) + val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) + + assertTrue("t1 should be triggered", results["t1"]!!) + assertFalse("t2 should not be triggered", results["t2"]!!) + } + + fun `test parse eval response missing trigger defaults to not triggered`() { + val triggerIds = listOf("trigger-1") + val aggResults = emptyMap>() + val results = RemoteQueryLevelTriggerEvaluator.parseEvalResponse(triggerIds, aggResults) + + assertFalse("Missing trigger should default to not triggered (fail-closed)", results["trigger-1"]!!) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/TriggerScriptRewriterTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/TriggerScriptRewriterTests.kt new file mode 100644 index 000000000..b30ae8df7 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/TriggerScriptRewriterTests.kt @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.test.OpenSearchTestCase + +class TriggerScriptRewriterTests : OpenSearchTestCase() { + + fun `test rewrite simple threshold`() { + val source = "ctx.results[0].hits.total.value > 100" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("params.results_0.hits.total.value > 100", rewritten) + } + + fun `test rewrite aggregation access`() { + val source = "ctx.results[0].aggregations.avg_cpu.value > 90" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("params.results_0.aggregations.avg_cpu.value > 90", rewritten) + } + + fun `test rewrite multiple occurrences`() { + val source = "ctx.results[0].hits.total.value > 0 && ctx.results[0].hits.hits.size() > 0" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("params.results_0.hits.total.value > 0 && params.results_0.hits.hits.size() > 0", rewritten) + } + + fun `test rewrite preserves non-ctx content`() { + val source = "def x = ctx.results[0].hits.total.value; return x > 0" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("def x = params.results_0.hits.total.value; return x > 0", rewritten) + } + + fun `test rewrite with no ctx reference`() { + val source = "return true" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals("return true", rewritten) + } + + fun `test rewrite loop over hits`() { + val source = "for (def hit : ctx.results[0].hits.hits) { if (hit._source.status == 500) return true } return false" + val rewritten = TriggerScriptRewriter.rewriteScript(source) + assertEquals( + "for (def hit : params.results_0.hits.hits) { if (hit._source.status == 500) return true } return false", + rewritten + ) + } +}