Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
718 changes: 426 additions & 292 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

Large diffs are not rendered by default.

163 changes: 87 additions & 76 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Large diffs are not rendered by default.

101 changes: 54 additions & 47 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingV2Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ object AlertingV2Utils {
return IllegalStateException(
"The ID given corresponds to an Alerting V2 Monitor, but a V1 Monitor was expected. " +
"If you wish to operate on a V2 Monitor (e.g. PPL Monitor), please use " +
"the Alerting V2 APIs with endpoint prefix: $MONITOR_V2_BASE_URI."
"the Alerting V2 APIs with endpoint prefix: $MONITOR_V2_BASE_URI.",
)
} else if (scheduledJob !is Monitor && scheduledJob !is Workflow) {
return IllegalStateException(
"The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}. " +
"Please validate the ID and ensure it corresponds to a valid Monitor."
"Please validate the ID and ensure it corresponds to a valid Monitor.",
)
}
return null
Expand All @@ -67,12 +67,12 @@ object AlertingV2Utils {
return IllegalStateException(
"The ID given corresponds to an Alerting V1 Monitor, but a V2 Monitor was expected. " +
"If you wish to operate on a V1 Monitor (e.g. Per Query, Per Document, etc), please use " +
"the Alerting V1 APIs with endpoint prefix: $MONITOR_BASE_URI."
"the Alerting V1 APIs with endpoint prefix: $MONITOR_BASE_URI.",
)
} else if (scheduledJob !is MonitorV2) {
return IllegalStateException(
"The ID given corresponds to a scheduled job of unknown type: ${scheduledJob.javaClass.name}. " +
"Please validate the ID and ensure it corresponds to a valid Monitor."
"Please validate the ID and ensure it corresponds to a valid Monitor.",
)
}
return null
Expand All @@ -96,15 +96,16 @@ object AlertingV2Utils {

// Used in Get and Search monitor functionalities to return a "no results" response
fun getEmptySearchResponse(): SearchResponse {
val internalSearchResponse = InternalSearchResponse(
SearchHits(emptyArray(), TotalHits(0L, Relation.EQUAL_TO), 0.0f),
InternalAggregations.from(Collections.emptyList()),
Suggest(Collections.emptyList()),
SearchProfileShardResults(Collections.emptyMap()),
false,
false,
0
)
val internalSearchResponse =
InternalSearchResponse(
SearchHits(emptyArray(), TotalHits(0L, Relation.EQUAL_TO), 0.0f),
InternalAggregations.from(Collections.emptyList()),
Suggest(Collections.emptyList()),
SearchProfileShardResults(Collections.emptyMap()),
false,
false,
0,
)

return SearchResponse(
internalSearchResponse,
Expand All @@ -114,15 +115,15 @@ object AlertingV2Utils {
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
SearchResponse.Clusters.EMPTY,
)
}

suspend fun getConfigAndSendNotification(
action: Action,
monitorCtx: MonitorRunnerExecutionContext,
subject: String?,
message: String
message: String,
): String {
val config = getConfigForNotificationAction(action, monitorCtx)
if (config.destination == null && config.channel == null) {
Expand All @@ -137,7 +138,7 @@ object AlertingV2Utils {

if (config.destination?.isAllowed(monitorCtx.allowList) == false) {
throw IllegalStateException(
"Monitor contains a Destination type that is not allowed: ${config.destination.type}"
"Monitor contains a Destination type that is not allowed: ${config.destination.type}",
)
}

Expand All @@ -146,7 +147,7 @@ object AlertingV2Utils {
?.sendNotification(
monitorCtx.client!!,
config.channel.getTitle(subject),
message
message,
) ?: actionResponseContent

actionResponseContent = config.destination
Expand All @@ -165,7 +166,7 @@ object AlertingV2Utils {
*/
private suspend fun getConfigForNotificationAction(
action: Action,
monitorCtx: MonitorRunnerExecutionContext
monitorCtx: MonitorRunnerExecutionContext,
): NotificationActionConfigs {
var destination: Destination? = null
var notificationPermissionException: Exception? = null
Expand All @@ -179,39 +180,45 @@ object AlertingV2Utils {

// If the channel was not found, try to retrieve the Destination
if (channel == null) {
destination = try {
val table = Table(
"asc",
"destination.name.keyword",
null,
1,
0,
destination =
try {
val table =
Table(
"asc",
"destination.name.keyword",
null,
1,
0,
null,
)
val getDestinationsRequest =
GetDestinationsRequest(
action.destinationId,
0L,
null,
table,
"ALL",
)

val getDestinationsResponse: GetDestinationsResponse =
monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it)
}
getDestinationsResponse.destinations.firstOrNull()
} catch (e: IllegalStateException) {
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
null
)
val getDestinationsRequest = GetDestinationsRequest(
action.destinationId,
0L,
null,
table,
"ALL"
)

val getDestinationsResponse: GetDestinationsResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it)
} catch (e: OpenSearchSecurityException) {
if (notificationPermissionException != null) {
throw notificationPermissionException
} else {
throw e
}
}
getDestinationsResponse.destinations.firstOrNull()
} catch (e: IllegalStateException) {
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
null
} catch (e: OpenSearchSecurityException) {
if (notificationPermissionException != null)
throw notificationPermissionException
else
throw e
}

if (destination == null && notificationPermissionException != null)
if (destination == null && notificationPermissionException != null) {
throw notificationPermissionException
}
}

return NotificationActionConfigs(destination, channel)
Expand Down
Loading
Loading