Skip to content

Worker API

Worker API.

Worker

Worker API.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

__init__(self, broker, service, verbose=False) special

Initializer the Worker class.

@param broker: @param service: @param verbose:

Source code in loudify/worker_api.py
def __init__(self, broker, service, verbose=False):
    """
    Initializer the Worker class.

    @param broker:
    @param service:
    @param verbose:
    """

    self.broker = broker
    self.service = service
    self.verbose = verbose
    self.ctx = zmq.Context()
    self.poller = zmq.Poller()
    self.reconnect_to_broker()

destroy(self)

Destroy zmq contex.

Source code in loudify/worker_api.py
def destroy(self):
    """Destroy zmq contex."""
    # context.destroy depends on pyzmq >= 2.1.10
    self.ctx.destroy(0)

reconnect_to_broker(self)

Connect or reconnect to broker.

Source code in loudify/worker_api.py
def reconnect_to_broker(self):
    """Connect or reconnect to broker."""
    if self.worker:
        self.poller.unregister(self.worker)
        self.worker.close()
    self.worker = self.ctx.socket(zmq.DEALER)
    self.worker.linger = 0
    self.worker.connect(self.broker)
    self.poller.register(self.worker, zmq.POLLIN)
    if self.verbose:
        _logger.info("I: connecting to broker at %s...", self.broker)

    # Register service with broker
    self.send_to_broker(definitions.W_READY, self.service, [])

    # If liveness hits zero, queue is considered disconnected
    self.liveness = self.HEARTBEAT_LIVENESS
    self.heartbeat_at = time.time() + 1e-3 * self.heartbeat

recv(self, reply=None)

Send reply, if any, to broker and wait for next request.

@param reply: @return:

Source code in loudify/worker_api.py
def recv(self, reply=None):
    """
    Send reply, if any, to broker and wait for next request.

    @param reply:
    @return:
    """

    try:
        # Format and send the reply if we were provided one
        if reply is None or reply is self.expect_reply:
            _logger.warning("E: Reply is wrong %s", reply)

        if reply is not None:
            if self.reply_to is None:
                _logger.error("E: reply address is None, invalid msg")
            reply = [self.reply_to, b""] + reply
            self.send_to_broker(definitions.W_REPLY, msg=reply)
    except KeyboardInterrupt:
        exit(0)

    self.expect_reply = True

    while True:
        # Poll socket for a reply, with timeout
        try:
            items = self.poller.poll(self.timeout)
        except KeyboardInterrupt:
            exit(0)
            _logger.warning("W: interrupt received, killing worker...")
            break  # Interrupted
        if items:
            msg = self.worker.recv_multipart()
            if self.verbose:
                _logger.info("I: received message from broker: ")
                zhelpers.dump(msg)

            self.liveness = self.HEARTBEAT_LIVENESS
            # Don't try to handle errors, just assert noisily
            if len(msg) < 3:
                _logger.warning("E: msg length is invalid")

            empty = msg.pop(0)
            if empty != b"":
                logging.error("E: invalid empty space in message")

            header = msg.pop(0)
            if header != definitions.W_WORKER:
                _logger.warning("E: header does not eqaul worker definition")

            command = msg.pop(0)
            if command == definitions.W_REQUEST:
                # We should pop and save as many addresses as there are
                # up to a null part, but for now, just save one...
                self.reply_to = msg.pop(0)
                # pop empty
                empty = msg.pop(0)
                if empty != b"":
                    _logger.warning("E: empty space in msg is not empty, invalid msg")
                return msg  # We have a request to process
            elif command == definitions.W_HEARTBEAT:
                # Do nothing for heartbeats
                pass
            elif command == definitions.W_DISCONNECT:
                self.reconnect_to_broker()
            else:
                _logger.error("E: invalid input message: ")
                zhelpers.dump(msg)

        else:
            self.liveness -= 1
            if self.liveness == 0:
                if self.verbose:
                    _logger.warning("W: disconnected from broker - retrying...")
                try:
                    time.sleep(1e-3 * self.reconnect)
                except KeyboardInterrupt:
                    break
                self.reconnect_to_broker()

        # Send HEARTBEAT if it's time
        if time.time() > self.heartbeat_at:
            self.send_to_broker(definitions.W_HEARTBEAT)
            self.heartbeat_at = time.time() + 1e-3 * self.heartbeat


    return None

send_to_broker(self, command, option=None, msg=None)

Send message to broker.

If no msg is provided, creates one internally

@param command: @param option: @param msg: @return:

Source code in loudify/worker_api.py
def send_to_broker(self, command, option=None, msg=None):
    """
    Send message to broker.

    If no msg is provided, creates one internally

    @param command:
    @param option:
    @param msg:
    @return:
    """

    if msg is None:
        msg = []
    elif not isinstance(msg, list):
        msg = [msg]

    if option:
        msg = [option] + msg

    msg = [b"", definitions.W_WORKER, command] + msg
    if self.verbose:
        _logger.info("I: sending %s to broker", command)
        zhelpers.dump(msg)
    self.worker.send_multipart(msg)