Skip to content

task processing rate falls with number of unique tasks passed to wiji eventloop #73

@komuw

Description

@komuw

look at the benchmarks in;

wiji/tests/test_worker.py

Lines 534 to 847 in a315334

class TestWorkerBenchmark(TestCase):
"""
run tests as:
python -m unittest discover -v -s .
run one testcase as:
python -m unittest -v tests.test_worker.TestWorkerBenchmark.test_something
"""
def setUp(self):
self.BROKER = wiji.broker.InMemoryBroker()
def tearDown(self):
pass
def broker_path(self):
return self.BROKER.__module__ + "." + self.BROKER.__class__.__name__
@staticmethod
def _run(*coros):
loop = asyncio.get_event_loop()
async_tasks = asyncio.gather(*coros, loop=loop)
return loop.run_until_complete(async_tasks)
def test_one_task(self):
"""
processing rate for 1 task using an InMemoryBroker.
results: 7520 tasks/second
"""
class AdderTask(wiji.task.Task):
the_broker = self.BROKER
queue_name = "AdderTask"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
t = AdderTask()
worker = wiji.Worker(the_task=t)
for i in range(0, 10_000):
t.synchronous_delay(a=21, b=i)
self._run(worker.consume_tasks())
def test_two_tasks(self):
"""
processing rate for 2 tasks using an InMemoryBroker.
results: 4237 tasks/second
"""
class TaskOne(wiji.task.Task):
the_broker = self.BROKER
queue_name = "TaskOne"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
class TaskTwo(TaskOne):
queue_name = "TaskTwo"
t1 = TaskOne()
w1 = wiji.Worker(the_task=t1)
for i in range(0, 10_000):
t1.synchronous_delay(a=21, b=i)
t2 = TaskTwo()
w2 = wiji.Worker(the_task=t2)
for i in range(0, 10_000):
t2.synchronous_delay(a=21, b=i)
self._run(w1.consume_tasks(), w2.consume_tasks())
def test_three_tasks(self):
"""
processing rate for 3 tasks using an InMemoryBroker.
results: 2816 tasks/second
"""
class TaskOne(wiji.task.Task):
the_broker = self.BROKER
queue_name = "TaskOne"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
class TaskTwo(TaskOne):
queue_name = "TaskTwo"
class TaskThree(TaskOne):
queue_name = "TaskThree"
t1 = TaskOne()
w1 = wiji.Worker(the_task=t1)
for i in range(0, 10_000):
t1.synchronous_delay(a=21, b=i)
t2 = TaskTwo()
w2 = wiji.Worker(the_task=t2)
for i in range(0, 10_000):
t2.synchronous_delay(a=21, b=i)
t3 = TaskThree()
w3 = wiji.Worker(the_task=t3)
for i in range(0, 10_000):
t3.synchronous_delay(a=21, b=i)
self._run(w1.consume_tasks(), w2.consume_tasks(), w3.consume_tasks())
def test_four_tasks(self):
"""
processing rate for 4 tasks using an InMemoryBroker.
results: 2157 tasks/second
"""
class TaskOne(wiji.task.Task):
the_broker = self.BROKER
queue_name = "TaskOne"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
class TaskTwo(TaskOne):
queue_name = "TaskTwo"
class TaskThree(TaskOne):
queue_name = "TaskThree"
class TaskFour(TaskOne):
queue_name = "TaskFour"
t1 = TaskOne()
w1 = wiji.Worker(the_task=t1)
for i in range(0, 10_000):
t1.synchronous_delay(a=21, b=i)
t2 = TaskTwo()
w2 = wiji.Worker(the_task=t2)
for i in range(0, 10_000):
t2.synchronous_delay(a=21, b=i)
t3 = TaskThree()
w3 = wiji.Worker(the_task=t3)
for i in range(0, 10_000):
t3.synchronous_delay(a=21, b=i)
t4 = TaskFour()
w4 = wiji.Worker(the_task=t4)
for i in range(0, 10_000):
t4.synchronous_delay(a=21, b=i)
self._run(w1.consume_tasks(), w2.consume_tasks(), w3.consume_tasks(), w4.consume_tasks())
def test_eight_tasks(self):
"""
processing rate for 8 tasks using an InMemoryBroker.
results: 1120 tasks/second
"""
class TaskOne(wiji.task.Task):
the_broker = self.BROKER
queue_name = "TaskOne"
loglevel = "NOTSET"
now = time.monotonic()
num_per_tasks = 0
async def run(self, a, b):
res = a + b
if self.num_per_tasks == 0:
self.now = time.monotonic()
self.num_per_tasks = self.num_per_tasks + 1
end = time.monotonic()
time_taken = end - self.now
process_rate = self.num_per_tasks / time_taken
print("\n\t process_rate:{0}".format(self.queue_name))
print(process_rate)
return res
class TaskTwo(TaskOne):
queue_name = "TaskTwo"
class TaskThree(TaskOne):
queue_name = "TaskThree"
class TaskFour(TaskOne):
queue_name = "TaskFour"
class TaskFive(TaskOne):
queue_name = "TaskFive"
class TaskSix(TaskOne):
queue_name = "TaskSix"
class TaskSeven(TaskOne):
queue_name = "TaskSeven"
class TaskEight(TaskOne):
queue_name = "TaskEight"
t1 = TaskOne()
w1 = wiji.Worker(the_task=t1)
for i in range(0, 10_000):
t1.synchronous_delay(a=21, b=i)
t2 = TaskTwo()
w2 = wiji.Worker(the_task=t2)
for i in range(0, 10_000):
t2.synchronous_delay(a=21, b=i)
t3 = TaskThree()
w3 = wiji.Worker(the_task=t3)
for i in range(0, 10_000):
t3.synchronous_delay(a=21, b=i)
t4 = TaskFour()
w4 = wiji.Worker(the_task=t4)
for i in range(0, 10_000):
t4.synchronous_delay(a=21, b=i)
t5 = TaskFive()
w5 = wiji.Worker(the_task=t5)
for i in range(0, 10_000):
t5.synchronous_delay(a=21, b=i)
t6 = TaskSix()
w6 = wiji.Worker(the_task=t6)
for i in range(0, 10_000):
t6.synchronous_delay(a=21, b=i)
t7 = TaskSeven()
w7 = wiji.Worker(the_task=t7)
for i in range(0, 10_000):
t7.synchronous_delay(a=21, b=i)
t8 = TaskEight()
w8 = wiji.Worker(the_task=t8)
for i in range(0, 10_000):
t8.synchronous_delay(a=21, b=i)
self._run(
w1.consume_tasks(),
w2.consume_tasks(),
w3.consume_tasks(),
w4.consume_tasks(),
w5.consume_tasks(),
w6.consume_tasks(),
w7.consume_tasks(),
w8.consume_tasks(),

When we only have one unique task class in eventloop the processing rate using an InMemoryBroker is 7520 tasks/second

When we have two uniqe task classes the rate drops to; 4237 tasks/second

for three the rate is; 2816 tasks/second
for four; 2157 tasks/second
for eight; 1120 tasks/second

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions