diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 29c5db2..47a2b67 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -6,6 +6,8 @@ on: push: tags: - '*' + workflow_dispatch: + # Allow to run manually jobs: build: diff --git a/.gitignore b/.gitignore index da57033..735456f 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ cypari2/handle_error.c cypari2/pari_instance.c cypari2/stack.c cypari2/string_utils.c +cypari2/threads.c # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/cypari2/__init__.py b/cypari2/__init__.py index 1700c12..f6b17e2 100644 --- a/cypari2/__init__.py +++ b/cypari2/__init__.py @@ -1,3 +1,4 @@ from .pari_instance import Pari from .handle_error import PariError from .gen import Gen +from .threads import PariThreadPool diff --git a/cypari2/stack.pyx b/cypari2/stack.pyx index 95a7f1b..af9fd01 100644 --- a/cypari2/stack.pyx +++ b/cypari2/stack.pyx @@ -209,7 +209,10 @@ cdef Gen new_gen_noclear(GEN x): elif isclone(x): gclone_refc(x) return Gen_new(x, x) - raise SystemError("new_gen() argument not on PARI stack, not on PARI heap and not a universal constant") + else: + # NOTE: it might be the case that x belongs to a local stack of a thread + # In that case we copy it in the main stack + x = gcopy(x) z = Gen_stack_new(x) diff --git a/cypari2/threads.pxd b/cypari2/threads.pxd new file mode 100644 index 0000000..bf3bdc2 --- /dev/null +++ b/cypari2/threads.pxd @@ -0,0 +1,6 @@ +from .types cimport * + +cdef class PariThreadPool: + cdef size_t nbthreads + cdef pari_thread * pths + cdef size_t ithread diff --git a/cypari2/threads.pyx b/cypari2/threads.pyx new file mode 100644 index 0000000..9eea9fd --- /dev/null +++ b/cypari2/threads.pyx @@ -0,0 +1,82 @@ +r""" +Multithreading from Python +************************** +""" + +#***************************************************************************** +# Copyright (C) 2022 Vincent Delecroix +# +# Distributed under the terms of the GNU General Public License (GPL) +# as published by the Free Software Foundation; either version 2 of +# the License, or (at your option) any later version. +# http://www.gnu.org/licenses/ +#***************************************************************************** + +from libc.stdlib cimport malloc, calloc, free + +from .types cimport * +from .paridecl cimport * +from gen cimport Gen, objtogen + +cdef class PariThreadPool: + r""" + Pari thread allocator + + This class is intended to be used in conjunction with the multithreading + capabilities of the ``ThreadPoolExecutor`` from the ``concurrent.futures`` + Python library. + + Examples: + + >>> from concurrent.futures import ThreadPoolExecutor, as_completed + >>> from cypari2 import Pari, PariThreadPool + >>> pari = Pari() + >>> pari.default('nbthreads', 1) + >>> max_workers = 4 + >>> pari_pool = PariThreadPool(max_workers) + >>> square_free = [] + >>> with ThreadPoolExecutor(max_workers=max_workers, initializer=pari_pool.initializer) as executor: + ... futures = {executor.submit(pari.issquarefree, n): n for n in range(10**6, 10**6 + 1000)} + ... for future in as_completed(futures): + ... n = futures[future] + ... if future.result(): + ... square_free.append(n) + >>> square_free.sort() + >>> square_free + [1000001, 1000002, 1000003, 1000005, 1000006, ..., 1000994, 1000995, 1000997, 1000999] + """ + def __init__(self, size_t nbthreads, size_t size=8000000, size_t sizemax=0): + r""" + INPUT: + + - ``nbthreads`` -- the number of threads to allocate + + - ``size`` -- (default: 8000000) the number of bytes for the + initial PARI stack (see notes below) + + - ``sizemax`` -- (default: 0) the maximal number of bytes for the + dynamically increasing PARI stack. + """ + cdef size_t i + size = max(size, pari_mainstack.rsize) + sizemax = max(max(size, pari_mainstack.vsize), sizemax) + self.pths = calloc(nbthreads, sizeof(pari_thread)) + for i in range(nbthreads): + pari_thread_valloc(self.pths + i, size, sizemax, NULL) + self.ithread = 0 + self.nbthreads = nbthreads + + def __dealloc__(self): + cdef size_t i + for i in range(self.ithread): + pari_thread_free(self.pths + i) + free(self.pths) + + def __repr__(self): + return 'Pari thread pool with {} threads'.format(self.nbthreads) + + def initializer(self): + if self.ithread >= self.nbthreads: + raise ValueError('no more thread available') + pari_thread_start(self.pths + self.ithread) + self.ithread += 1 diff --git a/cypari2/types.pxd b/cypari2/types.pxd index b0c9e43..cb62e13 100644 --- a/cypari2/types.pxd +++ b/cypari2/types.pxd @@ -125,7 +125,8 @@ cdef extern from "pari/pari.h": struct pariFILE struct pari_mt struct pari_stack - struct pari_thread + struct pari_thread: + pass struct pari_timer struct GENbin struct hashentry diff --git a/docs/source/index.rst b/docs/source/index.rst index 251ebe7..ad5a1f9 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -16,6 +16,7 @@ Welcome to CyPari2's documentation! closure handle_error convert + threads Indices and tables diff --git a/docs/source/threads.rst b/docs/source/threads.rst new file mode 100644 index 0000000..5ea3acf --- /dev/null +++ b/docs/source/threads.rst @@ -0,0 +1,2 @@ +.. automodule:: cypari2.threads + :members: diff --git a/tests/rundoctest.py b/tests/rundoctest.py index cde35b6..72f8ee9 100755 --- a/tests/rundoctest.py +++ b/tests/rundoctest.py @@ -29,7 +29,7 @@ attempted = 0 for mod in [cypari2.closure, cypari2.convert, cypari2.gen, cypari2.handle_error, cypari2.pari_instance, cypari2.stack, - cypari2.string_utils, + cypari2.string_utils, cypari2.threads, autogen.doc, autogen.generator, autogen.parser, autogen.paths]: