Skip to content

Commit e09421b

Browse files
committed
Added shard operations collector and optimised node stats collector
Signed-off-by: nishchay21 <nishcha@amazon.com>
1 parent 1495803 commit e09421b

File tree

2 files changed

+35
-33
lines changed

2 files changed

+35
-33
lines changed

src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class NodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMetri
5858
private static final int KEYS_PATH_LENGTH = 2;
5959
private static final Logger LOG =
6060
LogManager.getLogger(NodeStatsAllShardsMetricsCollector.class);
61-
private HashMap<ShardId, ShardStats> prevPerShardStats;
61+
private Map<ShardId, ShardStats> prevPerShardStats;
6262
private final PerformanceAnalyzerController controller;
6363

6464
public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController controller) {
@@ -143,10 +143,10 @@ public void collectMetrics(long startTime) {
143143

144144
Map<ShardId, ShardStats> currentPerShardStats = populatePerShardStats(indicesService);
145145

146-
for (HashMap.Entry currentShard : currentPerShardStats.entrySet()) {
147-
ShardId shardId = (ShardId) currentShard.getKey();
148-
ShardStats currentShardStats = (ShardStats) currentShard.getValue();
149-
if (prevPerShardStats.size() == 0) {
146+
for (HashMap.Entry<ShardId, ShardStats> currentShard : currentPerShardStats.entrySet()) {
147+
ShardId shardId = currentShard.getKey();
148+
ShardStats currentShardStats = currentShard.getValue();
149+
if (prevPerShardStats.isEmpty() || !prevPerShardStats.containsKey(shardId)) {
150150
// Populating value for the first run.
151151
populateMetricValue(
152152
currentShardStats, startTime, shardId.getIndexName(), shardId.id());
@@ -167,6 +167,7 @@ public void collectMetrics(long startTime) {
167167
populateDiffMetricValue(
168168
prevValue, currValue, startTime, shardId.getIndexName(), shardId.id());
169169
}
170+
prevPerShardStats = currentPerShardStats;
170171
}
171172

172173
// - Separated to have a unit test; and catch any code changes around this field
@@ -180,8 +181,8 @@ public Map<ShardId, ShardStats> populatePerShardStats(IndicesService indicesServ
180181
// Populate the shard stats per shard.
181182
HashMap<ShardId, IndexShard> currentShards = Utils.getShards();
182183
Map<ShardId, ShardStats> currentPerShardStats = new HashMap<>(Collections.emptyMap());
183-
for (HashMap.Entry currentShard : currentShards.entrySet()) {
184-
IndexShard currentIndexShard = (IndexShard) currentShard.getValue();
184+
for (HashMap.Entry<ShardId, IndexShard> currentShard : currentShards.entrySet()) {
185+
IndexShard currentIndexShard = currentShard.getValue();
185186
IndexShardStats currentIndexShardStats =
186187
Utils.indexShardStats(
187188
indicesService,
@@ -190,21 +191,24 @@ public Map<ShardId, ShardStats> populatePerShardStats(IndicesService indicesServ
190191
CommonStatsFlags.Flag.QueryCache,
191192
CommonStatsFlags.Flag.FieldData,
192193
CommonStatsFlags.Flag.RequestCache));
193-
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
194-
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
194+
if (currentIndexShardStats != null) {
195+
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
196+
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
197+
}
195198
}
196199
}
197200
return currentPerShardStats;
198201
}
199202

200203
public void populateMetricValue(
201204
ShardStats shardStats, long startTime, String IndexName, int ShardId) {
202-
StringBuilder value = new StringBuilder();
203-
value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds());
204-
// Populate the result with cache specific metrics only.
205-
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
206-
.append(new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize());
207-
saveMetricValues(value.toString(), startTime, IndexName, String.valueOf(ShardId));
205+
String value =
206+
PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()
207+
+
208+
// Populate the result with cache specific metrics only.
209+
PerformanceAnalyzerMetrics.sMetricNewLineDelimitor
210+
+ new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize();
211+
saveMetricValues(value, startTime, IndexName, String.valueOf(ShardId));
208212
}
209213

210214
public void populateDiffMetricValue(

src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public RTFNodeStatsAllShardsMetricsCollector(
105105
public void collectMetrics(long startTime) {
106106
if (performanceAnalyzerController.isCollectorDisabled(
107107
configOverridesWrapper, getCollectorName())) {
108-
LOG.info("RTFDisksCollector is disabled. Skipping collection.");
108+
LOG.info("RTFNodeStatsMetricsCollector is disabled. Skipping collection.");
109109
return;
110110
}
111111
IndicesService indicesService = OpenSearchResources.INSTANCE.getIndicesService();
@@ -126,29 +126,27 @@ configOverridesWrapper, getCollectorName())) {
126126
for (Map.Entry<ShardId, ShardStats> currentShard : currentPerShardStats.entrySet()) {
127127
ShardId shardId = currentShard.getKey();
128128
ShardStats currentShardStats = currentShard.getValue();
129-
if (prevPerShardStats.isEmpty()) {
130-
// Populating value for the first run.
129+
if (prevPerShardStats.isEmpty() || !prevPerShardStats.containsKey(shardId)) {
130+
// Populating value for the first run of shard.
131131
recordMetrics(
132132
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats),
133133
shardId);
134134
continue;
135135
}
136-
if (prevPerShardStats.containsKey(shardId)) {
137-
ShardStats prevShardStats = prevPerShardStats.get(shardId);
138-
if (prevShardStats == null) {
139-
// Populate value for shards which are new and were not present in the previous
140-
// run.
141-
recordMetrics(
142-
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats),
143-
shardId);
144-
continue;
145-
}
146-
NodeStatsMetricsAllShardsPerCollectionStatus prevValue =
147-
new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats);
148-
NodeStatsMetricsAllShardsPerCollectionStatus currValue =
149-
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats);
150-
populateDiffMetricValue(prevValue, currValue, shardId);
136+
ShardStats prevShardStats = prevPerShardStats.get(shardId);
137+
if (prevShardStats == null) {
138+
// Populate value for shards which are new and were not present in the previous
139+
// run.
140+
recordMetrics(
141+
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats),
142+
shardId);
143+
continue;
151144
}
145+
NodeStatsMetricsAllShardsPerCollectionStatus prevValue =
146+
new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats);
147+
NodeStatsMetricsAllShardsPerCollectionStatus currValue =
148+
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats);
149+
populateDiffMetricValue(prevValue, currValue, shardId);
152150
}
153151
prevPerShardStats = currentPerShardStats;
154152
}

0 commit comments

Comments
 (0)