Source code for zproc.tools

import multiprocessing
import os
from typing import Union, Callable, Optional

import zmq

from zproc import util
from zproc.server import Server
from zproc.constants import Msgs, Commands


[docs]def start_server( server_address: str = None, *, backend: Callable = multiprocessing.Process, secret_key: str = None ): """ Start a new zproc server. :param server_address: The zproc server's address. If it is set to ``None``, then a random address will be generated. Please read :ref:`server-address-spec` for a detailed explanation. :param backend: The backend to use for launching the server process. For example, you may use ``threading.Thread`` as the backend. .. warning:: Not guaranteed to work well with anything other than ``multiprocessing.Process``. :return: ``tuple``, containing a ``multiprocessing.Process`` object for server and the server address. """ zmq_ctx = util.create_zmq_ctx() sock = zmq_ctx.socket(zmq.PULL) pull_address = util.bind_to_random_address(sock) serializer = util.get_serializer(secret_key) server_process = backend( target=lambda *args, **kwargs: Server(*args, **kwargs).main(), args=[server_address, pull_address, secret_key], daemon=True, ) server_process.start() try: server_address = util.recv(sock, serializer) except zmq.ZMQError as e: if e.errno == 98: raise ConnectionError( "Encountered - %s. Perhaps the server is already running?" % repr(e) ) if e.errno == 22: raise ValueError( "Encountered - %s. `server_address` must be a string containing a valid endpoint." % repr(e) ) raise finally: sock.close() util.close_zmq_ctx(zmq_ctx) return server_process, server_address
[docs]def ping( server_address: str, *, timeout: Optional[Union[float, int]] = None, sent_payload: Optional[Union[bytes]] = None, secret_key: str = None ) -> Optional[int]: """ Ping the zproc server. This can be used to easily detect if a server is alive and running, with the aid of a suitable ``timeout``. :param server_address: The zproc server's address. Please read :ref:`server-address-spec` for a detailed explanation. :param timeout: The timeout in seconds. If this is set to ``None``, then it will block forever, until the zproc server replies. For all other values, it will wait for a reply, for that amount of time before returning with a ``TimeoutError``. By default it is set to ``None``. :param sent_payload: payload that will be sent to the server. If it is set to None, then ``os.urandom(56)`` (56 random bytes) will be used. (No real reason for the ``56`` magic number.) :return: The zproc server's **pid** if the ping was successful, else ``None`` If this returns ``None``, then it probably means there is some fault in communication with the server. """ if sent_payload is None: sent_payload = os.urandom(56) serializer = util.get_serializer(secret_key) zmq_ctx = util.create_zmq_ctx() sock = zmq_ctx.socket(zmq.DEALER) sock.connect(server_address) if timeout is not None: sock.setsockopt(zmq.RCVTIMEO, int(timeout * 1000)) sock.send(serializer.dumps({Msgs.cmd: Commands.ping, Msgs.info: sent_payload})) try: response = serializer.loads(sock.recv()) except zmq.error.Again: raise TimeoutError("Timed-out waiting while for the ZProc server to respond.") else: recv_payload, pid = response[Msgs.info] if recv_payload == sent_payload: return pid else: return None finally: sock.close()