API

Functions

zproc.ping(server_address: str, *, timeout: Union[float, int, None] = None, sent_payload: Optional[bytes] = None, secret_key: str = None) → Optional[int][source]

Ping the zproc server.

This can be used to easily detect if a server is alive and running, with the aid of a suitable timeout.

Parameters:
  • server_address

    The zproc server’s address.

    Please read The server address spec for a detailed explanation.

  • timeout

    The timeout in seconds.

    If this is set to None, then it will block forever, until the zproc server replies.

    For all other values, it will wait for a reply, for that amount of time before returning with a TimeoutError.

    By default it is set to None.

  • sent_payload

    payload that will be sent to the server.

    If it is set to None, then os.urandom(56) (56 random bytes) will be used.

    (No real reason for the 56 magic number.)

Returns:

The zproc server’s pid if the ping was successful, else None

If this returns None, then it probably means there is some fault in communication with the server.

zproc.atomic(fn: Callable) → Callable[source]

Wraps a function, to create an atomic operation out of it.

No Process shall access the state while fn is running.

Note

  • The first argument to the wrapped function must be a State object.

  • The wrapped function receives a frozen version (snapshot) of state;
    a dict object, not a State object.

Please read Atomicity and race conditions for a detailed explanation.

Parameters:fn – The function to be wrapped, as an atomic function.
Returns:A wrapper function.

The “wrapper” function returns the value returned by the “wrapped” function.

>>> import zproc
>>>
>>> @zproc.atomic
... def increment(snapshot):
...     return snapshot['count'] + 1
...
>>>
>>> ctx = zproc.Context()
>>> ctx.state['count'] = 0
>>>
>>> increment(ctx.state)
1
zproc.start_server(server_address: str = None, *, backend: Callable = <class 'multiprocessing.context.Process'>, secret_key: str = None)[source]

Start a new zproc server.

Parameters:
  • server_address

    The zproc server’s address.

    If it is set to None, then a random address will be generated.

    Please read The server address spec for a detailed explanation.

  • backend

    The backend to use for launching the server process.

    For example, you may use threading.Thread as the backend.

    Warning

    Not guaranteed to work well with anything other than multiprocessing.Process.

Returns:

tuple, containing a multiprocessing.Process object for server and the server address.

zproc.signal_to_exception(sig: signal.Signals)[source]

Convert a signal.Signals to a SignalException.

This allows for a natural, pythonic signal handing with the use of try-except blocks.

Exceptions

exception zproc.ProcessWaitError(message, exitcode, process=None)[source]
exception zproc.RemoteException(exc_info=None)[source]
exception zproc.SignalException(sig, frame)[source]

Context

class zproc.Context(server_address: str = None, *, wait: bool = False, cleanup: bool = True, server_backend: Callable = <class 'multiprocessing.context.Process'>, namespace: str = 'default', secret_key: str = None, **process_kwargs)[source]

Provides a high level interface to State and Process.

Primarily used to manage and launch processes.

All processes launched using a Context, share the same state.

Don’t share a Context object between Processes / Threads. A Context object is not thread-safe.

Parameters:
  • server_address

    The address of the server.

    If it is set to None, then a new server is started and a random address will be generated.

    Otherwise, it will connect to an existing server with the address provided.

    Caution

    If you provide a “server_address”, be sure to manually start the server, as described here - Starting the server manually.

    Please read The server address spec for a detailed explanation.

  • wait

    Wait for all running process to finish their work before exiting.

    Alternative to manually calling wait_all() at exit.

  • cleanup

    Whether to cleanup the process tree before exiting.

    Registers a signal handler for SIGTERM, and an atexit handler.

  • server_backend – Passed on to start_server() as backend.
  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will be used while creating processes using this Context.

Variables:
  • state – A State instance.
  • process_list – A list of child Process(s) created under this Context.
  • worker_list – A list of worker Process(s) created under this Context. Used for Context.process_map().
  • server_process – A multiprocessing.Process object for the server, or None.
  • server_address – The server’s address as a 2 element tuple.
  • namespace – Passed on from the constructor. This is read-only.
