diff --git a/crawler/defaults.py b/crawler/defaults.py index eb564e92..3595eb91 100644 --- a/crawler/defaults.py +++ b/crawler/defaults.py @@ -66,6 +66,7 @@ 'interface': {}, 'cpu': {}, 'load': {}, + 'gpu': {}, 'dockerps': {}, 'dockerhistory': {}, 'dockerinspect': {}, diff --git a/crawler/dockercontainer.py b/crawler/dockercontainer.py index 00f73bca..588db8ce 100644 --- a/crawler/dockercontainer.py +++ b/crawler/dockercontainer.py @@ -56,7 +56,7 @@ def __init__(self, name=None, type=None, source=None, self.host_log_dir = host_log_dir def __str__(self): - return "%s: %s --> %s" % (self.name, self.source, self.dest) + return "%s: %s --> %s" % (self.name, self.source, self.dest) def get_dest(self): if self.host_log_dir: diff --git a/crawler/emitter.py b/crawler/emitter.py index 66933c07..2a57e05a 100644 --- a/crawler/emitter.py +++ b/crawler/emitter.py @@ -29,6 +29,40 @@ # Kafka logs too much logging.getLogger('kafka').addHandler(NullHandler()) +def flatten_obj(obj, prefix=''): + results = {} + def flatten(obj, prefix, results): + if isinstance(obj, dict): + for key, val in obj.iteritems(): + if isinstance(val, dict): + if len(prefix) != 0: + flatten(val, prefix+'.'+key, results) + else: + flatten(val, key, results) + elif isinstance(val, list): + for i, j in enumerate(val): + if len(prefix) != 0: + flatten(j, prefix+'.'+key+'.'+str(i), results) + else: + flatten(j, key+'.'+str(i), results) + else: + if len(prefix) != 0: + results[prefix+'.'+key] = val + else: + results[key] = val + elif isinstance(obj, list): + for i, j in enumerate(obj): + if len(prefix) != 0: + flatten(j, prefix+'.'+key+'.'+str(i), results) + else: + flatten(j, key+'.'+str(i), results) + else: + if len(prefix) != 0: + results[prefix] = obj + + flatten(obj, '', results) + return results + def kafka_send(kurl, temp_fpath, format, topic, queue=None): try: kafka_python_client = kafka_python.KafkaClient(kurl) @@ -122,7 +156,7 @@ def emit_dict_as_graphite( timestamp=None, ): timestamp = int(timestamp or time.time()) - items = data.items() + items = flatten_obj(data).items() # this is for issue #343 diff --git a/crawler/features_crawler.py b/crawler/features_crawler.py index 49d26712..5bb89e83 100644 --- a/crawler/features_crawler.py +++ b/crawler/features_crawler.py @@ -9,6 +9,7 @@ import fnmatch import re import time +import subprocess # Additional modules @@ -37,6 +38,7 @@ logger = logging.getLogger('crawlutils') +NVIDIA_SMI = "/usr/bin/nvidia-smi" class FeaturesCrawler: @@ -88,6 +90,7 @@ def __init__( 'config': self.crawl_config_files, 'memory': self.crawl_memory, 'cpu': self.crawl_cpu, + 'gpu': self.crawl_gpu, 'interface': self.crawl_interface, 'load': self.crawl_load, 'dockerps': self.crawl_dockerps, @@ -1298,6 +1301,42 @@ def _crawl_cpuHw(self): feature_key = '{0}-{1}'.format('cpu', cpu.cpu_vendor_id) yield (feature_key, cpu) + + def crawl_gpu(self): + ''' + nvidia-smi returns following: MEMORY, UTILIZATION, ECC, TEMPERATURE, + POWER, CLOCK, COMPUTE, PIDS, PERFORMANCE, SUPPORTED_CLOCKS, PAGE_RETIREMENT, + ACCOUNTING + + currently, following are requested based on dlaas requirements: utilization.gpu + utilization.memory, memory.total, memory.free, memory.used + nvidia-smi --query-gpu=utilization.gpu,utilization.memory,memory.total,memory.free,memory.used --format=csv,noheader,nounits + ''' + + util_atttibutes = ['gpu','memory'] + memory_atttibutes = ['total','free','used'] + + if not os.path.exists(NVIDIA_SMI): + return + + nvidia_smi_proc = subprocess.Popen([NVIDIA_SMI, '--query-gpu=utilization.gpu,utilization.memory,memory.total,memory.free,memory.used', + '--format=csv,noheader,nounits' ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + nvidia_smi_proc_out, nvidia_smi_proc_err = nvidia_smi_proc.communicate() + + if nvidia_smi_proc.returncode > 0: + raise Exception('Unable to get gpu metrics') + + metrics = nvidia_smi_proc_out.split('\n') + for i, val_str in enumerate(metrics): + if len(val_str) != 0: + values = val_str.split(',') + entry = {'utilization':{'gpu': values[0], 'memory': values[1]}, + 'memory': {'total':values[2], 'free': values[3], 'used': values[4]}} + key = 'gpu{}'.format(i) + yield (key, entry) + + return + def _crawl_wrapper(self, _function, namespaces=ALL_NAMESPACES, *args): # TODO: add kwargs if self.crawl_mode == Modes.OUTCONTAINER: diff --git a/requirements.txt b/requirements.txt index 0cf60bc6..04dddf2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ psutil==2.1.3 -requests==2.5.0 +requests==2.7.0 netifaces==0.10.4 kafka-python==0.9.2 pykafka==1.1.0 diff --git a/tests/unit/test_emitter.py b/tests/unit/test_emitter.py index 8ca5be1c..96e7b112 100644 --- a/tests/unit/test_emitter.py +++ b/tests/unit/test_emitter.py @@ -252,6 +252,48 @@ def test_emitter_graphite_simple_stdout(self): assert float(_output[1].split(' ')[1]) == 12345.0 assert float(_output[2].split(' ')[1]) == 12345.0 + def _test_emitter_graphite_nested_dict(self): + metadata = {} + metadata['namespace'] = 'namespace777' + with Emitter(urls=['stdout://'], + emitter_args=metadata, + format='graphite') as emitter: + emitter.emit("gpu0", + {'memory': { + 'total': '11519', + 'used': '55', + 'free': '11464' + }, + 'utilization': { + 'gpu': '0', + 'memory': ' 0' + } + }, + 'gpu') + + def test_emitter_graphite_nested_dict(self): + with Capturing() as _output: + self._test_emitter_graphite_nested_dict() + output = "%s" % _output + # should look like this: + # [ + # 'namespace777.gpu0.utilization.memory 0.000000 1479812320', + # 'namespace777.gpu0.utilization.gpu 0.000000 1479812320', + # 'namespace777.gpu0.memory.total 11519.000000 1479812320', + # 'namespace777.gpu0.memory.free 11464.000000 1479812320', + # 'namespace777.gpu0.memory.used 55.000000 1479812320' + # ] + + assert len(_output) == 5 + assert 'namespace777.gpu0.utilization.memory' in output + assert 'namespace777.gpu0.utilization.gpu' in output + assert 'namespace777.gpu0.memory.total' in output + assert 'namespace777.gpu0.memory.free' in output + assert 'namespace777.gpu0.memory.used' in output + # all fields in graphite format + for line in _output: + assert len(line.split(' ')) == 3 + def test_emitter_unsupported_format(self): metadata = {} metadata['namespace'] = 'namespace777'