Source code for Cauldron.zmq.dispatcher

# -*- coding: utf-8 -*-
"""
Dispatcher implementation for ZMQ

"""

from .common import zmq_get_address, check_zmq, teardown, zmq_connect_socket
from .microservice import ZMQMicroservice, ZMQCauldronMessage, FRAMEFAIL, FRAMEBLANK
from .broker import ZMQBroker
from ..base import DispatcherService, DispatcherKeyword
from .. import registry
from ..exc import DispatcherError, WrongDispatcher, TimeoutError

import threading
import logging
import weakref
import six
import time
import sys, traceback

__all__ = ["Service", "Keyword"]


registry.dispatcher.teardown_for('zmq')(teardown)

class _ZMQResponder(ZMQMicroservice):
    """A python thread for ZMQ responses."""
    
    _socket = None
    
    def __init__(self, service):
        self.service = weakref.proxy(service)
        address = zmq_get_address(self.service._config, "broker", bind=False)
        super(_ZMQResponder, self).__init__(address=address, use_broker=True,
            context=self.service.ctx, name="DFW.Service.{0:s}.Responder".format(self.service.name))
        
    def check_broker(self, socket, signal):
        """Check for broker liveliness."""
        zmq = check_zmq()
        
        welcome = ZMQCauldronMessage(command="welcome", service=self.service.name, dispatcher=self.service.dispatcher, direction="DBQ")
        socket.send_multipart([b""]+welcome.data)
        self.log.log(5, "Sent broker a welcome message: {0!s}.".format(welcome))
        
        poller = zmq.Poller()
        poller.register(socket, zmq.POLLIN)
        poller.register(signal, zmq.POLLIN)
        
        ready = dict(poller.poll(timeout=self.timeout * 1e3))
        if ready.get(socket) == zmq.POLLIN:
            message = ZMQCauldronMessage.parse(socket.recv_multipart())
            if message.payload != "confirmed":
                raise DispatcherError("Message confirming welcome was malformed! {0!s}".format(message))
            return True
        else:
            self.log.log(5, "No broker response was available.  ")
            return False
        
        
    def greet_broker(self, socket, signal):
        """Send the appropriate greeting to the broker."""
        checks = 1
        attempts = 0
        while checks:
            attempts += 1
            checks -= 1
            if self.check_broker(socket, signal):
                break
            self._b = ZMQBroker.daemon(config = self.service._config)
        else:
            raise TimeoutError("Can't connect to broker after {0:d} attempts.".format(attempts))
        
        ready = ZMQCauldronMessage(command="ready", service=self.service.name, dispatcher=self.service.dispatcher, direction="DBQ")
        socket.send_multipart([b""]+ready.data)
        self.log.log(5, "Sent broker a ready message: {0!s}.".format(ready))
        
    
    def handle_modify(self, message):
        """Handle a modify command."""
        message.verify(self.service)
        keyword = self.service[message.keyword]
        keyword.modify(message.payload)
        return keyword.value
    
    def handle_update(self, message):
        """Handle an update command."""
        message.verify(self.service)
        keyword = self.service[message.keyword]
        return keyword.update()
        
    def handle_identify(self, message):
        """Handle an identify command."""
        message.verify(self.service)
        if message.payload not in self.service:
            self.log.log(5, "Not identifying b/c not in service.")
            return FRAMEBLANK
        # This seems harsh, not using "CONTAINS", etc,
        # but it handles dispatchers correctly.
        try:
            kwd = self.service[message.payload]
        except WrongDispatcher:
            self.log.log(5, "Not identifying b/c wrong dispatcher.")
            return FRAMEBLANK
        else:
            ktl_type = kwd.KTL_TYPE
            if ktl_type is None:
                ktl_type = "basic"
            return ktl_type
        
    def handle_enumerate(self, message):
        """Handle enumerate command."""
        message.verify(self.service)
        return ":".join(self.service.keywords())
        
    def handle_broadcast(self, message):
        """Handle the broadcast command."""
        message.verify(self.service)
        message = ZMQCauldronMessage(command="broadcast", service=self.service.name, dispatcher=self.service.dispatcher, keyword=message.keyword, payload=message.payload, direction="CDB")
        socket = self._get_broadcaster()
        self.log.log(5, "{0!r}.broadcast({1!s})".format(self, message))
        socket.send_multipart(message.data)
        return "success"
        
    def handle_heartbeat(self, message):
        """Heartbeat command does pretty much nothing."""
        self.log.log(5, "{0!r}.beat({1!s})".format(self, message))
        return "{0:.1f}".format(time.time())
        
        
    def connect(self):
        """Connect, and add a broadcaster."""
        self._get_broadcaster(wait=False)
        return super(_ZMQResponder, self).connect()
    
    def _get_broadcaster(self, wait=True):
        """Connect the broadcast socket."""
        zmq = check_zmq()
        if self._socket is not None:
            return self._socket
        self._socket = self.ctx.socket(zmq.PUB)
        try:
            address = zmq_get_address(self.service._config, "publish", bind=False)
            self._socket.connect(address)
        except zmq.ZMQError as e:
            self.log.error("Service can't connect to broadcaster address '{0}' because {1}".format(address, e))
            self._error = e
            raise
        else:
            self.log.log(5, "Broadcaster connected to {0}".format(address))
            if wait:
                time.sleep(0.2)
            return self._socket
    


