Utilities

exception sage_docbuild.utils.RemoteException(tb: str)[source]

Bases: Exception

Raised if an exception occurred in one of the child processes.

tb: str
class sage_docbuild.utils.RemoteExceptionWrapper(exc: BaseException)[source]

Bases: object

Used by child processes to capture exceptions thrown during execution and report them to the main process, including the correct traceback.

exc: BaseException
tb: str
exception sage_docbuild.utils.WorkerDiedException(message: str | None, original_exception: BaseException | None = None)[source]

Bases: RuntimeError

Raised if a worker process dies unexpected.

original_exception: BaseException | None
sage_docbuild.utils.build_many(target, args, processes=None)[source]

Map a list of arguments in args to a single-argument target function target in parallel using multiprocessing.cpu_count() (or processes if given) simultaneous processes.

This is a simplified version of multiprocessing.Pool.map from the Python standard library which avoids a couple of its pitfalls. In particular, it can abort (with a RuntimeError) without hanging if one of the worker processes unexpectedly dies. It also has semantics equivalent to maxtasksperchild=1; that is, one process is started per argument. As such, this is inefficient for processing large numbers of fast tasks, but appropriate for running longer tasks (such as doc builds) which may also require significant cleanup.

It also avoids starting new processes from a pthread, which results in at least one known issue:

  • When PARI is built with multi-threading support, forking a Sage process from a thread leaves the main Pari interface instance broken (see Issue #26608#comment:38).

In the future this may be replaced by a generalized version of the more robust parallel processing implementation from sage.doctest.forker.

EXAMPLES:

sage: from sage_docbuild.utils import build_many
sage: def target(N):
....:     import time
....:     time.sleep(float(0.1))
....:     print('Processed task %s' % N)
sage: _ = build_many(target, range(8), processes=8)
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
>>> from sage.all import *
>>> from sage_docbuild.utils import build_many
>>> def target(N):
...     import time
...     time.sleep(float(RealNumber('0.1')))
...     print('Processed task %s' % N)
>>> _ = build_many(target, range(Integer(8)), processes=Integer(8))
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
from sage_docbuild.utils import build_many
def target(N):
    import time
    time.sleep(float(0.1))
    print('Processed task %s' % N)
_ = build_many(target, range(8), processes=8)

This version can also return a result, and thus can be used as a replacement for multiprocessing.Pool.map (i.e. it still blocks until the result is ready):

sage: def square(N):
....:     return N * N
sage: build_many(square, range(100))
[0, 1, 4, 9, ..., 9604, 9801]
>>> from sage.all import *
>>> def square(N):
...     return N * N
>>> build_many(square, range(Integer(100)))
[0, 1, 4, 9, ..., 9604, 9801]
def square(N):
    return N * N
build_many(square, range(100))

If the target function raises an exception in any of the workers, build_many raises that exception and all other results are discarded. Any in-progress tasks may still be allowed to complete gracefully before the exception is raised:

sage: def target(N):
....:     import time, os, signal
....:     if N == 4:
....:         # Task 4 is a poison pill
....:         1 / 0
....:     else:
....:         time.sleep(float(0.5))
....:         print('Processed task %s' % N)
>>> from sage.all import *
>>> def target(N):
...     import time, os, signal
...     if N == Integer(4):
...         # Task 4 is a poison pill
...         Integer(1) / Integer(0)
...     else:
...         time.sleep(float(RealNumber('0.5')))
...         print('Processed task %s' % N)
def target(N):
    import time, os, signal
    if N == 4:
        # Task 4 is a poison pill
        1 / 0
    else:
        time.sleep(float(0.5))
        print('Processed task %s' % N)

Note: In practice this test might still show output from the other worker processes before the poison-pill is executed. It may also display the traceback from the failing process on stderr. However, due to how the doctest runner works, the doctest will only expect the final exception:

sage: build_many(target, range(8), processes=8)
Traceback (most recent call last):
...
    raise ZeroDivisionError("rational division by zero")
 ZeroDivisionError: rational division by zero
...
    raise worker_exc.original_exception
ZeroDivisionError: rational division by zero
>>> from sage.all import *
>>> build_many(target, range(Integer(8)), processes=Integer(8))
Traceback (most recent call last):
...
    raise ZeroDivisionError("rational division by zero")
 ZeroDivisionError: rational division by zero
...
    raise worker_exc.original_exception
ZeroDivisionError: rational division by zero
build_many(target, range(8), processes=8)

Similarly, if one of the worker processes dies unexpectedly otherwise exits non-zero (e.g. killed by a signal) any in-progress tasks will be completed gracefully, but then a RuntimeError is raised and pending tasks are not started:

sage: def target(N):
....:     import time, os, signal
....:     if N == 4:
....:         # Task 4 is a poison pill
....:         os.kill(os.getpid(), signal.SIGKILL)
....:     else:
....:         time.sleep(float(0.5))
....:         print('Processed task %s' % N)
sage: build_many(target, range(8), processes=8)
Traceback (most recent call last):
...
WorkerDiedException: worker for 4 died with non-zero exit code -9
>>> from sage.all import *
>>> def target(N):
...     import time, os, signal
...     if N == Integer(4):
...         # Task 4 is a poison pill
...         os.kill(os.getpid(), signal.SIGKILL)
...     else:
...         time.sleep(float(RealNumber('0.5')))
...         print('Processed task %s' % N)
>>> build_many(target, range(Integer(8)), processes=Integer(8))
Traceback (most recent call last):
...
WorkerDiedException: worker for 4 died with non-zero exit code -9
def target(N):
    import time, os, signal
    if N == 4:
        # Task 4 is a poison pill
        os.kill(os.getpid(), signal.SIGKILL)
    else:
        time.sleep(float(0.5))
        print('Processed task %s' % N)
build_many(target, range(8), processes=8)