Skip to content

Commit c622dc4

Browse files
manaswini1920Manaswini Ragamouni
andauthored
Use SdkClient for persistence in 4 transport actions (#2060)
* Use SdkClient for persistence in Delete, Search Monitor, GetDestinations, SearchComments Replace direct client.get/search calls with sdkClient equivalents: - TransportDeleteMonitorAction: sdkClient.getDataObject() in getMonitor() - TransportSearchMonitorAction: sdkClient.searchDataObjectAsync() in search() - TransportGetDestinationsAction: sdkClient.searchDataObjectAsync() in search() - TransportSearchAlertingCommentAction: sdkClient.searchDataObjectAsync() in search() Walk full cause chain for IndexNotFoundException handling. Add unit tests for SearchMonitor and GetDestinations SDK paths. Signed-off-by: Manaswini Ragamouni <ragamanu@amazon.com> * chore: retrigger CI Signed-off-by: Manaswini Ragamouni <ragamanu@amazon.com> --------- Signed-off-by: Manaswini Ragamouni <ragamanu@amazon.com> Co-authored-by: Manaswini Ragamouni <ragamanu@amazon.com>
1 parent 1655ae2 commit c622dc4

8 files changed

Lines changed: 675 additions & 76 deletions

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@ import kotlinx.coroutines.launch
1111
import org.apache.logging.log4j.LogManager
1212
import org.opensearch.OpenSearchStatusException
1313
import org.opensearch.action.ActionRequest
14-
import org.opensearch.action.get.GetRequest
15-
import org.opensearch.action.get.GetResponse
1614
import org.opensearch.action.support.ActionFilters
1715
import org.opensearch.action.support.HandledTransportAction
1816
import org.opensearch.action.support.WriteRequest.RefreshPolicy
19-
import org.opensearch.alerting.opensearchapi.suspendUntil
17+
import org.opensearch.alerting.AlertingPlugin
2018
import org.opensearch.alerting.service.DeleteMonitorService
2119
import org.opensearch.alerting.settings.AlertingSettings
2220
import org.opensearch.cluster.service.ClusterService
@@ -36,6 +34,7 @@ import org.opensearch.commons.utils.recreateObject
3634
import org.opensearch.core.action.ActionListener
3735
import org.opensearch.core.rest.RestStatus
3836
import org.opensearch.core.xcontent.NamedXContentRegistry
37+
import org.opensearch.remote.metadata.client.GetDataObjectRequest
3938
import org.opensearch.remote.metadata.client.SdkClient
4039
import org.opensearch.tasks.Task
4140
import org.opensearch.transport.TransportService
@@ -102,6 +101,7 @@ class TransportDeleteMonitorAction @Inject constructor(
102101
IllegalStateException()
103102
)
104103
)
104+
return
105105
} else if (canDelete) {
106106
actionListener.onResponse(
107107
DeleteMonitorService.deleteMonitor(monitor, refreshPolicy)
@@ -111,28 +111,39 @@ class TransportDeleteMonitorAction @Inject constructor(
111111
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
112112
)
113113
}
114+
} catch (t: OpenSearchStatusException) {
115+
log.error("Failed to delete monitor $monitorId", t)
116+
actionListener.onFailure(t)
114117
} catch (t: Exception) {
115118
log.error("Failed to delete monitor $monitorId", t)
116119
actionListener.onFailure(AlertingException.wrap(t))
117120
}
118121
}
119122

