Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:
exclude:
- os: windows-11-arm
py: "3.10"

runs-on: ${{ matrix.os }}
name: Run test with Python ${{ matrix.py }} on ${{ matrix.os }}

Expand All @@ -33,22 +34,41 @@ jobs:
run: |
python -m pip install -r requirements.txt pytest

- name: Install pytest-run-parallel under free-threading
if: contains(matrix.py, 't')
run: |
pip install pytest-run-parallel

- name: Build
shell: bash
run: |
make cython
pip install .

- name: Test (C extension)
if: ${{ ! contains(matrix.py, 't') }}
shell: bash
run: |
pytest -v test

- name: Test (pure Python fallback)
if: ${{ ! contains(matrix.py, 't') }}
shell: bash
run: |
MSGPACK_PUREPYTHON=1 pytest -v test

- name: Test (C extension) in parallel under free-threading
if: contains(matrix.py, 't')
shell: bash
run: |
pytest -v --parallel-threads=auto --iterations=20 test

- name: Test (pure Python fallback) in parallel under free-threading
if: contains(matrix.py, 't')
shell: bash
run: |
MSGPACK_PUREPYTHON=1 pytest -v --parallel-threads=auto --iterations=20 test

- name: build packages
shell: bash
run: |
Expand Down
51 changes: 51 additions & 0 deletions test/uneeded_test_multithreading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python3
import threading
from concurrent.futures import ThreadPoolExecutor

from msgpack import Packer


def run_threaded(
func,
num_threads=8,
pass_count=False,
pass_barrier=False,
outer_iterations=1,
prepare_args=None,
):
"""Runs a function many times in parallel"""
for _ in range(outer_iterations):
with ThreadPoolExecutor(max_workers=num_threads) as tpe:
if prepare_args is None:
args = []
else:
args = prepare_args()
if pass_barrier:
barrier = threading.Barrier(num_threads)
args.append(barrier)
if pass_count:
all_args = [(func, i, *args) for i in range(num_threads)]
else:
all_args = [(func, *args) for i in range(num_threads)]
try:
futures = []
for arg in all_args:
futures.append(tpe.submit(*arg))
finally:
if len(futures) < num_threads and pass_barrier:
barrier.abort()
for f in futures:
f.result()


def test_multithread_packing():
output = []
test_data = "abcd" * 10_000_000
packer = Packer()

def closure(b):
data = packer.pack(test_data)
output.append(data)
b.wait()

run_threaded(closure, num_threads=10, pass_barrier=True, pass_count=False)