Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.COMMENTS_MAX_CONTENT_SIZE,
AlertingSettings.MAX_COMMENTS_PER_ALERT,
AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION,
AlertingSettings.NOTIFICATION_CONTEXT_RESULTS_ALLOWED_ROLES
AlertingSettings.NOTIFICATION_CONTEXT_RESULTS_ALLOWED_ROLES,
AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ data class MonitorRunnerExecutionContext(
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
@Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES,
@Volatile var lockService: LockService? = null,
@Volatile var multiTenantTriggerEvalEnabled: Boolean = false,
)
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.totalNodesFanOut = it
}

monitorCtx.multiTenantTriggerEvalEnabled = AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.MULTI_TENANT_TRIGGER_EVAL_ENABLED) {
monitorCtx.multiTenantTriggerEvalEnabled = it
}

return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.trigger.RemoteQueryLevelTriggerEvaluator
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.transport.TransportService
import java.time.Instant
Expand Down Expand Up @@ -77,6 +79,25 @@ object QueryLevelMonitorRunner : MonitorRunner() {
val updatedAlerts = mutableListOf<Alert>()
val triggerResults = mutableMapOf<String, QueryLevelTriggerRunResult>()

// When multi-tenant trigger eval is enabled, batch-evaluate all query-level triggers
// remotely on the user's cluster instead of running Painless locally
val remoteTriggerResults = if (
monitorCtx.multiTenantTriggerEvalEnabled &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.QUERY_LEVEL_MONITOR &&
monitorResult.inputResults.results.isNotEmpty()
) {
val searchInput = monitor.inputs[0] as SearchInput
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Query-level monitors should always have SearchInput, a ClassCastException here would be unhandled

val queryLevelTriggers = monitor.triggers.filterIsInstance<QueryLevelTrigger>()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for a trigger type to be non-QueryLevelTrigger if the monitor type is QueryLevel?

Wondering why the filterIsInstance is needed/if it excludes any valid triggers

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well ideologically it can't be anything but query level triggers but just for sanity leaving this here to guardrail against missing this translation fi we add any newer trigger types.

I added this because there is a possilibity we need to support a DataFusion engine instead of lucene engine and they have some limitations which might force us to introduce new triggers.

RemoteQueryLevelTriggerEvaluator.evaluate(
monitorCtx.client!!,
searchInput.indices,
queryLevelTriggers,
monitorResult.inputResults.results[0]
)
} else {
null
}

val maxComments = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION)
val alertsToExecuteActionsForIds = currentAlerts.mapNotNull { it.value }.map { it.id }
val allAlertsComments = CommentsUtils.getCommentsForAlertNotification(
Expand All @@ -97,19 +118,27 @@ object QueryLevelMonitorRunner : MonitorRunner() {
currentAlertContext,
monitorCtx.clusterService!!.clusterSettings
)
val triggerResult = when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
val remoteMonitoringEnabled =
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
if (remoteMonitoringEnabled)
monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!)
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
val triggerResult = if (remoteTriggerResults != null) {
// Use pre-computed remote evaluation results
remoteTriggerResults[trigger.id]
?: QueryLevelTriggerRunResult(trigger.name, false, null)
} else {
when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
val remoteMonitoringEnabled =
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
if (remoteMonitoringEnabled)
monitorCtx.triggerService!!.runClusterMetricsTrigger(
monitor, trigger, triggerCtx, monitorCtx.clusterService!!
)
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.")
}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.")
}