process(target: Optional[Callable] = None, **process_kwargs) → Union[zproc.process.Process, Callable][source]

Produce a child process bound to this context.

Can be used both as a function and decorator:

Usage
@zproc.process()  # you may pass some arguments here
def my_process1(state):
        print('hello')


@zproc.process  # or not...
def my_process2(state):
    print('hello')


def my_process3(state):
    print('hello')

zproc.process(my_process3)  # or just use as a good ol' function
Parameters:
  • target

    Passed on to the Process constructor.

    SHOULD be omitted when using this as a decorator.

  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will have a precedence over the one’s provided in Context’s constructor.

Returns:

The Process instance produced.

process_factory(*targets, count: int = 1, **process_kwargs)[source]

Produce multiple child process(s) bound to this context.

Parameters:
  • *targets – Passed on to the Process constructor, one at a time.
  • count – The number of processes to spawn for each item in targets.
  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will have a precedence over the one’s provided in Context’s constructor.

Returns:

A list of the Process instance(s) produced.

pull_results_for_task(task_detail: zproc.context.TaskDetail) → Generator[[Any, None], None][source]

PULL “count” results from the process pool. Also arranges the results in-order.

process_map(target: Callable, map_iter: Sequence[Any] = None, *, map_args: Sequence[Sequence[Any]] = None, args: Sequence = None, map_kwargs: Sequence[Mapping[str, Any]] = None, kwargs: Mapping = None, count: int = None, stateful: bool = False, new: bool = False, return_task: bool = False) → Union[zproc.context.TaskDetail, Generator[[Any, None], None]][source]

Functional equivalent of map() in-built function, but executed in a parallel fashion.

Distributes the iterables provided in the map_* arguments to count no of worker Process(s).

(Aforementioned worker processes are visible here: Context.worker_list)

The idea is to:
  1. Split the the iterables provided in the map_* arguments into count number of equally sized chunks.
  2. Send these chunks to count number of worker Process(s).
  3. Wait for all these worker Process(s) to finish their task(s).
  4. Combine the acquired results in the same sequence as provided in the map_* arguments.
  5. Return the combined results.

Steps 3-5 are done lazily, on the fly with the help of a generator

Note

This function won’t spawn new worker Process(s), each time it is called.

Existing workers will be used if a sufficient amount is available. If the workers are busy, then this will wait for them to finish up their current work.

Use the new=True Keyword Argument to spawn new workers, irregardless of existing ones.

You need not worry about shutting down workers. ZProc will take care of that automatically.

Note

This method doesn’t have a way to pass Keyword Arguments to Process.

This was done, to prevent weird behavior due to the re-use of workers done by ZProc.

Use the Context’s constructor to workaround this problem.

Parameters:
  • target

    The Callable to be invoked inside a Process.

    It is invoked with the following signature:

    target(state, map_iter[i], *map_args[i], *args, **map_kwargs[i], **kwargs)

    Where:

    • state is a State instance. (Disabled by default. Use the stateful Keyword Argument to enable)
    • i is the index of nth element of the Iterable(s) provided in the map_* arguments.
    • args and kwargs are passed from the **process_kwargs.

    P.S. The stateful Keyword Argument of Process allows you to omit the state arg.

  • map_iter – A sequence whose elements are supplied as the first positional argument (after state) to the target.
  • map_args – A sequence whose elements are supplied as positional arguments (*args) to the target.
  • map_kwargs – A sequence whose elements are supplied as keyword arguments (**kwargs) to the target.
  • args

    The argument tuple for target, supplied after map_iter and map_args.

    By default, it is an empty tuple.

  • kwargs

    A dictionary of keyword arguments for target.

    By default, it is an empty dict.

  • stateful

    Weather this process needs to access the state.

    If this is set to False, then the state argument won’t be provided to the target.

    If this is set to True, then a State object is provided as the first Argument to the target.

    Unlike Process it is set to False by default. (To retain a similar API to in-built map())

  • new

    Weather to spawn new workers.

    If it is set to True, then it will spawn new workers, irregardless of existing ones.

    If it is set to False, then size - len(Context.worker_list) will be spawned.

    Un-used workers are thrashed automatically.

  • count

    The number of worker Process (s) to use.

    By default, it is set to multiprocessing.cpu_count() (The number of CPU cores on your system)

  • return_task

    Return a TaskDetail namedtuple object, instead of a Generator that yields the results of the computation.

    The TaskDetail returned can be passed to Context.pull_results_for_task(), which will fetch the results for you.

    This is useful in situations where the results are required at a later time, and since a Generator object is not easily serializable, things get a little tricky. On the other hand, a namedtuple can be serialized to JSON, pretty easily.

