Skip to content

Commit d38a1ca

Browse files
author
Manaswini Ragamouni
committed
Use SdkClient for persistence in IndexAlertingComment, IndexMonitor
Signed-off-by: Manaswini Ragamouni <ragamanu@amazon.com>
1 parent c622dc4 commit d38a1ca

File tree

3 files changed

+152
-120
lines changed

3 files changed

+152
-120
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexAlertingCommentAction.kt

Lines changed: 70 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,23 @@ import kotlinx.coroutines.launch
1111
import org.apache.logging.log4j.LogManager
1212
import org.opensearch.OpenSearchStatusException
1313
import org.opensearch.action.ActionRequest
14-
import org.opensearch.action.index.IndexRequest
15-
import org.opensearch.action.index.IndexResponse
16-
import org.opensearch.action.search.SearchRequest
17-
import org.opensearch.action.search.SearchResponse
1814
import org.opensearch.action.support.ActionFilters
1915
import org.opensearch.action.support.HandledTransportAction
20-
import org.opensearch.action.support.WriteRequest
16+
import org.opensearch.alerting.AlertingPlugin
2117
import org.opensearch.alerting.alerts.AlertIndices
2218
import org.opensearch.alerting.comments.CommentsIndices
2319
import org.opensearch.alerting.comments.CommentsIndices.Companion.COMMENTS_HISTORY_WRITE_INDEX
24-
import org.opensearch.alerting.opensearchapi.suspendUntil
2520
import org.opensearch.alerting.settings.AlertingSettings
2621
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_COMMENTS_ENABLED
2722
import org.opensearch.alerting.settings.AlertingSettings.Companion.COMMENTS_MAX_CONTENT_SIZE
2823
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
2924
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_COMMENTS_PER_ALERT
3025
import org.opensearch.alerting.util.CommentsUtils
26+
import org.opensearch.alerting.util.await
3127
import org.opensearch.cluster.service.ClusterService
3228
import org.opensearch.common.inject.Inject
3329
import org.opensearch.common.settings.Settings
3430
import org.opensearch.common.xcontent.LoggingDeprecationHandler
35-
import org.opensearch.common.xcontent.XContentFactory
3631
import org.opensearch.common.xcontent.XContentHelper
3732
import org.opensearch.common.xcontent.XContentType
3833
import org.opensearch.commons.alerting.action.AlertingActions
@@ -47,10 +42,13 @@ import org.opensearch.core.action.ActionListener
4742
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
4843
import org.opensearch.core.rest.RestStatus
4944
import org.opensearch.core.xcontent.NamedXContentRegistry
45+
import org.opensearch.core.xcontent.ToXContentObject
5046
import org.opensearch.core.xcontent.XContentParser
5147
import org.opensearch.core.xcontent.XContentParserUtils
5248
import org.opensearch.index.query.QueryBuilders
49+
import org.opensearch.remote.metadata.client.PutDataObjectRequest
5350
import org.opensearch.remote.metadata.client.SdkClient
51+
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
5452
import org.opensearch.rest.RestRequest
5553
import org.opensearch.search.builder.SearchSourceBuilder
5654
import org.opensearch.tasks.Task
@@ -191,30 +189,27 @@ constructor(
191189
user = user
192190
)
193191

194-
val indexRequest =
195-
IndexRequest(COMMENTS_HISTORY_WRITE_INDEX)
196-
.source(comment.toXContentWithUser(XContentFactory.jsonBuilder()))
197-
.setIfSeqNo(request.seqNo)
198-
.setIfPrimaryTerm(request.primaryTerm)
199-
.timeout(indexTimeout)
200-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
192+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
193+
val wrappedComment = ToXContentObject { builder, params ->
194+
comment.toXContentWithUser(builder)
195+
}
196+
val putRequest = PutDataObjectRequest.builder()
197+
.index(COMMENTS_HISTORY_WRITE_INDEX)
198+
.tenantId(tenantId)
199+
.dataObject(wrappedComment)
200+
.build()
201201

202-
log.debug("Creating new comment: ${comment.toXContentWithUser(XContentFactory.jsonBuilder())}")
202+
log.debug("Creating new comment")
203203

