Utilities¶
- exception sage_docbuild.utils.RemoteException(tb: str)[source]¶
Bases:
Exception
Raised if an exception occurred in one of the child processes.
- tb: str¶
- class sage_docbuild.utils.RemoteExceptionWrapper(exc: BaseException)[source]¶
Bases:
object
Used by child processes to capture exceptions thrown during execution and report them to the main process, including the correct traceback.
- exc: BaseException¶
- tb: str¶
- exception sage_docbuild.utils.WorkerDiedException(message: str | None, original_exception: BaseException | None = None)[source]¶
Bases:
RuntimeError
Raised if a worker process dies unexpected.
- original_exception: BaseException | None¶
- sage_docbuild.utils.build_many(target, args, processes=None)[source]¶
Map a list of arguments in
args
to a single-argument target functiontarget
in parallel usingmultiprocessing.cpu_count()
(orprocesses
if given) simultaneous processes.This is a simplified version of
multiprocessing.Pool.map
from the Python standard library which avoids a couple of its pitfalls. In particular, it can abort (with aRuntimeError
) without hanging if one of the worker processes unexpectedly dies. It also has semantics equivalent tomaxtasksperchild=1
; that is, one process is started per argument. As such, this is inefficient for processing large numbers of fast tasks, but appropriate for running longer tasks (such as doc builds) which may also require significant cleanup.It also avoids starting new processes from a pthread, which results in at least one known issue:
When PARI is built with multi-threading support, forking a Sage process from a thread leaves the main Pari interface instance broken (see Issue #26608#comment:38).
In the future this may be replaced by a generalized version of the more robust parallel processing implementation from
sage.doctest.forker
.EXAMPLES:
sage: from sage_docbuild.utils import build_many sage: def target(N): ....: import time ....: time.sleep(float(0.1)) ....: print('Processed task %s' % N) sage: _ = build_many(target, range(8), processes=8) Processed task ... Processed task ... Processed task ... Processed task ... Processed task ... Processed task ... Processed task ... Processed task ...
>>> from sage.all import * >>> from sage_docbuild.utils import build_many >>> def target(N): ... import time ... time.sleep(float(RealNumber('0.1'))) ... print('Processed task %s' % N) >>> _ = build_many(target, range(Integer(8)), processes=Integer(8)) Processed task ... Processed task ... Processed task ... Processed task ... Processed task ... Processed task ... Processed task ... Processed task ...
from sage_docbuild.utils import build_many def target(N): import time time.sleep(float(0.1)) print('Processed task %s' % N) _ = build_many(target, range(8), processes=8)
This version can also return a result, and thus can be used as a replacement for
multiprocessing.Pool.map
(i.e. it still blocks until the result is ready):sage: def square(N): ....: return N * N sage: build_many(square, range(100)) [0, 1, 4, 9, ..., 9604, 9801]
>>> from sage.all import * >>> def square(N): ... return N * N >>> build_many(square, range(Integer(100))) [0, 1, 4, 9, ..., 9604, 9801]
def square(N): return N * N build_many(square, range(100))
If the target function raises an exception in any of the workers,
build_many
raises that exception and all other results are discarded. Any in-progress tasks may still be allowed to complete gracefully before the exception is raised:sage: def target(N): ....: import time, os, signal ....: if N == 4: ....: # Task 4 is a poison pill ....: 1 / 0 ....: else: ....: time.sleep(float(0.5)) ....: print('Processed task %s' % N)
>>> from sage.all import * >>> def target(N): ... import time, os, signal ... if N == Integer(4): ... # Task 4 is a poison pill ... Integer(1) / Integer(0) ... else: ... time.sleep(float(RealNumber('0.5'))) ... print('Processed task %s' % N)
def target(N): import time, os, signal if N == 4: # Task 4 is a poison pill 1 / 0 else: time.sleep(float(0.5)) print('Processed task %s' % N)
Note: In practice this test might still show output from the other worker processes before the poison-pill is executed. It may also display the traceback from the failing process on stderr. However, due to how the doctest runner works, the doctest will only expect the final exception:
sage: build_many(target, range(8), processes=8) Traceback (most recent call last): ... raise ZeroDivisionError("rational division by zero") ZeroDivisionError: rational division by zero ... raise worker_exc.original_exception ZeroDivisionError: rational division by zero
>>> from sage.all import * >>> build_many(target, range(Integer(8)), processes=Integer(8)) Traceback (most recent call last): ... raise ZeroDivisionError("rational division by zero") ZeroDivisionError: rational division by zero ... raise worker_exc.original_exception ZeroDivisionError: rational division by zero
build_many(target, range(8), processes=8)
Similarly, if one of the worker processes dies unexpectedly otherwise exits non-zero (e.g. killed by a signal) any in-progress tasks will be completed gracefully, but then a
RuntimeError
is raised and pending tasks are not started:sage: def target(N): ....: import time, os, signal ....: if N == 4: ....: # Task 4 is a poison pill ....: os.kill(os.getpid(), signal.SIGKILL) ....: else: ....: time.sleep(float(0.5)) ....: print('Processed task %s' % N) sage: build_many(target, range(8), processes=8) Traceback (most recent call last): ... WorkerDiedException: worker for 4 died with non-zero exit code -9
>>> from sage.all import * >>> def target(N): ... import time, os, signal ... if N == Integer(4): ... # Task 4 is a poison pill ... os.kill(os.getpid(), signal.SIGKILL) ... else: ... time.sleep(float(RealNumber('0.5'))) ... print('Processed task %s' % N) >>> build_many(target, range(Integer(8)), processes=Integer(8)) Traceback (most recent call last): ... WorkerDiedException: worker for 4 died with non-zero exit code -9
def target(N): import time, os, signal if N == 4: # Task 4 is a poison pill os.kill(os.getpid(), signal.SIGKILL) else: time.sleep(float(0.5)) print('Processed task %s' % N) build_many(target, range(8), processes=8)