triggerResults[trigger.id] = triggerResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,5 +311,11 @@ class AlertingSettings {
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val MULTI_TENANT_TRIGGER_EVAL_ENABLED = Setting.boolSetting(
"plugins.alerting.multi_tenant_trigger_eval_enabled",
false,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.trigger

import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.TriggerScriptRewriter
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.index.query.QueryBuilders
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import org.opensearch.search.aggregations.AggregationBuilders
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.transport.client.Client

/**
* Evaluates query-level triggers remotely on the user's cluster via filter aggregations.
*
* This evaluator:
* 1. Rewrites trigger scripts from `ctx.results[0]` to `params.results_0`
* 2. Sends a single search request with one filter agg each per trigger
* 3. Passes the full search response from Call 1 as `params.results_0`
* 4. Reads `doc_count > 0` per trigger agg to determine if the trigger fires
*/
object RemoteQueryLevelTriggerEvaluator {

private val logger = LogManager.getLogger(javaClass)
private const val TRIGGER_AGG_PREFIX = "_query_trigger_"

/**
* Evaluates all triggers for a query-level monitor by sending a filter-agg request
* to the user's cluster.
*
* @param client The client connected to the user's cluster
* @param indices The indices to target (same as the monitor's search input)
* @param triggers The query-level triggers to evaluate
* @param searchResponse The full search response from the monitor's query (Call 1)
* @return Map of trigger ID to QueryLevelTriggerRunResult
*/
suspend fun evaluate(
client: Client,
indices: List<String>,
triggers: List<QueryLevelTrigger>,
searchResponse: Map<String, Any>
): Map<String, QueryLevelTriggerRunResult> {
val triggerData = triggers.map { TriggerData(it.id, it.name, it.condition.idOrCode) }
val searchSource = buildEvalSearchSource(triggerData, searchResponse)

return try {
val evalRequest = SearchRequest(*indices.toTypedArray()).source(searchSource)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking:
Do we want to make use of the plugins.alerting.request_timeout or one of our other timeout-related cluster settings here?

val evalResponse: SearchResponse = client.suspendUntil { client.search(evalRequest, it) }
val evalMap = evalResponse.convertToMap()

@Suppress("UNCHECKED_CAST")
val aggs = evalMap["aggregations"] as? Map<String, Map<String, Any>> ?: emptyMap()
val triggerIds = triggerData.map { it.id }
val parsedResults = parseEvalResponse(triggerIds, aggs)

triggers.associate { trigger ->
val triggered = parsedResults[trigger.id] ?: false
trigger.id to QueryLevelTriggerRunResult(trigger.name, triggered, null)
}
} catch (e: Exception) {
logger.error("Error evaluating triggers remotely", e)
// On error, fail closed — don't trigger alerts but surface the error
triggers.associate { it.id to QueryLevelTriggerRunResult(it.name, false, e) }
}
}

/**
* Builds the search source for the evaluation request.
* Each trigger becomes a filter aggregation with a script query.
*/
fun buildEvalSearchSource(
triggers: List<Any>,
searchResponse: Map<String, Any>
): SearchSourceBuilder {
val searchSource = SearchSourceBuilder().size(0)

for (trigger in triggers) {
val (id, script) = when (trigger) {
is TriggerData -> trigger.id to trigger.script
else -> {
val t = trigger as? Map<*, *>
?: throw IllegalArgumentException("Unsupported trigger type")
t["id"].toString() to t["script"].toString()
}
}

val rewrittenScript = TriggerScriptRewriter.rewriteScript(script)
val scriptObj = Script(
ScriptType.INLINE,
"painless",
rewrittenScript,
mapOf("results_0" to searchResponse)
)
val filterAgg = AggregationBuilders.filter(
"$TRIGGER_AGG_PREFIX$id",
QueryBuilders.scriptQuery(scriptObj)
)
searchSource.aggregation(filterAgg)
}

return searchSource
}

/**
* Parses the evaluation response to determine which triggers fired.
* `doc_count > 0` means the trigger condition was true.
* Missing triggers default to triggered (fail-open for safety).
*/
@Suppress("UNCHECKED_CAST")
fun parseEvalResponse(
triggerIds: List<String>,
aggResults: Map<String, Map<String, Any>>
): Map<String, Boolean> {
return triggerIds.associateWith { triggerId ->
val aggKey = "$TRIGGER_AGG_PREFIX$triggerId"
val aggResult = aggResults[aggKey]
if (aggResult == null) {
logger.warn("Missing evaluation result for trigger $triggerId, defaulting to not triggered")
false
} else {
val docCount = (aggResult["doc_count"] as? Number)?.toLong() ?: 0L
docCount > 0
}
}
}

/** Internal data class for decoupling trigger data from the full model in buildEvalSearchSource */
data class TriggerData(val id: String, val name: String, val script: String)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util

/**
* Rewrites Painless trigger scripts for remote evaluation on the user's cluster.
* They are sent to the user's cluster via a filter aggregation.
* The script context on the user's cluster uses `params` instead of `ctx`,
* so all references to `ctx.results[0]` must be replaced with `params.results_0`.
*/
object TriggerScriptRewriter {

private const val CTX_RESULTS_0 = "ctx.results[0]"
private const val PARAMS_RESULTS_0 = "params.results_0"

/**
* Replaces all occurrences of `ctx.results[0]` with `params.results_0` in the given script source.
* MONITOR_MAX_INPUTS = 1, so only `ctx.results[0]` ever exists.
*/
fun rewriteScript(source: String): String {
return source.replace(CTX_RESULTS_0, PARAMS_RESULTS_0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1400,11 +1400,10 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response.asMap()
}

fun RestClient.getSettings(): Map<String, Any> {
val response = this.makeRequest(
"GET",
"_cluster/settings?flat_settings=true"
)
fun RestClient.getSettings(includeDefaults: Boolean = false): Map<String, Any> {
var url = "_cluster/settings?flat_settings=true"
if (includeDefaults) url += "&include_defaults=true"
val response = this.makeRequest("GET", url)
assertEquals(RestStatus.OK, response.restStatus())
return response.asMap()
}
Expand Down
Loading
Loading