Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,104 @@ The structure of the repo is:
All the tests were run using [MongoDB Atlas](https://www.mongodb.com/cloud/atlas?jmp=VLDB2019).
Use code `VLDB2019` to get $150 credit to get started with MongoDB Atlas.

## Sharded MongoDB Driver

1. Create ana activate a python env.

```bash
mkdir ~/python_envs
cd ~/python_envs
~/python_envs$ python -m venv py-tpcc-env
source ~/python_envs/py-tpcc-env/bin/activate
```

2. Print your config.

```bash
cd ~/py-tpcc/pytpcc
~/py-tpcc/pytpcc$ python ./tpcc.py --print-config mongodb > mongodb.config
```

3. Edit the configuraiton for Postgres in the mongodb.config.
* Change shards to the number of `shards`
* Change the mongodb connection `uri` string
* Change the database `name`


```bash
# MongodbDriver Configuration File
# Created 2025-10-08 14:18:24.378446
[mongodb]

# The mongodb connection string or URI
uri = mongodb://user:[email protected]:27017/admin?ssl=true&tlsAllowInvalidHostnames=true&tlsAllowInvalidCertificates=true

# Database name
name = tpcc

# If true, data will be denormalized using MongoDB schema design best practices
denormalize = True

# If true, transactions will not be used (benchmarking only)
notransactions =

# If true, all things to update will be fetched via findAndModify
findandmodify = True

# If true, aggregation queries will be used
agg =

# If true, we will allow secondary reads
secondary_reads = True

# If true, we will enable retryable writes
retry_writes = True

# If true, we will perform causal reads
causal_consistency = True

# If true, we will have use only one 'unsharded' items collection
no_global_items =

# If > 0 then sharded
shards = 3
```


1. Define $MONGOURI env variable and point it to your MongoDB server

```bash
export MONGOURI="mongodb://user:[email protected]:27017/admin?ssl=true&tlsAllowInvalidHostnames=true&tlsAllowInvalidCertificates=true"
```

2. Define $MONGOBIN env variable and point it to your MongoDB server
```bash
export MONGOBIN=/ddata/workdir/bin
```


3. Run shardColl.sh
```bash
./shardColl.sh 21 3
```

4. Run pytpcc using --warehouses=XXX
NEVER USE --reset because that will dete the shard configuration by dropping the database. If you need to reset use step 3

* Only load the data
```bash
python ./tpcc.py --no-execute --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error
```

* Execute the tests without loading data.
```bash
python ./tpcc.py --no-load --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error
```

* Execute the tests with loading
```bash
python ./tpcc.py --clients=100 --duration=10 --warehouses=21 --config=mongodb.config mongodb --stop-on-error
```

## Postgres JSONB Driver

Expand Down Expand Up @@ -88,4 +186,4 @@ postgres=# \l+

# For any SQL command first use the database
\c tpcc;
```
```
24 changes: 14 additions & 10 deletions pytpcc/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import execnet
import worker
import message
from ConfigParser import SafeConfigParser
from configparser import ConfigParser
from pprint import pprint, pformat

from util import *
Expand Down Expand Up @@ -80,7 +80,7 @@ def startLoading(scalParameters,args,config,channels):
for w_id in range(scaleParameters.starting_warehouse, scaleParameters.ending_warehouse+1):
idx = w_id % procs
w_ids[idx].append(w_id)
print w_ids
print(w_ids)

load_start=time.time()
for i in range(len(channels)):
Expand Down Expand Up @@ -116,7 +116,7 @@ def startExecution(scaleParameters, args, config,channels):
aparser = argparse.ArgumentParser(description='Python implementation of the TPC-C Benchmark')
aparser.add_argument('system', choices=getDrivers(),
help='Target system driver')
aparser.add_argument('--config', type=file,
aparser.add_argument('--config', type=str,
help='Path to driver configuration file')
aparser.add_argument('--reset', action='store_true',
help='Instruct the driver to reset the contents of the database')
Expand All @@ -132,6 +132,8 @@ def startExecution(scaleParameters, args, config,channels):
aparser.add_argument('--clientprocs', default=1, type=int, metavar='N',
help='Number of processes on each client node.')

aparser.add_argument('--samewh', default=85, type=float, metavar='PP',
help='Percent paying same warehouse')
aparser.add_argument('--stop-on-error', action='store_true',
help='Stop the transaction execution when the driver throws an exception.')
aparser.add_argument('--no-load', action='store_true',
Expand All @@ -153,15 +155,16 @@ def startExecution(scaleParameters, args, config,channels):
assert driver != None, "Failed to create '%s' driver" % args['system']
if args['print_config']:
config = driver.makeDefaultConfig()
print driver.formatConfig(config)
print
print(driver.formatConfig(config))
print()
sys.exit(0)

## Load Configuration file
if args['config']:
logging.debug("Loading configuration file '%s'" % args['config'])
configFilePath = args['config']
if configFilePath:
logging.debug("Loading configuration file '%s'" % configFilePath)
cparser = ConfigParser()
cparser.read(os.path.realpath(args['config'].name))
cparser.read(os.path.realpath(configFilePath))
config = dict(cparser.items(args['system']))
else:
logging.debug("Using default configuration for %s" % args['system'])
Expand All @@ -171,6 +174,7 @@ def startExecution(scaleParameters, args, config,channels):
config['load'] = False
config['execute'] = False
if config['reset']: logging.info("Reseting database")
config['warehouses'] = args['warehouses']
driver.loadConfig(config)
logging.info("Initializing TPC-C benchmark using %s" % driver)

Expand Down Expand Up @@ -208,8 +212,8 @@ def startExecution(scaleParameters, args, config,channels):
if not args['no_execute']:
results = startExecution(scaleParameters, args, config,channels)
assert results
logging.info(results.show(load_time, driver, len(channels)))
print results.show(load_time, driver, len(channels))
logging.info(results.show(load_time, driver, len(channels), args['samewh']))
print(results.show(load_time, driver, len(channels), args['samewh']))
## IF

## MAIN
59 changes: 32 additions & 27 deletions pytpcc/drivers/mongodbdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@
import pymongo
from pymongo.client_session import TransactionOptions

# Import TransactionOptions from pymongo.client_session or
# pymongo.synchronous.client_session depending on the version of pymongo
from pymongo.client_session import TransactionOptions

import constants
from .abstractdriver import AbstractDriver

Expand Down Expand Up @@ -197,16 +193,17 @@
## ==============================================
class MongodbDriver(AbstractDriver):
DEFAULT_CONFIG = {
"uri": ("The mongodb connection string or URI", "mongodb://localhost:27017"),
"name": ("Database name", "tpcc"),
"denormalize": ("If true, data will be denormalized using MongoDB schema design best practices", True),
"notransactions": ("If true, transactions will not be used (benchmarking only)", False),
"findandmodify": ("If true, all things to update will be fetched via findAndModify", True),
"agg": ("If true, aggregation queries will be used", False),
"secondary_reads": ("If true, we will allow secondary reads", True),
"retry_writes": ("If true, we will enable retryable writes", True),
"causal_consistency": ("If true, we will perform causal reads ", True),
"shards": ("If >1 then sharded", "1")
"uri": ("The mongodb connection string or URI", "mongodb://localhost:27017"),
"name": ("Database name", "tpcc"),
"denormalize": ("If true, data will be denormalized using MongoDB schema design best practices", True),
"notransactions": ("If true, transactions will not be used (benchmarking only)", False),
"findandmodify": ("If true, all things to update will be fetched via findAndModify", True),
"agg": ("If true, aggregation queries will be used", False),
"secondary_reads": ("If true, we will allow secondary reads", True),
"retry_writes": ("If true, we will enable retryable writes", True),
"causal_consistency": ("If true, we will perform causal reads ", True),
"no_global_items": ("If true, we will have use only one 'unsharded' items collection", False),
"shards": ("If > 0 then sharded", "0")
}
DENORMALIZED_TABLES = [
constants.TABLENAME_ORDERS,
Expand Down Expand Up @@ -237,7 +234,8 @@ def __init__(self, ddl):
self.output = open('results.json','a')
self.result_doc = {}
self.warehouses = 0
self.shards = 1
self.no_global_items = False
self.shards = 0

## Create member mapping to collections
for name in constants.ALL_TABLES:
Expand Down Expand Up @@ -270,6 +268,7 @@ def loadConfig(self, config):
self.warehouses = config['warehouses']
self.find_and_modify = config['findandmodify'] == 'True'
self.causal_consistency = config['causal_consistency'] == 'True'
self.no_global_items = config['no_global_items'] == 'True'
self.retry_writes = config['retry_writes'] == 'True'
self.secondary_reads = config['secondary_reads'] == 'True'
self.agg = config['agg'] == 'True'
Expand Down Expand Up @@ -402,10 +401,11 @@ def loadTuples(self, tableName, tuples):
else:
if tableName == constants.TABLENAME_ITEM:
tuples3 = []
if self.shards > 1:
ww = range(1,self.warehouses+1)
if self.shards > 0:
ww = range(1,self.warehouses+1, int(self.warehouses/self.shards))
else:
ww = [0]

for t in tuples:
for w in ww:
t2 = list(t)
Expand All @@ -415,18 +415,22 @@ def loadTuples(self, tableName, tuples):
for t in tuples:
tuple_dicts.append(dict([(columns[i], t[i]) for i in num_columns]))
## FOR
self.database[tableName].insert_many(tuple_dicts)

self.database[tableName].insert_many(tuple_dicts, ordered=False)
## IF

return

def loadFinishDistrict(self, w_id, d_id):
if self.denormalize:
logging.debug("Pushing %d denormalized ORDERS records for WAREHOUSE %d DISTRICT %d into MongoDB", len(self.w_orders), w_id, d_id)
self.database[constants.TABLENAME_ORDERS].insert_many(self.w_orders.values())
self.database[constants.TABLENAME_ORDERS].insert_many(self.w_orders.values(), ordered=False)
self.w_orders.clear()
## IF

def loadFinish(self):
logging.debug("Load finished")

def executeStart(self):
"""Optional callback before the execution for each client starts"""
return None
Expand Down Expand Up @@ -614,8 +618,10 @@ def _doNewOrderTxn(self, s, params):
d_next_o_id = d["D_NEXT_O_ID"]

# fetch matching items and see if they are all valid
if self.shards > 1: i_w_id = w_id
if self.shards > 0: i_w_id = w_id-(w_id-1)%(self.warehouses/self.shards) # get_i_w(w_id)
else: i_w_id = 0
if self.no_global_items:
i_w_id = 1
items = list(self.item.find({"I_ID": {"$in": i_ids}, "I_W_ID": i_w_id, "$comment": comment},
{"_id":0, "I_ID": 1, "I_PRICE": 1, "I_NAME": 1, "I_DATA": 1},
session=s))
Expand All @@ -628,8 +634,7 @@ def _doNewOrderTxn(self, s, params):
#print constants.INVALID_ITEM_MESSAGE + ", Aborting transaction (ok for 1%)"
return None
## IF
xxi_ids = tuple(map(lambda o: o['I_ID'], items))
items = sorted(items, key=lambda x: xxi_ids.index(x['I_ID']))
items = sorted(items, key=lambda x: i_ids.index(x['I_ID']))

# getWarehouseTaxRate
w = self.warehouse.find_one({"W_ID": w_id, "$comment": comment}, {"_id":0, "W_TAX": 1}, session=s)
Expand Down Expand Up @@ -668,7 +673,7 @@ def _doNewOrderTxn(self, s, params):
## If all of the items are at the same warehouse, then we'll issue a single
## request to get their information, otherwise we'll still issue a single request
## ----------------
item_w_list = zip(i_ids, i_w_ids)
item_w_list = list(zip(i_ids, i_w_ids))
stock_project = {"_id":0, "S_I_ID": 1, "S_W_ID": 1,
"S_QUANTITY": 1, "S_DATA": 1, "S_YTD": 1,
"S_ORDER_CNT": 1, "S_REMOTE_CNT": 1, s_dist_col: 1}
Expand All @@ -684,8 +689,7 @@ def _doNewOrderTxn(self, s, params):
session=s))
## IF
assert len(all_stocks) == ol_cnt, "all_stocks len %d != ol_cnt %d" % (len(all_stocks), ol_cnt)
xxxi_ids = tuple(map(lambda o: (o['S_I_ID'], o['S_W_ID']), all_stocks))
all_stocks = sorted(all_stocks, key=lambda x: xxxi_ids.index((x['S_I_ID'], x["S_W_ID"])))
all_stocks = sorted(all_stocks, key=lambda x: item_w_list.index((x['S_I_ID'], x["S_W_ID"])))

## ----------------
## Insert Order Line, Stock Item Information
Expand Down Expand Up @@ -784,7 +788,7 @@ def _doNewOrderTxn(self, s, params):

if self.batch_writes:
if not self.denormalize:
self.order_line.insert_many(order_line_writes, session=s)
self.order_line.insert_many(order_line_writes, ordered=False, session=s)
self.stock.bulk_write(stock_writes, session=s)
## IF

Expand Down Expand Up @@ -936,7 +940,7 @@ def _doPaymentTxn(self, s, params):
session=s)
## IF

search_fields = {"C_W_ID": w_id, "C_D_ID": d_id, "$comment": comment}
search_fields = {"C_W_ID": c_w_id, "C_D_ID": c_d_id, "$comment": comment}
return_fields = {"C_BALANCE": 0, "C_YTD_PAYMENT": 0, "C_PAYMENT_CNT": 0}

if c_id != None:
Expand Down Expand Up @@ -1137,6 +1141,7 @@ def run_transaction_with_retries(self, txn_callback, name, params):
sleep(txn_retry_counter * .1)
logging.debug("txn retry number for %s: %d", name, txn_retry_counter)
## WHILE

def get_server_status(self):
ss=self.client.admin.command('serverStatus')
if "$configServerState" in ss:
Expand Down
10 changes: 5 additions & 5 deletions pytpcc/runtime/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@

class Executor:

def __init__(self, driver, scaleParameters, stop_on_error = False):
def __init__(self, driver, scaleParameters, stop_on_error = False, sameWH = 85):
self.driver = driver
self.scaleParameters = scaleParameters
self.stop_on_error = stop_on_error
self.same_wh = sameWH
## DEF

def execute(self, duration):
Expand Down Expand Up @@ -76,8 +77,7 @@ def execute(self, duration):
batch_result.abortTransaction(batch_txn_id)
if self.stop_on_error: raise
continue

# This will happen on all failing 1% of the transactions

if val is None:
global_result.abortTransaction(global_txn_id, retries)
batch_result.abortTransaction(batch_txn_id, retries)
Expand All @@ -86,7 +86,7 @@ def execute(self, duration):
batch_result.stopTransaction(batch_txn_id, retries)
global_result.stopTransaction(global_txn_id, retries)

if time.time() - start_batch > 900: # every 15 minutes
if time.time() - start_batch > 1800: # every 30 minutes
batch_result.stopBenchmark()
logging.info(batch_result.show())
batch_result = results.Results()
Expand Down Expand Up @@ -221,7 +221,7 @@ def generatePaymentParams(self):
h_date = datetime.now()

## 85%: paying through own warehouse (or there is only 1 warehouse)
if self.scaleParameters.warehouses == 1 or x <= 85:
if self.scaleParameters.warehouses == 1 or x <= self.same_wh:
c_w_id = w_id
c_d_id = d_id
## 15%: paying through another warehouse:
Expand Down
Loading