Skip to content

Commit 1fabc5f

Browse files
authored
Linter fixes for plugins/inputs/[c]* (influxdata#9194)
* Linter fixes for plugins/inputs/[c]* * Linter fixes for plugins/inputs/[c]* Co-authored-by: Pawel Zak <Pawel Zak>
1 parent 5256f91 commit 1fabc5f

24 files changed

+446
-398
lines changed

plugins/inputs/cassandra/README.md

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,26 @@ Cassandra plugin produces one or more measurements for each metric configured, a
1919
Given a configuration like:
2020

2121
```toml
22+
# Read Cassandra metrics through Jolokia
2223
[[inputs.cassandra]]
24+
## DEPRECATED: The cassandra plugin has been deprecated. Please use the
25+
## jolokia2 plugin instead.
26+
##
27+
## see https://github.com/influxdata/telegraf/tree/master/plugins/inputs/jolokia2
28+
2329
context = "/jolokia/read"
24-
servers = [":8778"]
25-
metrics = ["/java.lang:type=Memory/HeapMemoryUsage"]
30+
## List of cassandra servers exposing jolokia read service
31+
servers = ["myuser:[email protected]:8778","10.10.10.2:8778",":8778"]
32+
## List of metrics collected on above servers
33+
## Each metric consists of a jmx path.
34+
## This will collect all heap memory usage metrics from the jvm and
35+
## ReadLatency metrics for all keyspaces and tables.
36+
## "type=Table" in the query works with Cassandra3.0. Older versions might
37+
## need to use "type=ColumnFamily"
38+
metrics = [
39+
"/java.lang:type=Memory/HeapMemoryUsage",
40+
"/org.apache.cassandra.metrics:type=Table,keyspace=*,scope=*,name=ReadLatency"
41+
]
2642
```
2743

2844
The collected metrics will be:

plugins/inputs/cassandra/cassandra.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,11 @@ type jmxMetric interface {
4949
addTagsFields(out map[string]interface{})
5050
}
5151

52-
func newJavaMetric(host string, metric string,
53-
acc telegraf.Accumulator) *javaMetric {
52+
func newJavaMetric(acc telegraf.Accumulator, host string, metric string) *javaMetric {
5453
return &javaMetric{host: host, metric: metric, acc: acc}
5554
}
5655

57-
func newCassandraMetric(host string, metric string,
58-
acc telegraf.Accumulator) *cassandraMetric {
56+
func newCassandraMetric(acc telegraf.Accumulator, host string, metric string) *cassandraMetric {
5957
return &cassandraMetric{host: host, metric: metric, acc: acc}
6058
}
6159

@@ -72,13 +70,15 @@ func addValuesAsFields(values map[string]interface{}, fields map[string]interfac
7270
func parseJmxMetricRequest(mbean string) map[string]string {
7371
tokens := make(map[string]string)
7472
classAndPairs := strings.Split(mbean, ":")
75-
if classAndPairs[0] == "org.apache.cassandra.metrics" {
73+
switch classAndPairs[0] {
74+
case "org.apache.cassandra.metrics":
7675
tokens["class"] = "cassandra"
77-
} else if classAndPairs[0] == "java.lang" {
76+
case "java.lang":
7877
tokens["class"] = "java"
79-
} else {
78+
default:
8079
return tokens
8180
}
81+
8282
pairs := strings.Split(classAndPairs[1], ",")
8383
for _, pair := range pairs {
8484
p := strings.Split(pair, "=")
@@ -147,22 +147,21 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) {
147147
// maps in the json response
148148
if (tokens["type"] == "Table" || tokens["type"] == "ColumnFamily") && (tokens["keyspace"] == "*" ||
149149
tokens["scope"] == "*") {
150-
if valuesMap, ok := out["value"]; ok {
151-
for k, v := range valuesMap.(map[string]interface{}) {
152-
addCassandraMetric(k, c, v.(map[string]interface{}))
153-
}
154-
} else {
150+
valuesMap, ok := out["value"]
151+
if !ok {
155152
c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out))
156153
return
157154
}
155+
for k, v := range valuesMap.(map[string]interface{}) {
156+
addCassandraMetric(k, c, v.(map[string]interface{}))
157+
}
158158
} else {
159-
if values, ok := out["value"]; ok {
160-
addCassandraMetric(r.(map[string]interface{})["mbean"].(string),
161-
c, values.(map[string]interface{}))
162-
} else {
159+
values, ok := out["value"]
160+
if !ok {
163161
c.acc.AddError(fmt.Errorf("missing key 'value' in '%s' output response: %v", c.metric, out))
164162
return
165163
}
164+
addCassandraMetric(r.(map[string]interface{})["mbean"].(string), c, values.(map[string]interface{}))
166165
}
167166
}
168167

@@ -277,10 +276,10 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
277276

278277
var m jmxMetric
279278
if strings.HasPrefix(metric, "/java.lang:") {
280-
m = newJavaMetric(serverTokens["host"], metric, acc)
279+
m = newJavaMetric(acc, serverTokens["host"], metric)
281280
} else if strings.HasPrefix(metric,
282281
"/org.apache.cassandra.metrics:") {
283-
m = newCassandraMetric(serverTokens["host"], metric, acc)
282+
m = newCassandraMetric(acc, serverTokens["host"], metric)
284283
} else {
285284
// unsupported metric type
286285
acc.AddError(fmt.Errorf("unsupported Cassandra metric [%s], skipping", metric))

plugins/inputs/ceph/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ the cluster. The currently supported commands are:
4545
### Configuration:
4646

4747
```toml
48-
# Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster.
48+
# Collects performance metrics from the MON, OSD, MDS and RGW nodes in a Ceph storage cluster.
4949
[[inputs.ceph]]
5050
## This is the recommended interval to poll. Too frequent and you will lose
5151
## data points due to timeouts during rebalancing and recovery

plugins/inputs/ceph/ceph.go

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77
"io/ioutil"
8-
"log"
98
"os/exec"
109
"path/filepath"
1110
"strings"
@@ -28,17 +27,19 @@ const (
2827
)
2928

3029
type Ceph struct {
31-
CephBinary string
32-
OsdPrefix string
33-
MonPrefix string
34-
MdsPrefix string
35-
RgwPrefix string
36-
SocketDir string
37-
SocketSuffix string
38-
CephUser string
39-
CephConfig string
40-
GatherAdminSocketStats bool
41-
GatherClusterStats bool
30+
CephBinary string `toml:"ceph_binary"`
31+
OsdPrefix string `toml:"osd_prefix"`
32+
MonPrefix string `toml:"mon_prefix"`
33+
MdsPrefix string `toml:"mds_prefix"`
34+
RgwPrefix string `toml:"rgw_prefix"`
35+
SocketDir string `toml:"socket_dir"`
36+
SocketSuffix string `toml:"socket_suffix"`
37+
CephUser string `toml:"ceph_user"`
38+
CephConfig string `toml:"ceph_config"`
39+
GatherAdminSocketStats bool `toml:"gather_admin_socket_stats"`
40+
GatherClusterStats bool `toml:"gather_cluster_stats"`
41+
42+
Log telegraf.Logger `toml:"-"`
4243
}
4344

4445
func (c *Ceph) Description() string {
@@ -67,7 +68,14 @@ var sampleConfig = `
6768
## suffix used to identify socket files
6869
socket_suffix = "asok"
6970
70-
## Ceph user to authenticate as
71+
## Ceph user to authenticate as, ceph will search for the corresponding keyring
72+
## e.g. client.admin.keyring in /etc/ceph, or the explicit path defined in the
73+
## client section of ceph.conf for example:
74+
##
75+
## [client.telegraf]
76+
## keyring = /etc/ceph/client.telegraf.keyring
77+
##
78+
## Consult the ceph documentation for more detail on keyring generation.
7179
ceph_user = "client.admin"
7280
7381
## Ceph configuration to use to locate the cluster
@@ -76,7 +84,8 @@ var sampleConfig = `
7684
## Whether to gather statistics via the admin socket
7785
gather_admin_socket_stats = true
7886
79-
## Whether to gather statistics via ceph commands
87+
## Whether to gather statistics via ceph commands, requires ceph_user and ceph_config
88+
## to be specified
8089
gather_cluster_stats = false
8190
`
8291

@@ -112,14 +121,14 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
112121
acc.AddError(fmt.Errorf("error reading from socket '%s': %v", s.socket, err))
113122
continue
114123
}
115-
data, err := parseDump(dump)
124+
data, err := c.parseDump(dump)
116125
if err != nil {
117126
acc.AddError(fmt.Errorf("error parsing dump from socket '%s': %v", s.socket, err))
118127
continue
119128
}
120129
for tag, metrics := range data {
121130
acc.AddFields(measurement,
122-
map[string]interface{}(metrics),
131+
metrics,
123132
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
124133
}
125134
}
@@ -138,7 +147,7 @@ func (c *Ceph) gatherClusterStats(acc telegraf.Accumulator) error {
138147

139148
// For each job, execute against the cluster, parse and accumulate the data points
140149
for _, job := range jobs {
141-
output, err := c.exec(job.command)
150+
output, err := c.execute(job.command)
142151
if err != nil {
143152
return fmt.Errorf("error executing command: %v", err)
144153
}
@@ -171,15 +180,17 @@ func init() {
171180

172181
var perfDump = func(binary string, socket *socket) (string, error) {
173182
cmdArgs := []string{"--admin-daemon", socket.socket}
174-
if socket.sockType == typeOsd {
183+
184+
switch socket.sockType {
185+
case typeOsd:
175186
cmdArgs = append(cmdArgs, "perf", "dump")
176-
} else if socket.sockType == typeMon {
187+
case typeMon:
177188
cmdArgs = append(cmdArgs, "perfcounters_dump")
178-
} else if socket.sockType == typeMds {
189+
case typeMds:
179190
cmdArgs = append(cmdArgs, "perf", "dump")
180-
} else if socket.sockType == typeRgw {
191+
case typeRgw:
181192
cmdArgs = append(cmdArgs, "perf", "dump")
182-
} else {
193+
default:
183194
return "", fmt.Errorf("ignoring unknown socket type: %s", socket.sockType)
184195
}
185196

@@ -268,23 +279,23 @@ type taggedMetricMap map[string]metricMap
268279

269280
// Parses a raw JSON string into a taggedMetricMap
270281
// Delegates the actual parsing to newTaggedMetricMap(..)
271-
func parseDump(dump string) (taggedMetricMap, error) {
282+
func (c *Ceph) parseDump(dump string) (taggedMetricMap, error) {
272283
data := make(map[string]interface{})
273284
err := json.Unmarshal([]byte(dump), &data)
274285
if err != nil {
275286
return nil, fmt.Errorf("failed to parse json: '%s': %v", dump, err)
276287
}
277288

278-
return newTaggedMetricMap(data), nil
289+
return c.newTaggedMetricMap(data), nil
279290
}
280291

281292
// Builds a TaggedMetricMap out of a generic string map.
282293
// The top-level key is used as a tag and all sub-keys are flattened into metrics
283-
func newTaggedMetricMap(data map[string]interface{}) taggedMetricMap {
294+
func (c *Ceph) newTaggedMetricMap(data map[string]interface{}) taggedMetricMap {
284295
tmm := make(taggedMetricMap)
285296
for tag, datapoints := range data {
286297
mm := make(metricMap)
287-
for _, m := range flatten(datapoints) {
298+
for _, m := range c.flatten(datapoints) {
288299
mm[m.name()] = m.value
289300
}
290301
tmm[tag] = mm
@@ -296,7 +307,7 @@ func newTaggedMetricMap(data map[string]interface{}) taggedMetricMap {
296307
// Nested keys are flattened into ordered slices associated with a metric value.
297308
// The key slices are treated as stacks, and are expected to be reversed and concatenated
298309
// when passed as metrics to the accumulator. (see (*metric).name())
299-
func flatten(data interface{}) []*metric {
310+
func (c *Ceph) flatten(data interface{}) []*metric {
300311
var metrics []*metric
301312

302313
switch val := data.(type) {
@@ -305,20 +316,20 @@ func flatten(data interface{}) []*metric {
305316
case map[string]interface{}:
306317
metrics = make([]*metric, 0, len(val))
307318
for k, v := range val {
308-
for _, m := range flatten(v) {
319+
for _, m := range c.flatten(v) {
309320
m.pathStack = append(m.pathStack, k)
310321
metrics = append(metrics, m)
311322
}
312323
}
313324
default:
314-
log.Printf("I! [inputs.ceph] ignoring unexpected type '%T' for value %v", val, val)
325+
c.Log.Infof("ignoring unexpected type '%T' for value %v", val, val)
315326
}
316327

317328
return metrics
318329
}
319330

320-
// exec executes the 'ceph' command with the supplied arguments, returning JSON formatted output
321-
func (c *Ceph) exec(command string) (string, error) {
331+
// execute executes the 'ceph' command with the supplied arguments, returning JSON formatted output
332+
func (c *Ceph) execute(command string) (string, error) {
322333
cmdArgs := []string{"--conf", c.CephConfig, "--name", c.CephUser, "--format", "json"}
323334
cmdArgs = append(cmdArgs, strings.Split(command, " ")...)
324335

plugins/inputs/ceph/ceph_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ import (
99
"strings"
1010
"testing"
1111

12-
"github.com/influxdata/telegraf/testutil"
1312
"github.com/stretchr/testify/require"
13+
14+
"github.com/influxdata/telegraf/testutil"
1415
)
1516

1617
const (
@@ -29,28 +30,32 @@ func TestParseSockId(t *testing.T) {
2930
}
3031

3132
func TestParseMonDump(t *testing.T) {
32-
dump, err := parseDump(monPerfDump)
33+
c := &Ceph{Log: testutil.Logger{}}
34+
dump, err := c.parseDump(monPerfDump)
3335
require.NoError(t, err)
3436
require.InEpsilon(t, int64(5678670180), dump["cluster"]["osd_kb_used"], epsilon)
3537
require.InEpsilon(t, 6866.540527000, dump["paxos"]["store_state_latency.sum"], epsilon)
3638
}
3739

3840
func TestParseOsdDump(t *testing.T) {
39-
dump, err := parseDump(osdPerfDump)
41+
c := &Ceph{Log: testutil.Logger{}}
42+
dump, err := c.parseDump(osdPerfDump)
4043
require.NoError(t, err)
4144
require.InEpsilon(t, 552132.109360000, dump["filestore"]["commitcycle_interval.sum"], epsilon)
4245
require.Equal(t, float64(0), dump["mutex-FileJournal::finisher_lock"]["wait.avgcount"])
4346
}
4447

4548
func TestParseMdsDump(t *testing.T) {
46-
dump, err := parseDump(mdsPerfDump)
49+
c := &Ceph{Log: testutil.Logger{}}
50+
dump, err := c.parseDump(mdsPerfDump)
4751
require.NoError(t, err)
4852
require.InEpsilon(t, 2408386.600934982, dump["mds"]["reply_latency.sum"], epsilon)
4953
require.Equal(t, float64(0), dump["throttle-write_buf_throttle"]["wait.avgcount"])
5054
}
5155

5256
func TestParseRgwDump(t *testing.T) {
53-
dump, err := parseDump(rgwPerfDump)
57+
c := &Ceph{Log: testutil.Logger{}}
58+
dump, err := c.parseDump(rgwPerfDump)
5459
require.NoError(t, err)
5560
require.InEpsilon(t, 0.002219876, dump["rgw"]["get_initial_lat.sum"], epsilon)
5661
require.Equal(t, float64(0), dump["rgw"]["put_initial_lat.avgcount"])

plugins/inputs/cgroup/README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,19 @@ All measurements have the following tags:
4444
### Configuration:
4545

4646
```toml
47+
# Read specific statistics per cgroup
4748
# [[inputs.cgroup]]
49+
## Directories in which to look for files, globs are supported.
50+
## Consider restricting paths to the set of cgroups you really
51+
## want to monitor if you have a large number of cgroups, to avoid
52+
## any cardinality issues.
4853
# paths = [
49-
# "/sys/fs/cgroup/memory", # root cgroup
50-
# "/sys/fs/cgroup/memory/child1", # container cgroup
51-
# "/sys/fs/cgroup/memory/child2/*", # all children cgroups under child2, but not child2 itself
54+
# "/sys/fs/cgroup/memory",
55+
# "/sys/fs/cgroup/memory/child1",
56+
# "/sys/fs/cgroup/memory/child2/*",
5257
# ]
58+
## cgroup stat fields, as file names, globs are supported.
59+
## these file names are appended to each path from above.
5360
# files = ["memory.*usage*", "memory.limit_in_bytes"]
5461
```
5562

plugins/inputs/cgroup/cgroup_linux.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ func (g *CGroup) Gather(acc telegraf.Accumulator) error {
2525
acc.AddError(dir.err)
2626
continue
2727
}
28-
if err := g.gatherDir(dir.path, acc); err != nil {
28+
if err := g.gatherDir(acc, dir.path); err != nil {
2929
acc.AddError(err)
3030
}
3131
}
3232

3333
return nil
3434
}
3535

36-
func (g *CGroup) gatherDir(dir string, acc telegraf.Accumulator) error {
36+
func (g *CGroup) gatherDir(acc telegraf.Accumulator, dir string) error {
3737
fields := make(map[string]interface{})
3838

3939
list := make(chan pathInfo)
@@ -72,8 +72,8 @@ type pathInfo struct {
7272
err error
7373
}
7474

75-
func isDir(path string) (bool, error) {
76-
result, err := os.Stat(path)
75+
func isDir(pathToCheck string) (bool, error) {
76+
result, err := os.Stat(pathToCheck)
7777
if err != nil {
7878
return false, err
7979
}

0 commit comments

Comments
 (0)