Source code for zproc.process

import multiprocessing
import os
import signal
from typing import Callable, Union, Sequence, Mapping, Any, Optional

import zmq

from . import util, exceptions
from .processdef import child_process


[docs]class Process(util.SecretKeyHolder): def __init__( self, target: Callable, server_address: str, *, stateful: bool = True, pass_context: bool = False, args: Sequence = None, kwargs: Mapping = None, retry_for: Sequence[Union[signal.Signals, Exception]] = (), retry_delay: Union[int, float] = 5, max_retries: Optional[bool] = None, retry_args: Optional[tuple] = None, retry_kwargs: Optional[dict] = None, start: bool = True, backend: Callable = multiprocessing.Process, namespace: str = "default", secret_key: Optional[str] = None ) -> None: """ Provides a higher level interface to ``multiprocessing.Process``. Please don't share a Process object between Processes / Threads. A Process object is not thread-safe. :param server_address: The address of zproc server. If you are using a :py:class:`Context`, then this is automatically provided. Please read :ref:`server-address-spec` for a detailed explanation. :param target: The Callable to be invoked inside a new process. *The ``target`` is invoked with the following signature:* ``target(state, *args, **kwargs)`` *Where:* - ``state`` is a :py:class:`State` instance. - ``args`` and ``kwargs`` are passed from the constructor. :param pass_context: Weather to pass a :py:class:`Context` to this process. If this is set to ``True``, then the first argument to ``target`` will be a :py:class:`Context` object in-place of the default - :py:class:`State`. In other words, The ``target`` is invoked with the following signature: ``target(ctx, *args, **kwargs)`` Where: - ``ctx`` is a :py:class:`Context` object. - ``args`` and ``kwargs`` are passed from the constructor. .. note:: The :py:class:`Context` object provided here, will be different than the one used to create this process. The new :py:class:`Context` object can be used to create nested processes that share the same state. :param stateful: Weather this process needs to access the state. If this is set to ``False``, then the ``state`` argument won't be provided to the ``target``. In other words, The ``target`` is invoked with the following signature: ``target(*args, **kwargs)`` Where: - ``args`` and ``kwargs`` are passed from the constructor. Has no effect if ``pass_context`` is set to ``True``. :param start: Automatically call :py:meth:`.start()` on the process. :param retry_for: Retry only when one of these ``Exception``/``signal.Signals`` is raised. .. code-block:: python :caption: Example import signal # retry if a ConnectionError, ValueError or signal.SIGTERM is received. ctx.process( my_process, retry_for=(ConnectionError, ValueError, signal.SIGTERM) ) To retry for *any* Exception - ``retry_for=(Exception, )`` The items of this sequence MUST be a subclass of ``BaseException`` or of type ``signal.Signals``. :param retry_delay: The delay in seconds, before retrying. :param max_retries: Give up after this many attempts. A value of ``None`` will result in an *infinite* number of retries. After "max_tries", any Exception / Signal will exhibit default behavior. :param args: The argument tuple for ``target``. By default, it is an empty ``tuple``. :param kwargs: A dictionary of keyword arguments for ``target``. By default, it is an empty ``dict``. :param retry_args: Used in place of ``args`` when retrying. If set to ``None``, then it has no effect. :param retry_kwargs: Used in place of ``kwargs`` when retrying. If set to ``None``, then it has no effect. :param backend: The backend to use for launching the process(s). For example, you may use ``threading.Thread`` as the backend. .. warning:: Not guaranteed to work well with anything other than ``multiprocessing.Process``. :ivar child: A ``multiprocessing.Process`` instance for the child process. :ivar server_address: Passed on from the constructor. :ivar target: Passed on from the constructor. :ivar namespace: Passed on from the constructor. This is read-only. """ assert callable(target), '"target" must be a `Callable`, not `{}`'.format( type(target) ) super().__init__(secret_key) self.server_address = server_address self.namespace = namespace self.target = target self._zmq_ctx = util.create_zmq_ctx() self._result_sock = self._zmq_ctx.socket(zmq.PULL) # The result socket is meant to be used only after the process completes (after join()). # That implies -- we shouldn't need to wait for the result message. self._result_sock.setsockopt(zmq.RCVTIMEO, 0) result_address = util.bind_to_random_address(self._result_sock) self.child = backend( target=child_process, args=[ self.server_address, self.target, self.__repr__(), self.namespace, secret_key, stateful, pass_context, args, kwargs, retry_for, retry_delay, max_retries, retry_args, retry_kwargs, result_address, ], ) if start: self.child.start() def __repr__(self): try: pid = self.pid except AttributeError: pid = None try: exitcode = self.exitcode except AttributeError: exitcode = None try: is_alive = self.is_alive except AttributeError: is_alive = False return "<%s pid: %r target: %r ppid: %r is_alive: %r exitcode: %r>" % ( Process.__qualname__, pid, self.target.__module__ + "." + self.target.__qualname__, os.getpid(), is_alive, exitcode, )
[docs] def start(self): """ Start this Process If the child has already been started once, it will return with an :py:exc:`AssertionError`. :return: the process PID """ self.child.start() return self.pid
[docs] def stop(self): """ Stop this process. Once closed, it should not, and cannot be used again. :return: :py:attr:`~exitcode`. """ self.child.terminate() self._result_sock.close() util.close_zmq_ctx(self._zmq_ctx) return self.child.exitcode
[docs] def wait(self, timeout: Optional[Union[int, float]] = None): """ Wait until this process finishes execution, then return the value returned by the ``target``. :param timeout: The timeout in seconds. If the value is ``None``, it will block until the zproc server replies. For all other values, it will wait for a reply, for that amount of time before returning with a ``TimeoutError``. :return: The value returned by the ``target`` function. If the child finishes with a non-zero exitcode, or there is some error in retrieving the value returned by the ``target``, a :py:exc:`.ProcessWaitError` is raised. """ # try to fetch the cached result. try: return self._return_value except AttributeError: pass self.child.join(timeout) if self.is_alive: raise TimeoutError( "Timed-out while waiting for Process to return. -- %s" % repr(self) ) exitcode = self.exitcode if exitcode != 0: raise exceptions.ProcessWaitError( "Process finished with a non-zero exitcode (%d)." % exitcode, exitcode, self, ) try: self._return_value = util.recv( self._result_sock, self._serializer ) # type: Any except zmq.error.Again: raise exceptions.ProcessWaitError( "The Process died before sending its return value. " "It probably crashed, got killed, or exited without warning.", exitcode, ) self._result_sock.close() util.close_zmq_ctx(self._zmq_ctx) return self._return_value
@property def is_alive(self): """ Whether the child process is alive. Roughly, a process object is alive; from the moment the :py:meth:`start` method returns, until the child process is stopped manually (using :py:meth:`stop`) or naturally exits """ return self.child.is_alive() @property def pid(self): """ The process ID. Before the process is started, this will be None. """ return self.child.pid @property def exitcode(self): """ The child’s exit code. This will be None if the process has not yet terminated. A negative value ``-N`` indicates that the child was terminated by signal ``N``. """ return self.child.exitcode