Returns:

The result is quite similar to map() in-built function.

It returns a generator whose elements are the return value of the target function, when applied to every item of the Iterables provided in the map_* arguments.

The actual “processing” starts as soon as you call this function.

The returned generator fetches the results from the worker processes, one-by-one.

Warning

  • If len(map_iter) != len(maps_args) != len(map_kwargs), then the results will be cut-off at the shortest Sequence.

See Process Map for Examples.

call_when_change(*keys, exclude: bool = False, live: bool = False, **process_kwargs)[source]

Decorator version of get_when_change().

Spawns a new Process, and then calls the wrapped function inside of that new process.

The wrapped function is run with the following signature:

target(snapshot, state, *args, **kwargs)

Where:

  • target is the wrapped function.

  • snapshot is a dict containing a copy of the state.

    Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.

  • state is a State instance.

  • *args and **kwargs are passed on from **process_kwargs.

Parameters:
  • *keys

    Watch for changes on these keys in the state dict.

    If this is not provided, then all state-changes are respected. (default)

  • exclude

    Reverse the lookup logic i.e.,

    Watch for all changes in the state except in *keys.

    If *keys is not provided, then this has no effect. (default)

  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will have a precedence over the one’s provided in Context’s constructor.

Returns:A decorator function

The decorator function will return the Process instance created.

Example
import zproc

ctx = zproc.Context()

@ctx.call_when_change('gold')
def test(snapshot, state):
    print(snapshot['gold'], state)
call_when(test_fn: Callable, *, live: bool = False, **process_kwargs)[source]

Decorator version of get_when().

Spawns a new Process, and then calls the wrapped function inside of that new process.

The wrapped function is run with the following signature:

target(snapshot, state, *args, **kwargs)

Where:

  • target is the wrapped function.

  • snapshot is a dict containing a copy of the state.

    Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.

  • state is a State instance.

  • *args and **kwargs are passed on from **process_kwargs.

Parameters:
  • test_fn – A Callable, which is called on each state-change.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will have a precedence over the one’s provided in Context’s constructor.

Returns:A decorator function

The decorator function will return the Process instance created.

Example
import zproc

ctx = zproc.Context()

@ctx.get_state_when(lambda state: state['trees'] == 5)
def test(snapshot, state):
    print(snapshot['trees'], state)
call_when_equal(key: collections.abc.Hashable, value: Any, *, live: bool = False, **process_kwargs)[source]

Decorator version of get_when_equal().

Spawns a new Process, and then calls the wrapped function inside of that new process.

The wrapped function is run with the following signature:

target(snapshot, state, *args, **kwargs)

Where:

  • target is the wrapped function.

  • snapshot is a dict containing a copy of the state.

    Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.

  • state is a State instance.

  • *args and **kwargs are passed on from **process_kwargs.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will have a precedence over the one’s provided in Context’s constructor.

Returns:A decorator function

The decorator function will return the Process instance created.

Example
import zproc

ctx = zproc.Context()

@ctx.call_when_equal('oranges', 5)
def test(snapshot, state):
    print(snapshot['oranges'], state)
call_when_not_equal(key: collections.abc.Hashable, value: Any, *, live: bool = False, **process_kwargs)[source]

Decorator version of get_when_not_equal().