120123
private suspend fun getMonitor(): Monitor {
121-
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
124+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
125+
val getRequest = GetDataObjectRequest.builder()
126+
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
127+
.id(monitorId)
128+
.tenantId(tenantId)
129+
.build()
122130

123-
val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
124-
if (getResponse.isExists == false) {
125-
actionListener.onFailure(
126-
AlertingException.wrap(
127-
OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
128-
)
131+
try {
132+
val response = sdkClient.getDataObject(getRequest)
133+
val getResponse = response.getResponse()
134+
if (getResponse == null || !getResponse.isExists) {
135+
throw OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
136+
}
137+
val xcp = XContentHelper.createParser(
138+
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
139+
getResponse.sourceAsBytesRef, XContentType.JSON
129140
)
141+
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
142+
} catch (e: Exception) {
143+
if (e is OpenSearchStatusException && e.status() == RestStatus.NOT_FOUND) throw e
144+
log.error("GetMonitor operation failed for $monitorId", e)
145+
throw OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
130146
}
131-
val xcp = XContentHelper.createParser(
132-
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
133-
getResponse.sourceAsBytesRef, XContentType.JSON
134-
)
135-
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
136147
}
137148
}
138149
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@
66
package org.opensearch.alerting.transport
77

88
import org.apache.logging.log4j.LogManager
9-
import org.opensearch.action.search.SearchRequest
10-
import org.opensearch.action.search.SearchResponse
119
import org.opensearch.action.support.ActionFilters
1210
import org.opensearch.action.support.HandledTransportAction
11+
import org.opensearch.alerting.AlertingPlugin
1312
import org.opensearch.alerting.action.GetDestinationsAction
1413
import org.opensearch.alerting.action.GetDestinationsRequest
1514
import org.opensearch.alerting.action.GetDestinationsResponse
@@ -34,6 +33,8 @@ import org.opensearch.core.xcontent.XContentParserUtils
3433
import org.opensearch.index.query.Operator
3534
import org.opensearch.index.query.QueryBuilders
3635
import org.opensearch.remote.metadata.client.SdkClient
36+
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
37+
import org.opensearch.remote.metadata.common.SdkClientUtils
3738
import org.opensearch.search.builder.SearchSourceBuilder
3839
import org.opensearch.search.fetch.subphase.FetchSourceContext
3940
import org.opensearch.search.sort.SortBuilders
@@ -42,7 +43,6 @@ import org.opensearch.tasks.Task
4243
import org.opensearch.transport.TransportService
4344
import org.opensearch.transport.client.Client
4445
import java.io.IOException
45-
4646
private val log = LogManager.getLogger(TransportGetDestinationsAction::class.java)
4747

4848
class TransportGetDestinationsAction @Inject constructor(
@@ -136,34 +136,42 @@ class TransportGetDestinationsAction @Inject constructor(
136136
}
137137

138138
fun search(searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener<GetDestinationsResponse>) {
139-
val searchRequest = SearchRequest()
140-
.source(searchSourceBuilder)
139+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
140+
val sdkSearchRequest = SearchDataObjectRequest.builder()
141141
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
142-
client.search(
143-
searchRequest,
144-
object : ActionListener<SearchResponse> {
145-
override fun onResponse(response: SearchResponse) {
146-
val totalDestinationCount = response.hits.totalHits?.value?.toInt()
147-
val destinations = mutableListOf<Destination>()
148-
for (hit in response.hits) {
149-
val id = hit.id
150-
val version = hit.version
151-
val seqNo = hit.seqNo.toInt()
152-
val primaryTerm = hit.primaryTerm.toInt()
153-
val xcp = XContentType.JSON.xContent()
154-
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString)
155-
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
156-
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp)
157-
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
158-
destinations.add(Destination.parse(xcp, id, version, seqNo, primaryTerm))
159-
}
160-
actionListener.onResponse(GetDestinationsResponse(RestStatus.OK, totalDestinationCount, destinations))
142+
.tenantId(tenantId)
143+
.searchSourceBuilder(searchSourceBuilder)
144+
.build()
145+
146+
sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable ->
147+
if (throwable != null) {
148+
actionListener.onFailure(AlertingException.wrap(SdkClientUtils.unwrapAndConvertToException(throwable)))
149+
return@whenComplete
150+
}
151+
try {
152+
val searchResponse = response.searchResponse()
153+
if (searchResponse == null) {
154+
actionListener.onResponse(GetDestinationsResponse(RestStatus.OK, 0, emptyList()))
155+
return@whenComplete
161156
}
162-
163-
override fun onFailure(t: Exception) {
164-
actionListener.onFailure(AlertingException.wrap(t))
157+
val totalDestinationCount = searchResponse.hits.totalHits?.value?.toInt()
158+
val destinations = mutableListOf<Destination>()
159+
for (hit in searchResponse.hits) {
160+
val id = hit.id
161+
val version = hit.version
162+
val seqNo = hit.seqNo.toInt()
163+
val primaryTerm = hit.primaryTerm.toInt()
164+
val xcp = XContentType.JSON.xContent()
165+
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString)
166+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
167+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp)
168+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
169+
destinations.add(Destination.parse(xcp, id, version, seqNo, primaryTerm))
165170
}
171+
actionListener.onResponse(GetDestinationsResponse(RestStatus.OK, totalDestinationCount, destinations))
172+
} catch (e: Exception) {
173+
actionListener.onFailure(AlertingException.wrap(e))
166174
}
167-
)
175+
}
168176
}
169177
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchAlertingCommentAction.kt

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import org.opensearch.action.search.SearchRequest
1515
import org.opensearch.action.search.SearchResponse
1616
import org.opensearch.action.support.ActionFilters
1717
import org.opensearch.action.support.HandledTransportAction
18+
import org.opensearch.alerting.AlertingPlugin
1819
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
1920
import org.opensearch.alerting.opensearchapi.suspendUntil
2021
import org.opensearch.alerting.settings.AlertingSettings
@@ -41,12 +42,13 @@ import org.opensearch.core.xcontent.XContentParserUtils
4142
import org.opensearch.index.query.BoolQueryBuilder
4243
import org.opensearch.index.query.QueryBuilders
4344
import org.opensearch.remote.metadata.client.SdkClient
45+
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
46+
import org.opensearch.remote.metadata.common.SdkClientUtils
4447
import org.opensearch.search.builder.SearchSourceBuilder
4548
import org.opensearch.tasks.Task
4649
import org.opensearch.transport.TransportService
4750
import org.opensearch.transport.client.Client
4851
import java.io.IOException
49-
5052
private val log = LogManager.getLogger(TransportSearchAlertingCommentAction::class.java)
5153
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
5254

