Source code for zproc.context

import atexit
import collections
import functools
import multiprocessing
import signal
import time
from typing import (
    Callable,
    Union,
    Hashable,
    Any,
    List,
    Mapping,
    Generator,
    Sequence,
    Dict,
    Optional,
)

import zmq

from zproc import processdef, tools, util
from zproc.process import Process
from zproc.state import State

# holds the details for a task
TaskDetail = collections.namedtuple("TaskDetail", ["task_id", "chunk_count"])


[docs]class Context(util.SecretKeyHolder): def __init__( self, server_address: str = None, *, wait: bool = False, cleanup: bool = True, server_backend: Callable = multiprocessing.Process, namespace: str = "default", secret_key: str = None, **process_kwargs ) -> None: """ Provides a high level interface to :py:class:`State` and :py:class:`Process`. Primarily used to manage and launch processes. All processes launched using a Context, share the same state. Don't share a Context object between Processes / Threads. A Context object is not thread-safe. :param server_address: The address of the server. If it is set to ``None``, then a new server is started and a random address will be generated. Otherwise, it will connect to an existing server with the address provided. .. caution:: If you provide a "server_address", be sure to manually start the server, as described here - :ref:`start-server`. Please read :ref:`server-address-spec` for a detailed explanation. :param wait: Wait for all running process to finish their work before exiting. Alternative to manually calling :py:meth:`~Context.wait_all` at exit. :param cleanup: Whether to cleanup the process tree before exiting. Registers a signal handler for ``SIGTERM``, and an ``atexit`` handler. :param server_backend: Passed on to :py:func:`start_server` as ``backend``. :param \*\*process_kwargs: Keyword arguments that :py:class:`~Process` takes, except ``server_address`` and ``target``. If provided, these will be used while creating processes using this Context. :ivar state: A :py:class:`State` instance. :ivar process_list: A list of child ``Process``\ (s) created under this Context. :ivar worker_list: A list of worker ``Process``\ (s) created under this Context. Used for :py:meth:`Context.process_map`. :ivar server_process: A ``multiprocessing.Process`` object for the server, or None. :ivar server_address: The server's address as a 2 element ``tuple``. :ivar namespace: Passed on from the constructor. This is read-only. """ super().__init__(secret_key) if server_address is None: self.server_process, self.server_address = tools.start_server( server_address, backend=server_backend, secret_key=secret_key ) else: self.server_process, self.server_address = None, server_address self.namespace = namespace self.state = State( self.server_address, namespace=self.namespace, secret_key=secret_key ) self.process_list = [] # type:List[Process] self.worker_list = [] # type: List[Process] self._push_sock = self.state._zmq_ctx.socket(zmq.PUSH) self._push_address = util.bind_to_random_address(self._push_sock) self._pull_sock = self.state._zmq_ctx.socket(zmq.PULL) self._pull_address = util.bind_to_random_address(self._pull_sock) self._process_kwargs = process_kwargs self._process_kwargs["namespace"] = self.namespace self._process_kwargs["secret_key"] = self.secret_key self._worker_kwargs = { **self._process_kwargs, # must be in reverse order for this to work # i.e. first pull addr, then push addr. "args": (self._pull_address, self._push_address, self.secret_key), # worker can't work without the state! "stateful": True, "start": True, } self._task_counter = 0 self._task_chunk_results = collections.defaultdict( dict ) # type:Dict[TaskDetail, Dict[int, Any]] # register cleanup before wait, so that wait runs before cleanup. # (Order of execution is reversed) if cleanup: atexit.register(util.clean_process_tree) if util.is_main_thread(): signal.signal(signal.SIGTERM, util.clean_process_tree) if wait: atexit.register(self.wait_all) def __repr__(self): return "<{}>".format(self.__str__()) def __str__(self): return "{} for {}".format(Context.__qualname__, self.state)
[docs] def process( self, target: Optional[Callable] = None, **process_kwargs ) -> Union[Process, Callable]: """ Produce a child process bound to this context. Can be used both as a function and decorator: .. code-block:: python :caption: Usage @zproc.process() # you may pass some arguments here def my_process1(state): print('hello') @zproc.process # or not... def my_process2(state): print('hello') def my_process3(state): print('hello') zproc.process(my_process3) # or just use as a good ol' function :param target: Passed on to the :py:class:`Process` constructor. SHOULD be omitted when using this as a decorator. :param \*\*process_kwargs: .. include:: /api/context/params/process_kwargs.rst :return: The :py:class:`Process` instance produced. """ if target is None: def decorator(fn): return self.process(fn, **process_kwargs) return decorator process = Process( target, self.server_address, **{**self._process_kwargs, **process_kwargs} ) self.process_list.append(process) return process
[docs] def process_factory(self, *targets: Callable, count: int = 1, **process_kwargs): """ Produce multiple child process(s) bound to this context. :param \*targets: Passed on to the :py:class:`Process` constructor, one at a time. :param count: The number of processes to spawn for each item in ``targets``. :param \*\*process_kwargs: .. include:: /api/context/params/process_kwargs.rst :return: A ``list`` of the :py:class:`Process` instance(s) produced. """ return [ self.process(target, **process_kwargs) for target in targets for _ in range(count) ]
def _repopulate_workers(self, size: int, new: bool): if not new: size -= len([worker for worker in self.worker_list if worker.is_alive]) if size > 0: for _ in range(size): self.worker_list.append( self.process(processdef.process_map_worker, **self._worker_kwargs) ) elif size < 0: # Notify "size" no. of workers to finish up, and close shop. for _ in range(-size): util.send(None, self._push_sock, self._serializer) def _pull_results(self): response = util.recv(self._pull_sock, self._serializer) # print(response) task_detail, list_index, chunk_result = response self._task_chunk_results[task_detail][list_index] = chunk_result
[docs] def pull_results_for_task( self, task_detail: TaskDetail ) -> Generator[Any, None, None]: """ PULL "count" results from the process pool. Also arranges the results in-order. """ task_chunk_results = self._task_chunk_results[task_detail] for list_index in range(task_detail.chunk_count): while list_index not in task_chunk_results: self._pull_results() yield from task_chunk_results[list_index]
[docs] def process_map( self, target: Callable, map_iter: Sequence[Any] = None, *, map_args: Sequence[Sequence[Any]] = None, args: Sequence = None, map_kwargs: Sequence[Mapping[str, Any]] = None, kwargs: Mapping = None, count: int = None, stateful: bool = False, new: bool = False, return_task: bool = False ) -> Union[TaskDetail, Generator[Any, None, None]]: """ Functional equivalent of ``map()`` in-built function, but executed in a parallel fashion. Distributes the iterables provided in the ``map_*`` arguments to ``count`` no of worker :py:class:`Process`\ (s). (Aforementioned worker processes are visible here: :py:attr:`Context.worker_list`) The idea is to: 1. Split the the iterables provided in the ``map_*`` arguments into ``count`` number of equally sized chunks. 2. Send these chunks to ``count`` number of worker :py:class:`Process`\ (s). 3. Wait for all these worker :py:class:`Process`\ (s) to finish their task(s). 4. Combine the acquired results in the same sequence as provided in the ``map_*`` arguments. 5. Return the combined results. *Steps 3-5 are done lazily, on the fly with the help of a* ``generator`` .. note:: This function won't spawn new worker :py:class:`Process`\ (s), each time it is called. Existing workers will be used if a sufficient amount is available. If the workers are busy, then this will wait for them to finish up their current work. Use the ``new=True`` Keyword Argument to spawn new workers, irregardless of existing ones. You need not worry about shutting down workers. ZProc will take care of that automatically. .. note:: This method doesn't have a way to pass Keyword Arguments to :py:class:`Process`. This was done, to prevent weird behavior due to the re-use of workers done by ZProc. Use the :py:class:`Context`\ 's constructor to workaround this problem. :param target: The ``Callable`` to be invoked inside a :py:class:`Process`. *It is invoked with the following signature:* ``target(state, map_iter[i], *map_args[i], *args, **map_kwargs[i], **kwargs)`` *Where:* - ``state`` is a :py:class:`State` instance. (Disabled by default. Use the ``stateful`` Keyword Argument to enable) - ``i`` is the index of n\ :sup:`th` element of the Iterable(s) provided in the ``map_*`` arguments. - ``args`` and ``kwargs`` are passed from the ``**process_kwargs``. P.S. The ``stateful`` Keyword Argument of :py:class:`Process` allows you to omit the ``state`` arg. :param map_iter: A sequence whose elements are supplied as the *first* positional argument (after ``state``) to the ``target``. :param map_args: A sequence whose elements are supplied as positional arguments (``*args``) to the ``target``. :param map_kwargs: A sequence whose elements are supplied as keyword arguments (``**kwargs``) to the ``target``. :param args: The argument tuple for ``target``, supplied after ``map_iter`` and ``map_args``. By default, it is an empty ``tuple``. :param kwargs: A dictionary of keyword arguments for ``target``. By default, it is an empty ``dict``. :param stateful: Weather this process needs to access the state. If this is set to ``False``, then the ``state`` argument won't be provided to the ``target``. If this is set to ``True``, then a :py:class:`State` object is provided as the first Argument to the ``target``. Unlike :py:class:`Process` it is set to ``False`` by default. (To retain a similar API to in-built ``map()``) :param new: Weather to spawn new workers. If it is set to ``True``, then it will spawn new workers, irregardless of existing ones. If it is set to ``False``, then ``size - len(Context.worker_list)`` will be spawned. Un-used workers are thrashed automatically. :param count: The number of worker :py:class:`.Process` (s) to use. By default, it is set to ``multiprocessing.cpu_count()`` (The number of CPU cores on your system) :param return_task: Return a ``TaskDetail`` namedtuple object, instead of a Generator that yields the results of the computation. The ``TaskDetail`` returned can be passed to :py:meth:`Context.pull_results_for_task`, which will fetch the results for you. This is useful in situations where the results are required at a later time, and since a Generator object is not easily serializable, things get a little tricky. On the other hand, a namedtuple can be serialized to JSON, pretty easily. :return: The result is quite similar to ``map()`` in-built function. It returns a ``generator`` whose elements are the return value of the ``target`` function, when applied to every item of the Iterables provided in the ``map_*`` arguments. The actual "processing" starts as soon as you call this function. The returned ``generator`` fetches the results from the worker processes, one-by-one. .. warning:: - If ``len(map_iter) != len(maps_args) != len(map_kwargs)``, then the results will be cut-off at the shortest Sequence. See :ref:`process_map` for Examples. """ if count is None: count = multiprocessing.cpu_count() if args is None: args = () if kwargs is None: kwargs = {} task_detail = TaskDetail(task_id=self._task_counter, chunk_count=count) self._task_counter += 1 self._repopulate_workers(count, new) lengths = [len(i) for i in (map_iter, map_args, map_kwargs) if i is not None] assert ( lengths ), 'At least one of "map_iter", "map_args", or "map_kwargs" must be provided as a non-empty sequence.' chunk_size, extra = divmod(min(lengths), count) if extra: chunk_size += 1 chunks = ( util.chunk_gen(map_iter, chunk_size, count), util.chunk_gen(map_args, chunk_size, count), util.chunk_gen(map_kwargs, chunk_size, count), ) # print(smallest, chunk_size, count, chunks) for chunk_id in range(count): params = [ None if chunks[0] is None else chunks[0][chunk_id], None if chunks[1] is None else chunks[1][chunk_id], args, None if chunks[2] is None else chunks[2][chunk_id], kwargs, ] batch = [task_detail, chunk_id, stateful, target, params] util.send(batch, self._push_sock, self._serializer) if return_task: return task_detail return self.pull_results_for_task(task_detail)
def _create_call_when_xxx_decorator( self, get_when_xxx_fn_name: str, process_kwargs: dict, *state_watcher_args, **state_watcher_kwargs ): # can't work without the state! stateful = process_kwargs.pop("stateful", True) def decorator(wrapped_fn): if stateful: def watcher_process(state, *args, **kwargs): get_when_xxx_fn = getattr(state, get_when_xxx_fn_name) while True: wrapped_fn( get_when_xxx_fn( *state_watcher_args, **state_watcher_kwargs ), state, *args, **kwargs ) else: def watcher_process(state, *args, **kwargs): get_when_xxx_fn = getattr(state, get_when_xxx_fn_name) while True: wrapped_fn( get_when_xxx_fn( *state_watcher_args, **state_watcher_kwargs ), *args, **kwargs ) watcher_process = self.process(watcher_process, **process_kwargs) functools.update_wrapper(watcher_process.target, wrapped_fn) return watcher_process return decorator
[docs] def call_when_change( self, *keys: Hashable, exclude: bool = False, live: bool = False, **process_kwargs ): """ Decorator version of :py:meth:`~State.get_when_change()`. .. include:: /api/context/call_when_change.rst .. code-block:: python :caption: Example import zproc ctx = zproc.Context() @ctx.call_when_change('gold') def test(snapshot, state): print(snapshot['gold'], state) """ return self._create_call_when_xxx_decorator( "get_when_change", process_kwargs, *keys, exclude=exclude, live=live )
[docs] def call_when(self, test_fn: Callable, *, live: bool = False, **process_kwargs): """ Decorator version of :py:meth:`~State.get_when()`. .. include:: /api/context/call_when.rst .. code-block:: python :caption: Example import zproc ctx = zproc.Context() @ctx.get_state_when(lambda state: state['trees'] == 5) def test(snapshot, state): print(snapshot['trees'], state) """ return self._create_call_when_xxx_decorator( "get_when", process_kwargs, test_fn, live=live )
[docs] def call_when_equal( self, key: Hashable, value: Any, *, live: bool = False, **process_kwargs ): """ Decorator version of :py:meth:`~State.get_when_equal()`. .. include:: /api/context/call_when_equality.rst .. code-block:: python :caption: Example import zproc ctx = zproc.Context() @ctx.call_when_equal('oranges', 5) def test(snapshot, state): print(snapshot['oranges'], state) """ return self._create_call_when_xxx_decorator( "get_when_equal", process_kwargs, key, value, live=live )
[docs] def call_when_not_equal( self, key: Hashable, value: Any, *, live: bool = False, **process_kwargs ): """ Decorator version of :py:meth:`~State.get_when_not_equal()`. .. include:: /api/context/call_when_equality.rst .. code-block:: python :caption: Example import zproc ctx = zproc.Context() @ctx.call_when_not_equal('apples', 5) def test(snapshot, state): print(snapshot['apples'], state) """ return self._create_call_when_xxx_decorator( "get_when_not_equal", process_kwargs, key, value, live=live )
[docs] def call_when_none(self, key: Hashable, *, live: bool = False, **process_kwargs): """ Decorator version of :py:meth:`~State.get_when_none()`. .. include:: /api/context/call_when_equality.rst """ return self._create_call_when_xxx_decorator( "get_when_none", process_kwargs, key, live=live )
[docs] def call_when_not_none( self, key: Hashable, *, live: bool = False, **process_kwargs ): """ Decorator version of :py:meth:`~State.get_when_not_none()`. .. include:: /api/context/call_when_equality.rst """ return self._create_call_when_xxx_decorator( "get_when_not_none", process_kwargs, key, live=live )
[docs] def call_when_available( self, key: Hashable, *, live: bool = False, **process_kwargs ): """ Decorator version of :py:meth:`~State.get_when_available()`. .. include:: /api/context/call_when_equality.rst """ return self._create_call_when_xxx_decorator( "get_when_available", process_kwargs, key, live=live )
@staticmethod def _wait_or_catch_exc(process: Process, *args, **kwargs) -> Union[Exception, Any]: try: return process.wait(*args, **kwargs) except Exception as e: return e
[docs] def wait_all( self, timeout: Optional[Union[int, float]] = None, safe: bool = False ) -> List[Union[Any, Exception]]: """ Call :py:meth:`~Process.wait()` on all the child processes of this Context. (Excluding the worker processes) Retains the same order as ``Context.process_list``. :param timeout: Same as :py:meth:`~Process.wait()`. This parameter controls the timeout for all the Processes combined, not a single :py:meth:`~Process.wait()` call. :param safe: Suppress any errors that occur while waiting for a Process. The return value of failed :py:meth:`~Process.wait()` calls are substituted with the ``Exception`` that occurred. :return: A ``list`` containing the values returned by child Processes of this Context. """ if safe: _wait = self._wait_or_catch_exc else: _wait = Process.wait if timeout is None: return [_wait(process) for process in self.process_list] else: final = time.time() + timeout return [ _wait(process, final - time.time()) for process in self.process_list ]
[docs] def start_all(self): """ Call :py:meth:`~Process.start()` on all the child processes of this Context Ignores if a Process is already started, unlike :py:meth:`~Process.start()`, which throws an ``AssertionError``. """ for process in self.process_list: try: process.start() except AssertionError: pass
[docs] def stop_all(self): """ Call :py:meth:`~Process.stop()` on all the child processes of this Context Retains the same order as ``Context.process_list``. :return: A ``list`` containing the exitcodes of the child Processes of this Context. """ return [proc.stop() for proc in self.process_list]
[docs] def ping(self, **kwargs): """ Ping the zproc server. :param \*\*kwargs: Keyword arguments that :py:func:`ping` takes, except ``server_address``. :return: Same as :py:func:`ping` """ return tools.ping(self.server_address, **kwargs)
[docs] def close(self): """ Close this context and stop all processes associated with it. Once closed, you shouldn't use this Context again. """ self.stop_all() if self.server_process is not None: self.server_process.terminate() self._push_sock.close() self._pull_sock.close() self.state.close() # The zmq Context will be implicitly closed here.