Spawns a new Process, and then calls the wrapped function inside of that new process.

The wrapped function is run with the following signature:

target(snapshot, state, *args, **kwargs)

Where:

  • target is the wrapped function.

  • snapshot is a dict containing a copy of the state.

    Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.

  • state is a State instance.

  • *args and **kwargs are passed on from **process_kwargs.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will have a precedence over the one’s provided in Context’s constructor.

Returns:A decorator function

The decorator function will return the Process instance created.

Example
import zproc

ctx = zproc.Context()

@ctx.call_when_not_equal('apples', 5)
def test(snapshot, state):
    print(snapshot['apples'], state)
call_when_none(key: collections.abc.Hashable, *, live: bool = False, **process_kwargs)[source]

Decorator version of get_when_none().

Spawns a new Process, and then calls the wrapped function inside of that new process.

The wrapped function is run with the following signature:

target(snapshot, state, *args, **kwargs)

Where:

  • target is the wrapped function.

  • snapshot is a dict containing a copy of the state.

    Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.

  • state is a State instance.

  • *args and **kwargs are passed on from **process_kwargs.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will have a precedence over the one’s provided in Context’s constructor.

Returns:A decorator function

The decorator function will return the Process instance created.

call_when_not_none(key: collections.abc.Hashable, *, live: bool = False, **process_kwargs)[source]

Decorator version of get_when_not_none().

Spawns a new Process, and then calls the wrapped function inside of that new process.

The wrapped function is run with the following signature:

target(snapshot, state, *args, **kwargs)

Where:

  • target is the wrapped function.

  • snapshot is a dict containing a copy of the state.

    Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.

  • state is a State instance.

  • *args and **kwargs are passed on from **process_kwargs.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will have a precedence over the one’s provided in Context’s constructor.

Returns:A decorator function

The decorator function will return the Process instance created.

call_when_available(key: collections.abc.Hashable, *, live: bool = False, **process_kwargs)[source]

Decorator version of get_when_available().

Spawns a new Process, and then calls the wrapped function inside of that new process.

The wrapped function is run with the following signature:

target(snapshot, state, *args, **kwargs)

Where:

  • target is the wrapped function.

  • snapshot is a dict containing a copy of the state.

    Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.

  • state is a State instance.

  • *args and **kwargs are passed on from **process_kwargs.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

  • **process_kwargs

    Keyword arguments that Process takes, except server_address and target.

    If provided, these will have a precedence over the one’s provided in Context’s constructor.

Returns:A decorator function

The decorator function will return the Process instance created.

wait_all(timeout: Union[float, int, None] = None, safe: bool = False) → List[Union[Any, Exception]][source]

Call wait() on all the child processes of this Context. (Excluding the worker processes)

Retains the same order as Context.process_list.

Parameters:
  • timeout

    Same as wait().

    This parameter controls the timeout for all the Processes combined, not a single wait() call.

  • safe

    Suppress any errors that occur while waiting for a Process.

    The return value of failed wait() calls are substituted with the Exception that occurred.

Returns:

A list containing the values returned by child Processes of this Context.

start_all()[source]

Call start() on all the child processes of this Context

Ignores if a Process is already started, unlike start(), which throws an AssertionError.

stop_all()[source]

Call stop() on all the child processes of this Context

Retains the same order as Context.process_list.

Returns:A list containing the exitcodes of the child Processes of this Context.
ping(**kwargs)[source]

Ping the zproc server.

Parameters:**kwargs – Keyword arguments that ping() takes, except server_address.
Returns:Same as ping()
close()[source]

Close this context and stop all processes associated with it.

Once closed, you shouldn’t use this Context again.

Process

class zproc.Process(target: Callable, server_address: str, *, stateful: bool = True, pass_context: bool = False, args: Sequence = None, kwargs: Mapping = None, retry_for: Sequence[Union[signal.Signals, Exception]] = (), retry_delay: Union[int, float] = 5, max_retries: Optional[bool] = None, retry_args: Optional[tuple] = None, retry_kwargs: Optional[dict] = None, start: bool = True, backend: Callable = <class 'multiprocessing.context.Process'>, namespace: str = 'default', secret_key: Optional[str] = None)[source]

