Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionRequest
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.service.DeleteMonitorService
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.cluster.service.ClusterService
Expand All @@ -36,6 +34,7 @@ import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.remote.metadata.client.GetDataObjectRequest
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -102,6 +101,7 @@ class TransportDeleteMonitorAction @Inject constructor(
IllegalStateException()
)
)
return
} else if (canDelete) {
actionListener.onResponse(
DeleteMonitorService.deleteMonitor(monitor, refreshPolicy)
Expand All @@ -111,28 +111,39 @@ class TransportDeleteMonitorAction @Inject constructor(
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
)
}
} catch (t: OpenSearchStatusException) {
log.error("Failed to delete monitor $monitorId", t)
actionListener.onFailure(t)
} catch (t: Exception) {
log.error("Failed to delete monitor $monitorId", t)
actionListener.onFailure(AlertingException.wrap(t))
}
}

private suspend fun getMonitor(): Monitor {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior if this header is not present?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the header is absent, tenantId is null. In local mode, multi-tenancy is disabled in the SDK, so null tenantId is fine — no tenant filtering is applied. In remote mode, the upstream service always sets the
x-tenant-id header, so it's never absent. If it were absent with multi-tenancy enabled, the SDK would reject the request with a 400 error

val getRequest = GetDataObjectRequest.builder()
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
.id(monitorId)
.tenantId(tenantId)
.build()

val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
if (getResponse.isExists == false) {
actionListener.onFailure(
AlertingException.wrap(
Comment on lines -125 to -126
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we no longer need to notify the action listener or wrap the exception type as alerting with the remote SDK?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still do — getMonitor() throws on failure, which is caught by resolveUserAndStart()'s catch block (L116) where actionListener.onFailure(AlertingException.wrap(t)) is called. The previous code called
onFailure directly in getMonitor() but didn't return after, causing an NPE on the next line. Moving to throw/catch fixes that while keeping the same listener notification and AlertingException wrapping.

OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
)
try {
val response = sdkClient.getDataObject(getRequest)
val getResponse = response.getResponse()
if (getResponse == null || !getResponse.isExists) {
throw OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
} catch (e: Exception) {
if (e is OpenSearchStatusException && e.status() == RestStatus.NOT_FOUND) throw e
log.error("GetMonitor operation failed for $monitorId", e)
throw OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
package org.opensearch.alerting.transport

import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetDestinationsRequest
import org.opensearch.alerting.action.GetDestinationsResponse
Expand All @@ -34,6 +33,8 @@ import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.remote.metadata.common.SdkClientUtils
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.search.sort.SortBuilders
Expand All @@ -42,7 +43,6 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.Client
import java.io.IOException

private val log = LogManager.getLogger(TransportGetDestinationsAction::class.java)

class TransportGetDestinationsAction @Inject constructor(
Expand Down Expand Up @@ -136,34 +136,42 @@ class TransportGetDestinationsAction @Inject constructor(
}

fun search(searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener<GetDestinationsResponse>) {
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val sdkSearchRequest = SearchDataObjectRequest.builder()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
client.search(
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
val totalDestinationCount = response.hits.totalHits?.value?.toInt()
val destinations = mutableListOf<Destination>()
for (hit in response.hits) {
val id = hit.id
val version = hit.version
val seqNo = hit.seqNo.toInt()
val primaryTerm = hit.primaryTerm.toInt()
val xcp = XContentType.JSON.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
destinations.add(Destination.parse(xcp, id, version, seqNo, primaryTerm))
}
actionListener.onResponse(GetDestinationsResponse(RestStatus.OK, totalDestinationCount, destinations))
.tenantId(tenantId)
.searchSourceBuilder(searchSourceBuilder)
.build()

sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable ->
if (throwable != null) {
actionListener.onFailure(AlertingException.wrap(SdkClientUtils.unwrapAndConvertToException(throwable)))
return@whenComplete
}
try {
val searchResponse = response.searchResponse()
if (searchResponse == null) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would the search response be null? Wondering if this should be an error case rather than defaulting to an empty list

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SDK's SearchDataObjectResponse.searchResponse() returns nullable. It can be null if the SDK fails to deserialize the response from the storage backend. Returning empty is defensive. Happy to change to an error if you think that's better for debugging.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it - that sounds like an error case to me but we can reevaluate later

Given it's just a list of destinations it should be fine to have an empty list for now

actionListener.onResponse(GetDestinationsResponse(RestStatus.OK, 0, emptyList()))
return@whenComplete
}

override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
val totalDestinationCount = searchResponse.hits.totalHits?.value?.toInt()
val destinations = mutableListOf<Destination>()
for (hit in searchResponse.hits) {
val id = hit.id
val version = hit.version
val seqNo = hit.seqNo.toInt()
val primaryTerm = hit.primaryTerm.toInt()
val xcp = XContentType.JSON.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
destinations.add(Destination.parse(xcp, id, version, seqNo, primaryTerm))
}
actionListener.onResponse(GetDestinationsResponse(RestStatus.OK, totalDestinationCount, destinations))
} catch (e: Exception) {
actionListener.onFailure(AlertingException.wrap(e))
}
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
Expand All @@ -41,12 +42,13 @@ import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.remote.metadata.common.SdkClientUtils
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.Client
import java.io.IOException

private val log = LogManager.getLogger(TransportSearchAlertingCommentAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

Expand Down Expand Up @@ -137,18 +139,29 @@ class TransportSearchAlertingCommentAction @Inject constructor(
}

fun search(searchRequest: SearchRequest, actionListener: ActionListener<SearchResponse>) {
client.search(
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
actionListener.onResponse(response)
}

override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val sdkSearchRequest = SearchDataObjectRequest.builder()
.indices(*searchRequest.indices())
.tenantId(tenantId)
.searchSourceBuilder(searchRequest.source())
.build()

sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable ->
if (throwable != null) {
actionListener.onFailure(AlertingException.wrap(SdkClientUtils.unwrapAndConvertToException(throwable)))
return@whenComplete
}
)
val searchResponse = response.searchResponse()
if (searchResponse != null) {
actionListener.onResponse(searchResponse)
} else {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Failed to search comments", RestStatus.INTERNAL_SERVER_ERROR)
)
)
}
}
}

// retrieve the IDs of all Alerts after filtering by current User's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.action.search.SearchResponse.Clusters
import org.opensearch.action.search.ShardSearchFailure
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.opensearchapi.addFilter
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.use
Expand All @@ -37,17 +38,17 @@ import org.opensearch.index.query.ExistsQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.remote.metadata.client.SearchDataObjectRequest
import org.opensearch.remote.metadata.common.SdkClientUtils
import org.opensearch.search.SearchHits
import org.opensearch.search.aggregations.InternalAggregations
import org.opensearch.search.internal.InternalSearchResponse
import org.opensearch.search.profile.SearchProfileShardResults
import org.opensearch.search.suggest.Suggest
import org.opensearch.tasks.Task
import org.opensearch.transport.RemoteTransportException
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.Client
import java.util.Collections

private val log = LogManager.getLogger(TransportSearchMonitorAction::class.java)

class TransportSearchMonitorAction @Inject constructor(
Expand Down Expand Up @@ -139,34 +140,40 @@ class TransportSearchMonitorAction @Inject constructor(

// Checks if the exception is caused by an IndexNotFoundException (directly or nested).
private fun isIndexNotFoundException(e: Exception): Boolean {
if (e is IndexNotFoundException) return true
if (e is RemoteTransportException) {
val cause = e.cause
var cause: Throwable? = e
while (cause != null) {
if (cause is IndexNotFoundException) return true
cause = cause.cause
}
return false
}

fun search(searchRequest: SearchRequest, actionListener: ActionListener<SearchResponse>) {
client.search(
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
actionListener.onResponse(response)
}
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
val sdkSearchRequest = SearchDataObjectRequest.builder()
.indices(*searchRequest.indices())
.tenantId(tenantId)
.searchSourceBuilder(searchRequest.source())
.build()

override fun onFailure(ex: Exception) {
if (isIndexNotFoundException(ex)) {
log.error("Index not found while searching monitor", ex)
val emptyResponse = getEmptySearchResponse()
actionListener.onResponse(emptyResponse)
} else {
log.error("Unexpected error while searching monitor", ex)
actionListener.onFailure(AlertingException.wrap(ex))
}
sdkClient.searchDataObjectAsync(sdkSearchRequest).whenComplete { response, throwable ->
if (throwable != null) {
val cause = SdkClientUtils.unwrapAndConvertToException(throwable)
if (isIndexNotFoundException(cause)) {
actionListener.onResponse(getEmptySearchResponse())
} else {
log.error("Unexpected error while searching monitor", cause)
actionListener.onFailure(AlertingException.wrap(cause))
}
return@whenComplete
}
)
val searchResponse = response.searchResponse()
if (searchResponse != null) {
actionListener.onResponse(searchResponse)
} else {
actionListener.onResponse(getEmptySearchResponse())
}
}
}

private fun addOwnerFieldIfNotExists(searchRequest: SearchRequest) {
Expand Down
Loading
Loading