From d38a1ca664923ac8df4d2bb2dc1ebbfb89755c94 Mon Sep 17 00:00:00 2001 From: Manaswini Ragamouni Date: Thu, 2 Apr 2026 17:36:13 +0000 Subject: [PATCH] Use SdkClient for persistence in IndexAlertingComment, IndexMonitor Signed-off-by: Manaswini Ragamouni --- .../TransportIndexAlertingCommentAction.kt | 148 +++++++++--------- .../transport/TransportIndexMonitorAction.kt | 102 +++++++----- .../org/opensearch/alerting/util/SdkUtils.kt | 22 +++ 3 files changed, 152 insertions(+), 120 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/util/SdkUtils.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexAlertingCommentAction.kt index 367e118bb..a15d0b67b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexAlertingCommentAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexAlertingCommentAction.kt @@ -11,28 +11,23 @@ import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionRequest -import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse -import org.opensearch.action.search.SearchRequest -import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction -import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.comments.CommentsIndices import org.opensearch.alerting.comments.CommentsIndices.Companion.COMMENTS_HISTORY_WRITE_INDEX -import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_COMMENTS_ENABLED import org.opensearch.alerting.settings.AlertingSettings.Companion.COMMENTS_MAX_CONTENT_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_COMMENTS_PER_ALERT import org.opensearch.alerting.util.CommentsUtils +import org.opensearch.alerting.util.await import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler -import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.action.AlertingActions @@ -47,10 +42,13 @@ import org.opensearch.core.action.ActionListener import org.opensearch.core.common.io.stream.NamedWriteableRegistry import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContentObject import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import org.opensearch.index.query.QueryBuilders +import org.opensearch.remote.metadata.client.PutDataObjectRequest import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest import org.opensearch.rest.RestRequest import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task @@ -191,30 +189,27 @@ constructor( user = user ) - val indexRequest = - IndexRequest(COMMENTS_HISTORY_WRITE_INDEX) - .source(comment.toXContentWithUser(XContentFactory.jsonBuilder())) - .setIfSeqNo(request.seqNo) - .setIfPrimaryTerm(request.primaryTerm) - .timeout(indexTimeout) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val wrappedComment = ToXContentObject { builder, params -> + comment.toXContentWithUser(builder) + } + val putRequest = PutDataObjectRequest.builder() + .index(COMMENTS_HISTORY_WRITE_INDEX) + .tenantId(tenantId) + .dataObject(wrappedComment) + .build() - log.debug("Creating new comment: ${comment.toXContentWithUser(XContentFactory.jsonBuilder())}") + log.debug("Creating new comment") try { - val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) } - val failureReasons = checkShardsFailure(indexResponse) - if (failureReasons != null) { - actionListener.onFailure( - AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())), - ) - return - } - + val putResponse = sdkClient.putDataObjectAsync(putRequest).await() + val seqNo = putResponse.indexResponse()?.seqNo ?: 0L + val primaryTerm = putResponse.indexResponse()?.primaryTerm ?: 0L actionListener.onResponse( - IndexCommentResponse(indexResponse.id, indexResponse.seqNo, indexResponse.primaryTerm, comment) + IndexCommentResponse(putResponse.id(), seqNo, primaryTerm, comment) ) } catch (t: Exception) { + log.error("Failed to create comment", t) actionListener.onFailure(AlertingException.wrap(t)) } } @@ -240,40 +235,34 @@ constructor( // retains everything from the original comment except content and lastUpdatedTime val requestComment = currentComment.copy(content = request.content, lastUpdatedTime = Instant.now()) - val indexRequest = - IndexRequest(COMMENTS_HISTORY_WRITE_INDEX) - .source(requestComment.toXContentWithUser(XContentFactory.jsonBuilder())) - .id(requestComment.id) - .setIfSeqNo(request.seqNo) - .setIfPrimaryTerm(request.primaryTerm) - .timeout(indexTimeout) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - - log.debug( - "Updating comment, ${currentComment.id}, from: " + - "${currentComment.content} to: " + - requestComment.content, - ) + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val wrappedComment = ToXContentObject { builder, params -> + requestComment.toXContentWithUser(builder) + } + val putRequest = PutDataObjectRequest.builder() + .index(COMMENTS_HISTORY_WRITE_INDEX) + .id(requestComment.id) + .tenantId(tenantId) + .ifSeqNo(request.seqNo) + .ifPrimaryTerm(request.primaryTerm) + .overwriteIfExists(true) + .dataObject(wrappedComment) + .build() + + log.debug("Updating comment, ${currentComment.id}") try { - val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) } - val failureReasons = checkShardsFailure(indexResponse) - if (failureReasons != null) { - actionListener.onFailure( - AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())), - ) - return - } - + val putResponse = sdkClient.putDataObjectAsync(putRequest).await() actionListener.onResponse( IndexCommentResponse( - indexResponse.id, - indexResponse.seqNo, - indexResponse.primaryTerm, + putResponse.id(), + putResponse.indexResponse()?.seqNo ?: 0L, + putResponse.indexResponse()?.primaryTerm ?: 0L, requestComment, ), ) } catch (t: Exception) { + log.error("Failed to update comment ${currentComment.id}", t) actionListener.onFailure(AlertingException.wrap(t)) } } @@ -290,14 +279,21 @@ constructor( .seqNoAndPrimaryTerm(true) .query(queryBuilder) - // search all alerts, since user might want to create a comment - // on a completed alert - val searchRequest = - SearchRequest() - .indices(AlertIndices.ALL_ALERT_INDEX_PATTERN) - .source(searchSourceBuilder) - - val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val sdkSearchRequest = SearchDataObjectRequest.builder() + .indices(AlertIndices.ALL_ALERT_INDEX_PATTERN) + .tenantId(tenantId) + .searchSourceBuilder(searchSourceBuilder) + .build() + val sdkResponse = sdkClient.searchDataObjectAsync(sdkSearchRequest).await() + val searchResponse = sdkResponse.searchResponse() + if (searchResponse == null) { + log.error("Failed to search for alert ${request.entityId}") + actionListener.onFailure( + AlertingException.wrap(OpenSearchStatusException("Alert not found", RestStatus.NOT_FOUND)) + ) + return null + } val alerts = searchResponse.hits.map { hit -> val xcp = XContentHelper.createParser( NamedXContentRegistry.EMPTY, @@ -339,14 +335,21 @@ constructor( .seqNoAndPrimaryTerm(true) .query(queryBuilder) - // search all alerts, since user might want to create a comment - // on a completed alert - val searchRequest = - SearchRequest() - .indices(CommentsIndices.ALL_COMMENTS_INDEX_PATTERN) - .source(searchSourceBuilder) - - val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val sdkSearchRequest = SearchDataObjectRequest.builder() + .indices(CommentsIndices.ALL_COMMENTS_INDEX_PATTERN) + .tenantId(tenantId) + .searchSourceBuilder(searchSourceBuilder) + .build() + val sdkResponse = sdkClient.searchDataObjectAsync(sdkSearchRequest).await() + val searchResponse = sdkResponse.searchResponse() + if (searchResponse == null) { + log.error("Failed to search for comment ${request.commentId}") + actionListener.onFailure( + AlertingException.wrap(OpenSearchStatusException("Comment not found", RestStatus.NOT_FOUND)) + ) + return null + } val comments = searchResponse.hits.map { hit -> val xcp = XContentHelper.createParser( NamedXContentRegistry.EMPTY, @@ -375,16 +378,5 @@ constructor( return comments[0] } - - private fun checkShardsFailure(response: IndexResponse): String? { - val failureReasons = StringBuilder() - if (response.shardInfo.failed > 0) { - response.shardInfo.failures.forEach { entry -> - failureReasons.append(entry.reason()) - } - return failureReasons.toString() - } - return null - } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 9d914d676..cab10849e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -19,9 +19,6 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction import org.opensearch.action.admin.cluster.health.ClusterHealthRequest import org.opensearch.action.admin.cluster.health.ClusterHealthResponse import org.opensearch.action.admin.indices.create.CreateIndexResponse -import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse -import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -29,6 +26,7 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.clustermanager.AcknowledgedResponse +import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.opensearchapi.suspendUntil @@ -43,6 +41,7 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter +import org.opensearch.alerting.util.await import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.alerting.util.isADMonitor import org.opensearch.alerting.util.use @@ -51,7 +50,6 @@ import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler -import org.opensearch.common.xcontent.XContentFactory.jsonBuilder import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.action.AlertingActions @@ -75,10 +73,13 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject import org.opensearch.index.query.QueryBuilders import org.opensearch.index.reindex.BulkByScrollResponse import org.opensearch.index.reindex.DeleteByQueryAction import org.opensearch.index.reindex.DeleteByQueryRequestBuilder +import org.opensearch.remote.metadata.client.GetDataObjectRequest +import org.opensearch.remote.metadata.client.PutDataObjectRequest import org.opensearch.remote.metadata.client.SdkClient import org.opensearch.rest.RestRequest import org.opensearch.search.builder.SearchSourceBuilder @@ -508,30 +509,33 @@ class TransportIndexMonitorAction @Inject constructor( log.debug("Created monitor's backend roles: $rbacRoles") } - val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(request.refreshPolicy) - .source(request.monitor.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .setIfSeqNo(request.seqNo) - .setIfPrimaryTerm(request.primaryTerm) - .timeout(indexTimeout) - - log.info( - "Creating new monitor: ${request.monitor.toXContentWithUser( - jsonBuilder(), - ToXContent.MapParams(mapOf("with_type" to "true")) - )}" - ) + log.info("Creating new monitor: ${request.monitor.name}, type: ${request.monitor.monitorType}") + + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val monitorObj = ToXContentObject { builder, params -> + request.monitor.toXContentWithUser(builder, ToXContent.MapParams(mapOf("with_type" to "true"))) + } + val putRequest = PutDataObjectRequest.builder() + .index(SCHEDULED_JOBS_INDEX) + .tenantId(tenantId) + .dataObject(monitorObj) + .build() try { - val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) } - val failureReasons = checkShardsFailure(indexResponse) - if (failureReasons != null) { - log.info(failureReasons.toString()) + val putResponse = sdkClient.putDataObjectAsync(putRequest).await() + if (putResponse.isFailed) { actionListener.onFailure( - AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())) + AlertingException.wrap( + OpenSearchStatusException( + "Failed to create monitor: ${putResponse.cause()?.message}", + putResponse.status() ?: RestStatus.INTERNAL_SERVER_ERROR + ) + ) ) return } + val indexResponse = putResponse.indexResponse() + ?: throw OpenSearchStatusException("No index response from SDK", RestStatus.INTERNAL_SERVER_ERROR) var metadata: MonitorMetadata? try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener request.monitor = request.monitor.copy(id = indexResponse.id) @@ -611,10 +615,16 @@ class TransportIndexMonitorAction @Inject constructor( } private suspend fun updateMonitor() { - val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, request.monitorId) + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val getRequest = GetDataObjectRequest.builder() + .index(SCHEDULED_JOBS_INDEX) + .id(request.monitorId) + .tenantId(tenantId) + .build() try { - val getResponse: GetResponse = client.suspendUntil { client.get(getRequest, it) } - if (!getResponse.isExists) { + val response = sdkClient.getDataObjectAsync(getRequest).await() + val getResponse = response.getResponse() + if (getResponse == null || !getResponse.isExists) { actionListener.onFailure( AlertingException.wrap( OpenSearchStatusException("Monitor with ${request.monitorId} is not found", RestStatus.NOT_FOUND) @@ -680,30 +690,38 @@ class TransportIndexMonitorAction @Inject constructor( } request.monitor = request.monitor.copy(schemaVersion = IndexUtils.scheduledJobIndexSchemaVersion) - val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(request.refreshPolicy) - .source(request.monitor.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + + log.info("Updating monitor, ${currentMonitor.id}") + + val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER) + val monitorObj = ToXContentObject { builder, params -> + request.monitor.toXContentWithUser(builder, ToXContent.MapParams(mapOf("with_type" to "true"))) + } + val putRequest = PutDataObjectRequest.builder() + .index(SCHEDULED_JOBS_INDEX) .id(request.monitorId) - .setIfSeqNo(request.seqNo) - .setIfPrimaryTerm(request.primaryTerm) - .timeout(indexTimeout) - - log.info( - "Updating monitor, ${currentMonitor.id}, from: ${currentMonitor.toXContentWithUser( - jsonBuilder(), - ToXContent.MapParams(mapOf("with_type" to "true")) - )} \n to: ${request.monitor.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))}" - ) + .tenantId(tenantId) + .ifSeqNo(request.seqNo) + .ifPrimaryTerm(request.primaryTerm) + .overwriteIfExists(true) + .dataObject(monitorObj) + .build() try { - val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) } - val failureReasons = checkShardsFailure(indexResponse) - if (failureReasons != null) { + val putResponse = sdkClient.putDataObjectAsync(putRequest).await() + if (putResponse.isFailed) { actionListener.onFailure( - AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())) + AlertingException.wrap( + OpenSearchStatusException( + "Failed to update monitor: ${putResponse.cause()?.message}", + putResponse.status() ?: RestStatus.INTERNAL_SERVER_ERROR + ) + ) ) return } + val indexResponse = putResponse.indexResponse() + ?: throw OpenSearchStatusException("No index response from SDK", RestStatus.INTERNAL_SERVER_ERROR) var isDocLevelMonitorRestarted = false // Force re-creation of last run context if monitor is of type standard doc-level/threat-intel // And monitor is re-enabled diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/SdkUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/SdkUtils.kt new file mode 100644 index 000000000..8165e97be --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/SdkUtils.kt @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import java.util.concurrent.CompletionStage +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +/** + * Converts a [CompletionStage] to a suspend function, allowing it to be used + * inside coroutines without blocking the thread. + */ +suspend fun CompletionStage.await(): T = suspendCoroutine { cont -> + this.whenComplete { result, error -> + if (error != null) cont.resumeWithException(error) + else cont.resume(result) + } +}