Provides a higher level interface to multiprocessing.Process.

Please don’t share a Process object between Processes / Threads. A Process object is not thread-safe.

Parameters:
  • server_address

    The address of zproc server.

    If you are using a Context, then this is automatically provided.

    Please read The server address spec for a detailed explanation.

  • target

    The Callable to be invoked inside a new process.

    The ``target`` is invoked with the following signature:

    target(state, *args, **kwargs)

    Where:

    • state is a State instance.
    • args and kwargs are passed from the constructor.
  • pass_context

    Weather to pass a Context to this process.

    If this is set to True, then the first argument to target will be a Context object in-place of the default - State.

    In other words, The target is invoked with the following signature:

    target(ctx, *args, **kwargs)

    Where:

    • ctx is a Context object.
    • args and kwargs are passed from the constructor.

    Note

    The Context object provided here, will be different than the one used to create this process.

    The new Context object can be used to create nested processes that share the same state.

  • stateful

    Weather this process needs to access the state.

    If this is set to False, then the state argument won’t be provided to the target.

    In other words, The target is invoked with the following signature:

    target(*args, **kwargs)

    Where:

    • args and kwargs are passed from the constructor.

    Has no effect if pass_context is set to True.

  • start – Automatically call start() on the process.
  • retry_for

    Retry only when one of these Exception/signal.Signals is raised.

    Example
    import signal
    
    # retry if a ConnectionError, ValueError or signal.SIGTERM is received.
    ctx.process(
        my_process,
        retry_for=(ConnectionError, ValueError, signal.SIGTERM)
    )
    

    To retry for any Exception - retry_for=(Exception, )

    The items of this sequence MUST be a subclass of BaseException or of type signal.Signals.

  • retry_delay – The delay in seconds, before retrying.
  • max_retries

    Give up after this many attempts.

    A value of None will result in an infinite number of retries.

    After “max_tries”, any Exception / Signal will exhibit default behavior.

  • args

    The argument tuple for target.

    By default, it is an empty tuple.

  • kwargs

    A dictionary of keyword arguments for target.

    By default, it is an empty dict.

  • retry_args

    Used in place of args when retrying.

    If set to None, then it has no effect.

  • retry_kwargs

    Used in place of kwargs when retrying.

    If set to None, then it has no effect.

  • backend

    The backend to use for launching the process(s).

    For example, you may use threading.Thread as the backend.

    Warning

    Not guaranteed to work well with anything other than multiprocessing.Process.

Variables:
  • child – A multiprocessing.Process instance for the child process.
  • server_address – Passed on from the constructor.
  • target – Passed on from the constructor.
  • namespace – Passed on from the constructor. This is read-only.
start()[source]

Start this Process

If the child has already been started once, it will return with an AssertionError.

Returns:the process PID
stop()[source]

Stop this process.

Once closed, it should not, and cannot be used again.

Returns:exitcode.
wait(timeout: Union[float, int, None] = None)[source]

Wait until this process finishes execution, then return the value returned by the target.

Parameters:timeout

The timeout in seconds.

If the value is None, it will block until the zproc server replies.

For all other values, it will wait for a reply, for that amount of time before returning with a TimeoutError.

Returns:The value returned by the target function.

If the child finishes with a non-zero exitcode, or there is some error in retrieving the value returned by the target, a ProcessWaitError is raised.

is_alive

Whether the child process is alive.

Roughly, a process object is alive; from the moment the start() method returns, until the child process is stopped manually (using stop()) or naturally exits

pid

The process ID.

Before the process is started, this will be None.

exitcode

The child’s exit code.

This will be None if the process has not yet terminated. A negative value -N indicates that the child was terminated by signal N.

State

class zproc.State(server_address: str, *, namespace: str = 'default', secret_key: Optional[str] = None)[source]

