Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
xContentRegistry,
settings
)
MonitorMetadataService.sdkClient = sdkClient
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aren't we adding it in the constructor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because MonitorMetadataService is a Kotlin object (singleton), not a class — it doesn't have a constructor. It's initialized via MonitorMetadataService.initialize(client, clusterService, xContentRegistry, settings) which sets fields individually. The
sdkClient follows the same pattern.

You could add it as a parameter to initialize() instead of setting it separately. That would be cleaner:

kotlin

fun initialize(
    client: Client,
    clusterService: ClusterService,
    xContentRegistry: NamedXContentRegistry,
    settings: Settings,
    sdkClient: SdkClient
) {
    ...
    this.sdkClient = sdkClient
}


i can take it as follow up on the next PR


WorkflowMetadataService.initialize(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,19 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsAction
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.await
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
Expand All @@ -44,9 +37,13 @@ import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.remote.metadata.client.GetDataObjectRequest
import org.opensearch.remote.metadata.client.PutDataObjectRequest
import org.opensearch.remote.metadata.client.SdkClient
import org.opensearch.transport.RemoteTransportException
import org.opensearch.transport.client.Client

Expand All @@ -59,6 +56,7 @@ object MonitorMetadataService :
private lateinit var xContentRegistry: NamedXContentRegistry
private lateinit var clusterService: ClusterService
private lateinit var settings: Settings
lateinit var sdkClient: SdkClient

@Volatile
private lateinit var indexTimeout: TimeValue
Expand All @@ -77,49 +75,43 @@ object MonitorMetadataService :
this.clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.INDEX_TIMEOUT) { indexTimeout = it }
}

private fun getTenantId(): String? =
client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)

@Suppress("ComplexMethod", "ReturnCount")
suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata {
try {
if (clusterService.state().routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX)) {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(
metadata.toXContent(
XContentFactory.jsonBuilder(),
ToXContent.MapParams(mapOf("with_type" to "true"))
)
)
val metadataObj = ToXContentObject { builder, _ ->
metadata.toXContent(builder, ToXContent.MapParams(mapOf("with_type" to "true")))
}
val putRequestBuilder = PutDataObjectRequest.builder()
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
.id(metadata.id)
.routing(metadata.monitorId)
.setIfSeqNo(metadata.seqNo)
.setIfPrimaryTerm(metadata.primaryTerm)
.timeout(indexTimeout)
.tenantId(getTenantId())
.dataObject(metadataObj)

if (updating) {
indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm)
putRequestBuilder.ifSeqNo(metadata.seqNo).ifPrimaryTerm(metadata.primaryTerm)
.overwriteIfExists(true)
} else {
indexRequest.opType(DocWriteRequest.OpType.CREATE)
putRequestBuilder.overwriteIfExists(false)
}
val response: IndexResponse = client.suspendUntil { index(indexRequest, it) }
when (response.result) {
DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> {
val failureReason =
"The upsert metadata call failed with a ${response.result?.lowercase} result"
log.error(failureReason)
throw AlertingException(
failureReason,
RestStatus.INTERNAL_SERVER_ERROR,
IllegalStateException(failureReason)
)
}

DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> {
log.debug("Successfully upserted MonitorMetadata:${metadata.id} ")
}
val putResponse = sdkClient.putDataObjectAsync(putRequestBuilder.build()).await()
if (putResponse.isFailed) {
val failureReason = "The upsert metadata call failed: ${putResponse.cause()?.message}"
log.error(failureReason)
throw AlertingException(
failureReason,
putResponse.status() ?: RestStatus.INTERNAL_SERVER_ERROR,
putResponse.cause() ?: IllegalStateException(failureReason)
)
}
log.debug("Successfully upserted MonitorMetadata:${metadata.id} ")
return metadata.copy(
seqNo = response.seqNo,
primaryTerm = response.primaryTerm
seqNo = putResponse.indexResponse()?.seqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = putResponse.indexResponse()?.primaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
)
} else {
val failureReason = "Job index ${ScheduledJob.SCHEDULED_JOBS_INDEX} does not exist to update monitor metadata"
Expand Down Expand Up @@ -180,10 +172,15 @@ object MonitorMetadataService :
suspend fun getMetadata(monitor: Monitor, workflowMetadataId: String? = null): MonitorMetadata? {
try {
val metadataId = MonitorMetadata.getId(monitor, workflowMetadataId)
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, metadataId).routing(monitor.id)
val getRequest = GetDataObjectRequest.builder()
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
.id(metadataId)
.tenantId(getTenantId())
.build()

val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
return if (getResponse.isExists) {
val response = sdkClient.getDataObjectAsync(getRequest).await()
val getResponse = response.getResponse()
return if (getResponse != null && getResponse.isExists) {
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
Expand All @@ -194,14 +191,23 @@ object MonitorMetadataService :
null
}
} catch (e: Exception) {
if (e.message?.contains("no such index") == true) {
if (isIndexNotFoundException(e)) {
return null
} else {
throw AlertingException.wrap(e)
}
}
}

private fun isIndexNotFoundException(e: Throwable): Boolean {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isthis a common method used every where. can we move it to commons and add an extra log line

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

var cause: Throwable? = e
while (cause != null) {
if (cause.message?.contains("no such index") == true) return true
cause = cause.cause
}
return false
}

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
Expand Down
Loading