Skip to content

Commit 4e9b9eb

Browse files
committed
PPL Alerting: Adding PPL Related Models
1 parent 622ea10 commit 4e9b9eb

File tree

11 files changed

+804
-18
lines changed

11 files changed

+804
-18
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
buildscript {
77
ext {
88
opensearch_group = "org.opensearch"
9-
opensearch_version = System.getProperty("opensearch.version", "3.6.0-SNAPSHOT")
9+
opensearch_version = System.getProperty("opensearch.version", "3.3.0-SNAPSHOT")
1010
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
1111
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
1212
kotlin_version = System.getProperty("kotlin.version", "2.2.20")

src/main/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponse.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,14 @@ class GetAlertsResponse : BaseResponse {
3939
@Throws(IOException::class)
4040
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
4141
builder.startObject()
42-
.field("alerts", alerts)
43-
.field("totalAlerts", totalAlerts)
42+
.field(ALERTS_FIELD, alerts)
43+
.field(TOTAL_ALERTS_FIELD, totalAlerts)
4444

4545
return builder.endObject()
4646
}
47+
48+
companion object {
49+
const val ALERTS_FIELD = "alerts"
50+
const val TOTAL_ALERTS_FIELD = "totalAlerts"
51+
}
4752
}

src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt

Lines changed: 150 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.opensearch.commons.alerting.model
22

33
import org.opensearch.common.lucene.uid.Versions
44
import org.opensearch.commons.alerting.alerts.AlertError
5+
import org.opensearch.commons.alerting.model.Monitor.Companion.suppressWarning
56
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
67
import org.opensearch.commons.alerting.util.instant
78
import org.opensearch.commons.alerting.util.optionalTimeField
@@ -43,9 +44,10 @@ data class Alert(
4344
val aggregationResultBucket: AggregationResultBucket? = null,
4445
val executionId: String? = null,
4546
val associatedAlertIds: List<String>,
46-
val clusters: List<String>? = null
47+
val clusters: List<String>? = null,
48+
val pplQuery: String?,
49+
val pplQueryResults: Map<String, Any>
4750
) : Writeable, ToXContent {
48-
4951
init {
5052
if (errorMessage != null) {
5153
require(state == State.DELETED || state == State.ERROR || state == State.AUDIT) {
@@ -54,6 +56,110 @@ data class Alert(
5456
}
5557
}
5658

59+
constructor(
60+
id: String = NO_ID,
61+
version: Long = NO_VERSION,
62+
schemaVersion: Int = NO_SCHEMA_VERSION,
63+
monitorId: String,
64+
workflowId: String,
65+
workflowName: String,
66+
monitorName: String,
67+
monitorVersion: Long,
68+
monitorUser: User?,
69+
triggerId: String,
70+
triggerName: String,
71+
findingIds: List<String>,
72+
relatedDocIds: List<String>,
73+
state: State,
74+
startTime: Instant,
75+
endTime: Instant? = null,
76+
lastNotificationTime: Instant? = null,
77+
acknowledgedTime: Instant? = null,
78+
errorMessage: String? = null,
79+
errorHistory: List<AlertError>,
80+
severity: String,
81+
actionExecutionResults: List<ActionExecutionResult>,
82+
aggregationResultBucket: AggregationResultBucket? = null,
83+
executionId: String? = null,
84+
associatedAlertIds: List<String>,
85+
clusters: List<String>? = null
86+
) : this (
87+
id = id,
88+
version = version,
89+
schemaVersion = schemaVersion,
90+
monitorId = monitorId,
91+
workflowId = workflowId,
92+
workflowName = workflowName,
93+
monitorName = monitorName,
94+
monitorVersion = monitorVersion,
95+
monitorUser = monitorUser,
96+
triggerId = triggerId,
97+
triggerName = triggerName,
98+
findingIds = findingIds,
99+
relatedDocIds = relatedDocIds,
100+
state = state,
101+
startTime = startTime,
102+
endTime = endTime,
103+
lastNotificationTime = lastNotificationTime,
104+
acknowledgedTime = acknowledgedTime,
105+
errorMessage = errorMessage,
106+
errorHistory = errorHistory,
107+
severity = severity,
108+
actionExecutionResults = actionExecutionResults,
109+
aggregationResultBucket = aggregationResultBucket,
110+
executionId = executionId,
111+
associatedAlertIds = associatedAlertIds,
112+
clusters = clusters,
113+
pplQuery = null,
114+
pplQueryResults = mapOf()
115+
)
116+
117+
// Constructor for stateless alerts
118+
constructor(
119+
monitorId: String,
120+
monitorName: String,
121+
monitorVersion: Long,
122+
monitorUser: User?,
123+
triggerId: String,
124+
triggerName: String,
125+
pplQuery: String,
126+
pplQueryResults: Map<String, Any>,
127+
state: State, // only ever ACTIVE or ERROR
128+
triggeredTime: Instant,
129+
errorMessage: String? = null,
130+
severity: String,
131+
executionId: String? = null
132+
) : this(
133+
id = NO_ID,
134+
version = NO_VERSION,
135+
schemaVersion = NO_SCHEMA_VERSION,
136+
monitorId = monitorId,
137+
workflowId = "",
138+
workflowName = "",
139+
monitorName = monitorName,
140+
monitorVersion = monitorVersion,
141+
monitorUser = monitorUser,
142+
triggerId = triggerId,
143+
triggerName = triggerName,
144+
findingIds = emptyList(),
145+
relatedDocIds = emptyList(),
146+
state = state,
147+
startTime = triggeredTime,
148+
endTime = null, // stateless alerts forever have a null endTime, they don't end, they get expired
149+
lastNotificationTime = null, // not needed since stateless alert generation corresponds with notifications sent
150+
acknowledgedTime = null, // stateless alerts don't get acknowledged
151+
errorMessage = errorMessage,
152+
errorHistory = emptyList(),
153+
severity = severity,
154+
actionExecutionResults = emptyList(),
155+
aggregationResultBucket = null,
156+
executionId = executionId,
157+
associatedAlertIds = emptyList(),
158+
clusters = null,
159+
pplQuery = pplQuery,
160+
pplQueryResults = pplQueryResults
161+
)
162+
57163
constructor(
58164
startTime: Instant,
59165
lastNotificationTime: Instant?,
@@ -87,7 +193,9 @@ data class Alert(
87193
workflowId = workflow.id,
88194
workflowName = workflow.name,
89195
associatedAlertIds = associatedAlertIds,
90-
clusters = clusters
196+
clusters = clusters,
197+
pplQuery = null,
198+
pplQueryResults = mapOf()
91199
)
92200

93201
constructor(
@@ -125,7 +233,9 @@ data class Alert(
125233
workflowId = workflowId ?: "",
126234
workflowName = "",
127235
associatedAlertIds = emptyList(),
128-
clusters = clusters
236+
clusters = clusters,
237+
pplQuery = null,
238+
pplQueryResults = mapOf()
129239
)
130240

131241
constructor(
@@ -164,7 +274,9 @@ data class Alert(
164274
workflowId = workflowId ?: "",
165275
workflowName = "",
166276
associatedAlertIds = emptyList(),
167-
clusters = clusters
277+
clusters = clusters,
278+
pplQuery = null,
279+
pplQueryResults = mapOf()
168280
)
169281

170282
constructor(
@@ -204,7 +316,9 @@ data class Alert(
204316
workflowId = workflowId ?: "",
205317
workflowName = "",
206318
associatedAlertIds = emptyList(),
207-
clusters = clusters
319+
clusters = clusters,
320+
pplQuery = null,
321+
pplQueryResults = mapOf()
208322
)
209323

210324
constructor(
@@ -246,7 +360,9 @@ data class Alert(
246360
workflowId = workflowId ?: "",
247361
workflowName = "",
248362
associatedAlertIds = emptyList(),
249-
clusters = clusters
363+
clusters = clusters,
364+
pplQuery = null,
365+
pplQueryResults = mapOf()
250366
)
251367

252368
constructor(
@@ -285,7 +401,9 @@ data class Alert(
285401
workflowId = workflowId ?: "",
286402
executionId = executionId,
287403
associatedAlertIds = emptyList(),
288-
clusters = clusters
404+
clusters = clusters,
405+
pplQuery = null,
406+
pplQueryResults = mapOf()
289407
)
290408

291409
enum class State {
@@ -329,7 +447,9 @@ data class Alert(
329447
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null,
330448
executionId = sin.readOptionalString(),
331449
associatedAlertIds = sin.readStringList(),
332-
clusters = sin.readOptionalStringList()
450+
clusters = sin.readOptionalStringList(),
451+
pplQuery = sin.readOptionalString(),
452+
pplQueryResults = suppressWarning(sin.readMap())
333453
)
334454

335455
fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
@@ -368,6 +488,8 @@ data class Alert(
368488
out.writeOptionalString(executionId)
369489
out.writeStringCollection(associatedAlertIds)
370490
out.writeOptionalStringArray(clusters?.toTypedArray())
491+
out.writeOptionalString(pplQuery)
492+
out.writeMap(pplQueryResults)
371493
}
372494

373495
companion object {
@@ -399,6 +521,8 @@ data class Alert(
399521
const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
400522
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
401523
const val CLUSTERS_FIELD = "clusters"
524+
const val PPL_SQL_QUERY_FIELD = "ppl_query"
525+
const val PPL_SQL_QUERY_RESULTS_FIELD = "ppl_query_results"
402526
const val NO_ID = ""
403527
const val NO_VERSION = Versions.NOT_FOUND
404528

@@ -430,6 +554,8 @@ data class Alert(
430554
var aggAlertBucket: AggregationResultBucket? = null
431555
val associatedAlertIds = mutableListOf<String>()
432556
val clusters = mutableListOf<String>()
557+
var pplQuery: String? = null
558+
var pplQueryResults: Map<String, Any> = mapOf()
433559
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
434560
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
435561
val fieldName = xcp.currentName()
@@ -505,6 +631,12 @@ data class Alert(
505631
clusters.add(xcp.text())
506632
}
507633
}
634+
PPL_SQL_QUERY_FIELD -> {
635+
if (xcp.currentToken() != XContentParser.Token.VALUE_NULL) {
636+
pplQuery = xcp.text()
637+
}
638+
}
639+
PPL_SQL_QUERY_RESULTS_FIELD -> pplQueryResults = xcp.map()
508640
}
509641
}
510642

@@ -534,7 +666,9 @@ data class Alert(
534666
workflowId = workflowId,
535667
workflowName = workflowName,
536668
associatedAlertIds = associatedAlertIds,
537-
clusters = if (clusters.size > 0) clusters else null
669+
clusters = if (clusters.size > 0) clusters else null,
670+
pplQuery = pplQuery,
671+
pplQueryResults = pplQueryResults
538672
)
539673
}
540674

@@ -587,6 +721,9 @@ data class Alert(
587721

588722
if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray())
589723

724+
if (!pplQuery.isNullOrEmpty()) builder.field(PPL_SQL_QUERY_FIELD, pplQuery)
725+
if (pplQueryResults.isNotEmpty()) builder.field(PPL_SQL_QUERY_RESULTS_FIELD, pplQueryResults)
726+
590727
builder.endObject()
591728
return builder
592729
}
@@ -611,7 +748,9 @@ data class Alert(
611748
PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath,
612749
FINDING_IDS to findingIds.joinToString(","),
613750
RELATED_DOC_IDS to relatedDocIds.joinToString(","),
614-
CLUSTERS_FIELD to clusters?.joinToString(",")
751+
CLUSTERS_FIELD to clusters?.joinToString(","),
752+
PPL_SQL_QUERY_FIELD to pplQuery,
753+
PPL_SQL_QUERY_RESULTS_FIELD to pplQueryResults
615754
)
616755
}
617756
}

src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.opensearch.commons.alerting.model
22

33
import org.opensearch.commons.alerting.model.ClusterMetricsInput.Companion.URI_FIELD
44
import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD
5+
import org.opensearch.commons.alerting.model.PPLSQLInput.Companion.PPL_SQL_INPUT_FIELD
56
import org.opensearch.commons.alerting.model.SearchInput.Companion.SEARCH_FIELD
67
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
78
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput.Companion.REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD
@@ -20,7 +21,8 @@ interface Input : BaseModel {
2021
CLUSTER_METRICS_INPUT(URI_FIELD),
2122
SEARCH_INPUT(SEARCH_FIELD),
2223
REMOTE_MONITOR_INPUT(REMOTE_MONITOR_INPUT_FIELD),
23-
REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD);
24+
REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD),
25+
PPL_SQL_INPUT(PPL_SQL_INPUT_FIELD);
2426

2527
override fun toString(): String {
2628
return value
@@ -42,8 +44,10 @@ interface Input : BaseModel {
4244
DocLevelMonitorInput.parse(xcp)
4345
} else if (xcp.currentName() == Type.REMOTE_MONITOR_INPUT.value) {
4446
RemoteMonitorInput.parse(xcp)
45-
} else {
47+
} else if (xcp.currentName() == Type.REMOTE_DOC_LEVEL_MONITOR_INPUT.value) {
4648
RemoteDocLevelMonitorInput.parse(xcp)
49+
} else {
50+
PPLSQLInput.parseInner(xcp)
4751
}
4852
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)
4953
return input
@@ -58,6 +62,7 @@ interface Input : BaseModel {
5862
Type.SEARCH_INPUT -> SearchInput(sin)
5963
Type.REMOTE_MONITOR_INPUT -> RemoteMonitorInput(sin)
6064
Type.REMOTE_DOC_LEVEL_MONITOR_INPUT -> RemoteDocLevelMonitorInput(sin)
65+
Type.PPL_SQL_INPUT -> PPLSQLInput(sin)
6166
// This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns
6267
// enum can be null in Java
6368
else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger")

0 commit comments

Comments
 (0)