Allows accessing state stored on the zproc server, through a dict-like API.

Communicates to the zproc server using the ZMQ sockets.

Please don’t share a State object between Processes/Threads. A State object is not thread-safe.

Boasts the following dict-like members, for accessing the state:

  • Magic methods:
    __contains__(), __delitem__(), __eq__(), __getitem__(), __iter__(), __len__(), __ne__(), __setitem__()
  • Methods:
    clear(), copy(), get(), items(), keys(), pop(), popitem(), setdefault(), update(), values()
Parameters:server_address

The address of zproc server.

If you are using a Context, then this is automatically provided.

Please read The server address spec for a detailed explanation.

Variables:server_address – Passed on from constructor.
fork(server_address: Optional[str] = None, *, namespace: Optional[str] = None, secret_key: Optional[str] = None) → zproc.state.State[source]

“Forks” this State object.

Takes the same args as the State constructor, except that they automatically default to the values provided during the creation of this State object.

If no args are provided to this function, then it shall create a new State object that follows the exact same semantics as this one.

This is preferred over copying a State object.

Useful when one needs to access 2 or more namespaces on the same server.

set(value: dict)[source]

Set the state, completely over-writing the previous value.

copy()[source]

Return a deep-copy of the state.

Unlike the shallow-copy returned by dict.copy().

go_live()[source]

Clear the outstanding queue (or buffer), thus clearing any past events that were stored.

Internally, this re-opens a socket, which in-turn clears the queue.

Please read Live-ness of events for a detailed explanation.

get_raw_update(live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → Tuple[dict, dict, bool][source]

A low-level hook that emits each and every state update.

Parameters:
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

get_when_change(*keys, exclude: bool = False, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]

Block until a change is observed, and then return a copy of the state.

Parameters:
  • *keys

    Watch for changes on these keys in the state dict.

    If this is not provided, then all state-changes are respected. (default)

  • exclude

    Reverse the lookup logic i.e.,

    Watch for all changes in the state except in *keys.

    If *keys is not provided, then this has no effect. (default)

  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

Returns:A dict containing a copy of the state.

This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.

get_when(test_fn, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]

Block until test_fn(snapshot) returns a “truthy” value, and then return a copy of the state.

Where-

snapshot is a dict, containing a copy of the state.

Parameters:
  • test_fn – A Callable, which is called on each state-change.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

Returns:A dict containing a copy of the state.

This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.

get_when_equal(key: collections.abc.Hashable, value: Any, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]

Block until state[key] == value, and then return a copy of the state.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

Returns:A dict containing a copy of the state.

This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.

get_when_not_equal(key: collections.abc.Hashable, value: Any, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]

Block until state[key] != value, and then return a copy of the state.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

Returns:A dict containing a copy of the state.

This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.

get_when_none(key: collections.abc.Hashable, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]

Block until state[key] is None, and then return a copy of the state.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

Returns:A dict containing a copy of the state.

This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.

get_when_not_none(key: collections.abc.Hashable, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]

Block until state[key] is not None, and then return a copy of the state.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

Returns:A dict containing a copy of the state.

This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.

get_when_available(key: collections.abc.Hashable, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False)[source]

Block until key in state, and then return a copy of the state.

Parameters:
  • key – Some key in the state dict.
  • value – The value corresponding to the key in state dict.
  • live

    Whether to get live updates.

    Please read Live-ness of events for a detailed explanation.

  • timeout

    Sets the timeout in seconds.

    If the value is None, it will block until an update is available.

    For all other values (>=0), it will wait for a state-change, for that amount of time before returning with a TimeoutError.

  • duplicate_okay

    Whether it’s okay to process duplicate updates.

    Please read Duplicate-ness of events for a detailed explanation.

Returns:A dict containing a copy of the state.

This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.

ping(**kwargs)[source]

Ping the zproc server corresponding to this State’s Context

Parameters:kwargs – Keyword arguments that ping() takes, except server_address.
Returns:Same as ping()
close()[source]

Close this State and disconnect with the Server.