Skip to content

Commit 474acbb

Browse files
committed
update monitor now ignores seqNo and primaryTerm, and eliminating monitor stats v2 api in favor of version parameter in existing monitor stats api
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
1 parent b7586a5 commit 474acbb

File tree

10 files changed

+59
-118
lines changed

10 files changed

+59
-118
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
3333
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
3434
import org.opensearch.alerting.core.lock.LockService
3535
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
36-
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsV2Handler
3736
import org.opensearch.alerting.core.schedule.JobScheduler
3837
import org.opensearch.alerting.core.settings.AlertingV2Settings
3938
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
@@ -253,8 +252,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
253252
RestDeleteMonitorV2Action(),
254253
RestGetMonitorV2Action(),
255254
RestSearchMonitorV2Action(settings, clusterService),
256-
RestGetAlertsV2Action(),
257-
RestScheduledJobStatsV2Handler()
255+
RestGetAlertsV2Action()
258256
)
259257
}
260258

alerting/src/main/kotlin/org/opensearch/alerting/transportv2/TransportIndexMonitorV2Action.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -745,8 +745,6 @@ class TransportIndexMonitorV2Action @Inject constructor(
745745
.source(newMonitorV2.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
746746
.id(indexMonitorRequest.monitorId)
747747
.routing(indexMonitorRequest.monitorId)
748-
.setIfSeqNo(indexMonitorRequest.seqNo)
749-
.setIfPrimaryTerm(indexMonitorRequest.primaryTerm)
750748
.timeout(indexTimeout)
751749

752750
log.info(

alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,14 +1618,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
16181618
return map[key]
16191619
}
16201620

1621-
fun getAlertingStats(metrics: String = ""): Map<String, Any> {
1622-
val monitorStatsResponse = client().makeRequest("GET", "/_plugins/_alerting/stats$metrics")
1623-
val responseMap = createParser(XContentType.JSON.xContent(), monitorStatsResponse.entity.content).map()
1624-
return responseMap
1625-
}
1626-
1627-
fun getAlertingV2Stats(metrics: String = ""): Map<String, Any> {
1628-
val monitorStatsResponse = client().makeRequest("GET", "/_plugins/_alerting/v2/stats$metrics")
1621+
fun getAlertingStats(metrics: String = "", alertingVersion: String? = null): Map<String, Any> {
1622+
val endpoint = "/_plugins/_alerting/stats$metrics${alertingVersion?.let { "?version=$it" }.orEmpty()}"
1623+
val monitorStatsResponse = client().makeRequest("GET", endpoint)
16291624
val responseMap = createParser(XContentType.JSON.xContent(), monitorStatsResponse.entity.content).map()
16301625
return responseMap
16311626
}

alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorV2RestApiIT.kt

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,30 @@ class MonitorV2RestApiIT : AlertingRestTestCase() {
198198
assertEquals(monitorV2, scheduledJob)
199199
}
200200

201+
fun `test monitor stats v1 and v2 only return stats for their respective monitors`() {
202+
enableScheduledJob()
203+
204+
val monitorV1Id = createMonitor(randomQueryLevelMonitor(enabled = true)).id
205+
val monitorV2Id = createRandomPPLMonitor(randomPPLMonitor(enabled = true)).id
206+
207+
val statsAllResponse = getAlertingStats(alertingVersion = null)
208+
val statsV1Response = getAlertingStats(alertingVersion = "v1")
209+
val statsV2Response = getAlertingStats(alertingVersion = "v2")
210+
211+
logger.info("all stats: $statsAllResponse")
212+
logger.info("v1 stats: $statsV1Response")
213+
logger.info("v2 stats: $statsV2Response")
214+
215+
assertTrue("All stats does not contain V1 Monitor", isMonitorScheduled(monitorV1Id, statsAllResponse))
216+
assertTrue("All stats does not contain V2 Monitor", isMonitorScheduled(monitorV2Id, statsAllResponse))
217+
218+
assertTrue("V1 stats does not contain V1 Monitor", isMonitorScheduled(monitorV1Id, statsV1Response))
219+
assertFalse("V1 stats contains V2 Monitor", isMonitorScheduled(monitorV2Id, statsV1Response))
220+
221+
assertTrue("V2 stats does not contain V2 Monitor", isMonitorScheduled(monitorV2Id, statsV2Response))
222+
assertFalse("V2 stats contains V1 Monitor", isMonitorScheduled(monitorV1Id, statsV2Response))
223+
}
224+
201225
/* Validation Tests */
202226
fun `test create ppl monitor that queries nonexistent index fails`() {
203227
val pplMonitorConfig = randomPPLMonitor(
@@ -497,22 +521,4 @@ class MonitorV2RestApiIT : AlertingRestTestCase() {
497521
assertEquals("Unexpected status", RestStatus.NOT_FOUND, e.response.restStatus())
498522
}
499523
}
500-
501-
fun `test monitor stats v1 and v2 only return stats for their respective monitors`() {
502-
enableScheduledJob()
503-
504-
val monitorV1Id = createMonitor(randomQueryLevelMonitor(enabled = true)).id
505-
val monitorV2Id = createRandomPPLMonitor(randomPPLMonitor(enabled = true)).id
506-
507-
val statsV1Response = getAlertingStats()
508-
val statsV2Response = getAlertingV2Stats()
509-
510-
logger.info("v1 stats: $statsV1Response")
511-
logger.info("v2 stats: $statsV2Response")
512-
513-
assertTrue("V1 stats does not contain V1 Monitor", isMonitorScheduled(monitorV1Id, statsV1Response))
514-
assertTrue("V2 stats does not contain V2 Monitor", isMonitorScheduled(monitorV2Id, statsV2Response))
515-
assertFalse("V2 stats contains V1 Monitor", isMonitorScheduled(monitorV1Id, statsV2Response))
516-
assertFalse("V1 stats contains V2 Monitor", isMonitorScheduled(monitorV2Id, statsV1Response))
517-
}
518524
}

core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobsStatsRequest.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@ import java.io.IOException
1717
class ScheduledJobsStatsRequest : BaseNodesRequest<ScheduledJobsStatsRequest> {
1818
var jobSchedulingMetrics: Boolean = true
1919
var jobsInfo: Boolean = true
20-
var showAlertingV2ScheduledJobs: Boolean = false // show Alerting V2 scheduled jobs if true, Alerting V1 scheduled jobs if false
20+
// show Alerting V2 scheduled jobs if true, Alerting V1 scheduled jobs if false, all scheduled jobs if null
21+
var showAlertingV2ScheduledJobs: Boolean? = null
2122

2223
constructor(si: StreamInput) : super(si) {
2324
jobSchedulingMetrics = si.readBoolean()
2425
jobsInfo = si.readBoolean()
25-
showAlertingV2ScheduledJobs = si.readBoolean()
26+
showAlertingV2ScheduledJobs = si.readOptionalBoolean()
2627
}
2728

28-
constructor(nodeIds: Array<String>, showAlertingV2ScheduledJobs: Boolean) : super(*nodeIds) {
29+
constructor(nodeIds: Array<String>, showAlertingV2ScheduledJobs: Boolean?) : super(*nodeIds) {
2930
this.showAlertingV2ScheduledJobs = showAlertingV2ScheduledJobs
3031
}
3132

@@ -34,7 +35,7 @@ class ScheduledJobsStatsRequest : BaseNodesRequest<ScheduledJobsStatsRequest> {
3435
super.writeTo(out)
3536
out.writeBoolean(jobSchedulingMetrics)
3637
out.writeBoolean(jobsInfo)
37-
out.writeBoolean(showAlertingV2ScheduledJobs)
38+
out.writeOptionalBoolean(showAlertingV2ScheduledJobs)
3839
}
3940

4041
fun all(): ScheduledJobsStatsRequest {

core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobsStatsTransportAction.kt

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import org.opensearch.alerting.core.JobSweeperMetrics
1414
import org.opensearch.alerting.core.ScheduledJobIndices
1515
import org.opensearch.alerting.core.schedule.JobScheduler
1616
import org.opensearch.alerting.core.schedule.JobSchedulerMetrics
17-
import org.opensearch.alerting.core.settings.AlertingV2Settings.Companion.ALERTING_V2_ENABLED
1817
import org.opensearch.cluster.health.ClusterIndexHealth
1918
import org.opensearch.cluster.service.ClusterService
2019
import org.opensearch.common.inject.Inject
@@ -95,19 +94,7 @@ class ScheduledJobsStatsTransportAction : TransportNodesAction<ScheduledJobsStat
9594
): ScheduledJobStats {
9695
val jobSweeperMetrics = jobSweeper.getJobSweeperMetrics()
9796

98-
val alertingV2Enabled = clusterService.clusterSettings.get(ALERTING_V2_ENABLED)
99-
if (scheduledJobsStatusRequest.showAlertingV2ScheduledJobs && !alertingV2Enabled) {
100-
throw IllegalArgumentException(
101-
"Alerting V2 is currently disabled, please enable it with the " +
102-
"cluster setting: ${ALERTING_V2_ENABLED.key}."
103-
)
104-
}
105-
106-
val jobSchedulerMetrics = if (scheduledJobsStatusRequest.showAlertingV2ScheduledJobs) { // show V2 scheduled jobs
107-
jobScheduler.getJobSchedulerV2Metric()
108-
} else { // show V1 scheduled jobs
109-
jobScheduler.getJobSchedulerMetric()
110-
}
97+
val jobSchedulerMetrics = jobScheduler.getJobSchedulerMetric(scheduledJobsStatusRequest.showAlertingV2ScheduledJobs)
11198

11299
val status: ScheduledJobStats.ScheduleStatus = evaluateStatus(jobSchedulerMetrics, jobSweeperMetrics)
113100
return ScheduledJobStats(

core/src/main/kotlin/org/opensearch/alerting/core/resthandler/RestScheduledJobStatsHandler.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,14 @@ class RestScheduledJobStatsHandler(private val path: String) : BaseRestHandler()
5858
}
5959

6060
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
61-
val scheduledJobNodesStatsRequest = StatsRequestUtils.getStatsRequest(request, false, this::unrecognized)
61+
val alertingVersion = request.param("version")
62+
if (alertingVersion != null && alertingVersion !in listOf("v1", "v2")) {
63+
throw IllegalArgumentException("Version parameter must be one of v1 or v2")
64+
}
65+
66+
val showV2ScheduledJobs: Boolean? = alertingVersion?.let { it == "v2" }
67+
68+
val scheduledJobNodesStatsRequest = StatsRequestUtils.getStatsRequest(request, showV2ScheduledJobs, this::unrecognized)
6269
return RestChannelConsumer { channel ->
6370
client.execute(
6471
ScheduledJobsStatsAction.INSTANCE,

core/src/main/kotlin/org/opensearch/alerting/core/resthandler/RestScheduledJobStatsV2Handler.kt

Lines changed: 0 additions & 51 deletions
This file was deleted.

core/src/main/kotlin/org/opensearch/alerting/core/resthandler/StatsRequestUtils.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@ internal object StatsRequestUtils {
1717

1818
fun getStatsRequest(
1919
request: RestRequest,
20-
showAlertingV2ScheduledJobs: Boolean,
20+
showAlertingV2ScheduledJobs: Boolean?,
2121
unrecognizedFn: (RestRequest, Set<String>, Set<String>, String) -> String
2222
): ScheduledJobsStatsRequest {
2323
val nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"))
2424
val metrics = Strings.tokenizeByCommaToSet(request.param("metric"))
25-
val scheduledJobsStatsRequest = ScheduledJobsStatsRequest(nodeIds = nodesIds, showAlertingV2ScheduledJobs)
25+
val scheduledJobsStatsRequest = ScheduledJobsStatsRequest(
26+
nodeIds = nodesIds,
27+
showAlertingV2ScheduledJobs = showAlertingV2ScheduledJobs
28+
)
2629
scheduledJobsStatsRequest.timeout(request.param("timeout"))
2730

2831
if (metrics.isEmpty()) {

core/src/main/kotlin/org/opensearch/alerting/core/schedule/JobScheduler.kt

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -197,22 +197,19 @@ class JobScheduler(private val threadPool: ThreadPool, private val jobRunner: Jo
197197
return true
198198
}
199199

200-
fun getJobSchedulerMetric(): List<JobSchedulerMetrics> {
201-
return scheduledJobIdToInfo.entries.filter { it.value.scheduledJob.type != monitorV2Type }
202-
.stream()
203-
.map { entry ->
204-
JobSchedulerMetrics(
205-
entry.value.scheduledJobId,
206-
entry.value.actualPreviousExecutionTime?.toEpochMilli(),
207-
entry.value.scheduledJob.schedule.runningOnTime(entry.value.actualPreviousExecutionTime)
208-
)
209-
}
210-
.collect(Collectors.toList())
211-
}
200+
fun getJobSchedulerMetric(showAlertingV2ScheduledJobs: Boolean?): List<JobSchedulerMetrics> {
201+
val scheduledJobEntries = scheduledJobIdToInfo.entries
202+
203+
val filteredScheduledJobEntries = if (showAlertingV2ScheduledJobs == null) {
204+
// if no alerting version was specified, do not filter
205+
scheduledJobEntries
206+
} else if (showAlertingV2ScheduledJobs) {
207+
scheduledJobEntries.filter { it.value.scheduledJob.type == monitorV2Type }
208+
} else {
209+
scheduledJobEntries.filter { it.value.scheduledJob.type != monitorV2Type }
210+
}
212211

213-
fun getJobSchedulerV2Metric(): List<JobSchedulerMetrics> {
214-
return scheduledJobIdToInfo.entries.filter { it.value.scheduledJob.type == monitorV2Type }
215-
.stream()
212+
return filteredScheduledJobEntries.stream()
216213
.map { entry ->
217214
JobSchedulerMetrics(
218215
entry.value.scheduledJobId,

0 commit comments

Comments
 (0)