204204
try {
205-
val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) }
206-
val failureReasons = checkShardsFailure(indexResponse)
207-
if (failureReasons != null) {
208-
actionListener.onFailure(
209-
AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())),
210-
)
211-
return
212-
}
213-
205+
val putResponse = sdkClient.putDataObjectAsync(putRequest).await()
206+
val seqNo = putResponse.indexResponse()?.seqNo ?: 0L
207+
val primaryTerm = putResponse.indexResponse()?.primaryTerm ?: 0L
214208
actionListener.onResponse(
215-
IndexCommentResponse(indexResponse.id, indexResponse.seqNo, indexResponse.primaryTerm, comment)
209+
IndexCommentResponse(putResponse.id(), seqNo, primaryTerm, comment)
216210
)
217211
} catch (t: Exception) {
212+
log.error("Failed to create comment", t)
218213
actionListener.onFailure(AlertingException.wrap(t))
219214
}
220215
}
@@ -240,40 +235,34 @@ constructor(
240235
// retains everything from the original comment except content and lastUpdatedTime
241236
val requestComment = currentComment.copy(content = request.content, lastUpdatedTime = Instant.now())
242237

243-
val indexRequest =
244-
IndexRequest(COMMENTS_HISTORY_WRITE_INDEX)
245-
.source(requestComment.toXContentWithUser(XContentFactory.jsonBuilder()))
246-
.id(requestComment.id)
247-
.setIfSeqNo(request.seqNo)
248-
.setIfPrimaryTerm(request.primaryTerm)
249-
.timeout(indexTimeout)
250-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
251-
252-
log.debug(
253-
"Updating comment, ${currentComment.id}, from: " +
254-
"${currentComment.content} to: " +
255-
requestComment.content,
256-
)
238+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
239+
val wrappedComment = ToXContentObject { builder, params ->
240+
requestComment.toXContentWithUser(builder)
241+
}
242+
val putRequest = PutDataObjectRequest.builder()
243+
.index(COMMENTS_HISTORY_WRITE_INDEX)
244+
.id(requestComment.id)
245+
.tenantId(tenantId)
246+
.ifSeqNo(request.seqNo)
247+
.ifPrimaryTerm(request.primaryTerm)
248+
.overwriteIfExists(true)
249+
.dataObject(wrappedComment)
250+
.build()
251+
252+
log.debug("Updating comment, ${currentComment.id}")
257253

258254
try {
259-
val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) }
260-
val failureReasons = checkShardsFailure(indexResponse)
261-
if (failureReasons != null) {
262-
actionListener.onFailure(
263-
AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())),
264-
)
265-
return
266-
}
267-
255+
val putResponse = sdkClient.putDataObjectAsync(putRequest).await()
268256
actionListener.onResponse(
269257
IndexCommentResponse(
270-
indexResponse.id,
271-
indexResponse.seqNo,
272-
indexResponse.primaryTerm,
258+
putResponse.id(),
259+
putResponse.indexResponse()?.seqNo ?: 0L,
260+
putResponse.indexResponse()?.primaryTerm ?: 0L,
273261
requestComment,
274262
),
275263
)
276264
} catch (t: Exception) {
265+
log.error("Failed to update comment ${currentComment.id}", t)
277266
actionListener.onFailure(AlertingException.wrap(t))
278267
}
279268
}
@@ -290,14 +279,21 @@ constructor(
290279
.seqNoAndPrimaryTerm(true)
291280
.query(queryBuilder)
292281

293-
// search all alerts, since user might want to create a comment
294-
// on a completed alert
295-
val searchRequest =
296-
SearchRequest()
297-
.indices(AlertIndices.ALL_ALERT_INDEX_PATTERN)
298-
.source(searchSourceBuilder)
299-
300-
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
282+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
283+
val sdkSearchRequest = SearchDataObjectRequest.builder()
284+
.indices(AlertIndices.ALL_ALERT_INDEX_PATTERN)
285+
.tenantId(tenantId)
286+
.searchSourceBuilder(searchSourceBuilder)
287+
.build()
288+
val sdkResponse = sdkClient.searchDataObjectAsync(sdkSearchRequest).await()
289+
val searchResponse = sdkResponse.searchResponse()
290+
if (searchResponse == null) {
291+
log.error("Failed to search for alert ${request.entityId}")
292+
actionListener.onFailure(
293+
AlertingException.wrap(OpenSearchStatusException("Alert not found", RestStatus.NOT_FOUND))
294+
)
295+
return null
296+
}
301297
val alerts = searchResponse.hits.map { hit ->
302298
val xcp = XContentHelper.createParser(
303299
NamedXContentRegistry.EMPTY,
@@ -339,14 +335,21 @@ constructor(
339335
.seqNoAndPrimaryTerm(true)
340336
.query(queryBuilder)
341337

342-
// search all alerts, since user might want to create a comment
343-
// on a completed alert
344-
val searchRequest =
345-
SearchRequest()
346-
.indices(CommentsIndices.ALL_COMMENTS_INDEX_PATTERN)
347-
.source(searchSourceBuilder)
348-
349-
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
338+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
339+
val sdkSearchRequest = SearchDataObjectRequest.builder()
340+
.indices(CommentsIndices.ALL_COMMENTS_INDEX_PATTERN)
341+
.tenantId(tenantId)
342+
.searchSourceBuilder(searchSourceBuilder)
343+
.build()
344+
val sdkResponse = sdkClient.searchDataObjectAsync(sdkSearchRequest).await()
345+
val searchResponse = sdkResponse.searchResponse()
346+
if (searchResponse == null) {
347+
log.error("Failed to search for comment ${request.commentId}")
348+
actionListener.onFailure(
349+
AlertingException.wrap(OpenSearchStatusException("Comment not found", RestStatus.NOT_FOUND))
350+
)
351+
return null
352+
}
350353
val comments = searchResponse.hits.map { hit ->
351354
val xcp = XContentHelper.createParser(
352355
NamedXContentRegistry.EMPTY,
@@ -375,16 +378,5 @@ constructor(
375378

376379
return comments[0]
377380
}
378-
379-
private fun checkShardsFailure(response: IndexResponse): String? {
380-
val failureReasons = StringBuilder()
381-
if (response.shardInfo.failed > 0) {
382-
response.shardInfo.failures.forEach { entry ->
383-
failureReasons.append(entry.reason())
384-
}
385-
return failureReasons.toString()
386-
}
387-
return null
388-
}
389381
}
390382
}

0 commit comments

Comments
 (0)