|
| 1 | +import subprocess |
| 2 | +import common |
| 3 | +import settings |
| 4 | +import monitoring |
| 5 | +import os, sys |
| 6 | +import time |
| 7 | +import threading |
| 8 | +import lxml.etree as ET |
| 9 | +import re |
| 10 | +import time |
| 11 | + |
| 12 | +from cluster.ceph import Ceph |
| 13 | +from benchmark import Benchmark |
| 14 | + |
| 15 | +class Cosbench(Benchmark): |
| 16 | + |
| 17 | + def __init__(self, cluster, config): |
| 18 | + super(Cosbench, self).__init__(cluster, config) |
| 19 | + |
| 20 | + config = self.parse_conf(config) |
| 21 | + |
| 22 | + self.op_size = config["obj_size"] |
| 23 | + self.total_procs = config["workers"] |
| 24 | + self.containers = config["containers_max"] |
| 25 | + self.objects = config["objects_max"] |
| 26 | + self.mode = config["mode"] |
| 27 | + |
| 28 | + self.run_dir = '%s/osd_ra-%08d/op_size-%s/concurrent_procs-%03d/containers-%05d/objects-%05d/%s' % (self.run_dir, int(self.osd_ra), self.op_size, int(self.total_procs), int(self.containers),int(self.objects), self.mode) |
| 29 | + self.out_dir = '%s/osd_ra-%08d/op_size-%s/concurrent_procs-%03d/containers-%05d/objects-%05d/%s' % (self.archive_dir, int(self.osd_ra), self.op_size, int(self.total_procs), int(self.containers),int(self.objects), self.mode) |
| 30 | + |
| 31 | + def prerun_check(self): |
| 32 | + #1. check cosbench |
| 33 | + if not self.check_workload_status(): |
| 34 | + sys.exit() |
| 35 | + #2. check rgw |
| 36 | + cosconf = {} |
| 37 | + for param in self.config["auth"]["config"].split(';'): |
| 38 | + try: |
| 39 | + key, value = param.split('=') |
| 40 | + cosconf[key] = value |
| 41 | + except: |
| 42 | + pass |
| 43 | + print cosconf |
| 44 | + if "username" in cosconf and "password" in cosconf and "url" in cosconf: |
| 45 | + stdout, stderr = common.pdsh(self.config["controller"],"curl -D - -H 'X-Auth-User: %s' -H 'X-Auth-Key: %s' %s" % (cosconf["username"], cosconf["password"], cosconf["url"])).communicate() |
| 46 | + else: |
| 47 | + print "[ERROR]Auth Configuration in Yaml file is not in correct format" |
| 48 | + sys.exit() |
| 49 | + if re.search('(refused|error)', stderr): |
| 50 | + print "[ERROR]Cosbench connect to Radosgw Connection Failed" |
| 51 | + print stderr |
| 52 | + sys.exit() |
| 53 | + if re.search("AccessDenied", stdout): |
| 54 | + print "[ERROR]Cosbench connect to Radosgw Auth Failed" |
| 55 | + print stdout |
| 56 | + sys.exit() |
| 57 | + |
| 58 | + def exists(self): |
| 59 | + if os.path.exists(self.out_dir): |
| 60 | + print 'Skipping existing test in %s.' % self.out_dir |
| 61 | + return True |
| 62 | + return False |
| 63 | + |
| 64 | + def choose_template(self, temp_name, conf): |
| 65 | + template = { |
| 66 | + "default":{ |
| 67 | + "description": conf["mode"], |
| 68 | + "name": "%s_%scon_%sobj_%s_%dw" % (conf["mode"], conf["containers_max"], conf["objects_max"], conf["obj_size"], conf["workers"]), |
| 69 | + "storage": {"type":"swift", "config":"timeout=300000" }, |
| 70 | + "auth": {"type":"swauth", "config":"%s" % (conf["auth"]["config"])}, |
| 71 | + "workflow": { |
| 72 | + "workstage": [{ |
| 73 | + "name": "init", |
| 74 | + "work": {"type":"init", "workers":conf["workers"], "config":"containers=r(1,%s);cprefix=%s-%s" % (conf["containers_max"], conf["obj_size"], conf["mode"])} |
| 75 | + },{ |
| 76 | + "name": "main", |
| 77 | + "work": {"rampup":conf["rampup"], "rampdown":conf["rampdown"], "name":conf["obj_size"], "workers":conf["workers"], "runtime":conf["runtime"], |
| 78 | + "operation": { |
| 79 | + "config":"containers=%s;objects=%s;cprefix=%s-%s;sizes=c(%s)%s" % (conf["containers"], conf["objects"], conf["obj_size"], conf["mode"], conf["obj_size_num"], conf["obj_size_unit"]), |
| 80 | + "ratio":conf["ratio"], |
| 81 | + "type":conf["mode"] |
| 82 | + } |
| 83 | + } |
| 84 | + }] |
| 85 | + } |
| 86 | + } |
| 87 | + } |
| 88 | + if temp_name in template: |
| 89 | + return template[temp_name] |
| 90 | + |
| 91 | + def parse_conf(self, conf): |
| 92 | + if "containers" in conf: |
| 93 | + m = re.findall("(\w{1})\((\d+),(\d+)\)", conf["containers"]) |
| 94 | + if m: |
| 95 | + conf["containers_method"] = m[0][0] |
| 96 | + conf["containers_min"] = m[0][1] |
| 97 | + conf["containers_max"] = m[0][2] |
| 98 | + if "objects" in conf: |
| 99 | + m = re.findall("(\w{1})\((\d+),(\d+)\)", conf["objects"]) |
| 100 | + if m: |
| 101 | + conf["objects_method"] = m[0][0] |
| 102 | + conf["objects_min"] = m[0][1] |
| 103 | + conf["objects_max"] = m[0][2] |
| 104 | + if "obj_size" in conf: |
| 105 | + m = re.findall("(\d+)(\w+)", conf["obj_size"]) |
| 106 | + if m: |
| 107 | + conf["obj_size_num"] = m[0][0] |
| 108 | + conf["obj_size_unit"] = m[0][1] |
| 109 | + return conf |
| 110 | + |
| 111 | + def initialize(self): |
| 112 | + super(Cosbench, self).initialize() |
| 113 | + |
| 114 | + print 'Running cosbench and radosgw check.' |
| 115 | + self.prerun_check() |
| 116 | + |
| 117 | + print 'Running scrub monitoring.' |
| 118 | + monitoring.start("%s/scrub_monitoring" % self.run_dir) |
| 119 | + self.cluster.check_scrub() |
| 120 | + monitoring.stop() |
| 121 | + |
| 122 | + print 'Pausing for 60s for idle monitoring.' |
| 123 | + monitoring.start("%s/idle_monitoring" % self.run_dir) |
| 124 | + time.sleep(60) |
| 125 | + monitoring.stop() |
| 126 | + |
| 127 | + common.sync_files('%s' % self.run_dir, self.out_dir) |
| 128 | + |
| 129 | + # Create the run directory |
| 130 | + common.make_remote_dir(self.run_dir) |
| 131 | + |
| 132 | + conf = self.config |
| 133 | + if not self.config["template"]: |
| 134 | + self.config["template"] = "default" |
| 135 | + self.config["workload"] = self.choose_template("default", conf) |
| 136 | + self.prepare_xml(self.config["workload"]) |
| 137 | + return True |
| 138 | + |
| 139 | + #function use_template, set_leaf and run_content, add_leaf_to_tree all used for generate a cosbench xml. |
| 140 | + def prepare_xml(self, leaves): |
| 141 | + conf = self.config |
| 142 | + root = ET.Element("workload") |
| 143 | + parent = root |
| 144 | + self.add_leaf_to_tree(leaves, parent) |
| 145 | + self.config["xml_name"] = leaves["name"] |
| 146 | + tree = ET.ElementTree(root) |
| 147 | + tree.write("%s/%s.xml" % (conf["cosbench_xml_dir"], leaves["name"]),pretty_print=True) |
| 148 | + print "Write xml conf to %s/%s.xml" % (conf["cosbench_xml_dir"], leaves["name"]) |
| 149 | + |
| 150 | + def add_leaf_to_tree(self, leaves, parent): |
| 151 | + for leaf, leaf_content in leaves.iteritems(): |
| 152 | + if isinstance(leaf_content, str) or isinstance(leaf_content, int): |
| 153 | + parent.set(leaf, str(leaf_content)) |
| 154 | + elif isinstance(leaf_content, list): |
| 155 | + leaves = leaf_content |
| 156 | + for leaf_content in leaves: |
| 157 | + self.add_leaf_to_tree(leaf_content, ET.SubElement(parent, leaf)) |
| 158 | + else: |
| 159 | + self.add_leaf_to_tree(leaf_content, ET.SubElement(parent, leaf)) |
| 160 | + |
| 161 | + def run(self): |
| 162 | + super(Cosbench, self).run() |
| 163 | + self.dropcaches() |
| 164 | + self.cluster.dump_config(self.run_dir) |
| 165 | + monitoring.start(self.run_dir) |
| 166 | + |
| 167 | + # Run cosbench test |
| 168 | + try: |
| 169 | + self._run() |
| 170 | + except KeyboardInterrupt: |
| 171 | + print "[WARNING] accept keyboard interrupt, cancel this run" |
| 172 | + conf = self.config |
| 173 | + stdout, stderr = common.pdsh(conf["controller"],'sh %s/cli.sh cancel %s' % (conf["cosbench_dir"], self.runid)).communicate() |
| 174 | + print "[LOG]%s" % stdout |
| 175 | + |
| 176 | + self.check_workload_status() |
| 177 | + |
| 178 | + time.sleep(5) |
| 179 | + |
| 180 | + monitoring.stop(self.run_dir) |
| 181 | + self.cluster.dump_historic_ops(self.run_dir) |
| 182 | + common.sync_files('%s/*' % self.run_dir, self.out_dir) |
| 183 | + common.sync_files('%s/archive/%s*' % (self.config["cosbench_dir"], self.runid), self.out_dir) |
| 184 | + |
| 185 | + def check_workload_status(self): |
| 186 | + wait = True |
| 187 | + try: |
| 188 | + self.runid |
| 189 | + except: |
| 190 | + wait = False |
| 191 | + while wait: |
| 192 | + stdout, stderr = common.pdsh(self.config["controller"],"sh %s/cli.sh info | grep %s | awk '{print $8}'" % (self.config["cosbench_dir"], self.runid)).communicate() |
| 193 | + if stderr: |
| 194 | + print "[ERROR]Cosbench Deamon is not running on %s" % self.config["controller"] |
| 195 | + return False |
| 196 | + try: |
| 197 | + status = stdout.split(':')[1] |
| 198 | + if status.strip() != 'PROCESSING': |
| 199 | + wait = False |
| 200 | + except: |
| 201 | + wait = False |
| 202 | + time.sleep(1) |
| 203 | + stdout, stderr = common.pdsh(self.config["controller"],"sh %s/cli.sh info " % (self.config["cosbench_dir"])).communicate() |
| 204 | + print stdout |
| 205 | + return True |
| 206 | + |
| 207 | + def _run(self): |
| 208 | + conf = self.config |
| 209 | + stdout, stderr = common.pdsh(conf["controller"],'sh %s/cli.sh submit %s/%s.xml' % (conf["cosbench_dir"], conf["cosbench_xml_dir"], conf["xml_name"])).communicate() |
| 210 | + m = re.findall('Accepted with ID:\s*(\w+)', stdout ) |
| 211 | + if not m: |
| 212 | + print "[ERROR] cosbench start failing with error: %s" % stderr |
| 213 | + sys.exit() |
| 214 | + self.runid = m[0] |
| 215 | + print "[LOG] cosbench job start, job number %s" % self.runid |
| 216 | + wait_time = conf["rampup"]+conf["rampdown"]+conf["runtime"] |
| 217 | + print "====== cosbench job: %s started ======" % (conf["xml_name"]) |
| 218 | + print "wait %d secs to finish the test" % (wait_time) |
| 219 | + print "You can monitor the runtime status and results on http://localhost:19088/controller" |
| 220 | + time.sleep(wait_time) |
| 221 | + |
| 222 | + def __str__(self): |
| 223 | + return "%s\n%s\n%s" % (self.run_dir, self.out_dir, super(Cosbench, self).__str__()) |
0 commit comments