Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,23 @@ import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.comments.CommentsIndices
import org.opensearch.alerting.comments.CommentsIndices.Companion.COMMENTS_HISTORY_WRITE_INDEX
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_COMMENTS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.COMMENTS_MAX_CONTENT_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_COMMENTS_PER_ALERT
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.await
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AlertingActions
Expand All @@ -47,10 +42,13 @@ import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.query.QueryBuilders
import org.opensearch.remote.metadata.client.PutDataObjectRequest
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
Expand Down Expand Up @@ -191,30 +189,27 @@ constructor(
user = user
)

val indexRequest =
IndexRequest(COMMENTS_HISTORY_WRITE_INDEX)
.source(comment.toXContentWithUser(XContentFactory.jsonBuilder()))
.setIfSeqNo(request.seqNo)
.setIfPrimaryTerm(request.primaryTerm)
.timeout(indexTimeout)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val wrappedComment = ToXContentObject { builder, params ->
comment.toXContentWithUser(builder)
}
val putRequest = PutDataObjectRequest.builder()
.index(COMMENTS_HISTORY_WRITE_INDEX)
.tenantId(tenantId)
.dataObject(wrappedComment)
.build()

log.debug("Creating new comment: ${comment.toXContentWithUser(XContentFactory.jsonBuilder())}")
log.debug("Creating new comment")

try {
val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) }
val failureReasons = checkShardsFailure(indexResponse)
if (failureReasons != null) {
actionListener.onFailure(
AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())),
)
return
}

val putResponse = sdkClient.putDataObjectAsync(putRequest).await()
val seqNo = putResponse.indexResponse()?.seqNo ?: 0L
val primaryTerm = putResponse.indexResponse()?.primaryTerm ?: 0L
actionListener.onResponse(
IndexCommentResponse(indexResponse.id, indexResponse.seqNo, indexResponse.primaryTerm, comment)
IndexCommentResponse(putResponse.id(), seqNo, primaryTerm, comment)
)
} catch (t: Exception) {
log.error("Failed to create comment", t)
actionListener.onFailure(AlertingException.wrap(t))
}
}
Expand All @@ -240,40 +235,34 @@ constructor(
// retains everything from the original comment except content and lastUpdatedTime
val requestComment = currentComment.copy(content = request.content, lastUpdatedTime = Instant.now())

val indexRequest =
IndexRequest(COMMENTS_HISTORY_WRITE_INDEX)
.source(requestComment.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(requestComment.id)
.setIfSeqNo(request.seqNo)
.setIfPrimaryTerm(request.primaryTerm)
.timeout(indexTimeout)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)

log.debug(
"Updating comment, ${currentComment.id}, from: " +
"${currentComment.content} to: " +
requestComment.content,
)
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val wrappedComment = ToXContentObject { builder, params ->
requestComment.toXContentWithUser(builder)
}
val putRequest = PutDataObjectRequest.builder()
.index(COMMENTS_HISTORY_WRITE_INDEX)
.id(requestComment.id)
.tenantId(tenantId)
.ifSeqNo(request.seqNo)
.ifPrimaryTerm(request.primaryTerm)
.overwriteIfExists(true)
.dataObject(wrappedComment)
.build()

log.debug("Updating comment, ${currentComment.id}")

try {
val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) }
val failureReasons = checkShardsFailure(indexResponse)
if (failureReasons != null) {
actionListener.onFailure(
AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())),
)
return
}

val putResponse = sdkClient.putDataObjectAsync(putRequest).await()
actionListener.onResponse(
IndexCommentResponse(
indexResponse.id,
indexResponse.seqNo,
indexResponse.primaryTerm,
putResponse.id(),
putResponse.indexResponse()?.seqNo ?: 0L,
putResponse.indexResponse()?.primaryTerm ?: 0L,
requestComment,
),
)
} catch (t: Exception) {
log.error("Failed to update comment ${currentComment.id}", t)
actionListener.onFailure(AlertingException.wrap(t))
}
}
Expand All @@ -290,14 +279,21 @@ constructor(
.seqNoAndPrimaryTerm(true)
.query(queryBuilder)

// search all alerts, since user might want to create a comment
// on a completed alert
val searchRequest =
SearchRequest()
.indices(AlertIndices.ALL_ALERT_INDEX_PATTERN)
.source(searchSourceBuilder)

val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val sdkSearchRequest = SearchDataObjectRequest.builder()
.indices(AlertIndices.ALL_ALERT_INDEX_PATTERN)
.tenantId(tenantId)
.searchSourceBuilder(searchSourceBuilder)
.build()
val sdkResponse = sdkClient.searchDataObjectAsync(sdkSearchRequest).await()
val searchResponse = sdkResponse.searchResponse()
if (searchResponse == null) {
log.error("Failed to search for alert ${request.entityId}")
actionListener.onFailure(
AlertingException.wrap(OpenSearchStatusException("Alert not found", RestStatus.NOT_FOUND))
)
return null
}
val alerts = searchResponse.hits.map { hit ->
val xcp = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
Expand Down Expand Up @@ -339,14 +335,21 @@ constructor(
.seqNoAndPrimaryTerm(true)
.query(queryBuilder)

// search all alerts, since user might want to create a comment
// on a completed alert
val searchRequest =
SearchRequest()
.indices(CommentsIndices.ALL_COMMENTS_INDEX_PATTERN)
.source(searchSourceBuilder)

val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val sdkSearchRequest = SearchDataObjectRequest.builder()
.indices(CommentsIndices.ALL_COMMENTS_INDEX_PATTERN)
.tenantId(tenantId)
.searchSourceBuilder(searchSourceBuilder)
.build()
val sdkResponse = sdkClient.searchDataObjectAsync(sdkSearchRequest).await()
val searchResponse = sdkResponse.searchResponse()
if (searchResponse == null) {
log.error("Failed to search for comment ${request.commentId}")
actionListener.onFailure(
AlertingException.wrap(OpenSearchStatusException("Comment not found", RestStatus.NOT_FOUND))
)
return null
}
val comments = searchResponse.hits.map { hit ->
val xcp = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
Expand Down Expand Up @@ -375,16 +378,5 @@ constructor(

return comments[0]
}

private fun checkShardsFailure(response: IndexResponse): String? {
val failureReasons = StringBuilder()
if (response.shardInfo.failed > 0) {
response.shardInfo.failures.forEach { entry ->
failureReasons.append(entry.reason())
}
return failureReasons.toString()
}
return null
}
}
}
Loading
Loading