From 09296eeb89a900790aa8f1efd1f2761a75124ec6 Mon Sep 17 00:00:00 2001 From: blagoev Date: Tue, 7 Oct 2025 10:50:09 +0300 Subject: [PATCH 1/5] add sharded support --- pytpcc/coordinator.py | 9 ++-- pytpcc/drivers/mongodbdriver.py | 74 +++++++++++++++++++++------------ pytpcc/runtime/executor.py | 10 ++--- pytpcc/tpcc.py | 23 +++++----- pytpcc/util/nurand.py | 17 ++++---- pytpcc/util/results.py | 13 ++++-- pytpcc/worker.py | 22 +++++----- shardColl.js | 53 +++++++++++++++++++++++ shardColl.sh | 43 +++++++++++++++++++ 9 files changed, 195 insertions(+), 69 deletions(-) create mode 100644 shardColl.js create mode 100644 shardColl.sh diff --git a/pytpcc/coordinator.py b/pytpcc/coordinator.py index 412b2c2..1668170 100755 --- a/pytpcc/coordinator.py +++ b/pytpcc/coordinator.py @@ -132,6 +132,8 @@ def startExecution(scaleParameters, args, config,channels): aparser.add_argument('--clientprocs', default=1, type=int, metavar='N', help='Number of processes on each client node.') + aparser.add_argument('--samewh', default=85, type=float, metavar='PP', + help='Percent paying same warehouse') aparser.add_argument('--stop-on-error', action='store_true', help='Stop the transaction execution when the driver throws an exception.') aparser.add_argument('--no-load', action='store_true', @@ -160,7 +162,7 @@ def startExecution(scaleParameters, args, config,channels): ## Load Configuration file if args['config']: logging.debug("Loading configuration file '%s'" % args['config']) - cparser = ConfigParser() + cparser = SafeConfigParser() cparser.read(os.path.realpath(args['config'].name)) config = dict(cparser.items(args['system'])) else: @@ -171,6 +173,7 @@ def startExecution(scaleParameters, args, config,channels): config['load'] = False config['execute'] = False if config['reset']: logging.info("Reseting database") + config['warehouses'] = args['warehouses'] driver.loadConfig(config) logging.info("Initializing TPC-C benchmark using %s" % driver) @@ -208,8 +211,8 @@ def startExecution(scaleParameters, args, config,channels): if not args['no_execute']: results = startExecution(scaleParameters, args, config,channels) assert results - logging.info(results.show(load_time, driver, len(channels))) - print results.show(load_time, driver, len(channels)) + logging.info(results.show(load_time, driver, len(channels), args['samewh'])) + print(results.show(load_time, driver, len(channels), args['samewh'])) ## IF ## MAIN diff --git a/pytpcc/drivers/mongodbdriver.py b/pytpcc/drivers/mongodbdriver.py index df8fe53..9691823 100644 --- a/pytpcc/drivers/mongodbdriver.py +++ b/pytpcc/drivers/mongodbdriver.py @@ -38,14 +38,9 @@ from pprint import pformat from time import sleep import pymongo -from pymongo.client_session import TransactionOptions - -# Import TransactionOptions from pymongo.client_session or -# pymongo.synchronous.client_session depending on the version of pymongo -from pymongo.client_session import TransactionOptions import constants -from .abstractdriver import AbstractDriver +from abstractdriver import AbstractDriver TABLE_COLUMNS = { constants.TABLENAME_ITEM: [ @@ -206,7 +201,8 @@ class MongodbDriver(AbstractDriver): "secondary_reads": ("If true, we will allow secondary reads", True), "retry_writes": ("If true, we will enable retryable writes", True), "causal_consistency": ("If true, we will perform causal reads ", True), - "shards": ("If >1 then sharded", "1") + "no_global_items": ("If true, we will have use only one 'unsharded' items collection", False), + "shards": ("If >0 then sharded", "0") } DENORMALIZED_TABLES = [ constants.TABLENAME_ORDERS, @@ -237,7 +233,9 @@ def __init__(self, ddl): self.output = open('results.json','a') self.result_doc = {} self.warehouses = 0 - self.shards = 1 + self.no_global_items = False + self.shards = 0 + self.sshost = None ## Create member mapping to collections for name in constants.ALL_TABLES: @@ -270,6 +268,7 @@ def loadConfig(self, config): self.warehouses = config['warehouses'] self.find_and_modify = config['findandmodify'] == 'True' self.causal_consistency = config['causal_consistency'] == 'True' + self.no_global_items = config['no_global_items'] == 'True' self.retry_writes = config['retry_writes'] == 'True' self.secondary_reads = config['secondary_reads'] == 'True' self.agg = config['agg'] == 'True' @@ -305,12 +304,19 @@ def loadConfig(self, config): real_uri = uri[0:pindex]+userpassword+uri[pindex:] display_uri = uri[0:pindex]+usersecret+uri[pindex:] + # for extra URL to mongos + if userpassword == "" and ':' in uri[pindex:] and '@' in uri[pindex:]: + at = uri.index('@',pindex) + userpassword = uri[(pindex):(at+1)] self.client = pymongo.MongoClient(real_uri, retryWrites=self.retry_writes, readPreference=self.read_preference, readConcernLevel=self.read_concern) self.result_doc['before']=self.get_server_status() + ssURI="mongodb://"+userpassword+self.result_doc['before']['host']+"/test?ssl=true&authSource=admin" + logging.debug("%s %s %s", userpassword, self.result_doc['before']['host'], ssURI) + self.sshost = pymongo.MongoClient(ssURI) # set default writeConcern on the database self.database = self.client.get_database(name=str(config['name']), write_concern=self.write_concern) @@ -402,10 +408,11 @@ def loadTuples(self, tableName, tuples): else: if tableName == constants.TABLENAME_ITEM: tuples3 = [] - if self.shards > 1: - ww = range(1,self.warehouses+1) + if self.shards > 0: + ww = range(1,self.warehouses+1, int(self.warehouses/self.shards)) else: ww = [0] + # print self.shards, self.warehouses, ww for t in tuples: for w in ww: t2 = list(t) @@ -415,18 +422,23 @@ def loadTuples(self, tableName, tuples): for t in tuples: tuple_dicts.append(dict([(columns[i], t[i]) for i in num_columns])) ## FOR - self.database[tableName].insert_many(tuple_dicts) + + self.database[tableName].insert_many(tuple_dicts, ordered=False) ## IF return def loadFinishDistrict(self, w_id, d_id): + logging.debug("LoadFinishDistrict") if self.denormalize: logging.debug("Pushing %d denormalized ORDERS records for WAREHOUSE %d DISTRICT %d into MongoDB", len(self.w_orders), w_id, d_id) - self.database[constants.TABLENAME_ORDERS].insert_many(self.w_orders.values()) + self.database[constants.TABLENAME_ORDERS].insert_many(self.w_orders.values(), ordered=False) self.w_orders.clear() ## IF + def loadFinish(self): + logging.debug("load finish: ") + def executeStart(self): """Optional callback before the execution for each client starts""" return None @@ -614,8 +626,10 @@ def _doNewOrderTxn(self, s, params): d_next_o_id = d["D_NEXT_O_ID"] # fetch matching items and see if they are all valid - if self.shards > 1: i_w_id = w_id + if self.shards > 0: i_w_id = w_id-(w_id-1)%(self.warehouses/self.shards) # get_i_w(w_id) else: i_w_id = 0 + if self.no_global_items: + i_w_id = 1 items = list(self.item.find({"I_ID": {"$in": i_ids}, "I_W_ID": i_w_id, "$comment": comment}, {"_id":0, "I_ID": 1, "I_PRICE": 1, "I_NAME": 1, "I_DATA": 1}, session=s)) @@ -628,8 +642,7 @@ def _doNewOrderTxn(self, s, params): #print constants.INVALID_ITEM_MESSAGE + ", Aborting transaction (ok for 1%)" return None ## IF - xxi_ids = tuple(map(lambda o: o['I_ID'], items)) - items = sorted(items, key=lambda x: xxi_ids.index(x['I_ID'])) + items = sorted(items, key=lambda x: i_ids.index(x['I_ID'])) # getWarehouseTaxRate w = self.warehouse.find_one({"W_ID": w_id, "$comment": comment}, {"_id":0, "W_TAX": 1}, session=s) @@ -684,8 +697,7 @@ def _doNewOrderTxn(self, s, params): session=s)) ## IF assert len(all_stocks) == ol_cnt, "all_stocks len %d != ol_cnt %d" % (len(all_stocks), ol_cnt) - xxxi_ids = tuple(map(lambda o: (o['S_I_ID'], o['S_W_ID']), all_stocks)) - all_stocks = sorted(all_stocks, key=lambda x: xxxi_ids.index((x['S_I_ID'], x["S_W_ID"]))) + all_stocks = sorted(all_stocks, key=lambda x: item_w_list.index((x['S_I_ID'], x["S_W_ID"]))) ## ---------------- ## Insert Order Line, Stock Item Information @@ -784,7 +796,7 @@ def _doNewOrderTxn(self, s, params): if self.batch_writes: if not self.denormalize: - self.order_line.insert_many(order_line_writes, session=s) + self.order_line.insert_many(order_line_writes, ordered=False, session=s) self.stock.bulk_write(stock_writes, session=s) ## IF @@ -936,7 +948,7 @@ def _doPaymentTxn(self, s, params): session=s) ## IF - search_fields = {"C_W_ID": w_id, "C_D_ID": d_id, "$comment": comment} + search_fields = {"C_W_ID": c_w_id, "C_D_ID": c_d_id, "$comment": comment} return_fields = {"C_BALANCE": 0, "C_YTD_PAYMENT": 0, "C_PAYMENT_CNT": 0} if c_id != None: @@ -1084,9 +1096,9 @@ def _doStockLevelTxn(self, s, params): ol_ids.add(ol["OL_I_ID"]) ## FOR - result = self.stock.count_documents({"S_W_ID": w_id, + result = self.stock.find({"S_W_ID": w_id, "S_I_ID": {"$in": list(ol_ids)}, - "S_QUANTITY": {"$lt": threshold}, "$comment": comment}) + "S_QUANTITY": {"$lt": threshold}, "$comment": comment}).count() return int(result) @@ -1115,7 +1127,7 @@ def run_transaction(self, txn_callback, session, name, params): # Should we retry txns within the same session or start a new one? def run_transaction_with_retries(self, txn_callback, name, params): txn_retry_counter = 0 - to = TransactionOptions( + to = pymongo.client_session.TransactionOptions( read_concern=None, #read_concern=pymongo.read_concern.ReadConcern("snapshot"), write_concern=self.write_concern, @@ -1137,8 +1149,12 @@ def run_transaction_with_retries(self, txn_callback, name, params): sleep(txn_retry_counter * .1) logging.debug("txn retry number for %s: %d", name, txn_retry_counter) ## WHILE - def get_server_status(self): - ss=self.client.admin.command('serverStatus') + + def get_server_status(self, otherClient=None): + if otherClient and self.sshost: + ss=self.sshost.admin.command('serverStatus') + else: + ss=self.client.admin.command('serverStatus') if "$configServerState" in ss: del ss["$configServerState"] if "$gleStats" in ss: @@ -1157,8 +1173,12 @@ def get_server_status(self): def save_result(self, result_doc): self.result_doc.update(result_doc) - self.result_doc['after']=self.get_server_status() - # saving test results and server statuses ('before' and 'after') into MongoDB as a single document - self.client.test.results.insert_one(self.result_doc) + self.result_doc['after']=self.get_server_status(self.sshost) + # save cache size, instance type, version + self.result_doc['version']=self.result_doc['after']['version'][0:3] +# {$trunc:{$divide:["$before.wiredTiger.cache.maximum bytes configured",1024*1024*1024]}},72]}}, {$set:{cacheGB:NumberLong(72) + #self.result_doc['cacheGB']=int(self.result_doc['after']['wiredTiger']['cache']['maximum bytes configured']/1073741824) + #self.result_doc['instance']={18:"M50",36:"M60",72:"M80"}.get(self.result_doc['cacheGB'], 'unknown') + self.client.test.results.save(self.result_doc) ## CLASS diff --git a/pytpcc/runtime/executor.py b/pytpcc/runtime/executor.py index c065e93..be5a5a9 100644 --- a/pytpcc/runtime/executor.py +++ b/pytpcc/runtime/executor.py @@ -44,10 +44,11 @@ class Executor: - def __init__(self, driver, scaleParameters, stop_on_error = False): + def __init__(self, driver, scaleParameters, stop_on_error = False, sameWH = 85): self.driver = driver self.scaleParameters = scaleParameters self.stop_on_error = stop_on_error + self.same_wh = sameWH ## DEF def execute(self, duration): @@ -76,8 +77,7 @@ def execute(self, duration): batch_result.abortTransaction(batch_txn_id) if self.stop_on_error: raise continue - - # This will happen on all failing 1% of the transactions + if val is None: global_result.abortTransaction(global_txn_id, retries) batch_result.abortTransaction(batch_txn_id, retries) @@ -86,7 +86,7 @@ def execute(self, duration): batch_result.stopTransaction(batch_txn_id, retries) global_result.stopTransaction(global_txn_id, retries) - if time.time() - start_batch > 900: # every 15 minutes + if time.time() - start_batch > 1800: # every 30 minutes batch_result.stopBenchmark() logging.info(batch_result.show()) batch_result = results.Results() @@ -221,7 +221,7 @@ def generatePaymentParams(self): h_date = datetime.now() ## 85%: paying through own warehouse (or there is only 1 warehouse) - if self.scaleParameters.warehouses == 1 or x <= 85: + if self.scaleParameters.warehouses == 1 or x <= self.same_wh: c_w_id = w_id c_d_id = d_id ## 15%: paying through another warehouse: diff --git a/pytpcc/tpcc.py b/pytpcc/tpcc.py index 98a8885..3a243e2 100755 --- a/pytpcc/tpcc.py +++ b/pytpcc/tpcc.py @@ -36,7 +36,8 @@ import time import multiprocessing import subprocess -from configparser import ConfigParser +import random +from ConfigParser import SafeConfigParser from pprint import pprint, pformat from util import results, scaleparameters @@ -99,7 +100,10 @@ def getDrivers(): ## DEF ## ============================================== -## startLoading +## startLoading. +# This intentionally uses multiprocess pool and intentionally stats new processes for each batch +# becuase for long running, many hour long loads, the connection between the child process and the parent process is lost +# and the parent block indefinitelly waiting for the result. ## ============================================== def startLoading(driverClass, scaleParameters, args, config): """ @@ -199,10 +203,7 @@ def startExecution(driverClass, scaleParameters, args, config): logging.debug("Creating client pool with %d processes", args['clients']) pool = multiprocessing.Pool(args['clients']) debug = logging.getLogger().isEnabledFor(logging.DEBUG) - try: - del args['config'] - except KeyError: - print() + worker_results = [] for _ in range(args['clients']): r = pool.apply_async(executorFunc, (driverClass, scaleParameters, args, config, debug,)) @@ -236,7 +237,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): config['reset'] = False driver.loadConfig(config) - e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error']) + e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error'], sameWH=args['samewh']) driver.executeStart() results = e.execute(args['duration']) driver.executeFinish() @@ -257,6 +258,8 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): help='Instruct the driver to reset the contents of the database') aparser.add_argument('--scalefactor', default=1, type=float, metavar='SF', help='Benchmark scale factor') + aparser.add_argument('--samewh', default=85, type=float, metavar='PP', + help='Percent paying same warehouse') aparser.add_argument('--warehouses', default=4, type=int, metavar='W', help='Number of Warehouses') aparser.add_argument('--duration', default=60, type=int, metavar='D', @@ -295,7 +298,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): ## Load Configuration file if args['config']: logging.debug("Loading configuration file '%s'", args['config']) - cparser = ConfigParser() + cparser = SafeConfigParser() cparser.read(os.path.realpath(args['config'].name)) config = dict(cparser.items(args['system'])) else: @@ -342,7 +345,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): if not args['no_execute']: noftifyDsiOfPhaseStart("TPC-C_workload") if args['clients'] == 1: - e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error']) + e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error'], sameWH=args['samewh']) driver.executeStart() results = e.execute(args['duration']) driver.executeFinish() @@ -351,7 +354,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): assert results, "No results from execution for %d client!" % args['clients'] logging.info("Final Results") logging.info("Threads: %d", args['clients']) - logging.info(results.show(load_time, driver, args['clients'])) + logging.info(results.show(load_time, driver, args['clients'], args['samewh'])) noftifyDsiOfPhaseEnd("TPC-C_workload") ## IF diff --git a/pytpcc/util/nurand.py b/pytpcc/util/nurand.py index 09fa55c..361cec7 100644 --- a/pytpcc/util/nurand.py +++ b/pytpcc/util/nurand.py @@ -29,14 +29,13 @@ # OTHER DEALINGS IN THE SOFTWARE. # ----------------------------------------------------------------------- -import random +import rand def makeForLoad(): """Create random NURand constants, appropriate for loading the database.""" - cLast = random.randint(0, 255) - cId = random.randint(0, 1023) - orderLineItemId = random.randint(0, 8191) - return NURandC(cLast, cId, orderLineItemId) + cLast = rand.number(0, 255) + cId = rand.number(0, 1023) + orderLineItemId = rand.number(0, 8191) return NURandC(cLast, cId, orderLineItemId) def validCRun(cRun, cLoad): @@ -46,13 +45,13 @@ def validCRun(cRun, cLoad): def makeForRun(loadC): """Create random NURand constants for running TPC-C. TPC-C 2.1.6.1. (page 20) specifies the valid range for these constants.""" - cRun = random.randint(0, 255) + cRun = rand.number(0, 255) while validCRun(cRun, loadC.cLast) == False: - cRun = random.randint(0, 255) + cRun = rand.number(0, 255) assert validCRun(cRun, loadC.cLast) - cId = random.randint(0, 1023) - orderLineItemId = random.randint(0, 8191) + cId = rand.number(0, 1023) + orderLineItemId = rand.number(0, 8191) return NURandC(cRun, cId, orderLineItemId) class NURandC: diff --git a/pytpcc/util/results.py b/pytpcc/util/results.py index ff24f9f..248491f 100644 --- a/pytpcc/util/results.py +++ b/pytpcc/util/results.py @@ -26,6 +26,7 @@ import logging import time +import os from collections import Counter class Results: @@ -142,7 +143,7 @@ def append(self, r): def __str__(self): return self.show() - def show(self, load_time=None, driver=None, threads=1): + def show(self, load_time=None, driver=None, threads=1, samewh=85): if not self.start: return "Benchmark not started" if not self.stop: @@ -223,15 +224,19 @@ def show(self, load_time=None, driver=None, threads=1): result_doc['batch_writes'] = driver.batch_writes result_doc['find_and_modify'] = driver.find_and_modify result_doc['read_preference'] = driver.read_preference - result_doc['write_concern'] = driver.write_concern.document['w'] + result_doc['write_concern'] = str(driver.write_concern.document['w']) result_doc['causal'] = driver.causal_consistency + result_doc['no_global_items'] = driver.no_global_items result_doc['all_in_one_txn'] = driver.all_in_one_txn result_doc['retry_writes'] = driver.retry_writes result_doc['read_concern'] = driver.read_concern + result_doc['shards'] = driver.shards result_doc['total_retries'] = total_retries + result_doc['samewh'] = samewh result_doc['total'] = total_cnt result_doc['aborts'] = total_aborts - ret += "\n%s TpmC for %s %s thr %s txn %d WH: %d %d total %d durSec, batch %s %d retries %s%% %s fnM %s p50 %s p75 %s p90 %s p95 %s p99 %s max %s WC %s causal %s 10in1 %s retry %s %d %d" % ( + result_doc['instance'] = os.getenv('INSTANCE') + ret += "\n%s TpmC for %s %s thr %s txn %d WH: %d %d total %d durSec, batch %s %d retries %s%% %s fnM %s p50 %s p75 %s p90 %s p95 %s p99 %s max %s WC %s causal %s 10in1 %s retry %s %d %d correct %d noGlobalItems %s" % ( time.strftime("%Y-%m-%d %H:%M:%S"), ("normal", "denorm")[driver.denormalize], threads, @@ -246,7 +251,7 @@ def show(self, load_time=None, driver=None, threads=1): u"%6.2f" % (1000*lat[int(samples/100.0*99)]), u"%6.2f" % (1000.0*lat[-1]), str(driver.write_concern), ('false', 'true')[driver.causal_consistency], - ('false', 'true')[driver.all_in_one_txn], ('false', 'true')[driver.retry_writes],total_cnt,total_aborts) + ('false', 'true')[driver.all_in_one_txn], ('false', 'true')[driver.retry_writes],total_cnt,total_aborts, samewh, ('false', 'true')[driver.no_global_items]) driver.save_result(result_doc) print(result_doc) # PostgreSQL driver returns a shorter version of the summary without extra configuration data diff --git a/pytpcc/worker.py b/pytpcc/worker.py index d689307..6a3e55e 100755 --- a/pytpcc/worker.py +++ b/pytpcc/worker.py @@ -75,7 +75,7 @@ def loaderFunc(driverClass, scaleParameters, args, config, w_ids, debug): driver.loadFinish() except KeyboardInterrupt: return -1 - except (Exception, AssertionError), ex: + except (Exception, AssertionError) as ex: logging.warn("Failed to load data: %s" % (ex)) #if debug: traceback.print_exc(file=sys.stdout) @@ -96,7 +96,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): config['reset'] = False driver.loadConfig(config) - e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error']) + e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error'], sameWH=args['samewh']) driver.executeStart() results = e.execute(args['duration']) driver.executeFinish() @@ -116,14 +116,14 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): w_ids=command.data[3] ## Create a handle to the target client driver at the client side - driverClass = createDriverClass(args['system']) - assert driverClass != None, "Failed to find '%s' class" % args['system'] - driver = driverClass(args['ddl']) - assert driver != None, "Failed to create '%s' driver" % args['system'] + driverClass = createDriverClass(args['system']) + assert driverClass != None, "Failed to find '%s' class" % args['system'] + driver = driverClass(args['ddl']) + assert driver != None, "Failed to create '%s' driver" % args['system'] - loaderFunc(driverClass,scaleParameters,args,config,w_ids,True) + loaderFunc(driverClass,scaleParameters,args,config,w_ids,True) m=message.Message(header=message.LOAD_COMPLETED) - channel.send(pickle.dumps(m,-1)) + channel.send(pickle.dumps(m,-1)) elif command.header==message.CMD_EXECUTE: scaleParameters=command.data[0] args=command.data[1] @@ -136,9 +136,9 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): driver = driverClass(args['ddl']) assert driver != None, "Failed to create '%s' driver" % args['system'] - results=executorFunc(driverClass,scaleParameters,args,config,True) - m=message.Message(header=message.EXECUTE_COMPLETED,data=results) - channel.send(pickle.dumps(m,-1)) + results=executorFunc(driverClass,scaleParameters,args,config,True) + m=message.Message(header=message.EXECUTE_COMPLETED,data=results) + channel.send(pickle.dumps(m,-1)) elif command.header==message.CMD_STOP: pass diff --git a/shardColl.js b/shardColl.js new file mode 100644 index 0000000..1206f03 --- /dev/null +++ b/shardColl.js @@ -0,0 +1,53 @@ +// Run this before loading the data +sh.setBalancerState(false); +db.getSiblingDB("_DBNAME_").dropDatabase(); +sleep(10000); +sh.enableSharding("_DBNAME_"); +sh.shardCollection("_DBNAME_.ITEM",{"I_W_ID":1, "I_ID":1},true); +db.getSiblingDB("_DBNAME_").WAREHOUSE.createIndex({"W_ID":1, "W_TAX":1}, {unique:true}); +sh.shardCollection("_DBNAME_.WAREHOUSE",{"W_ID":1}); +db.getSiblingDB("_DBNAME_").DISTRICT.createIndex({"D_W_ID":1, "D_ID":1,"D_NEXT_O_ID" : 1,"D_TAX":1}, {unique:true}); +sh.shardCollection("_DBNAME_.DISTRICT",{"D_W_ID":1,"D_ID":1}); +sh.shardCollection("_DBNAME_.CUSTOMER", {"C_W_ID":1, "C_D_ID":1, "C_ID":1}, true); +sh.shardCollection("_DBNAME_.HISTORY", {"H_W_ID":1}); +sh.shardCollection("_DBNAME_.STOCK", {"S_W_ID":1, "S_I_ID":1},true); +db.getSiblingDB("_DBNAME_").NEW_ORDER.createIndex({"NO_W_ID":1, "NO_D_ID":1, "NO_O_ID":1}, {unique:true}); +sh.shardCollection("_DBNAME_.NEW_ORDER", {"NO_W_ID":1, "NO_D_ID":1}); +db.getSiblingDB("_DBNAME_").ORDERS.createIndex({"O_W_ID":1, "O_D_ID":1, "O_ID":1, "O_C_ID":1}, {unique: true} ); +sh.shardCollection("_DBNAME_.ORDERS", {"O_W_ID":1, "O_D_ID":1, "O_ID":1}); +// this is for 6 WH on 3 shards +// make sure that number of WH is a multiple of number of shards +// nWH/nshards=whole number (2 here) +var numShards = _SHARDS_; +if (! numShards >= 0) numShards = db.getSiblingDB("config").shards.count(); +var shards=db.getSiblingDB("config").shards.distinct("_id"); +var numWH = _NUMWAREHOUSES_; /* must be multiple of 3 */ +var whPerShard= numWH/numShards; +print(whPerShard, numShards, numWH); +// do splits +for (i=1+whPerShard; i shardTemp.js +$MONGO $MONGOURI shardTemp.js +echo "Ran shardColl script with $1 $2 $3 - ready to load" From ff8184cb2d63b3852b93d57a4292fc3b3f2ddb9a Mon Sep 17 00:00:00 2001 From: blagoev Date: Fri, 10 Oct 2025 14:49:49 +0300 Subject: [PATCH 2/5] fix bugs, python3 syntax and load code --- README.md | 26 +++++++++ pytpcc/coordinator.py | 19 +++---- pytpcc/drivers/mongodbdriver.py | 41 +++++---------- pytpcc/tpcc.py | 35 ++++++------- pytpcc/util/nurand.py | 2 +- shardColl.js | 93 ++++++++++++++++++++------------- shardColl.sh | 19 ++++--- 7 files changed, 135 insertions(+), 100 deletions(-) diff --git a/README.md b/README.md index 388c0c8..0e6c72f 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,32 @@ The structure of the repo is: All the tests were run using [MongoDB Atlas](https://www.mongodb.com/cloud/atlas?jmp=VLDB2019). Use code `VLDB2019` to get $150 credit to get started with MongoDB Atlas. +## Sharded MongoDB Driver + +1. Define $MONGOURI env variable and point it to your MongoDB server + export MONGOURI="mongodb://username:password@10.2.1.119:27017/admin?ssl=true&tlsAllowInvalidHostnames=true&tlsAllowInvalidCertificates=true" + +2. Define $MONGOBIN env variable and point it to your MongoDB server + export MONGOBIN=/ddata/workdir/bin + +3. Run shardColl.sh + ./shardColl.sh 180 3 + +4. Run pytpcc using --warehouses=XXX + NEVER USE --reset because that will dete the shard configuration by dropping the database. If you need to reset use step 3 + + Only load the data + python ./tpcc.py --no-execute --clients=100 --duration=10 --warehouses=180 --config=mongodb.config mongodb --stop-on-error + + Execute the tests without loading data. + python ./tpcc.py --no-load --clients=100 --duration=10 --warehouses=180 --config=mongodb.config mongodb --stop-on-error + + Execute the tests with loading + python ./tpcc.py --clients=100 --duration=10 --warehouses=180 --config=mongodb.config mongodb --stop-on-error + + + + ## Postgres JSONB Driver diff --git a/pytpcc/coordinator.py b/pytpcc/coordinator.py index 1668170..3425f9c 100755 --- a/pytpcc/coordinator.py +++ b/pytpcc/coordinator.py @@ -38,7 +38,7 @@ import execnet import worker import message -from ConfigParser import SafeConfigParser +from configparser import ConfigParser from pprint import pprint, pformat from util import * @@ -80,7 +80,7 @@ def startLoading(scalParameters,args,config,channels): for w_id in range(scaleParameters.starting_warehouse, scaleParameters.ending_warehouse+1): idx = w_id % procs w_ids[idx].append(w_id) - print w_ids + print(w_ids) load_start=time.time() for i in range(len(channels)): @@ -116,7 +116,7 @@ def startExecution(scaleParameters, args, config,channels): aparser = argparse.ArgumentParser(description='Python implementation of the TPC-C Benchmark') aparser.add_argument('system', choices=getDrivers(), help='Target system driver') - aparser.add_argument('--config', type=file, + aparser.add_argument('--config', type=str, help='Path to driver configuration file') aparser.add_argument('--reset', action='store_true', help='Instruct the driver to reset the contents of the database') @@ -155,15 +155,16 @@ def startExecution(scaleParameters, args, config,channels): assert driver != None, "Failed to create '%s' driver" % args['system'] if args['print_config']: config = driver.makeDefaultConfig() - print driver.formatConfig(config) - print + print(driver.formatConfig(config)) + print() sys.exit(0) ## Load Configuration file - if args['config']: - logging.debug("Loading configuration file '%s'" % args['config']) - cparser = SafeConfigParser() - cparser.read(os.path.realpath(args['config'].name)) + configFilePath = args['config'] + if configFilePath: + logging.debug("Loading configuration file '%s'" % configFilePath) + cparser = ConfigParser() + cparser.read(os.path.realpath(configFilePath)) config = dict(cparser.items(args['system'])) else: logging.debug("Using default configuration for %s" % args['system']) diff --git a/pytpcc/drivers/mongodbdriver.py b/pytpcc/drivers/mongodbdriver.py index 9691823..8b63770 100644 --- a/pytpcc/drivers/mongodbdriver.py +++ b/pytpcc/drivers/mongodbdriver.py @@ -38,9 +38,10 @@ from pprint import pformat from time import sleep import pymongo +from pymongo.client_session import TransactionOptions import constants -from abstractdriver import AbstractDriver +from .abstractdriver import AbstractDriver TABLE_COLUMNS = { constants.TABLENAME_ITEM: [ @@ -235,7 +236,6 @@ def __init__(self, ddl): self.warehouses = 0 self.no_global_items = False self.shards = 0 - self.sshost = None ## Create member mapping to collections for name in constants.ALL_TABLES: @@ -304,19 +304,12 @@ def loadConfig(self, config): real_uri = uri[0:pindex]+userpassword+uri[pindex:] display_uri = uri[0:pindex]+usersecret+uri[pindex:] - # for extra URL to mongos - if userpassword == "" and ':' in uri[pindex:] and '@' in uri[pindex:]: - at = uri.index('@',pindex) - userpassword = uri[(pindex):(at+1)] self.client = pymongo.MongoClient(real_uri, retryWrites=self.retry_writes, readPreference=self.read_preference, readConcernLevel=self.read_concern) self.result_doc['before']=self.get_server_status() - ssURI="mongodb://"+userpassword+self.result_doc['before']['host']+"/test?ssl=true&authSource=admin" - logging.debug("%s %s %s", userpassword, self.result_doc['before']['host'], ssURI) - self.sshost = pymongo.MongoClient(ssURI) # set default writeConcern on the database self.database = self.client.get_database(name=str(config['name']), write_concern=self.write_concern) @@ -412,7 +405,7 @@ def loadTuples(self, tableName, tuples): ww = range(1,self.warehouses+1, int(self.warehouses/self.shards)) else: ww = [0] - # print self.shards, self.warehouses, ww + for t in tuples: for w in ww: t2 = list(t) @@ -429,7 +422,6 @@ def loadTuples(self, tableName, tuples): return def loadFinishDistrict(self, w_id, d_id): - logging.debug("LoadFinishDistrict") if self.denormalize: logging.debug("Pushing %d denormalized ORDERS records for WAREHOUSE %d DISTRICT %d into MongoDB", len(self.w_orders), w_id, d_id) self.database[constants.TABLENAME_ORDERS].insert_many(self.w_orders.values(), ordered=False) @@ -437,7 +429,7 @@ def loadFinishDistrict(self, w_id, d_id): ## IF def loadFinish(self): - logging.debug("load finish: ") + logging.debug("Load finished") def executeStart(self): """Optional callback before the execution for each client starts""" @@ -681,7 +673,7 @@ def _doNewOrderTxn(self, s, params): ## If all of the items are at the same warehouse, then we'll issue a single ## request to get their information, otherwise we'll still issue a single request ## ---------------- - item_w_list = zip(i_ids, i_w_ids) + item_w_list = list(zip(i_ids, i_w_ids)) stock_project = {"_id":0, "S_I_ID": 1, "S_W_ID": 1, "S_QUANTITY": 1, "S_DATA": 1, "S_YTD": 1, "S_ORDER_CNT": 1, "S_REMOTE_CNT": 1, s_dist_col: 1} @@ -1096,9 +1088,9 @@ def _doStockLevelTxn(self, s, params): ol_ids.add(ol["OL_I_ID"]) ## FOR - result = self.stock.find({"S_W_ID": w_id, + result = self.stock.count_documents({"S_W_ID": w_id, "S_I_ID": {"$in": list(ol_ids)}, - "S_QUANTITY": {"$lt": threshold}, "$comment": comment}).count() + "S_QUANTITY": {"$lt": threshold}, "$comment": comment}) return int(result) @@ -1127,7 +1119,7 @@ def run_transaction(self, txn_callback, session, name, params): # Should we retry txns within the same session or start a new one? def run_transaction_with_retries(self, txn_callback, name, params): txn_retry_counter = 0 - to = pymongo.client_session.TransactionOptions( + to = TransactionOptions( read_concern=None, #read_concern=pymongo.read_concern.ReadConcern("snapshot"), write_concern=self.write_concern, @@ -1150,11 +1142,8 @@ def run_transaction_with_retries(self, txn_callback, name, params): logging.debug("txn retry number for %s: %d", name, txn_retry_counter) ## WHILE - def get_server_status(self, otherClient=None): - if otherClient and self.sshost: - ss=self.sshost.admin.command('serverStatus') - else: - ss=self.client.admin.command('serverStatus') + def get_server_status(self): + ss=self.client.admin.command('serverStatus') if "$configServerState" in ss: del ss["$configServerState"] if "$gleStats" in ss: @@ -1173,12 +1162,8 @@ def get_server_status(self, otherClient=None): def save_result(self, result_doc): self.result_doc.update(result_doc) - self.result_doc['after']=self.get_server_status(self.sshost) - # save cache size, instance type, version - self.result_doc['version']=self.result_doc['after']['version'][0:3] -# {$trunc:{$divide:["$before.wiredTiger.cache.maximum bytes configured",1024*1024*1024]}},72]}}, {$set:{cacheGB:NumberLong(72) - #self.result_doc['cacheGB']=int(self.result_doc['after']['wiredTiger']['cache']['maximum bytes configured']/1073741824) - #self.result_doc['instance']={18:"M50",36:"M60",72:"M80"}.get(self.result_doc['cacheGB'], 'unknown') - self.client.test.results.save(self.result_doc) + self.result_doc['after']=self.get_server_status() + # saving test results and server statuses ('before' and 'after') into MongoDB as a single document + self.client.test.results.insert_one(self.result_doc) ## CLASS diff --git a/pytpcc/tpcc.py b/pytpcc/tpcc.py index 3a243e2..3405037 100755 --- a/pytpcc/tpcc.py +++ b/pytpcc/tpcc.py @@ -37,7 +37,7 @@ import multiprocessing import subprocess import random -from ConfigParser import SafeConfigParser +from configparser import ConfigParser from pprint import pprint, pformat from util import results, scaleparameters @@ -56,12 +56,12 @@ logging.getLogger('').addHandler(console) NOTIFY_PHASE_START_PATH = '/data/workdir/src/flamegraph/notify_phase_start.py' -NOTIFY_PHASE_END_PATH = '/data/workdir/src/flamegraph/notify_phase_start.py' +NOTIFY_PHASE_END_PATH = '/data/workdir/src/flamegraph/notify_phase_end.py' ## ============================================== -## noftifyDsiOfPhaseStart +## notifyDSIOfPhaseStart ## ============================================== -def noftifyDsiOfPhaseStart(phasename): +def notifyDSIOfPhaseStart(phasename): if os.path.isfile(NOTIFY_PHASE_START_PATH): output = subprocess.run(["python3", NOTIFY_PHASE_START_PATH, phasename], capture_output=True) if output.returncode != 0: @@ -69,9 +69,9 @@ def noftifyDsiOfPhaseStart(phasename): ## DEF ## ============================================== -## noftifyDsiOfPhaseStart +## notifyDSIOfPhaseEnd ## ============================================== -def noftifyDsiOfPhaseEnd(phasename): +def notifyDSIOfPhaseEnd(phasename): if os.path.isfile(NOTIFY_PHASE_END_PATH): output = subprocess.run(["python3", NOTIFY_PHASE_END_PATH, phasename], capture_output=True) if output.returncode != 0: @@ -119,10 +119,6 @@ def startLoading(driverClass, scaleParameters, args, config): logging.debug(f"Total warehouses: {total_warehouses}") loader_results = [] - try: - del args['config'] - except KeyError: - logging.warning("Key 'config' not found in args") # Iterate through warehouses, processing them in batches of 'clients' for i in range(total_warehouses): @@ -252,7 +248,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): aparser = argparse.ArgumentParser(description='Python implementation of the TPC-C Benchmark') aparser.add_argument('system', choices=getDrivers(), help='Target system driver') - aparser.add_argument('--config', type=open, + aparser.add_argument('--config', type=str, help='Path to driver configuration file') aparser.add_argument('--reset', action='store_true', help='Instruct the driver to reset the contents of the database') @@ -296,10 +292,11 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): sys.exit(0) ## Load Configuration file - if args['config']: - logging.debug("Loading configuration file '%s'", args['config']) - cparser = SafeConfigParser() - cparser.read(os.path.realpath(args['config'].name)) + configFilePath = args['config'] + if configFilePath: + logging.debug("Loading configuration file '%s'", configFilePath) + cparser = ConfigParser() + cparser.read(os.path.realpath(configFilePath)) config = dict(cparser.items(args['system'])) else: logging.debug("Using default configuration for %s", args['system']) @@ -323,7 +320,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): load_time = None if not args['no_load']: logging.info("Loading TPC-C benchmark data using %s", (driver)) - noftifyDsiOfPhaseStart("TPC-C_load") + notifyDSIOfPhaseStart("TPC-C_load") load_start = time.time() if args['clients'] == 1: l = loader.Loader( @@ -338,12 +335,12 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): else: startLoading(driverClass, scaleParameters, args, config) load_time = time.time() - load_start - noftifyDsiOfPhaseEnd("TPC-C_load") + notifyDSIOfPhaseEnd("TPC-C_load") ## IF ## WORKLOAD DRIVER!!! if not args['no_execute']: - noftifyDsiOfPhaseStart("TPC-C_workload") + notifyDSIOfPhaseStart("TPC-C_workload") if args['clients'] == 1: e = executor.Executor(driver, scaleParameters, stop_on_error=args['stop_on_error'], sameWH=args['samewh']) driver.executeStart() @@ -355,7 +352,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): logging.info("Final Results") logging.info("Threads: %d", args['clients']) logging.info(results.show(load_time, driver, args['clients'], args['samewh'])) - noftifyDsiOfPhaseEnd("TPC-C_workload") + notifyDSIOfPhaseEnd("TPC-C_workload") ## IF ## MAIN diff --git a/pytpcc/util/nurand.py b/pytpcc/util/nurand.py index 361cec7..9dc429d 100644 --- a/pytpcc/util/nurand.py +++ b/pytpcc/util/nurand.py @@ -29,7 +29,7 @@ # OTHER DEALINGS IN THE SOFTWARE. # ----------------------------------------------------------------------- -import rand +from .import rand def makeForLoad(): """Create random NURand constants, appropriate for loading the database.""" diff --git a/shardColl.js b/shardColl.js index 1206f03..6e1f31d 100644 --- a/shardColl.js +++ b/shardColl.js @@ -3,51 +3,70 @@ sh.setBalancerState(false); db.getSiblingDB("_DBNAME_").dropDatabase(); sleep(10000); sh.enableSharding("_DBNAME_"); -sh.shardCollection("_DBNAME_.ITEM",{"I_W_ID":1, "I_ID":1},true); -db.getSiblingDB("_DBNAME_").WAREHOUSE.createIndex({"W_ID":1, "W_TAX":1}, {unique:true}); -sh.shardCollection("_DBNAME_.WAREHOUSE",{"W_ID":1}); -db.getSiblingDB("_DBNAME_").DISTRICT.createIndex({"D_W_ID":1, "D_ID":1,"D_NEXT_O_ID" : 1,"D_TAX":1}, {unique:true}); -sh.shardCollection("_DBNAME_.DISTRICT",{"D_W_ID":1,"D_ID":1}); -sh.shardCollection("_DBNAME_.CUSTOMER", {"C_W_ID":1, "C_D_ID":1, "C_ID":1}, true); -sh.shardCollection("_DBNAME_.HISTORY", {"H_W_ID":1}); -sh.shardCollection("_DBNAME_.STOCK", {"S_W_ID":1, "S_I_ID":1},true); -db.getSiblingDB("_DBNAME_").NEW_ORDER.createIndex({"NO_W_ID":1, "NO_D_ID":1, "NO_O_ID":1}, {unique:true}); -sh.shardCollection("_DBNAME_.NEW_ORDER", {"NO_W_ID":1, "NO_D_ID":1}); -db.getSiblingDB("_DBNAME_").ORDERS.createIndex({"O_W_ID":1, "O_D_ID":1, "O_ID":1, "O_C_ID":1}, {unique: true} ); -sh.shardCollection("_DBNAME_.ORDERS", {"O_W_ID":1, "O_D_ID":1, "O_ID":1}); +sh.shardCollection("_DBNAME_.ITEM", { "I_W_ID": 1, "I_ID": 1 }, true); +db.getSiblingDB("_DBNAME_").WAREHOUSE.createIndex({ "W_ID": 1, "W_TAX": 1 }, { unique: true }); +sh.shardCollection("_DBNAME_.WAREHOUSE", { "W_ID": 1 }); +db.getSiblingDB("_DBNAME_").DISTRICT.createIndex({ "D_W_ID": 1, "D_ID": 1, "D_NEXT_O_ID": 1, "D_TAX": 1 }, { unique: true }); +sh.shardCollection("_DBNAME_.DISTRICT", { "D_W_ID": 1, "D_ID": 1 }); +sh.shardCollection("_DBNAME_.CUSTOMER", { "C_W_ID": 1, "C_D_ID": 1, "C_ID": 1 }, true); +sh.shardCollection("_DBNAME_.HISTORY", { "H_W_ID": 1 }); +sh.shardCollection("_DBNAME_.STOCK", { "S_W_ID": 1, "S_I_ID": 1 }, true); +db.getSiblingDB("_DBNAME_").NEW_ORDER.createIndex({ "NO_W_ID": 1, "NO_D_ID": 1, "NO_O_ID": 1 }, { unique: true }); +sh.shardCollection("_DBNAME_.NEW_ORDER", { "NO_W_ID": 1, "NO_D_ID": 1 }); +db.getSiblingDB("_DBNAME_").ORDERS.createIndex({ "O_W_ID": 1, "O_D_ID": 1, "O_ID": 1, "O_C_ID": 1 }, { unique: true }); +sh.shardCollection("_DBNAME_.ORDERS", { "O_W_ID": 1, "O_D_ID": 1, "O_ID": 1 }); // this is for 6 WH on 3 shards // make sure that number of WH is a multiple of number of shards // nWH/nshards=whole number (2 here) var numShards = _SHARDS_; -if (! numShards >= 0) numShards = db.getSiblingDB("config").shards.count(); -var shards=db.getSiblingDB("config").shards.distinct("_id"); +if (numShards <= 0) { + print("Warning: Shards argument is negative. Getting shards from the cluster"); + numShards = db.getSiblingDB("config").shards.count(); +} + var numWH = _NUMWAREHOUSES_; /* must be multiple of 3 */ -var whPerShard= numWH/numShards; -print(whPerShard, numShards, numWH); +if (numWH <= 0) { + print("Error: Invalid number of warehouses. numWH must be > 0"); + quit(64); +} + +var remainder = numWH % numShards; +if (remainder !== 0) { + print("ERROR: Number of Warehouses (" + numWH + ") is not a multiple of the number of shards (" + numShards + ")"); + quit(64); +} + +var whPerShard = numWH / numShards; +print("Using (" + numWH + ") warehouses per shard"); + // do splits -for (i=1+whPerShard; i shardTemp.js -$MONGO $MONGOURI shardTemp.js -echo "Ran shardColl script with $1 $2 $3 - ready to load" +$MONGO $MONGOURI --tls --tlsAllowInvalidHostnames --tlsAllowInvalidCertificates shardTemp.js \ No newline at end of file From 05aab04474ec511e162390d27182fa30378a1d3b Mon Sep 17 00:00:00 2001 From: blagoev Date: Fri, 10 Oct 2025 15:10:39 +0300 Subject: [PATCH 3/5] Update README.md --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 0e6c72f..b5c8391 100644 --- a/README.md +++ b/README.md @@ -19,25 +19,25 @@ Use code `VLDB2019` to get $150 credit to get started with MongoDB Atlas. ## Sharded MongoDB Driver 1. Define $MONGOURI env variable and point it to your MongoDB server - export MONGOURI="mongodb://username:password@10.2.1.119:27017/admin?ssl=true&tlsAllowInvalidHostnames=true&tlsAllowInvalidCertificates=true" + export MONGOURI="mongodb://<>:<>@<>:27017/admin?ssl=true&tlsAllowInvalidHostnames=true&tlsAllowInvalidCertificates=true" 2. Define $MONGOBIN env variable and point it to your MongoDB server export MONGOBIN=/ddata/workdir/bin 3. Run shardColl.sh - ./shardColl.sh 180 3 + ./shardColl.sh 21 3 4. Run pytpcc using --warehouses=XXX NEVER USE --reset because that will dete the shard configuration by dropping the database. If you need to reset use step 3 Only load the data - python ./tpcc.py --no-execute --clients=100 --duration=10 --warehouses=180 --config=mongodb.config mongodb --stop-on-error + python ./tpcc.py --no-execute --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error Execute the tests without loading data. - python ./tpcc.py --no-load --clients=100 --duration=10 --warehouses=180 --config=mongodb.config mongodb --stop-on-error + python ./tpcc.py --no-load --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error Execute the tests with loading - python ./tpcc.py --clients=100 --duration=10 --warehouses=180 --config=mongodb.config mongodb --stop-on-error + python ./tpcc.py --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error @@ -114,4 +114,4 @@ postgres=# \l+ # For any SQL command first use the database \c tpcc; -``` \ No newline at end of file +``` From ad1dade7429b39f7872581b36c7007fc96bb18d5 Mon Sep 17 00:00:00 2001 From: blagoev Date: Fri, 10 Oct 2025 16:17:14 +0300 Subject: [PATCH 4/5] fix readme.md and driver --- README.md | 88 ++++++++++++++++++++++++++++++--- pytpcc/drivers/mongodbdriver.py | 22 ++++----- 2 files changed, 91 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index b5c8391..16fa6fb 100644 --- a/README.md +++ b/README.md @@ -18,30 +18,102 @@ Use code `VLDB2019` to get $150 credit to get started with MongoDB Atlas. ## Sharded MongoDB Driver +1. Create ana activate a python env. + +```bash +mkdir ~/python_envs +cd ~/python_envs +~/python_envs$ python -m venv py-tpcc-env +source ~/python_envs/py-tpcc-env/bin/activate +``` + +2. Print your config. + +```bash +cd ~/py-tpcc/pytpcc +~/py-tpcc/pytpcc$ python ./tpcc.py --print-config mongodb > mongodb.config +``` + +3. Edit the configuraiton for Postgres in the mongodb.config. + * Change shards to the number of `shards` + * Change the mongodb connection `uri` string + * Change the database `name` + + +```bash +# MongodbDriver Configuration File +# Created 2025-10-08 14:18:24.378446 +[mongodb] + +# The mongodb connection string or URI +uri = mongodb://user:pass@10.2.1.119:27017/admin?ssl=true&tlsAllowInvalidHostnames=true&tlsAllowInvalidCertificates=true + +# Database name +name = tpcc + +# If true, data will be denormalized using MongoDB schema design best practices +denormalize = True + +# If true, transactions will not be used (benchmarking only) +notransactions = + +# If true, all things to update will be fetched via findAndModify +findandmodify = True + +# If true, aggregation queries will be used +agg = + +# If true, we will allow secondary reads +secondary_reads = True + +# If true, we will enable retryable writes +retry_writes = True + +# If true, we will perform causal reads +causal_consistency = True + +# If true, we will have use only one 'unsharded' items collection +no_global_items = + +# If > 0 then sharded +shards = 3 +``` + + 1. Define $MONGOURI env variable and point it to your MongoDB server - export MONGOURI="mongodb://<>:<>@<>:27017/admin?ssl=true&tlsAllowInvalidHostnames=true&tlsAllowInvalidCertificates=true" + + ```bash + export MONGOURI="mongodb://user:pass@10.2.1.119:27017/admin?ssl=true&tlsAllowInvalidHostnames=true&tlsAllowInvalidCertificates=true" + ``` 2. Define $MONGOBIN env variable and point it to your MongoDB server + ```bash export MONGOBIN=/ddata/workdir/bin + ``` + 3. Run shardColl.sh + ```bash ./shardColl.sh 21 3 + ``` 4. Run pytpcc using --warehouses=XXX NEVER USE --reset because that will dete the shard configuration by dropping the database. If you need to reset use step 3 - Only load the data + * Only load the data + ```bash python ./tpcc.py --no-execute --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error + ``` - Execute the tests without loading data. + * Execute the tests without loading data. + ```bash python ./tpcc.py --no-load --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error + ``` - Execute the tests with loading + * Execute the tests with loading + ```bash python ./tpcc.py --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error - - - - + ``` ## Postgres JSONB Driver diff --git a/pytpcc/drivers/mongodbdriver.py b/pytpcc/drivers/mongodbdriver.py index 8b63770..590557b 100644 --- a/pytpcc/drivers/mongodbdriver.py +++ b/pytpcc/drivers/mongodbdriver.py @@ -193,17 +193,17 @@ ## ============================================== class MongodbDriver(AbstractDriver): DEFAULT_CONFIG = { - "uri": ("The mongodb connection string or URI", "mongodb://localhost:27017"), - "name": ("Database name", "tpcc"), - "denormalize": ("If true, data will be denormalized using MongoDB schema design best practices", True), - "notransactions": ("If true, transactions will not be used (benchmarking only)", False), - "findandmodify": ("If true, all things to update will be fetched via findAndModify", True), - "agg": ("If true, aggregation queries will be used", False), - "secondary_reads": ("If true, we will allow secondary reads", True), - "retry_writes": ("If true, we will enable retryable writes", True), - "causal_consistency": ("If true, we will perform causal reads ", True), - "no_global_items": ("If true, we will have use only one 'unsharded' items collection", False), - "shards": ("If >0 then sharded", "0") + "uri": ("The mongodb connection string or URI", "mongodb://localhost:27017"), + "name": ("Database name", "tpcc"), + "denormalize": ("If true, data will be denormalized using MongoDB schema design best practices", True), + "notransactions": ("If true, transactions will not be used (benchmarking only)", False), + "findandmodify": ("If true, all things to update will be fetched via findAndModify", True), + "agg": ("If true, aggregation queries will be used", False), + "secondary_reads": ("If true, we will allow secondary reads", True), + "retry_writes": ("If true, we will enable retryable writes", True), + "causal_consistency": ("If true, we will perform causal reads ", True), + "no_global_items": ("If true, we will have use only one 'unsharded' items collection", False), + "shards": ("If > 0 then sharded", "0") } DENORMALIZED_TABLES = [ constants.TABLENAME_ORDERS, From c3a80fded878bda3706aadd2df11272b00f38209 Mon Sep 17 00:00:00 2001 From: blagoev Date: Mon, 13 Oct 2025 12:35:55 +0300 Subject: [PATCH 5/5] don't reset if sharded --- README.md | 3 ++- pytpcc/drivers/mongodbdriver.py | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 16fa6fb..83d34e0 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,8 @@ shards = 3 ``` 4. Run pytpcc using --warehouses=XXX - NEVER USE --reset because that will dete the shard configuration by dropping the database. If you need to reset use step 3 + + NEVER USE `--reset` because that will dete the shard configuration by dropping the database. If you need to reset use step 3 * Only load the data ```bash diff --git a/pytpcc/drivers/mongodbdriver.py b/pytpcc/drivers/mongodbdriver.py index 590557b..2c5489c 100644 --- a/pytpcc/drivers/mongodbdriver.py +++ b/pytpcc/drivers/mongodbdriver.py @@ -315,6 +315,11 @@ def loadConfig(self, config): self.database = self.client.get_database(name=str(config['name']), write_concern=self.write_concern) if self.denormalize: logging.debug("Using denormalized data model") + + # Don't reset the database if sharded configuration is set. + if config["reset"] and self.shards > 0: + logging.error("Error: resetting the dabatase is not supported with shard configuration. Use shardColl.sh instead.") + sys.exit(64) try: if config["reset"]: