Introduction to ZProc

The whole architecture of zproc is built around a State object.

Context is provided as a convenient wrapper over Process and State.

It’s the most obvious way to launch processes with zproc.

Each Context object is associated with a state; accessible by its processes.

Here’s how you create a Context.

import zproc

ctx = zproc.Context()

Launching a Process

Lets launch a process that does nothing.

def my_process(state):
    pass

ctx.process(my_process)

The process() will launch a process, and provide it with state.

If you like to be cool, then you can use it as a decorator. (process() works both as a function, and decorator)

@ctx.process
def my_process(state):
    pass

The state is a dict-like object.

dict-like, because it’s not exactly a dict. It provides a dict interface, but is actually just passing messages.

You cannot mutate the underlying dict directly. It’s protected by a Process whose sole job is to manage it.

You can also access it from the Context itself using ctx.state.

state['apples'] = 5

state.get('apples')

state.setdefault('apples', 10)

...

Providing arguments to a Process

To provide some initial values to a Process, you can use use *args and **kwargs.

def my_process(state, num, exp):
    print(num, exp)  # 2, 4

ctx.process(my_process, args=[2], kwargs={'exp': 4})

Waiting for a Process

Once you’ve launched a Process, you can wait for it to complete, and get it’s return value like this:

from time import sleep


@ctx.process
def my_process(state):
    sleep(5)
    return 'Hello There!'


print(my_process.wait())   # Hello There!

Process Factory

Process Map

Python’s inbuilt multiprocessing.Pool let’s you use the in-built map() function in a parallel way.

However, it gets quite finicky to use for anything serious.

That’s why ZProc provides a more powerful construct, process_map() for mapping iterables to processes.

Works similar to map()
def square(num):
    return num * num

# [1, 4, 9, 16]
list(ctx.process_map(square, [1, 2, 3, 4]))
Common Arguments.
def power(num, exp):
    return num ** exp

# [0, 1, 8, 27, 64, ... 941192, 970299]
list(
     ctx.process_map(
        power,
        range(100),
        args=[3],
        count=10  # distribute among 10 workers.
     )
)
Mapped Positional Arguments.
def power(num, exp):
    return num ** exp

# [4, 9, 36, 256]
list(
    ctx.process_map(
        power,
        map_args=[(2, 2), (3, 2), (6, 2), (2, 8)]
    )
)
Mapped Keyword Arguments.
def my_thingy(seed, num, exp):
    return seed + num ** exp

# [1007, 3132, 298023223876953132, 736, 132, 65543, 8]
list(
    ctx.process_map(
        my_thingy,
        args=[7],
        map_kwargs=[
            {'num': 10, 'exp': 3},
            {'num': 5, 'exp': 5},
            {'num': 5, 'exp': 2},
            {'num': 9, 'exp': 3},
            {'num': 5, 'exp': 3},
            {'num': 4, 'exp': 8},
            {'num': 1, 'exp': 4},
        ],
        count=5
    )
)

What’s really cool about the process map is that it returns a generator.

The moment you call it, it will distribute the task to “count” number of workers.

It will then, return with a generator, which in-turn will do the job of pulling in the results from these workers, and arranging them in order.

>>> import zproc
>>> import time
>>> ctx = zproc.Context()
>>> def my_blocking_thingy(x):
...     time.sleep(5)
...
...     return x * x
...
>>> res = ctx.process_map(my_blocking_thingy, range(10))  # returns immediately
>>> res
<generator object Context._pull_results_for_task at 0x7fef735e6570>
>>> next(res)  # might block
0
>>> next(res)  # might block
1
>>> next(res)  # might block
4
>>> next(res)  # might block
9
>>> next(res)  # might block
16
...

It is noteworthy, that computation continues in the background while the main process is running.

As a result, the amount of time it takes for next(res) to return changes over time.

Reactive programming with zproc

This is the part where you really start to see the benefits of a smart state. The state knows when it’s being mutated, and does the job of notifying everyone.

I like to call it The magic of state watching.

State watching allows you to react to some change in the state in an efficient way.

Lets say, you want to wait for the number of “cookies” to be “5”.

Normally, you might do it with something like this:

while True:
    if cookies == 5:
        print('done!')
        break

But then you find out that this eats too much CPU, and put put some sleep.

from time import sleep

while True:
    if cookies == 5:
        print('done!')
        break
    sleep(1)

And from there on, you try to manage the time for which your application sleeps ( to arrive at a sweet spot).

zproc provides an elegant, easy to use solution to this problem.

def my_process(state):
    state.get_when_equal('cookies', 5)
    print('done with zproc!')

This eats very little to no CPU, and is fast enough for almost everyone needs.

You must realise that this doesn’t do any of that expensive “busy” waiting. Under the covers, it’s actually just a socket waiting for a request.

If you want, you can even provide a function:

def my_process(state):
    state.get_when(lambda state: state.get('cookies') == 5)

The function you provide will get called on each state update, to check whether the return value is truthy.

Caution

You can’t do things like this:

from time import time

t = time()
state.get_when(lambda state: time() > t + 5)  # wrong!

The State responds to state changes. Changing time doesn’t signify a state update.

Mutating objects inside state

You must remember that you can’t mutate (update) objects deep inside the state.

state['numbers'] = [1, 2, 3]  # works

state['numbers'].append(4)  # doesn't work

While this might look like a flaw of zproc (and it somewhat is), you can see this as a feature. It will avoid you from

  1. over-complicating your state. (Keeping the state as flat as possible is generally a good idea).
  2. avoiding race conditions. (Think about the atomicity of state['numbers'].append(4)).

The correct way to mutate objects inside the state, is to do them atomically, which is to say using the atomic() decorator.

@zproc.atomic
def add_a_number(state, to_add)
    state['numbers'].append(to_add)


def my_process(state):
    add_a_number(state, 4)

Read more about Atomicity and race conditions.

Something to keep in mind

Absolutely none of the the classes in ZProc are Process/Thread safe. You must never attempt to share a Context/State between multiple processes.

Create a new one for each Process/Thread. Communicate and synchronize using the State at all times.

This is also, in-general very good practice.

Never attempt to directly share python objects between Processes, and the multitasking gods will reward you :).