# -*- coding: utf-8 -*-
"""
A single message broker service to distribute messages between clients and dispatchers.
"""
from __future__ import absolute_import
import threading
import time
import logging
import collections
import binascii
import weakref
from ..config import read_configuration
from .microservice import ZMQCauldronMessage, ZMQCauldronErrorResponse, FRAMEBLANK, FRAMEFAIL, DIRECTIONS
from .common import zmq_get_address, check_zmq, teardown, zmq_connect_socket
from ..exc import DispatcherError
__all__ = ['ZMQBroker', 'NoResponseNecessary', 'NoDispatcherAvailable', 'MultipleDispatchersFound']
def logger_getChild(logger, child):
"""Get a child logger of a parent."""
name = "{0}.{1}".format(logger.name, child)
return logging.getLogger(name)
[docs]class NoResponseNecessary(Exception):
"""NoResponseNecessary"""
pass
[docs]class NoDispatcherAvailable(DispatcherError):
"""Raised when no dispatcher is available."""
pass
[docs]class MultipleDispatchersFound(DispatcherError):
"""Raised when too many dispatchers are identified for a keyword."""
pass
class FanMessage(object):
"""An identification message request"""
def __init__(self, service, client, message, timeout=10.0):
super(FanMessage, self).__init__()
self.message = message # The original message.
self.service = service # The parent service.
self.client = client # The original identifier.
self.id = message.identifier # A human-readable binary identifier.
self.timeout = time.time() + timeout
# Store the results.
self.pending = set()
self.responses = dict()
self.valid = False # Did we message at least one dispatcher?
def __repr__(self):
"""A message representation."""
return "<FanMessage {0:s} pending={1:d} lifetime={2:.0f} responses={3:d}>".format(
binascii.hexlify(self.id).decode('utf-8')[:6], len(self.pending), self.timeout - time.time(), len(self.responses)
)
@property
def done(self):
"""Is this fan complete?"""
return (not len(self.pending)) or (time.time() > self.timeout)
def generate_message(self, dispatcher):
"""Generate a message for a specific dispatcher."""
message = self.message.copy()
message.direction = "SDQ"
self.pending.add(dispatcher.id)
self.valid = True
message.prefix = [self.client.id, b""]
return message
def add(self, dispatcher, message):
"""Add item to the fan message."""
self.pending.remove(dispatcher.id)
if message.payload not in (FRAMEBLANK.decode('utf-8'), FRAMEFAIL.decode('utf-8')) and not DIRECTIONS.iserror(message.direction):
self.responses[dispatcher.name] = message.payload
self.client.log.log(5, "{0!r}.add({1!r}) success".format(self, message))
else:
self.client.log.log(5, "{0!r}.add({1!r}) ignore".format(self, message))
def resolve(self):
"""Fan message responses."""
if not self.valid:
response = self.message.error_response("No dispatchers for '{0}'".format(self.message.service))
self.client.log.log(5, "{0!r}.resolve() no dispatcher".format(self))
return response
elif len(self.responses) == 1:
dispatcher, payload = next(iter(self.responses.items()))
response = self.message.response(payload)
response.dispatcher = dispatcher
self.client.log.log(5, "{0!r}.resolve() single response {1}".format(self, payload))
return response
elif len(self.responses):
self.client.log.log(5, "{0!r}.resolve() multiple response {1!r}".format(self, self.responses))
return self.message.response(":".join(self.responses.values()))
else:
self.client.log.log(5, "{0!r}.resolve() failure".format(self))
return self.message.response(FRAMEFAIL)
def send(self, socket):
"""Send the final response"""
response = self.resolve()
self.service.scrape(response)
self.client.send(response, socket)
class MessageReciept(object):
"""A simple message receipt object."""
__slots__ = ('message', 'sent')
def __init__(self, message):
super(MessageReciept, self).__init__()
self.message = message
self.sent = time.time()
class Lifetime(object):
"""An object with a lifetime"""
def __init__(self, service):
super(Lifetime, self).__init__()
self.service = weakref.proxy(service)
self._message_pool = dict()
self.beat()
def beat(self):
"""Mark a heartbeat"""
self._expiration = time.time() + self.service.broker.timeout
@property
def alive(self):
"""Is this object alive?"""
return time.time() < (self._expiration + 4 * self.service.broker.timeout)
@property
def shouldbeat(self):
"""Should this thing ask for a heartbeat."""
return time.time() > self._expiration
@property
def lifetime(self):
"""Return the lifetime of this object."""
return (self._expiration + 4 * self.service.broker.timeout) - time.time()
@property
def active(self):
"""Is this object active?"""
return len(self._message_pool)
def activate(self, message):
"""Base-class method to record a message as sent."""
self._message_pool[message.identifier] = MessageReciept(message)
def deactivate(self, message):
"""Record a message reciept. Generally means the source was alive!"""
value = self._message_pool.pop(message.identifier, None)
self.beat()
return value
def expire(self):
"""Expire, by returning a list of messages to deactivate."""
while len(self._message_pool):
key, value = self._message_pool.popitem()
yield value
def clear(self):
"""Clear the message pool"""
self._message_pool.clear()
def __repr__(self):
return "<{0} name='{1}' lifetime={2:.0f} open={3:d}>".format(self.__class__.__name__, self.name, self.lifetime, self.active)
class Client(Lifetime):
"""A simple representation of a client connection."""
def __init__(self, client_id, service):
super(Client, self).__init__(service)
self.id = client_id
self.name = binascii.hexlify(self.id).decode('utf-8')
self.log = logger_getChild(service.log, "Client.{0}".format(binascii.hexlify(self.id)))
def send(self, message, socket):
"""Send a message to this client."""
message.prefix = [self.id, b""]
self.log.log(5, "{0!r}.send({1!r})".format(self, message))
socket.send_multipart(message.data)
self.deactivate(message)
class Dispatcher(Lifetime):
"""A simple representation of a dispatcher."""
def __init__(self, name, dispatcher_id, service):
super(Dispatcher, self).__init__(service)
self.id = dispatcher_id
self.name = name
self.service = weakref.proxy(service)
self.log = logger_getChild(self.service.log,self.name)
self.message = None
self.keywords = dict()
def send(self, message, socket):
"""Send a message to this dispatcher."""
message.prefix = [self.id, b""] + message.prefix
self.log.log(5, "{0!r}.send({1!r})".format(self, message))
socket.send_multipart(message.data)
self.activate(message)
handlers = {}
def handler(code):
"""Mark a new handler."""
def _(func):
"""Decorator with bound arguments."""
handlers[code] = func.__name__
return func
return _
class Service(object):
"""A simple representation of a service"""
def __init__(self, name, broker):
super(Service, self).__init__()
self.name = name.upper()
self.keywords = {}
self.dispatchers = {}
self.clients = {}
self.broker = weakref.proxy(broker)
self.log = logger_getChild(self.broker.log, "Service.{0}".format(self.name))
self._fans = {}
def __repr__(self):
"""Represent this object."""
return "<{0} name={1} dispatchers={2:d} clients={3:d}>".format(self.__class__.__name__, self.name, len(self.dispatchers), len(self.clients))
def get_dispatcher(self, message, recv=True):
"""Get a dispatcher object from a message."""
try:
dispatcher_object = self.dispatchers[message.dispatcher]
if dispatcher_object.id != message.dispatcher_id:
dispatcher_object.clear()
dispatcher_object.id = message.dispatcher_id
except KeyError:
try:
dispatcher_object = self.dispatchers[message.dispatcher] = Dispatcher(message.dispatcher, message.dispatcher_id, self)
except ValueError:
raise DispatcherError("No dispatcher available for {0}".format(message.dispatcher))
if recv:
self.log.log(5, "{0!r}.recv({1})".format(dispatcher_object, message))
dispatcher_object.deactivate(message)
return dispatcher_object
def get_client(self, message, recv=True):
"""Retrieve a client object by client ID."""
try:
client_object = self.clients[message.client_id]
except KeyError:
client_object = self.clients[message.client_id] = Client(message.client_id, self)
if recv:
self.log.log(5, "{0!r}.recv({1})".format(client_object, message))
return client_object
def scrape(self, message):
"""Scrape a message for dispatcher information."""
# Cache dispatcher identities for easy lookup later.
if message.dispatcher != FRAMEBLANK.decode('utf-8'):
if message.isvalid:
# If this message can identify the disptacher, save it for later.
self.keywords[message.keyword.upper()] = message.dispatcher
if message.command == "identify" and message.isvalid:
self.dispatchers[message.dispatcher].keywords[message.keyword.upper()] = message.payload
def paste(self, message):
"""Opposite of scrape, paste the dispatcher back into the message."""
if message.dispatcher == FRAMEBLANK.decode('utf-8'):
# Locate the correct dispatcher.
if message.keyword != FRAMEBLANK.decode('utf-8'):
try:
dispatcher_name = self.keywords[message.keyword.upper()]
dispatcher = self.dispatchers[dispatcher_name]
except KeyError:
raise NoDispatcherAvailable("No dispatcher is available for '{0}'".format(message.service))
message.dispatcher = dispatcher.name
else:
raise DispatcherError("Ambiguous dispatcher specification in message {0!s}".format(message))
try:
return self.dispatchers[message.dispatcher]
except KeyError:
raise NoDispatcherAvailable("Dispatcher '{0}' is not available for '{1}'".format(message.dispatcher, message.service))
def finish_fan_messages(self, socket):
"""Finish a fan message"""
for fmessage in list(self._fans.values()):
if fmessage.done:
del self._fans[fmessage.id]
fmessage.send(socket)
def expire(self, socket):
"""Expire messages."""
for name in list(self.dispatchers.keys()):
dispatcher = self.dispatchers[name]
if dispatcher.alive:
continue
self.log.debug("{0!r} expiring".format(dispatcher))
for reciept in dispatcher.expire():
response = reciept.message.error_response("Dispatcher Timed Out")
self.handle(response, socket)
del self.dispatchers[name]
def beat(self, socket):
"""Start heartbeat messages where necssary."""
for name in self.dispatchers.keys():
dispatcher = self.dispatchers[name]
if not dispatcher.shouldbeat:
continue
if dispatcher.message is not None:
msg = dispatcher.message.response("beat")
msg.command = "heartbeat"
dispatcher.send(msg, socket)
def handle(self, message, socket):
"""Handle"""
try:
method = getattr(self, handlers[message.direction])
method(message, socket)
except KeyError as e:
raise
except Exception as e:
self.log.exception("Handling exception {0}".format(e))
socket.send_multipart(message.error_response(repr(e)))
else:
self.finish_fan_messages(socket)
@handler("DBE")
def handle_dispatcher_broker_error(self, message, socket):
"""This is an unusual case which can happen during cleanup."""
self.log.log(5, "Discarding {0!r}".format(message))
@handler("DBQ")
def handle_dispatcher_broker_query(self, message, socket):
"""Handle a query from a dispatcher to a broker."""
dispatcher = self.get_dispatcher(message)
if message.command == "welcome":
message.prefix = []
dispatcher.send(message.response("confirmed"), socket)
elif message.command == "ready":
dispatcher.message = message
self.log.info("{0!r} is ready.".format(dispatcher))
elif message.command == "heartbeat":
pass
else:
dispatcher.send(message.error_response("unknown command"), socket)
@handler("SDE")
@handler("SDP")
def handle_service_dispathcer_reply(self, message, socket):
"""Handle a dispatcher fan-out response.
| Dispatcher -> Broker | --> Client
"""
dispatcher = self.get_dispatcher(message)
client = self.get_client(message, recv=False)
try:
self._fans[message.identifier].add(dispatcher, message)
except KeyError:
# Nothing to do, the message was probably disposed much earlier.
pass
@handler("CSQ")
def handle_client_service_query(self, message, socket):
"""Handle the start of a fan message.
| Dispatcher <- Broker <- Client |
"""
client = self.get_client(message)
client.activate(message)
try:
if message.command == "identify":
dispatcher_name = self.keywords[message.keyword.upper()]
dispatcher = self.dispatchers[dispatcher_name]
ktl_type = dispatcher.keywords[message.keyword.upper()]
else:
raise KeyError
except KeyError:
fmessage = FanMessage(self, client, message)
self._fans[fmessage.id] = fmessage
for dispatcher in self.dispatchers.values():
dispatcher.send(fmessage.generate_message(dispatcher), socket)
self.log.log(5, "{0!r}.fan()".format(fmessage))
else:
response = message.response(ktl_type)
response.dispatcher = dispatcher_name
client.send(response, socket)
@handler("CDE")
@handler("CDP")
def handle_client_dispathcer_reply(self, message, socket):
"""Handle dispatcher reply
| Dispatcher -> Broker -> Client |
"""
client = self.get_client(message, recv=False)
dispatcher = self.get_dispatcher(message)
self.scrape(message)
client.send(message, socket)
@handler("CDQ")
def handle_client_dispatcher_query(self, message, socket):
"""Handle a dispathcer request.
| Dispatcher <- Broker <- Client |
"""
client = self.get_client(message)
client.activate(message)
try:
dispatcher = self.paste(message)
dispatcher.send(message, socket)
except DispatcherError as e:
client.send(message.error_response(e), socket)
@handler("CBQ")
def handle_client_broker_query(self, message, socket):
"""Handle the client asking the broker for something."""
client = self.get_client(message)
client.activate(message)
if message.command == "lookup":
# The client has asked for the subscription address, we should send that to them.
response = message.response(self.broker._mon_address)
elif message.command == "locate":
# The client has asked us if a service is locatable.
response = message.response("yes" if len(self.dispatchers) else "no")
else:
response = message.error_response("unknown command")
client.send(response, socket)
[docs]class ZMQBroker(threading.Thread):
"""A broker object for handling ZMQ Messaging patterns"""
def __init__(self, name, address, pub_address, sub_address, mon_address, context=None, timeout=1.0):
super(ZMQBroker, self).__init__(name=name)
import zmq
self.context = context or zmq.Context.instance()
self.running = threading.Event()
self.log = logging.getLogger("DFW.Broker." + name )
self._address = address
self._pub_address = pub_address
self._sub_address = sub_address
self._mon_address = mon_address
self.timeout = float(timeout)
self._local = threading.local()
self._error = None
self.services = dict()
@classmethod
[docs] def from_config(cls, config, name="ConfiguredBroker"):
"""Make a new item from a configuration."""
config = read_configuration(config)
address = zmq_get_address(config, "broker", bind=True)
sub_address = zmq_get_address(config, "publish", bind=True)
pub_address = zmq_get_address(config, "subscribe", bind=True)
mon_address = zmq_get_address(config, "subscribe", bind=False)
timeout = config.getfloat("zmq", "timeout")
return cls(name, address, pub_address, sub_address, mon_address, timeout=timeout)
@classmethod
[docs] def serve(cls, config, name="ServerBroker"):
"""Make a broker which serves."""
obj = cls.from_config(config, name)
obj.run()
return obj
@classmethod
[docs] def daemon(cls, config=None, daemon=True):
"""Serve in a process."""
import multiprocessing as mp
proc = mp.Process(target=cls.serve, args=(config,), name="ZMQBroker")
proc.daemon = daemon
proc.start()
return proc
@classmethod
[docs] def thread(cls, config=None, daemon=True):
"""Serve in a thread."""
obj = cls.from_config(config)
obj.daemon = daemon
obj.start()
return obj
@classmethod
[docs] def setup(cls, config=None, timeout=2.0):
"""Ensure a broker is set up to start."""
if not cls.check(timeout=timeout):
b = cls.thread(config=config, daemon=True)
b.running.wait(timeout=min([timeout, 2.0]))
if not b.running.is_set():
msg = "Couldn't start ZMQ broker."
if b._error is not None:
msg += " Error: " + repr(b._error)
raise RuntimeError(msg)
return b
else:
return None
@classmethod
[docs] def check(cls, config=None, timeout=2.0, ctx=None, address=None):
"""Check for the existence of a router at the configured address."""
import zmq
ctx = ctx or zmq.Context.instance()
log = logging.getLogger(__name__ + "Broker.check")
socket = ctx.socket(zmq.REQ)
zmq_connect_socket(socket, read_configuration(config), "broker", log=log, label="broker-check", address=address)
message = ZMQCauldronMessage(command="check", direction="UBQ")
socket.send_multipart(message.data)
if socket.poll(timeout * 1e3):
response = ZMQCauldronMessage.parse(socket.recv_multipart())
print(response)
if response.payload == "Broker Alive":
return True
return False
[docs] def connect(self, address, mode="ROUTER"):
"""Connect the frontend and backend sockets."""
import zmq
socket = self.context.socket(getattr(zmq, mode))
try:
socket.bind(address)
except zmq.ZMQError as e:
self.log.error("Can't bind to address '{0}' because {1}".format(address, e))
self._error = e
raise
else:
self.log.debug("Broker bound {0} to address '{1}'".format(mode, address))
return socket
[docs] def get_service(self, service):
"""Try to get a service."""
try:
service_object = self.services[service.upper()]
except KeyError:
service_object = self.services[service.upper()] = Service(service, self)
self.log.debug("Registered new service '{0}'".format(service))
return service_object
[docs] def respond_inquiry(self, message, socket):
"""Respond to an inquiry."""
try:
socket.send_multipart(message.response("Broker Alive").data)
except Exception as e:
socket.send_multipart(message.error_response(repr(e)).data)
[docs] def cleanup(self, socket):
"""docstring for cleanup"""
for service in self.services.values():
service.finish_fan_messages(socket)
service.expire(socket)
service.beat(socket)
[docs] def prepare(self):
"""Thread local way to prepare connections."""
import zmq
socket = self._local.socket = self.connect(self._address)
xpub = self._local.xpub = self.connect(self._pub_address, 'XPUB')
xsub = self._local.xsub = self.connect(self._sub_address, 'XSUB')
signal = self._local.signal = self.connect("inproc://{0}".format(hex(id(self))), "PULL")
self._local.poller = zmq.Poller()
self._local.poller.register(socket, zmq.POLLIN)
self._local.poller.register(xsub, zmq.POLLIN)
self._local.poller.register(xpub, zmq.POLLIN)
self._local.poller.register(signal, zmq.POLLIN)
[docs] def close(self):
"""Close thread-local sockets."""
import zmq
if not hasattr(self._local, 'socket'):
return
self._local.socket.close(linger=0)
self._local.xpub.close(linger=0)
self._local.xsub.close(linger=0)
signal = self._local.signal
if signal.closed:
pass
elif signal.poll(timeout=1):
signal.recv(flags=zmq.DONTWAIT)
signal.close(linger=0)
del self._local.socket, self._local.xpub, self._local.xsub, self._local.signal
[docs] def respond(self):
"""Respond to a single message on each socket."""
import zmq
poller = self._local.poller
socket = self._local.socket
xpub = self._local.xpub
xsub = self._local.xsub
signal = self._local.signal
try:
sockets = dict(poller.poll(timeout=self.timeout))
if sockets.get(signal) == zmq.POLLIN:
self.running.clear()
return
if sockets.get(socket) == zmq.POLLIN:
request = socket.recv_multipart()
if len(request) > 3:
message = ZMQCauldronMessage.parse(request)
if message.direction[0:2] == "UB":
self.respond_inquiry(message, socket)
else:
service = self.get_service(message.service)
service.handle(message, socket)
else:
self.log.log(5, "Malofrmed request: |{0}|".format("|".join(map(binascii.hexlify,request))))
self.cleanup(socket)
if sockets.get(xsub) == zmq.POLLIN:
request = xsub.recv_multipart()
xpub.send_multipart(request)
if sockets.get(xpub) == zmq.POLLIN:
request = xpub.recv_multipart()
xsub.send_multipart(request)
except zmq.ZMQError as e:
self.log.log(6, "Ending .respond(). {0!r}".format(e))
self.running.clear()
[docs] def stop(self):
"""Stop the responder."""
import zmq
if not self.isAlive():
return
if self.running.is_set() and not self.context.closed:
signal = self.context.socket(zmq.PUSH)
signal.connect("inproc://{0:s}".format(hex(id(self))))
self.running.clear()
signal.send(b"", flags=zmq.NOBLOCK)
signal.close(linger=0)
else:
self.running.clear()
self.join()
self.log.debug("Joined Broker")
if self._error is not None:
raise RuntimeError(self._error)
[docs] def run(self):
"""Run method for threads."""
try:
self.prepare()
self.running.set()
self.log.debug("Broker running. Timeout={0}".format(self.timeout))
while self.running.is_set():
self.respond()
self.close()
except Exception as e:
self._error = e
raise
else:
self.log.debug("Broker done. running={0}".format(self.running.is_set()))
def _setup_logging(verbose):
"""Try to use lumberjack to enable logging when in a subprocess."""
try:
import lumberjack
lumberjack.setup_logging("DFW.Broker", mode='stream', level=30 - 10 * verbose)
lumberjack.setup_warnings_logger("DFW.Broker")
except:
pass
def main():
"""Run the router from the command line."""
import argparse, six
parser = argparse.ArgumentParser(description="A broker to connect ZMQ clients to services.")
parser.add_argument("-c", "--config", type=six.text_type, help="set the Cauldron-zmq configuration filename", default=None)
parser.add_argument("-v", "--verbose", action="count", help="use verbose messaging.")
parser.add_argument("-D", dest='debug', action='store_true', help="Debug - print keyboard interrupts.")
opt = parser.parse_args()
if opt.verbose:
_setup_logging(opt.verbose)
print("^C to stop broker.")
try:
ZMQBroker.serve(read_configuration(opt.config))
except KeyboardInterrupt:
if opt.debug:
raise
print("\nShutting down.")
return 0