diff --git a/README.md b/README.md
index 37a04c7..a52f2af 100644
--- a/README.md
+++ b/README.md
@@ -1,30 +1,42 @@
-# Elasticsearch statsd plugin
+# Elasticsearch StatsD Plugin
-This plugin creates a little push service, which regularly updates a statsd host with indices stats and nodes stats. In case you are running a cluster, these datas are always only pushed from the master node.
+This plugin creates a little push service, which regularly updates a StatsD host with indices stats and nodes stats.
+Index stats that apply across the entire cluster is only pushed from the elected master which node level stats are pushed from every node.
-The data sent to the statsd server tries to be roughly equivalent to [Indices Stats API](http://www.elasticsearch.org/guide/reference/api/admin-indices-stats.html) and [Nodes Stats Api](http://www.elasticsearch.org/guide/reference/api/admin-cluster-nodes-stats.html)
+The data sent to the StatsD server tries to be roughly equivalent to the [Indices Stats API](http://www.elasticsearch.org/guide/reference/api/admin-indices-stats.html) and [Nodes Stats Api](http://www.elasticsearch.org/guide/reference/api/admin-cluster-nodes-stats.html).
## Installation
-As plugins (except site plugins) cannot be automatically installed from github currently you need to build the plugin yourself (takes half a minute including an integrations test).
+To install a prepackaged plugin use the following command:
```
-git clone http://github.com/swoop-inc/elasticsearch-statsd-plugin.git
+bin/plugin -install statsd -url https://github.com/Automattic/elasticsearch-statsd-plugin/releases/download/v0.3.3/elasticsearch-statsd-0.3.3.zip
+```
+
+You can also build your own by doing the following:
+
+```
+git clone http://github.com/Automattic/elasticsearch-statsd-plugin.git
cd elasticsearch-statsd-plugin
mvn package
-/path/to/elasticsearch/bin/plugin -install statsd -url file:///absolute/path/to/current/dir/target/releases/elasticsearch-statsd-0.2-SNAPSHOT.zip
+bin/plugin -install statsd -url file:///absolute/path/to/current/dir/target/releases/elasticsearch-statsd-0.3.3.zip
```
## Configuration
-Configuration is possible via three parameters:
+Configuration is possible via these parameters:
* `metrics.statsd.host`: The statsd host to connect to (default: none)
* `metrics.statsd.port`: The port to connect to (default: 8125)
* `metrics.statsd.every`: The interval to push data (default: 1m)
* `metrics.statsd.prefix`: The metric prefix that's sent with metric names (default: elasticsearch.your_cluster_name)
+* `metrics.statsd.node_name`: Override the name for node used in the stat keys (default: the ES node name)
+* `metrics.statsd.report.node_indices`: If per node index sums should be reported (default: false)
+* `metrics.statsd.report.indices`: If index level sums should be reported (default: true)
+* `metrics.statsd.report.shards`: If shard level stats should be reported (default: false)
+* `metrics.statsd.report.fs_details`: If nodes should break down the FS by device instead of total disk (default: false)
Check your elasticsearch log file for a line like this after adding the configuration parameters below to the configuration file
@@ -33,19 +45,30 @@ Check your elasticsearch log file for a line like this after adding the configur
```
+## Stats Key Formats
+
+This plugin reports both node level and cluster level stats, the StatsD keys will be in the formats:
+
+* `{PREFIX}.node.{NODE_NAME}.{STAT_KEY}`: Node level stats (CPU / JVM / etc.)
+* `{PREFIX}.node.{NODE_NAME}.indices.{STAT_KEY}`: Index stats summed across the node (off by default)
+* `{PREFIX}.indices.{STAT_KEY}`: Index stats summed across the entire cluster
+* `{PREFIX}.index.{INDEX_NAME}.total.{STAT_KEY}`: Index stats summed per index across all shards
+* `{PREFIX}.index.{INDEX_NAME}.{SHARD_ID}.{STAT_KEY}` -- Index stats per shard (off by default)
+
+
## Bugs/TODO
-* No really nice cluster support yet (needed it for a single instance system)
* Not extensively tested
* In case of a master node failover, counts are starting from 0 again (in case you are wondering about spikes)
## Credits
-Heavily inspired by the excellent [metrics library](http://metrics.codehale.com) by Code Hale and its [GraphiteReporter add-on](http://metrics.codahale.com/manual/graphite/).
+This is a fork of the [Swoop plugin](https://github.com/swoop-inc/elasticsearch-statsd-plugin) for multi-node clusters on ES 1.x.
+
+Heavily inspired by the excellent [metrics library](http://metrics.codahale.com) by Coda Hale and its [GraphiteReporter add-on](http://metrics.codahale.com/manual/graphite/).
## License
See LICENSE
-
diff --git a/pom.xml b/pom.xml
index 951fceb..34ce60c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,21 +6,22 @@
4.0.0
de.spinscale.elasticsearch
elasticsearch-statsd
- 0.2-SNAPSHOT
+ 0.3.3
jar
- Statsd monitoring plugin for Elasticsearch
- https://github.com/swoop-inc/elasticsearch-statsd-plugin/
+ StatsD monitoring plugin for Elasticsearch
+ https://github.com/Automattic/elasticsearch-statsd-plugin/
- 0.90.3
+ 1.0.1
1.3
+ UTF-8
com.timgroup
java-statsd-client
- 2.0.0
+ 3.0.2
diff --git a/src/main/java/org/elasticsearch/plugin/statsd/StatsdPlugin.java b/src/main/java/org/elasticsearch/plugin/statsd/StatsdPlugin.java
index d8ef2eb..029e90d 100644
--- a/src/main/java/org/elasticsearch/plugin/statsd/StatsdPlugin.java
+++ b/src/main/java/org/elasticsearch/plugin/statsd/StatsdPlugin.java
@@ -17,7 +17,7 @@ public String name()
public String description()
{
- return "Statsd Monitoring Plugin";
+ return "StatsD Monitoring Plugin";
}
@SuppressWarnings("rawtypes")
diff --git a/src/main/java/org/elasticsearch/service/statsd/StatsdReporter.java b/src/main/java/org/elasticsearch/service/statsd/StatsdReporter.java
index 58a1919..5afa50e 100644
--- a/src/main/java/org/elasticsearch/service/statsd/StatsdReporter.java
+++ b/src/main/java/org/elasticsearch/service/statsd/StatsdReporter.java
@@ -1,429 +1,62 @@
package org.elasticsearch.service.statsd;
-import com.timgroup.statsd.StatsDClient;
-
-import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
-import org.elasticsearch.http.HttpStats;
-import org.elasticsearch.index.cache.filter.FilterCacheStats;
-import org.elasticsearch.index.cache.id.IdCacheStats;
-import org.elasticsearch.index.flush.FlushStats;
-import org.elasticsearch.index.get.GetStats;
-import org.elasticsearch.index.indexing.IndexingStats;
-import org.elasticsearch.index.merge.MergeStats;
-import org.elasticsearch.index.refresh.RefreshStats;
-import org.elasticsearch.index.search.stats.SearchStats;
-import org.elasticsearch.index.shard.DocsStats;
-import org.elasticsearch.index.shard.service.IndexShard;
-import org.elasticsearch.index.store.StoreStats;
-import org.elasticsearch.index.warmer.WarmerStats;
-import org.elasticsearch.indices.NodeIndicesStats;
-import org.elasticsearch.monitor.fs.FsStats;
-import org.elasticsearch.monitor.jvm.JvmStats;
-import org.elasticsearch.monitor.network.NetworkStats;
-import org.elasticsearch.monitor.os.OsStats;
-import org.elasticsearch.monitor.process.ProcessStats;
-import org.elasticsearch.threadpool.ThreadPoolStats;
-import org.elasticsearch.transport.TransportStats;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-public class StatsdReporter {
+import com.timgroup.statsd.StatsDClient;
- private static final String DEFAULT_JOINER = ".";
- private static final ESLogger logger = ESLoggerFactory.getLogger(StatsdReporter.class.getName());
- private List indexShards;
- private NodeStats nodeStats;
- private final NodeIndicesStats nodeIndicesStats;
- private final StatsDClient statsdClient;
- private String joiner = DEFAULT_JOINER;
+public abstract class StatsdReporter {
- public StatsdReporter(NodeIndicesStats nodeIndicesStats, List indexShards, NodeStats nodeStats,
- StatsDClient statsdClient)
- {
+ private static final String DEFAULT_JOINER = ".";
+ private static final ESLogger logger = ESLoggerFactory.getLogger(StatsdReporter.class.getName());
+ private StatsDClient statsdClient;
- this.indexShards = indexShards;
- this.nodeStats = nodeStats;
- this.nodeIndicesStats = nodeIndicesStats;
+ public StatsdReporter setStatsDClient(StatsDClient statsdClient) {
this.statsdClient = statsdClient;
+ return this;
}
- public void run()
- {
- try {
- sendNodeIndicesStats();
- sendIndexShardStats();
- sendNodeStats();
- }
- catch (Exception e) {
- logException(e);
- }
- }
+ public abstract void run();
- private void sendNodeStats()
- {
- sendNodeFsStats(nodeStats.getFs());
- sendNodeHttpStats(nodeStats.getHttp());
- sendNodeJvmStats(nodeStats.getJvm());
- sendNodeNetworkStats(nodeStats.getNetwork());
- sendNodeOsStats(nodeStats.getOs());
- sendNodeProcessStats(nodeStats.getProcess());
- sendNodeTransportStats(nodeStats.getTransport());
- sendNodeThreadPoolStats(nodeStats.getThreadPool());
+ protected void sendGauge(String name, String valueName, long value) {
+ this.statsdClient.gauge(this.join(name, valueName), value);
}
- private void sendNodeThreadPoolStats(ThreadPoolStats threadPoolStats)
- {
- String type = buildMetricName("node.threadpool");
- Iterator statsIterator = threadPoolStats.iterator();
- while (statsIterator.hasNext()) {
- ThreadPoolStats.Stats stats = statsIterator.next();
- String id = type + "." + stats.getName();
-
- sendGauge(id, "threads", stats.getThreads());
- sendGauge(id, "queue", stats.getQueue());
- sendGauge(id, "active", stats.getActive());
- sendGauge(id, "rejected", stats.getRejected());
- sendGauge(id, "largest", stats.getLargest());
- sendGauge(id, "completed", stats.getCompleted());
- }
+ protected void sendGauge(String name, String valueName, double value) {
+ this.statsdClient.gauge(this.join(name, valueName), value);
}
- private void sendNodeTransportStats(TransportStats transportStats)
- {
- String type = buildMetricName("node.transport");
- sendGauge(type, "serverOpen", transportStats.serverOpen());
- sendCount(type, "rxCount", transportStats.rxCount());
- sendCount(type, "rxSizeBytes", transportStats.rxSize().bytes());
- sendCount(type, "txCount", transportStats.txCount());
- sendCount(type, "txSizeBytes", transportStats.txSize().bytes());
+ protected void sendCount(String name, String valueName, long value) {
+ this.statsdClient.count(this.join(name, valueName), value);
}
- private void sendNodeProcessStats(ProcessStats processStats)
- {
- String type = buildMetricName("node.process");
-
- sendGauge(type, "openFileDescriptors", processStats.openFileDescriptors());
- if (processStats.cpu() != null) {
- sendGauge(type + ".cpu", "percent", processStats.cpu().percent());
- sendGauge(type + ".cpu", "sysSeconds", processStats.cpu().sys().seconds());
- sendGauge(type + ".cpu", "totalSeconds", processStats.cpu().total().seconds());
- sendGauge(type + ".cpu", "userSeconds", processStats.cpu().user().seconds());
- }
-
- if (processStats.mem() != null) {
- sendGauge(type + ".mem", "totalVirtual", processStats.mem().totalVirtual().bytes());
- sendGauge(type + ".mem", "resident", processStats.mem().resident().bytes());
- sendGauge(type + ".mem", "share", processStats.mem().share().bytes());
- }
+ protected void sendTime(String name, String valueName, long value) {
+ this.statsdClient.time(this.join(name, valueName), value);
}
- private void sendNodeOsStats(OsStats osStats)
- {
- String type = buildMetricName("node.os");
-
- if (osStats.cpu() != null) {
- sendGauge(type + ".cpu", "sys", osStats.cpu().sys());
- sendGauge(type + ".cpu", "idle", osStats.cpu().idle());
- sendGauge(type + ".cpu", "user", osStats.cpu().user());
- }
-
- if (osStats.mem() != null) {
- sendGauge(type + ".mem", "freeBytes", osStats.mem().free().bytes());
- sendGauge(type + ".mem", "usedBytes", osStats.mem().used().bytes());
- sendGauge(type + ".mem", "freePercent", osStats.mem().freePercent());
- sendGauge(type + ".mem", "usedPercent", osStats.mem().usedPercent());
- sendGauge(type + ".mem", "actualFreeBytes", osStats.mem().actualFree().bytes());
- sendGauge(type + ".mem", "actualUsedBytes", osStats.mem().actualUsed().bytes());
- }
-
- if (osStats.swap() != null) {
- sendGauge(type + ".swap", "freeBytes", osStats.swap().free().bytes());
- sendGauge(type + ".swap", "usedBytes", osStats.swap().used().bytes());
- }
- }
-
- private void sendNodeNetworkStats(NetworkStats networkStats)
- {
- String type = buildMetricName("node.network.tcp");
- NetworkStats.Tcp tcp = networkStats.tcp();
-
- // might be null, if sigar isnt loaded
- if (tcp != null) {
- sendGauge(type, "activeOpens", tcp.activeOpens());
- sendGauge(type, "passiveOpens", tcp.passiveOpens());
- sendGauge(type, "attemptFails", tcp.attemptFails());
- sendGauge(type, "estabResets", tcp.estabResets());
- sendGauge(type, "currEstab", tcp.currEstab());
- sendGauge(type, "inSegs", tcp.inSegs());
- sendGauge(type, "outSegs", tcp.outSegs());
- sendGauge(type, "retransSegs", tcp.retransSegs());
- sendGauge(type, "inErrs", tcp.inErrs());
- sendGauge(type, "outRsts", tcp.outRsts());
- }
- }
-
- private void sendNodeJvmStats(JvmStats jvmStats)
- {
- String type = buildMetricName("node.jvm");
- sendGauge(type, "uptime", jvmStats.uptime().seconds());
-
- // mem
- sendGauge(type + ".mem", "heapCommitted", jvmStats.mem().heapCommitted().bytes());
- sendGauge(type + ".mem", "heapUsed", jvmStats.mem().heapUsed().bytes());
- sendGauge(type + ".mem", "nonHeapCommitted", jvmStats.mem().nonHeapCommitted().bytes());
- sendGauge(type + ".mem", "nonHeapUsed", jvmStats.mem().nonHeapUsed().bytes());
-
- Iterator memoryPoolIterator = jvmStats.mem().iterator();
- while (memoryPoolIterator.hasNext()) {
- JvmStats.MemoryPool memoryPool = memoryPoolIterator.next();
- String memoryPoolType = type + ".mem.pool." + memoryPool.name();
-
- sendGauge(memoryPoolType, "max", memoryPool.max().bytes());
- sendGauge(memoryPoolType, "used", memoryPool.used().bytes());
- sendGauge(memoryPoolType, "peakUsed", memoryPool.peakUsed().bytes());
- sendGauge(memoryPoolType, "peakMax", memoryPool.peakMax().bytes());
- }
-
- // threads
- sendGauge(type + ".threads", "count", jvmStats.threads().count());
- sendGauge(type + ".threads", "peakCount", jvmStats.threads().peakCount());
-
- // garbage collectors
- sendCount(type + ".gc", "collectionCount", jvmStats.gc().collectionCount());
- sendTime(type + ".gc", "collectionTimeSeconds", jvmStats.gc().collectionTime().seconds());
- for (JvmStats.GarbageCollector collector : jvmStats.gc().collectors()) {
- String id = type + ".gc." + collector.name();
- sendCount(id, "collectionCount", collector.collectionCount());
- sendTime(id, "collectionTimeSeconds", collector.collectionTime().seconds());
-
- JvmStats.GarbageCollector.LastGc lastGc = collector.lastGc();
- String lastGcType = type + ".lastGc";
- if (lastGc != null) {
- sendTime(lastGcType, "time", lastGc.endTime() - lastGc.startTime());
- sendGauge(lastGcType, "max", lastGc.max().bytes());
- sendGauge(lastGcType, "beforeUsed", lastGc.beforeUsed().bytes());
- sendGauge(lastGcType, "afterUsed", lastGc.afterUsed().bytes());
- sendGauge(lastGcType, "durationSeconds", lastGc.duration().seconds());
- }
- }
- }
-
- private void sendNodeHttpStats(HttpStats httpStats)
- {
- String type = buildMetricName("node.http");
- sendGauge(type, "serverOpen", httpStats.getServerOpen());
- sendGauge(type, "totalOpen", httpStats.getTotalOpen());
- }
-
- private void sendNodeFsStats(FsStats fs)
- {
- Iterator infoIterator = fs.iterator();
- int i = 0;
- while (infoIterator.hasNext()) {
- String type = buildMetricName("node.fs") + i;
- FsStats.Info info = infoIterator.next();
- sendGauge(type, "available", info.getAvailable().bytes());
- sendGauge(type, "total", info.getTotal().bytes());
- sendGauge(type, "free", info.getFree().bytes());
- sendCount(type, "diskReads", info.getDiskReads());
- sendCount(type, "diskReadsInBytes", info.getDiskReadSizeInBytes());
- sendCount(type, "diskWrites", info.getDiskWrites());
- sendCount(type, "diskWritesInBytes", info.getDiskWriteSizeInBytes());
- sendGauge(type, "diskQueue", (long) info.getDiskQueue());
- sendGauge(type, "diskService", (long) info.getDiskServiceTime());
- i++;
- }
- }
-
- private void sendIndexShardStats()
- {
- for (IndexShard indexShard : indexShards) {
- String type = buildMetricName("indexes.") + indexShard.shardId().index().name() + ".id." + indexShard.shardId().id();
- sendIndexShardStats(type, indexShard);
- }
- }
-
- private void sendIndexShardStats(String type, IndexShard indexShard)
- {
- sendSearchStats(type + ".search", indexShard.searchStats());
- sendGetStats(type + ".get", indexShard.getStats());
- sendDocsStats(type + ".docs", indexShard.docStats());
- sendRefreshStats(type + ".refresh", indexShard.refreshStats());
- sendIndexingStats(type + ".indexing", indexShard.indexingStats("_all"));
- sendMergeStats(type + ".merge", indexShard.mergeStats());
- sendWarmerStats(type + ".warmer", indexShard.warmerStats());
- sendStoreStats(type + ".store", indexShard.storeStats());
- }
-
- private void sendStoreStats(String type, StoreStats storeStats)
- {
- sendGauge(type, "sizeInBytes", storeStats.sizeInBytes());
- sendGauge(type, "throttleTimeInNanos", storeStats.throttleTime().getNanos());
- }
-
- private void sendWarmerStats(String type, WarmerStats warmerStats)
- {
- sendGauge(type, "current", warmerStats.current());
- sendGauge(type, "total", warmerStats.total());
- sendTime(type, "totalTimeInMillis", warmerStats.totalTimeInMillis());
- }
-
- private void sendMergeStats(String type, MergeStats mergeStats)
- {
- sendGauge(type, "total", mergeStats.getTotal());
- sendTime(type, "totalTimeInMillis", mergeStats.getTotalTimeInMillis());
- sendGauge(type, "totalNumDocs", mergeStats.getTotalNumDocs());
- sendGauge(type, "current", mergeStats.getCurrent());
- sendGauge(type, "currentNumDocs", mergeStats.getCurrentNumDocs());
- sendGauge(type, "currentSizeInBytes", mergeStats.getCurrentSizeInBytes());
- }
-
- private void sendNodeIndicesStats()
- {
- String type = buildMetricName("node");
- sendFilterCacheStats(type + ".filtercache", nodeIndicesStats.getFilterCache());
- sendIdCacheStats(type + ".idcache", nodeIndicesStats.getIdCache());
- sendDocsStats(type + ".docs", nodeIndicesStats.getDocs());
- sendFlushStats(type + ".flush", nodeIndicesStats.getFlush());
- sendGetStats(type + ".get", nodeIndicesStats.getGet());
- sendIndexingStats(type + ".indexing", nodeIndicesStats.getIndexing());
- sendRefreshStats(type + ".refresh", nodeIndicesStats.getRefresh());
- sendSearchStats(type + ".search", nodeIndicesStats.getSearch());
- }
-
- private void sendSearchStats(String type, SearchStats searchStats)
- {
- SearchStats.Stats totalSearchStats = searchStats.getTotal();
- sendSearchStatsStats(type + "._all", totalSearchStats);
-
- if (searchStats.getGroupStats() != null) {
- for (Map.Entry statsEntry : searchStats.getGroupStats().entrySet()) {
- sendSearchStatsStats(type + "." + statsEntry.getKey(), statsEntry.getValue());
- }
- }
- }
-
- private void sendSearchStatsStats(String group, SearchStats.Stats searchStats)
- {
- String type = buildMetricName("search.stats.") + group;
- sendCount(type, "queryCount", searchStats.getQueryCount());
- sendCount(type, "queryTimeInMillis", searchStats.getQueryTimeInMillis());
- sendGauge(type, "queryCurrent", searchStats.getQueryCurrent());
- sendCount(type, "fetchCount", searchStats.getFetchCount());
- sendCount(type, "fetchTimeInMillis", searchStats.getFetchTimeInMillis());
- sendGauge(type, "fetchCurrent", searchStats.getFetchCurrent());
- }
-
- private void sendRefreshStats(String type, RefreshStats refreshStats)
- {
- sendCount(type, "total", refreshStats.getTotal());
- sendCount(type, "totalTimeInMillis", refreshStats.getTotalTimeInMillis());
- }
-
- private void sendIndexingStats(String type, IndexingStats indexingStats)
- {
- IndexingStats.Stats totalStats = indexingStats.getTotal();
- sendStats(type + "._all", totalStats);
-
- Map typeStats = indexingStats.getTypeStats();
- if (typeStats != null) {
- for (Map.Entry statsEntry : typeStats.entrySet()) {
- sendStats(type + "." + statsEntry.getKey(), statsEntry.getValue());
- }
- }
- }
-
- private void sendStats(String type, IndexingStats.Stats stats)
- {
- sendCount(type, "indexCount", stats.getIndexCount());
- sendCount(type, "indexTimeInMillis", stats.getIndexTimeInMillis());
- sendGauge(type, "indexCurrent", stats.getIndexCount());
- sendCount(type, "deleteCount", stats.getDeleteCount());
- sendCount(type, "deleteTimeInMillis", stats.getDeleteTimeInMillis());
- sendGauge(type, "deleteCurrent", stats.getDeleteCurrent());
- }
-
- private void sendGetStats(String type, GetStats getStats)
- {
- sendCount(type, "existsCount", getStats.getExistsCount());
- sendCount(type, "existsTimeInMillis", getStats.getExistsTimeInMillis());
- sendCount(type, "missingCount", getStats.getMissingCount());
- sendCount(type, "missingTimeInMillis", getStats.getMissingTimeInMillis());
- sendGauge(type, "current", getStats.current());
- }
-
- private void sendFlushStats(String type, FlushStats flush)
- {
- sendCount(type, "total", flush.getTotal());
- sendCount(type, "totalTimeInMillis", flush.getTotalTimeInMillis());
- }
-
- private void sendDocsStats(String name, DocsStats docsStats)
- {
- sendCount(name, "count", docsStats.getCount());
- sendCount(name, "deleted", docsStats.getDeleted());
- }
-
- private void sendIdCacheStats(String name, IdCacheStats idCache)
- {
- sendGauge(name, "memorySizeInBytes", idCache.getMemorySizeInBytes());
- }
-
- private void sendFilterCacheStats(String name, FilterCacheStats filterCache)
- {
- sendGauge(name, "memorySizeInBytes", filterCache.getMemorySizeInBytes());
- sendGauge(name, "evictions", filterCache.getEvictions());
- }
-
- protected void sendGauge(String name, String valueName, long value)
- {
- statsdClient.gauge(join(name, valueName), (int) value);
- }
-
- protected void sendCount(String name, String valueName, long value)
- {
- statsdClient.count(join(name, valueName), (int) value);
- }
-
- protected void sendTime(String name, String valueName, long value)
- {
- statsdClient.time(join(name, valueName), (int) value);
- }
-
- protected String sanitizeString(String s)
- {
+ protected String sanitizeString(String s) {
return s.replace(' ', '-');
}
- protected String buildMetricName(String name)
- {
- return sanitizeString(name);
+ protected String buildMetricName(String name) {
+ return this.sanitizeString(name);
}
- private String join(String... parts)
- {
- if (parts == null) return null;
+ private String join(String... parts) {
+ if (parts == null) {
+ return null;
+ }
StringBuilder builder = new StringBuilder();
for (int i = 0; i < parts.length; i++) {
builder.append(parts[i]);
if (i < parts.length - 1) {
- builder.append(joiner);
+ builder.append(this.DEFAULT_JOINER);
}
}
return builder.toString();
}
- private void logException(Exception e)
- {
- if (logger.isDebugEnabled()) {
- logger.debug("Error writing to Statsd", e);
- }
- else {
- logger.warn("Error writing to Statsd: {}", e.getMessage());
- }
+ protected void logException(Exception e) {
+ this.logger.warn("Error writing to StatsD", e);
}
}
diff --git a/src/main/java/org/elasticsearch/service/statsd/StatsdReporterIndexStats.java b/src/main/java/org/elasticsearch/service/statsd/StatsdReporterIndexStats.java
new file mode 100644
index 0000000..fdf08b1
--- /dev/null
+++ b/src/main/java/org/elasticsearch/service/statsd/StatsdReporterIndexStats.java
@@ -0,0 +1,153 @@
+package org.elasticsearch.service.statsd;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.elasticsearch.indices.NodeIndicesStats;
+import org.elasticsearch.index.cache.filter.FilterCacheStats;
+import org.elasticsearch.index.cache.id.IdCacheStats;
+import org.elasticsearch.index.flush.FlushStats;
+import org.elasticsearch.index.get.GetStats;
+import org.elasticsearch.index.indexing.IndexingStats;
+import org.elasticsearch.index.refresh.RefreshStats;
+import org.elasticsearch.index.search.stats.SearchStats;
+import org.elasticsearch.index.shard.DocsStats;
+import org.elasticsearch.index.store.StoreStats;
+import org.elasticsearch.index.fielddata.FieldDataStats;
+import org.elasticsearch.index.merge.MergeStats;
+import org.elasticsearch.index.percolator.stats.PercolateStats;
+import org.elasticsearch.search.suggest.completion.CompletionStats;
+import org.elasticsearch.index.engine.SegmentsStats;
+import org.elasticsearch.index.warmer.WarmerStats;
+
+public abstract class StatsdReporterIndexStats extends StatsdReporter {
+
+ protected void sendDocsStats(String name, DocsStats docsStats) {
+ if (null == docsStats) return;
+ this.sendGauge(name, "count", docsStats.getCount());
+ this.sendGauge(name, "deleted", docsStats.getDeleted());
+ }
+
+ protected void sendStoreStats(String name, StoreStats storeStats) {
+ if (null == storeStats) return;
+ this.sendGauge(name, "size_in_bytes", storeStats.sizeInBytes());
+ this.sendGauge(name, "throttle_time_in_millis", storeStats.getThrottleTime().millis());
+ }
+
+ protected void sendIndexingStats(String name, IndexingStats indexingStats) {
+ if (null == indexingStats) return;
+ IndexingStats.Stats totalStats = indexingStats.getTotal();
+ this.sendIndexingStatsStats(name, totalStats);
+
+ // TODO: Maybe print out stats to shards level?
+ }
+
+ protected void sendGetStats(String name, GetStats getStats) {
+ if (null == getStats) return;
+ this.sendGauge(name, "total", getStats.getCount());
+ this.sendGauge(name, "time_in_millis", getStats.getTimeInMillis());
+ this.sendGauge(name, "exists_total", getStats.getExistsCount());
+ this.sendGauge(name, "exists_time_in_millis", getStats.getExistsTimeInMillis());
+ this.sendGauge(name, "missing_total", getStats.getMissingCount());
+ this.sendGauge(name, "missing_time_in_millis", getStats.getMissingTimeInMillis());
+ this.sendGauge(name, "current", getStats.current());
+ }
+
+ protected void sendSearchStats(String name, SearchStats searchStats) {
+ if (null == searchStats) return;
+ SearchStats.Stats totalSearchStats = searchStats.getTotal();
+ this.sendSearchStatsStats(name, totalSearchStats);
+
+ // TODO: Maybe print out stats to shards level?
+ }
+
+ protected void sendMergeStats(String name, MergeStats mergeStats) {
+ if (null == mergeStats) return;
+ this.sendGauge(name, "current", mergeStats.getCurrent());
+ this.sendGauge(name, "current_docs", mergeStats.getCurrentNumDocs());
+ this.sendGauge(name, "current_size_in_bytes", mergeStats.getCurrentSizeInBytes());
+ this.sendGauge(name, "total", mergeStats.getTotal());
+ this.sendGauge(name, "total_time_in_millis", mergeStats.getTotalTimeInMillis());
+ this.sendGauge(name, "total_docs", mergeStats.getTotalNumDocs());
+ this.sendGauge(name, "total_size_in_bytes", mergeStats.getTotalSizeInBytes());
+ }
+
+ protected void sendRefreshStats(String name, RefreshStats refreshStats) {
+ if (null == refreshStats) return;
+ this.sendGauge(name, "total", refreshStats.getTotal());
+ this.sendGauge(name, "total_time_in_millis", refreshStats.getTotalTimeInMillis());
+ }
+
+ protected void sendFlushStats(String name, FlushStats flushStats) {
+ if (null == flushStats) return;
+ this.sendGauge(name, "total", flushStats.getTotal());
+ this.sendGauge(name, "total_time_in_millis", flushStats.getTotalTimeInMillis());
+ }
+
+ protected void sendWarmerStats(String name, WarmerStats warmerStats) {
+ if (null == warmerStats) return;
+ this.sendGauge(name, "current", warmerStats.current());
+ this.sendGauge(name, "total", warmerStats.total());
+ this.sendGauge(name, "total_time_in_millis", warmerStats.totalTimeInMillis());
+ }
+
+ protected void sendFilterCacheStats(String name, FilterCacheStats filterCacheStats) {
+ if (null == filterCacheStats) return;
+ this.sendGauge(name, "memory_size_in_bytes", filterCacheStats.getMemorySizeInBytes());
+ this.sendGauge(name, "evictions", filterCacheStats.getEvictions());
+ }
+
+ protected void sendIdCacheStats(String name, IdCacheStats idCacheStats) {
+ if (null == idCacheStats) return;
+ this.sendGauge(name, "memory_size_in_bytes", idCacheStats.getMemorySizeInBytes());
+ }
+
+ protected void sendFielddataCacheStats(String name, FieldDataStats fielddataStats) {
+ if (null == fielddataStats) return;
+ this.sendGauge(name, "memory_size_in_bytes", fielddataStats.getMemorySizeInBytes());
+ this.sendGauge(name, "evictions", fielddataStats.getEvictions());
+ }
+
+ protected void sendPercolateStats(String name, PercolateStats percolateStats) {
+ if (null == percolateStats) return;
+ this.sendGauge(name, "total", percolateStats.getCount());
+ this.sendGauge(name, "time_in_millis", percolateStats.getTimeInMillis());
+ this.sendGauge(name, "current", percolateStats.getCurrent());
+ this.sendGauge(name, "queries", percolateStats.getNumQueries());
+
+ if (percolateStats.getMemorySizeInBytes() != -1)
+ this.sendGauge(name, "memory_size_in_bytes", percolateStats.getMemorySizeInBytes());
+ }
+
+ protected void sendCompletionStats(String name, CompletionStats completionStats) {
+ if (null == completionStats) return;
+ this.sendGauge(name, "size_in_bytes", completionStats.getSizeInBytes());
+ }
+
+ protected void sendSegmentsStats(String name, SegmentsStats segmentsStats) {
+ if (null == segmentsStats) return;
+ this.sendGauge(name, "count", segmentsStats.getCount());
+ this.sendGauge(name, "memory_in_bytes", segmentsStats.getMemoryInBytes());
+ }
+
+ protected void sendIndexingStatsStats(String name, IndexingStats.Stats indexingStatsStats) {
+ if (null == indexingStatsStats) return;
+ this.sendGauge(name, "index_total", indexingStatsStats.getIndexCount());
+ this.sendGauge(name, "index_time_in_millis", indexingStatsStats.getIndexTimeInMillis());
+ this.sendGauge(name, "index_current", indexingStatsStats.getIndexCount());
+ this.sendGauge(name, "delete_total", indexingStatsStats.getDeleteCount());
+ this.sendGauge(name, "delete_time_in_millis", indexingStatsStats.getDeleteTimeInMillis());
+ this.sendGauge(name, "delete_current", indexingStatsStats.getDeleteCurrent());
+ }
+
+ protected void sendSearchStatsStats(String name, SearchStats.Stats searchStatsStats) {
+ if (null == searchStatsStats) return;
+ this.sendGauge(name, "query_total", searchStatsStats.getQueryCount());
+ this.sendGauge(name, "query_time_in_millis", searchStatsStats.getQueryTimeInMillis());
+ this.sendGauge(name, "query_current", searchStatsStats.getQueryCurrent());
+ this.sendGauge(name, "fetch_total", searchStatsStats.getFetchCount());
+ this.sendGauge(name, "fetch_time_in_millis", searchStatsStats.getFetchTimeInMillis());
+ this.sendGauge(name, "fetch_current", searchStatsStats.getFetchCurrent());
+ }
+}
diff --git a/src/main/java/org/elasticsearch/service/statsd/StatsdReporterIndices.java b/src/main/java/org/elasticsearch/service/statsd/StatsdReporterIndices.java
new file mode 100644
index 0000000..7543334
--- /dev/null
+++ b/src/main/java/org/elasticsearch/service/statsd/StatsdReporterIndices.java
@@ -0,0 +1,72 @@
+package org.elasticsearch.service.statsd;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.elasticsearch.action.admin.indices.stats.*;
+
+public class StatsdReporterIndices extends StatsdReporterIndexStats {
+
+ private final IndicesStatsResponse indicesStatsResponse;
+ private final Boolean reportIndices;
+ private final Boolean reportShards;
+
+ public StatsdReporterIndices(IndicesStatsResponse indicesStatsResponse, Boolean reportIndices, Boolean reportShards) {
+ this.indicesStatsResponse = indicesStatsResponse;
+ this.reportIndices = reportIndices;
+ this.reportShards = reportShards;
+ }
+
+ public void run() {
+ try {
+ // First report totals
+ this.sendCommonStats(
+ this.buildMetricName("indices"),
+ this.indicesStatsResponse.getTotal()
+ );
+
+ if (this.reportIndices) {
+ for (IndexStats indexStats : this.indicesStatsResponse.getIndices().values()) {
+ String indexPrefix = "index." + indexStats.getIndex();
+
+ this.sendCommonStats(
+ this.buildMetricName(indexPrefix + ".total"),
+ indexStats.getTotal()
+ );
+
+ if (this.reportShards) {
+ for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
+ this.sendCommonStats(
+ this.buildMetricName(indexPrefix + "." + indexShardStats.getShardId().id()),
+ indexShardStats.getTotal()
+ );
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ this.logException(e);
+ }
+ }
+
+ private void sendCommonStats(String prefix, CommonStats stats) {
+ this.sendDocsStats(prefix + ".docs", stats.getDocs());
+ this.sendStoreStats(prefix + ".store", stats.getStore());
+ this.sendIndexingStats(prefix + ".indexing", stats.getIndexing());
+ this.sendGetStats(prefix + ".get", stats.getGet());
+ this.sendSearchStats(prefix + ".search", stats.getSearch());
+ this.sendMergeStats(prefix + ".merges", stats.getMerge());
+ this.sendRefreshStats(prefix + ".refresh", stats.getRefresh());
+ this.sendFlushStats(prefix + ".flush", stats.getFlush());
+ this.sendWarmerStats(prefix + ".warmer", stats.getWarmer());
+ this.sendFilterCacheStats(prefix + ".filter_cache", stats.getFilterCache());
+ this.sendIdCacheStats(prefix + ".id_cache", stats.getIdCache());
+ this.sendFielddataCacheStats(prefix + ".fielddata", stats.getFieldData());
+ this.sendPercolateStats(prefix + ".percolate", stats.getPercolate());
+ this.sendCompletionStats(prefix + ".completion", stats.getCompletion());
+ this.sendSegmentsStats(prefix + ".segments", stats.getSegments());
+ //TODO: getTranslog
+ //TODO: getSuggest
+ }
+}
diff --git a/src/main/java/org/elasticsearch/service/statsd/StatsdReporterNodeIndicesStats.java b/src/main/java/org/elasticsearch/service/statsd/StatsdReporterNodeIndicesStats.java
new file mode 100644
index 0000000..3f16c72
--- /dev/null
+++ b/src/main/java/org/elasticsearch/service/statsd/StatsdReporterNodeIndicesStats.java
@@ -0,0 +1,40 @@
+package org.elasticsearch.service.statsd;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.elasticsearch.indices.NodeIndicesStats;
+
+public class StatsdReporterNodeIndicesStats extends StatsdReporterIndexStats {
+
+ private final NodeIndicesStats nodeIndicesStats;
+ private final String nodeName;
+
+ public StatsdReporterNodeIndicesStats(NodeIndicesStats nodeIndicesStats, String nodeName) {
+ this.nodeIndicesStats = nodeIndicesStats;
+ this.nodeName = nodeName;
+ }
+
+ public void run() {
+ try {
+ String prefix = this.buildMetricName( "node." + this.nodeName + ".indices" );
+ this.sendDocsStats(prefix + ".docs", this.nodeIndicesStats.getDocs());
+ this.sendStoreStats(prefix + ".store", this.nodeIndicesStats.getStore());
+ this.sendIndexingStats(prefix + ".indexing", this.nodeIndicesStats.getIndexing());
+ this.sendGetStats(prefix + ".get", this.nodeIndicesStats.getGet());
+ this.sendSearchStats(prefix + ".search", this.nodeIndicesStats.getSearch());
+ this.sendMergeStats(prefix + ".merges", this.nodeIndicesStats.getMerge());
+ this.sendRefreshStats(prefix + ".refresh", this.nodeIndicesStats.getRefresh());
+ this.sendFlushStats(prefix + ".flush", this.nodeIndicesStats.getFlush());
+ this.sendFilterCacheStats(prefix + ".filter_cache", this.nodeIndicesStats.getFilterCache());
+ this.sendIdCacheStats(prefix + ".id_cache", this.nodeIndicesStats.getIdCache());
+ this.sendFielddataCacheStats(prefix + ".fielddata", this.nodeIndicesStats.getFieldData());
+ this.sendPercolateStats(prefix + ".percolate", this.nodeIndicesStats.getPercolate());
+ this.sendCompletionStats(prefix + ".completion", this.nodeIndicesStats.getCompletion());
+ this.sendSegmentsStats(prefix + ".segments", this.nodeIndicesStats.getSegments());
+ } catch (Exception e) {
+ this.logException(e);
+ }
+ }
+}
diff --git a/src/main/java/org/elasticsearch/service/statsd/StatsdReporterNodeStats.java b/src/main/java/org/elasticsearch/service/statsd/StatsdReporterNodeStats.java
new file mode 100644
index 0000000..62668b1
--- /dev/null
+++ b/src/main/java/org/elasticsearch/service/statsd/StatsdReporterNodeStats.java
@@ -0,0 +1,234 @@
+package org.elasticsearch.service.statsd;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
+import org.elasticsearch.monitor.fs.FsStats;
+import org.elasticsearch.monitor.jvm.JvmStats;
+import org.elasticsearch.monitor.jvm.JvmStats.GarbageCollector;
+import org.elasticsearch.monitor.network.NetworkStats;
+import org.elasticsearch.monitor.os.OsStats;
+import org.elasticsearch.monitor.process.ProcessStats;
+import org.elasticsearch.http.HttpStats;
+import org.elasticsearch.transport.TransportStats;
+import org.elasticsearch.threadpool.ThreadPoolStats;
+
+public class StatsdReporterNodeStats extends StatsdReporter {
+
+ private final NodeStats nodeStats;
+ private final String nodeName;
+ private final Boolean statsdReportFsDetails;
+
+ public StatsdReporterNodeStats(NodeStats nodeStats, String nodeName, Boolean statsdReportFsDetails) {
+ this.nodeStats = nodeStats;
+ this.nodeName = nodeName;
+ this.statsdReportFsDetails = statsdReportFsDetails;
+ }
+
+ public void run() {
+ try {
+ this.sendNodeFsStats(this.nodeStats.getFs());
+ this.sendNodeJvmStats(this.nodeStats.getJvm());
+ this.sendNodeNetworkStats(this.nodeStats.getNetwork());
+ this.sendNodeOsStats(this.nodeStats.getOs());
+ this.sendNodeProcessStats(this.nodeStats.getProcess());
+ this.sendNodeHttpStats(this.nodeStats.getHttp());
+ this.sendNodeTransportStats(this.nodeStats.getTransport());
+ this.sendNodeThreadPoolStats(this.nodeStats.getThreadPool());
+ } catch (Exception e) {
+ this.logException(e);
+ }
+ }
+
+ private void sendNodeThreadPoolStats(ThreadPoolStats threadPoolStats) {
+ String prefix = this.getPrefix("thread_pool");
+ Iterator statsIterator = threadPoolStats.iterator();
+ while (statsIterator.hasNext()) {
+ ThreadPoolStats.Stats stats = statsIterator.next();
+ String threadPoolType = prefix + "." + stats.getName();
+
+ this.sendGauge(threadPoolType, "threads", stats.getThreads());
+ this.sendGauge(threadPoolType, "queue", stats.getQueue());
+ this.sendGauge(threadPoolType, "active", stats.getActive());
+ this.sendGauge(threadPoolType, "rejected", stats.getRejected());
+ this.sendGauge(threadPoolType, "largest", stats.getLargest());
+ this.sendGauge(threadPoolType, "completed", stats.getCompleted());
+ }
+ }
+
+ private void sendNodeTransportStats(TransportStats transportStats) {
+ String prefix = this.getPrefix("transport");
+ this.sendGauge(prefix, "server_open", transportStats.serverOpen());
+ this.sendGauge(prefix, "rx_count", transportStats.rxCount());
+ this.sendGauge(prefix, "rx_size_in_bytes", transportStats.rxSize().bytes());
+ this.sendGauge(prefix, "tx_count", transportStats.txCount());
+ this.sendGauge(prefix, "tx_size_in_bytes", transportStats.txSize().bytes());
+ }
+
+ private void sendNodeProcessStats(ProcessStats processStats) {
+ String prefix = this.getPrefix("process");
+
+ this.sendGauge(prefix, "open_file_descriptors", processStats.openFileDescriptors());
+
+ if (processStats.cpu() != null) {
+ this.sendGauge(prefix + ".cpu", "percent", processStats.cpu().percent());
+ this.sendGauge(prefix + ".cpu", "sys_in_millis", processStats.cpu().sys().millis());
+ this.sendGauge(prefix + ".cpu", "user_in_millis", processStats.cpu().user().millis());
+ this.sendGauge(prefix + ".cpu", "total_in_millis", processStats.cpu().total().millis());
+ }
+
+ if (processStats.mem() != null) {
+ this.sendGauge(prefix + ".mem", "resident_in_bytes", processStats.mem().resident().bytes());
+ this.sendGauge(prefix + ".mem", "share_in_bytes", processStats.mem().share().bytes());
+ this.sendGauge(prefix + ".mem", "total_virtual_in_bytes", processStats.mem().totalVirtual().bytes());
+ }
+ }
+
+ private void sendNodeOsStats(OsStats osStats) {
+ String prefix = this.getPrefix("os");
+
+ double[] loadAverage = osStats.getLoadAverage();
+ if (loadAverage.length > 0) {
+ this.sendGauge(prefix + ".load_average", "1m", loadAverage[0]);
+ this.sendGauge(prefix + ".load_average", "5m", loadAverage[1]);
+ this.sendGauge(prefix + ".load_average", "15m", loadAverage[2]);
+ }
+
+ if (osStats.cpu() != null) {
+ this.sendGauge(prefix + ".cpu", "sys", osStats.cpu().sys());
+ this.sendGauge(prefix + ".cpu", "user", osStats.cpu().user());
+ this.sendGauge(prefix + ".cpu", "idle", osStats.cpu().idle());
+ this.sendGauge(prefix + ".cpu", "stolen", osStats.cpu().stolen());
+ }
+
+ if (osStats.mem() != null) {
+ this.sendGauge(prefix + ".mem", "free_in_bytes", osStats.mem().free().bytes());
+ this.sendGauge(prefix + ".mem", "used_in_bytes", osStats.mem().used().bytes());
+ this.sendGauge(prefix + ".mem", "free_percent", osStats.mem().freePercent());
+ this.sendGauge(prefix + ".mem", "used_percent", osStats.mem().usedPercent());
+ this.sendGauge(prefix + ".mem", "actual_free_in_bytes", osStats.mem().actualFree().bytes());
+ this.sendGauge(prefix + ".mem", "actual_used_in_bytes", osStats.mem().actualUsed().bytes());
+ }
+
+ if (osStats.swap() != null) {
+ this.sendGauge(prefix + ".swap", "free_in_bytes", osStats.swap().free().bytes());
+ this.sendGauge(prefix + ".swap", "used_in_bytes", osStats.swap().used().bytes());
+ }
+ }
+
+ private void sendNodeNetworkStats(NetworkStats networkStats) {
+ String prefix = this.getPrefix("network.tcp");
+ NetworkStats.Tcp tcp = networkStats.tcp();
+
+ // might be null, if sigar isnt loaded
+ if (tcp != null) {
+ this.sendGauge(prefix, "active_opens", tcp.getActiveOpens());
+ this.sendGauge(prefix, "passive_opens", tcp.getPassiveOpens());
+ this.sendGauge(prefix, "curr_estab", tcp.getCurrEstab());
+ this.sendGauge(prefix, "in_segs", tcp.inSegs());
+ this.sendGauge(prefix, "out_segs", tcp.outSegs());
+ this.sendGauge(prefix, "retrans_segs", tcp.retransSegs());
+ this.sendGauge(prefix, "estab_resets", tcp.estabResets());
+ this.sendGauge(prefix, "attempt_fails", tcp.attemptFails());
+ this.sendGauge(prefix, "in_errs", tcp.inErrs());
+ this.sendGauge(prefix, "out_rsts", tcp.outRsts());
+ }
+ }
+
+ private void sendNodeJvmStats(JvmStats jvmStats) {
+ String prefix = this.getPrefix("jvm");
+
+ // mem
+ this.sendGauge(prefix + ".mem", "heap_used_percent", jvmStats.mem().heapUsedPercent());
+ this.sendGauge(prefix + ".mem", "heap_used_in_bytes", jvmStats.mem().heapUsed().bytes());
+ this.sendGauge(prefix + ".mem", "heap_committed_in_bytes", jvmStats.mem().heapCommitted().bytes());
+ this.sendGauge(prefix + ".mem", "non_heap_used_in_bytes", jvmStats.mem().nonHeapUsed().bytes());
+ this.sendGauge(prefix + ".mem", "non_heap_committed_in_bytes", jvmStats.mem().nonHeapCommitted().bytes());
+ for (JvmStats.MemoryPool memoryPool : jvmStats.mem()) {
+ String memoryPoolType = prefix + ".mem.pools." + memoryPool.name();
+
+ this.sendGauge(memoryPoolType, "max_in_bytes", memoryPool.max().bytes());
+ this.sendGauge(memoryPoolType, "used_in_bytes", memoryPool.used().bytes());
+ this.sendGauge(memoryPoolType, "peak_used_in_bytes", memoryPool.peakUsed().bytes());
+ this.sendGauge(memoryPoolType, "peak_max_in_bytes", memoryPool.peakMax().bytes());
+ }
+
+ // threads
+ this.sendGauge(prefix + ".threads", "count", jvmStats.threads().count());
+ this.sendGauge(prefix + ".threads", "peak_count", jvmStats.threads().peakCount());
+
+ // garbage collectors
+ for (JvmStats.GarbageCollector collector : jvmStats.gc()) {
+ String gcCollectorType = prefix + ".gc.collectors." + collector.name();
+
+ this.sendGauge(gcCollectorType, "collection_count", collector.collectionCount());
+ this.sendGauge(gcCollectorType, "collection_time_in_millis", collector.collectionTime().millis());
+ }
+
+ // TODO: buffer pools
+ }
+
+ private void sendNodeHttpStats(HttpStats httpStats) {
+ String prefix = this.getPrefix("http");
+ this.sendGauge(prefix, "current_open", httpStats.getServerOpen());
+ this.sendGauge(prefix, "total_opened", httpStats.getTotalOpen());
+ }
+
+ private void sendNodeFsStats(FsStats fs) {
+ // Send total
+ String prefix = this.getPrefix("fs");
+ this.sendNodeFsStatsInfo(prefix + ".total", fs.total());
+
+ // Maybe send details
+ if (this.statsdReportFsDetails) {
+ Iterator infoIterator = fs.iterator();
+ while (infoIterator.hasNext()) {
+ FsStats.Info info = infoIterator.next();
+ this.sendNodeFsStatsInfo(prefix + ".data", info);
+ }
+ }
+ }
+
+ private void sendNodeFsStatsInfo(String prefix, FsStats.Info info) {
+ // Construct detailed path
+ String prefixAppend = "";
+ if (info.getPath() != null)
+ prefixAppend += "." + info.getPath();
+ if (info.getMount() != null)
+ prefixAppend += "." + info.getMount();
+ if (info.getDev() != null)
+ prefixAppend += "." + info.getDev();
+
+ if (info.getAvailable().bytes() != -1)
+ this.sendGauge(prefix + prefixAppend, "available_in_bytes", info.getAvailable().bytes());
+ if (info.getTotal().bytes() != -1)
+ this.sendGauge(prefix + prefixAppend, "total_in_bytes", info.getTotal().bytes());
+ if (info.getFree().bytes() != -1)
+ this.sendGauge(prefix + prefixAppend, "free_in_bytes", info.getFree().bytes());
+
+ // disk_io_op is sum of reads and writes (use graphite functions)
+ if (info.getDiskReads() != -1)
+ this.sendGauge(prefix + prefixAppend, "disk_reads", info.getDiskReads());
+ if (info.getDiskWrites() != -1)
+ this.sendGauge(prefix + prefixAppend, "disk_writes", info.getDiskWrites());
+
+ // disk_io_size_in_bytes is sum of reads and writes (use graphite functions)
+ if (info.getDiskReadSizeInBytes() != -1)
+ this.sendGauge(prefix + prefixAppend, "disk_read_size_in_bytes", info.getDiskReadSizeInBytes());
+ if (info.getDiskWriteSizeInBytes() != -1)
+ this.sendGauge(prefix + prefixAppend, "disk_write_size_in_bytes", info.getDiskWriteSizeInBytes());
+
+ /** TODO: Find out if these stats are useful.
+ if (info.getDiskQueue() != -1)
+ this.sendGauge(prefix + prefixAppend, "disk_queue", (long) info.getDiskQueue());
+ if (info.getDiskServiceTime() != -1)
+ this.sendGauge(prefix + prefixAppend, "disk_service_time", (long) info.getDiskServiceTime());
+ */
+ }
+
+ private String getPrefix(String prefix) {
+ return this.buildMetricName( "node." + this.nodeName + "." + prefix );
+ }
+}
diff --git a/src/main/java/org/elasticsearch/service/statsd/StatsdService.java b/src/main/java/org/elasticsearch/service/statsd/StatsdService.java
index e1917df..791f4f2 100644
--- a/src/main/java/org/elasticsearch/service/statsd/StatsdService.java
+++ b/src/main/java/org/elasticsearch/service/statsd/StatsdService.java
@@ -1,135 +1,187 @@
package org.elasticsearch.service.statsd;
-import com.timgroup.statsd.NonBlockingStatsDClient;
-import com.timgroup.statsd.StatsDClient;
-
-import org.elasticsearch.ElasticSearchException;
-import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.index.service.IndexService;
-import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
-import org.elasticsearch.indices.NodeIndicesStats;
import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.client.Client;
-import java.util.List;
-
-public class StatsdService extends AbstractLifecycleComponent
-{
-
- private final ClusterService clusterService;
- private final IndicesService indicesService;
- private NodeService nodeService;
- private final String statsdHost;
- private final Integer statsdPort;
- private final TimeValue statsdRefreshInternal;
- private final String statsdPrefix;
- private final StatsDClient statsdClient;
+import com.timgroup.statsd.NonBlockingStatsDClient;
+import com.timgroup.statsd.StatsDClient;
- private volatile Thread statsdReporterThread;
- private volatile boolean closed;
+public class StatsdService extends AbstractLifecycleComponent {
+
+ private final Client client;
+ private final ClusterService clusterService;
+ private final IndicesService indicesService;
+ private final NodeService nodeService;
+ private final String statsdHost;
+ private final Integer statsdPort;
+ private final TimeValue statsdRefreshInternal;
+ private final String statsdPrefix;
+ private final String statsdNodeName;
+ private final Boolean statsdReportNodeIndices;
+ private final Boolean statsdReportIndices;
+ private final Boolean statsdReportShards;
+ private final Boolean statsdReportFsDetails;
+ private final StatsDClient statsdClient;
+
+ private volatile Thread statsdReporterThread;
+ private volatile boolean closed;
@Inject
- public StatsdService(Settings settings, ClusterService clusterService, IndicesService indicesService,
- NodeService nodeService)
- {
+ public StatsdService(Settings settings, Client client, ClusterService clusterService, IndicesService indicesService, NodeService nodeService) {
super(settings);
+ this.client = client;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.nodeService = nodeService;
- this.statsdRefreshInternal = settings.getAsTime("metrics.statsd.every", TimeValue.timeValueMinutes(1));
- this.statsdHost = settings.get("metrics.statsd.host");
- this.statsdPort = settings.getAsInt("metrics.statsd.port", 8125);
- this.statsdPrefix = settings.get("metrics.statsd.prefix", "elasticsearch" + "." + settings.get("cluster.name"));
- this.statsdClient = new NonBlockingStatsDClient(statsdPrefix, statsdHost, statsdPort);
+ this.statsdRefreshInternal = settings.getAsTime(
+ "metrics.statsd.every", TimeValue.timeValueMinutes(1)
+ );
+ this.statsdHost = settings.get(
+ "metrics.statsd.host"
+ );
+ this.statsdPort = settings.getAsInt(
+ "metrics.statsd.port", 8125
+ );
+ this.statsdPrefix = settings.get(
+ "metrics.statsd.prefix", "elasticsearch" + "." + settings.get("cluster.name")
+ );
+ this.statsdNodeName = settings.get(
+ "metrics.statsd.node_name"
+ );
+ this.statsdReportNodeIndices = settings.getAsBoolean(
+ "metrics.statsd.report.node_indices", false
+ );
+ this.statsdReportIndices = settings.getAsBoolean(
+ "metrics.statsd.report.indices", true
+ );
+ this.statsdReportShards = settings.getAsBoolean(
+ "metrics.statsd.report.shards", false
+ );
+ this.statsdReportFsDetails = settings.getAsBoolean(
+ "metrics.statsd.report.fs_details", false
+ );
+ this.statsdClient = new NonBlockingStatsDClient(this.statsdPrefix, this.statsdHost, this.statsdPort);
}
@Override
- protected void doStart() throws ElasticSearchException
- {
- if (statsdHost != null && statsdHost.length() > 0) {
- statsdReporterThread = EsExecutors.daemonThreadFactory(settings, "statsd_reporter").newThread(
- new StatsdReporterThread());
- statsdReporterThread.start();
- logger.info("Statsd reporting triggered every [{}] to host [{}:{}] with metric prefix [{}]",
- statsdRefreshInternal, statsdHost, statsdPort, statsdPrefix);
- }
- else {
- logger.error("Statsd reporting disabled, no statsd host configured");
+ protected void doStart() throws ElasticsearchException {
+ if (this.statsdHost != null && this.statsdHost.length() > 0) {
+ this.statsdReporterThread = EsExecutors
+ .daemonThreadFactory(this.settings, "statsd_reporter")
+ .newThread(new StatsdReporterThread());
+ this.statsdReporterThread.start();
+ this.logger.info(
+ "StatsD reporting triggered every [{}] to host [{}:{}] with metric prefix [{}]",
+ this.statsdRefreshInternal, this.statsdHost, this.statsdPort, this.statsdPrefix
+ );
+ } else {
+ this.logger.error(
+ "StatsD reporting disabled, no StatsD host configured"
+ );
}
}
@Override
- protected void doStop() throws ElasticSearchException
- {
- if (closed) {
+ protected void doStop() throws ElasticsearchException {
+ if (this.closed) {
return;
}
- if (statsdReporterThread != null) {
- statsdReporterThread.interrupt();
+ if (this.statsdReporterThread != null) {
+ this.statsdReporterThread.interrupt();
}
- closed = true;
- logger.info("Statsd reporter stopped");
+ this.closed = true;
+ this.logger.info("StatsD reporter stopped");
}
@Override
- protected void doClose() throws ElasticSearchException
- {
+ protected void doClose() throws ElasticsearchException {
}
- public class StatsdReporterThread implements Runnable
- {
-
- public void run()
- {
- while (!closed) {
- DiscoveryNode node = clusterService.localNode();
- boolean isClusterStarted = clusterService.lifecycleState().equals(Lifecycle.State.STARTED);
-
- if (isClusterStarted && node != null && node.isMasterNode()) {
- NodeIndicesStats nodeIndicesStats = indicesService.stats(false);
- CommonStatsFlags commonStatsFlags = new CommonStatsFlags().clear();
- NodeStats nodeStats = nodeService.stats(commonStatsFlags, true, true, true, true, true, true, true, true);
- List indexShards = getIndexShards(indicesService);
+ public class StatsdReporterThread implements Runnable {
+
+ @Override
+ public void run() {
+ while (!StatsdService.this.closed) {
+ DiscoveryNode node = StatsdService.this.clusterService.localNode();
+ ClusterState state = StatsdService.this.clusterService.state();
+ boolean isClusterStarted = StatsdService.this.clusterService
+ .lifecycleState()
+ .equals(Lifecycle.State.STARTED);
+
+ if (node != null && state != null && isClusterStarted) {
+ String statsdNodeName = StatsdService.this.statsdNodeName;
+ if (null == statsdNodeName) statsdNodeName = node.getName();
+
+ // Report node stats -- runs for all nodes
+ StatsdReporter nodeStatsReporter = new StatsdReporterNodeStats(
+ StatsdService.this.nodeService.stats(
+ new CommonStatsFlags().clear(), // indices
+ true, // os
+ true, // process
+ true, // jvm
+ true, // threadPool
+ true, // network
+ true, // fs
+ true, // transport
+ true, // http
+ false // circuitBreaker
+ ),
+ statsdNodeName,
+ StatsdService.this.statsdReportFsDetails
+ );
+ nodeStatsReporter
+ .setStatsDClient(StatsdService.this.statsdClient)
+ .run();
+
+ // Maybe report index stats per node
+ if (StatsdService.this.statsdReportNodeIndices && node.isDataNode()) {
+ StatsdReporter nodeIndicesStatsReporter = new StatsdReporterNodeIndicesStats(
+ StatsdService.this.indicesService.stats(
+ false // includePrevious
+ ),
+ statsdNodeName
+ );
+ nodeIndicesStatsReporter
+ .setStatsDClient(StatsdService.this.statsdClient)
+ .run();
+ }
- StatsdReporter statsdReporter = new StatsdReporter(nodeIndicesStats, indexShards, nodeStats, statsdClient);
- statsdReporter.run();
- }
- else {
- if (node != null) {
- logger.debug("[{}]/[{}] is not master node, not triggering update", node.getId(), node.getName());
+ // Master node is the only one allowed to send cluster wide sums / stats
+ if (state.nodes().localNodeMaster()) {
+ StatsdReporter indicesReporter = new StatsdReporterIndices(
+ StatsdService.this.client
+ .admin() // AdminClient
+ .indices() // IndicesAdminClient
+ .prepareStats() // IndicesStatsRequestBuilder
+ .all() // IndicesStatsRequestBuilder
+ .get(), // IndicesStatsResponse
+ StatsdService.this.statsdReportIndices,
+ StatsdService.this.statsdReportShards
+ );
+ indicesReporter
+ .setStatsDClient(StatsdService.this.statsdClient)
+ .run();
}
}
try {
- Thread.sleep(statsdRefreshInternal.millis());
- }
- catch (InterruptedException e1) {
+ Thread.sleep(StatsdService.this.statsdRefreshInternal.millis());
+ } catch (InterruptedException e1) {
continue;
}
}
}
-
- private List getIndexShards(IndicesService indicesService)
- {
- List indexShards = Lists.newArrayList();
- String[] indices = indicesService.indices().toArray(new String[] {});
- for (String indexName : indices) {
- IndexService indexService = indicesService.indexServiceSafe(indexName);
- for (int shardId : indexService.shardIds()) {
- indexShards.add(indexService.shard(shardId));
- }
- }
- return indexShards;
- }
}
}
diff --git a/src/test/java/org/elasticsearch/module/statsd/test/StatsdPluginIntegrationTest.java b/src/test/java/org/elasticsearch/module/statsd/test/StatsdPluginIntegrationTest.java
index d33f00d..6c6197b 100644
--- a/src/test/java/org/elasticsearch/module/statsd/test/StatsdPluginIntegrationTest.java
+++ b/src/test/java/org/elasticsearch/module/statsd/test/StatsdPluginIntegrationTest.java
@@ -23,56 +23,69 @@ public class StatsdPluginIntegrationTest
private String clusterName = RandomStringGenerator.randomAlphabetic(10);
private String index = RandomStringGenerator.randomAlphabetic(6).toLowerCase();
private String type = RandomStringGenerator.randomAlphabetic(6).toLowerCase();
- private Node node;
+ private Node node_1;
+ private Node node_2;
+ private Node node_3;
@Before
public void startStatsdMockServerAndNode() throws Exception
{
statsdMockServer = new StatsdMockServer(STATSD_SERVER_PORT);
statsdMockServer.start();
- node = createNode(clusterName, 1, STATSD_SERVER_PORT, "1s");
+ node_1 = createNode(clusterName, 4, STATSD_SERVER_PORT, "1s");
+ node_2 = createNode(clusterName, 4, STATSD_SERVER_PORT, "1s");
+ node_3 = createNode(clusterName, 4, STATSD_SERVER_PORT, "1s");
}
@After
public void stopStatsdServer() throws Exception
{
statsdMockServer.close();
- if (!node.isClosed()) {
- node.close();
+ if (!node_1.isClosed()) {
+ node_1.close();
+ }
+ if (!node_2.isClosed()) {
+ node_2.close();
+ }
+ if (!node_3.isClosed()) {
+ node_3.close();
}
}
@Test
public void testThatIndexingResultsInMonitoring() throws Exception
{
- IndexResponse indexResponse = indexElement(node, index, type, "value");
+ IndexResponse indexResponse = indexElement(node_1, index, type, "value");
assertThat(indexResponse.getId(), is(notNullValue()));
- Thread.sleep(2000);
+ //Index some more docs
+ this.indexSomeDocs(101);
+
+ Thread.sleep(4000);
ensureValidKeyNames();
- assertStatsdMetricIsContained("elasticsearch." + clusterName + ".indexes." + index + ".id.0.indexing._all.indexCount:1|c");
- assertStatsdMetricIsContained("elasticsearch." + clusterName + ".indexes." + index + ".id.0.indexing." + type + ".indexCount:1|c");
- assertStatsdMetricIsContained("elasticsearch." + clusterName + ".node.jvm.threads.peakCount:");
+ assertStatsdMetricIsContained("elasticsearch." + clusterName + ".index." + index + ".shard.0.indexing.index_total:51|c");
+ assertStatsdMetricIsContained("elasticsearch." + clusterName + ".index." + index + ".shard.1.indexing.index_total:51|c");
+ assertStatsdMetricIsContained(".jvm.threads.peak_count:");
}
@Test
public void masterFailOverShouldWork() throws Exception
{
String clusterName = RandomStringGenerator.randomAlphabetic(10);
- IndexResponse indexResponse = indexElement(node, index, type, "value");
+ IndexResponse indexResponse = indexElement(node_1, index, type, "value");
assertThat(indexResponse.getId(), is(notNullValue()));
- Node origNode = node;
- node = createNode(clusterName, 1, STATSD_SERVER_PORT, "1s");
+ Node origNode = node_1;
+ node_1 = createNode(clusterName, 1, STATSD_SERVER_PORT, "1s");
statsdMockServer.content.clear();
origNode.stop();
- indexResponse = indexElement(node, index, type, "value");
+ indexResponse = indexElement(node_1, index, type, "value");
assertThat(indexResponse.getId(), is(notNullValue()));
// wait for master fail over and writing to graph reporter
- Thread.sleep(2000);
- assertStatsdMetricIsContained("elasticsearch." + clusterName + ".indexes." + index + ".id.0.indexing._all.indexCount:1|c");
+ Thread.sleep(4000);
+ assertStatsdMetricIsContained("elasticsearch." + clusterName + ".index." + index + ".shard.0.indexing.index_total:1|c");
}
// the stupid hamcrest matchers have compile erros depending whether they run on java6 or java7, so I rolled my own version
@@ -96,4 +109,12 @@ private IndexResponse indexElement(Node node, String index, String type, String
{
return node.client().prepareIndex(index, type).setSource("field", fieldValue).execute().actionGet();
}
+
+ private void indexSomeDocs(int docs)
+ {
+ while( docs > 0 ) {
+ indexElement(node_1, index, type, "value " + docs);
+ docs--;
+ }
+ }
}