Skip to content

Client Asynchronous API

Client API.

Client

Majordomo Protocol Client API Synchronous version.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

__init__(self, broker, verbose=False) special

Initialize the client.

@param broker: address of the broker to connect to @param verbose: verbose logging, defaults to False.

Source code in loudify/client_async_api.py
def __init__(self, broker, verbose=False):
    """
    Initialize the client.

    @param broker: address of the broker to connect to
    @param verbose: verbose logging, defaults to False.
    """
    self.broker = broker
    self.verbose = verbose
    self.ctx = zmq.Context()
    self.poller = zmq.Poller()
    self.reconnect_to_broker()

reconnect_to_broker(self)

Connect or reconnect to broker.

Source code in loudify/client_async_api.py
def reconnect_to_broker(self):
    """Connect or reconnect to broker."""
    if self.client:
        self.poller.unregister(self.client)
        self.client.close()
    self.client = self.ctx.socket(zmq.DEALER)
    self.client.linger = 0
    self.client.connect(self.broker)
    self.poller.register(self.client, zmq.POLLIN)
    if self.verbose:
        _logger.info("I: connecting to broker at %s...", self.broker)

recv(self)

Return the reply message or None if there was no reply.

Source code in loudify/client_async_api.py
def recv(self):
    """Return the reply message or None if there was no reply."""
    try:
        items = self.poller.poll(self.timeout)
    except KeyboardInterrupt:
        return  # interrupted

    if items:
        # if we got a reply, process it
        msg = self.client.recv_multipart()
        if self.verbose:
            _logger.info("I: received reply:")
            zhelpers.dump(msg)

        if len(msg) < 4:
            _logger.error("E: client msg is to short %s", len(msg))
        empty = msg.pop(0)
        if empty != b"":
            _logger.error("E: client empty msg is not empty %s", empty)
        header = msg.pop(0)
        if definitions.C_CLIENT != header:
            _logger.error("E: Client header is incorrect %s", header)

        # service = msg.pop(0)
        return msg
    else:
        _logger.warning("W: permanent error, abandoning request")
        return -1

send(self, service, request, **flowgraph_vars)

Send request to broker.

@param service: service that is requested @param request: input data for the request @param flowgraph_vars: dict containing all flowgraph values @return:

Source code in loudify/client_async_api.py
def send(self, service, request, **flowgraph_vars):
    """
    Send request to broker.

    @param service: service that is requested
    @param request: input data for the request
    @param flowgraph_vars: dict containing all flowgraph values
    @return:
    """
    if not isinstance(request, list):
        request = [request]

    # Prefix request with protocol frames
    # Frame 0: empty (REQ emulation)
    # Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    # Frame 2: Service name (printable string)
    if flowgraph_vars:
        request = (
            [b"", definitions.C_CLIENT, service]
            + request
            + [str(flowgraph_vars).encode("ascii")]
        )
    else:
        request = [b"", definitions.C_CLIENT, service] + request

    if self.verbose:
        _logger.info("I: send request to '%s' service: ", service)
        zhelpers.dump(request)
    self.client.send_multipart(request)