@registry.dispatcher.service_for("zmq")
[docs]class Service(DispatcherService): # A ZMQ-based service. def __init__(self, name, config, setup=None, dispatcher=None): zmq = check_zmq() self.ctx = zmq.Context.instance() self._sockets = threading.local() super(Service, self).__init__(name, config, setup, dispatcher) @property def socket(self): """A thread-local ZMQ socket for sending commands to the responder thread.""" # Short out if we already have a socket. if hasattr(self._sockets, 'socket'): return self._sockets.socket zmq = check_zmq() socket = self.ctx.socket(zmq.REQ) zmq_connect_socket(socket, self._config, "broker", log=self.log, label='dispatcher') self._sockets.socket = socket return socket def _prepare(self): """Begin this service.""" self._thread = _ZMQResponder(self) self._message_queue = [] def _begin(self): """Allow command responses to start.""" if not self._thread.is_alive(): self._thread.start() self.log.debug("Started ZMQ Responder Thread.") self._thread.check_alive(timeout=10) while len(self._message_queue): self.socket.send_multipart(self._message_queue.pop().data) response = ZMQCauldronMessage.parse(self.socket.recv_multipart()) response.verify(self)
[docs] def shutdown(self): zmq = check_zmq() if hasattr(self, '_thread') and self._thread.is_alive(): self._thread.stop()
def _synchronous_command(self, command, payload, keyword=None, timeout=None): """Execute a synchronous command.""" message = ZMQCauldronMessage(command, service=self.name, dispatcher=self.dispatcher, keyword=keyword.name if keyword else FRAMEBLANK, payload=payload, direction="CDQ") if not self._thread.running.is_set(): self.log.log(5, "{0!r}.queue({1!s})".format(self, message)) return self._message_queue.append(message) elif threading.current_thread() == self._thread: self.log.log(5, "{0!r}.handle({1!s})".format(self, message)) response = self._thread.handle(message) else: self.log.log(5, "{0!r}.send({0!s})".format(message)) self.socket.send_multipart(message.data) if timeout: if not self.socket.poll(timeout * 1e3): raise TimeoutError("Dispatcher timed out.") response = ZMQCauldronMessage.parse(self.socket.recv_multipart()) response.verify(self) return response
@registry.dispatcher.keyword_for("zmq")
[docs]class Keyword(DispatcherKeyword): # A keyword object for ZMQ Cauldron backends. def _broadcast(self, value): """Broadcast this keyword value.""" self.service._synchronous_command("broadcast", value, self)