@@ -137,18 +139,29 @@ class TransportSearchAlertingCommentAction @Inject constructor(
137139
}
138140

139141
fun search(searchRequest: SearchRequest, actionListener: ActionListener<SearchResponse>) {
140-
client.search(
141-
searchRequest,
142-
object : ActionListener<SearchResponse> {
143-
override fun onResponse(response: SearchResponse) {
144-
actionListener.onResponse(response)
145-
}
146-
147-
override fun onFailure(t: Exception) {
148-
actionListener.onFailure(AlertingException.wrap(t))
149-
}
142+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
143+
val sdkSearchRequest = SearchDataObjectRequest.builder()
144+
.indices(*searchRequest.indices())
145+
.tenantId(tenantId)
146+
.searchSourceBuilder(searchRequest.source())
147+
.build()
148+
149+
sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable ->
150+
if (throwable != null) {
151+
actionListener.onFailure(AlertingException.wrap(SdkClientUtils.unwrapAndConvertToException(throwable)))
152+
return@whenComplete
150153
}
151-
)
154+
val searchResponse = response.searchResponse()
155+
if (searchResponse != null) {
156+
actionListener.onResponse(searchResponse)
157+
} else {
158+
actionListener.onFailure(
159+
AlertingException.wrap(
160+
OpenSearchStatusException("Failed to search comments", RestStatus.INTERNAL_SERVER_ERROR)
161+
)
162+
)
163+
}
164+
}
152165
}
153166

