diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6b1664ae..61291af9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,6 +14,7 @@ jobs: exclude: - os: windows-11-arm py: "3.10" + runs-on: ${{ matrix.os }} name: Run test with Python ${{ matrix.py }} on ${{ matrix.os }} @@ -33,6 +34,11 @@ jobs: run: | python -m pip install -r requirements.txt pytest + - name: Install pytest-run-parallel under free-threading + if: contains(matrix.py, 't') + run: | + pip install pytest-run-parallel + - name: Build shell: bash run: | @@ -40,15 +46,29 @@ jobs: pip install . - name: Test (C extension) + if: ${{ ! contains(matrix.py, 't') }} shell: bash run: | pytest -v test - name: Test (pure Python fallback) + if: ${{ ! contains(matrix.py, 't') }} shell: bash run: | MSGPACK_PUREPYTHON=1 pytest -v test + - name: Test (C extension) in parallel under free-threading + if: contains(matrix.py, 't') + shell: bash + run: | + pytest -v --parallel-threads=auto --iterations=20 test + + - name: Test (pure Python fallback) in parallel under free-threading + if: contains(matrix.py, 't') + shell: bash + run: | + MSGPACK_PUREPYTHON=1 pytest -v --parallel-threads=auto --iterations=20 test + - name: build packages shell: bash run: | diff --git a/test/uneeded_test_multithreading.py b/test/uneeded_test_multithreading.py new file mode 100644 index 00000000..6694fdc6 --- /dev/null +++ b/test/uneeded_test_multithreading.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +import threading +from concurrent.futures import ThreadPoolExecutor + +from msgpack import Packer + + +def run_threaded( + func, + num_threads=8, + pass_count=False, + pass_barrier=False, + outer_iterations=1, + prepare_args=None, +): + """Runs a function many times in parallel""" + for _ in range(outer_iterations): + with ThreadPoolExecutor(max_workers=num_threads) as tpe: + if prepare_args is None: + args = [] + else: + args = prepare_args() + if pass_barrier: + barrier = threading.Barrier(num_threads) + args.append(barrier) + if pass_count: + all_args = [(func, i, *args) for i in range(num_threads)] + else: + all_args = [(func, *args) for i in range(num_threads)] + try: + futures = [] + for arg in all_args: + futures.append(tpe.submit(*arg)) + finally: + if len(futures) < num_threads and pass_barrier: + barrier.abort() + for f in futures: + f.result() + + +def test_multithread_packing(): + output = [] + test_data = "abcd" * 10_000_000 + packer = Packer() + + def closure(b): + data = packer.pack(test_data) + output.append(data) + b.wait() + + run_threaded(closure, num_threads=10, pass_barrier=True, pass_count=False)