Skip to content

Client Synchronous API

Synchronus Client API.

Client

Majordomo Protocol Client API Synchronous version.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

__init__(self, broker, verbose=False) special

Initialize the client.

@param broker: address of the broker to connect to @param verbose: verbose logging, defaults to False.

Source code in loudify/client_sync_api.py
def __init__(self, broker, verbose=False):
    """
    Initialize the client.

    @param broker: address of the broker to connect to
    @param verbose: verbose logging, defaults to False.
    """
    self.broker = broker
    self.verbose = verbose
    self.ctx = zmq.Context()
    self.poller = zmq.Poller()
    self.reconnect_to_broker()

reconnect_to_broker(self)

Connect or reconnect to broker.

Source code in loudify/client_sync_api.py
def reconnect_to_broker(self):
    """Connect or reconnect to broker."""
    if self.client:
        self.poller.unregister(self.client)
        self.client.close()
    self.client = self.ctx.socket(zmq.REQ)
    self.client.linger = 0
    self.client.connect(self.broker)
    self.poller.register(self.client, zmq.POLLIN)
    if self.verbose:
        _logger.info("I: connecting to broker at %s...", self.broker)

send(self, service, request, flowgraph_vars=None)

Send request to broker and get reply by hook or crook.

Takes ownership of request message and destroys it when sent. Returns the reply message or None if there was no reply.

@param service: service that is requested @param request: input data for the request @param flowgraph_vars: dict containing all flowgraph values @return:

Source code in loudify/client_sync_api.py
def send(self, service, request, flowgraph_vars=None):
    """
    Send request to broker and get reply by hook or crook.

    Takes ownership of request message and destroys it when sent.
    Returns the reply message or None if there was no reply.

    @param service: service that is requested
    @param request: input data for the request
    @param flowgraph_vars: dict containing all flowgraph values
    @return:
    """

    if not isinstance(request, list):
        request = [request]

    if flowgraph_vars is not None:
        request = (
            [definitions.C_CLIENT, service] + request + [str(flowgraph_vars).encode("ascii")]
        )
    else:
        request = [definitions.C_CLIENT, service] + request

    if self.verbose:
        _logger.info("I: send request to '%s' service: ", service)
        zhelpers.dump(request)
    reply = None

    retries = self.retries
    while retries > 0:
        self.client.send_multipart(request)
        try:
            items = self.poller.poll(self.timeout)
        except KeyboardInterrupt:
            break  # interrupted

        if items:
            msg = self.client.recv_multipart()
            if self.verbose:
                _logger.info("I: received reply:")
                zhelpers.dump(msg)

            if len(msg) < 3:
                _logger.warning("E: client msg is to short %s", len(msg))

            header = msg.pop(0)
            if header != definitions.C_CLIENT:
                _logger.error("E: Client header is incorrect %s", header)

            reply_service = msg.pop(0)

            if service != reply_service:
                _logger.warning(
                    "E: worker  reply service not the same as internal service of worker %s",
                    service,
                )

            reply = msg
            break
        else:
            if retries:
                logging.warning("W: no reply, reconnecting...")
                self.reconnect_to_broker()
            else:
                logging.warning("W: permanent error, abandoning")
                break
            retries -= 1

    return reply