154167
// retrieve the IDs of all Alerts after filtering by current User's

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import org.opensearch.action.search.SearchResponse.Clusters
1515
import org.opensearch.action.search.ShardSearchFailure
1616
import org.opensearch.action.support.ActionFilters
1717
import org.opensearch.action.support.HandledTransportAction
18+
import org.opensearch.alerting.AlertingPlugin
1819
import org.opensearch.alerting.opensearchapi.addFilter
1920
import org.opensearch.alerting.settings.AlertingSettings
2021
import org.opensearch.alerting.util.use
@@ -37,17 +38,17 @@ import org.opensearch.index.query.ExistsQueryBuilder
3738
import org.opensearch.index.query.MatchQueryBuilder
3839
import org.opensearch.index.query.QueryBuilders
3940
import org.opensearch.remote.metadata.client.SdkClient
41+
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
42+
import org.opensearch.remote.metadata.common.SdkClientUtils
4043
import org.opensearch.search.SearchHits
4144
import org.opensearch.search.aggregations.InternalAggregations
4245
import org.opensearch.search.internal.InternalSearchResponse
4346
import org.opensearch.search.profile.SearchProfileShardResults
4447
import org.opensearch.search.suggest.Suggest
4548
import org.opensearch.tasks.Task
46-
import org.opensearch.transport.RemoteTransportException
4749
import org.opensearch.transport.TransportService
4850
import org.opensearch.transport.client.Client
4951
import java.util.Collections
50-
5152
private val log = LogManager.getLogger(TransportSearchMonitorAction::class.java)
5253

5354
class TransportSearchMonitorAction @Inject constructor(
@@ -139,34 +140,40 @@ class TransportSearchMonitorAction @Inject constructor(
139140

140141
// Checks if the exception is caused by an IndexNotFoundException (directly or nested).
141142
private fun isIndexNotFoundException(e: Exception): Boolean {
142-
if (e is IndexNotFoundException) return true
143-
if (e is RemoteTransportException) {
144-
val cause = e.cause
143+
var cause: Throwable? = e
144+
while (cause != null) {
145145
if (cause is IndexNotFoundException) return true
146+
cause = cause.cause
146147
}
147148
return false
148149
}
149150

150151
fun search(searchRequest: SearchRequest, actionListener: ActionListener<SearchResponse>) {
151-
client.search(
152-
searchRequest,
153-
object : ActionListener<SearchResponse> {
154-
override fun onResponse(response: SearchResponse) {
155-
actionListener.onResponse(response)
156-
}
152+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
153+
val sdkSearchRequest = SearchDataObjectRequest.builder()
154+
.indices(*searchRequest.indices())
155+
.tenantId(tenantId)
156+
.searchSourceBuilder(searchRequest.source())
157+
.build()
157158

158-
override fun onFailure(ex: Exception) {
159-
if (isIndexNotFoundException(ex)) {
160-
log.error("Index not found while searching monitor", ex)
161-
val emptyResponse = getEmptySearchResponse()
162-
actionListener.onResponse(emptyResponse)
163-
} else {
164-
log.error("Unexpected error while searching monitor", ex)
165-
actionListener.onFailure(AlertingException.wrap(ex))
166-
}
159+
sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable ->
160+
if (throwable != null) {
161+
val cause = SdkClientUtils.unwrapAndConvertToException(throwable)
162+
if (isIndexNotFoundException(cause)) {
163+
actionListener.onResponse(getEmptySearchResponse())
164+
} else {
165+
log.error("Unexpected error while searching monitor", cause)
166+
actionListener.onFailure(AlertingException.wrap(cause))
167167
}
168+
return@whenComplete
168169
}
169-
)
170+
val searchResponse = response.searchResponse()
171+
if (searchResponse != null) {
172+
actionListener.onResponse(searchResponse)
173+
} else {
174+
actionListener.onResponse(getEmptySearchResponse())
175+
}
176+
}
170177
}
171178

172179
private fun addOwnerFieldIfNotExists(searchRequest: SearchRequest) {

0 commit comments

Comments
 (0)