Source code for Cauldron.zmq.client

# -*- coding: utf-8 -*-
"""
ZMQ Client Implementation
"""

from __future__ import absolute_import

import weakref
from ..base import ClientService, ClientKeyword
from ..exc import CauldronAPINotImplementedWarning, CauldronAPINotImplemented, DispatcherError, TimeoutError
from .common import zmq_get_address, check_zmq, teardown, zmq_connect_socket
from .microservice import ZMQCauldronMessage, ZMQCauldronErrorResponse, FRAMEBLANK, FRAMEFAIL
from .tasker import Task, TaskQueue
from .. import registry
from ..config import get_configuration, get_timeout

import six
import threading
import logging
import warnings

__all__ = ["Service", "Keyword"]

registry.client.teardown_for('zmq')(teardown)

class _ZMQMonitorThread(threading.Thread):
    """A monitoring thread for ZMQ-powered Services which listens for broadcasts."""
    def __init__(self, service):
        super(_ZMQMonitorThread, self).__init__(name="ktl.Service.{0}.Broadcasts".format(service.name))
        self.service = weakref.proxy(service)
        self.shutdown = threading.Event()
        self.log = logging.getLogger(self.name)
        self.monitored = set()
        self.daemon = True
        self.address = None
        
    def stop(self):
        """Stop this thread."""
        self.shutdown.set()
        if not self.isAlive():
            return
        self.log.debug("Joining {0}".format(self.name))
        self.join()
        
    def run(self):
        """Run the monitoring thread."""
        zmq = check_zmq()
        try:
            ctx = self.service.ctx
        except weakref.ReferenceError:
            self.log.log(5, "Can't start ZMQ monitor, service has disappeared.")
            return
        socket = ctx.socket(zmq.SUB)
        try:
            zmq_connect_socket(socket, get_configuration(), "subscribe", log=self.log, label='client-monitor', address=self.address)
            # Accept everything belonging to this service.
            socket.setsockopt_string(zmq.SUBSCRIBE, six.text_type(self.service.name))
            
            while not self.shutdown.isSet():
                if socket.poll(timeout=1):
                    try:
                        message = ZMQCauldronMessage.parse(socket.recv_multipart())
                        message.verify(self.service)
                        keyword = self.service[message.keyword]
                        if keyword.name in self.monitored:
                            keyword._update(message.payload)
                            self.log.log(5, "{0!r}.monitor({1}={2})".format(self, keyword.name, message.payload))
                        else:
                            self.log.log(5, "{0!r}.monitor({1}) ignored".format(self, keyword.name))
                    except ZMQCauldronErrorResponse as e:
                        self.log.error("Broadcast Message Error: {0!r}".format(e))
                    except (zmq.ContextTerminated, zmq.ZMQError):
                        raise
                    except Exception as e:
                        self.log.error("Broadcast Error: {0!r}".format(e))
        except (zmq.ContextTerminated, zmq.ZMQError) as e:
            self.log.log(6, "Service shutdown and context terminated, closing broadcast thread. {0}".format(repr(e)))
        else:
            try:
                socket.setsockopt(zmq.LINGER, 0)
                socket.close()
            except:
                pass
        finally:
            self.log.debug("Stopped Monitor Thread")
            

@registry.client.service_for("zmq")
[docs]class Service(ClientService): # Client service object for use with ZMQ. def __init__(self, name, populate=False): zmq = check_zmq() self.ctx = zmq.Context.instance() self._sockets = threading.local() self._monitor = None self._tasker = None self._lock = threading.RLock() self._type_ktl_cache = {} super(Service, self).__init__(name, populate) def _prepare(self): """Prepare step.""" self._monitor = _ZMQMonitorThread(self) self._tasker = TaskQueue(self.name, ctx=self.ctx, log=self.log) self._tasker.start() address = self._synchronous_command("lookup", "subscribe", direction="CBQ") self._monitor.address = address self._monitor.start() def _ktl_type(self, key): """Get the KTL type of a specific keyword.""" name = key.upper() with self._lock: if name in self._type_ktl_cache: return self._type_ktl_cache[name] message = self._synchronous_command("identify", payload=name, keyword=name, direction="CSQ") items = list(set(message.split(":"))) if len(items) == 1 and (items[0] not in (FRAMEBLANK.decode('utf-8'), FRAMEFAIL.decode('utf-8'))): ktl_type = items[0] else: raise KeyError("Keyword '{0}' does not exist.".format(name)) self._type_ktl_cache[name] = ktl_type return ktl_type def __del__(self): """On delete, try to shutdown.""" self.shutdown()
[docs] def shutdown(self): if hasattr(self, '_monitor'): self._monitor.stop() if hasattr(self, '_tasker'): self._tasker.stop()
[docs] def has_keyword(self, name): assert name.upper() != self.name.upper() name = name.upper() try: ktype = self._ktl_type(name) except KeyError as e: return False else: return True
[docs] def keywords(self): message = self._synchronous_command("enumerate", FRAMEBLANK, direction="CSQ") return message.split(":")
def _handle_response(self, message): """Handle a response, and return the payload.""" self.log.log(5, "{0!r}.recv({1!s})".format(self, message)) if message.iserror: raise DispatcherError("Dispatcher error on command: {0}".format(message.payload)) message.verify(self) return message.payload def _asynchronous_command(self, command, payload, keyword=None, direction="CDQ", timeout=None, callback=None): """Run an asynchronous command.""" request = ZMQCauldronMessage(command, direction=direction, service=self.name, dispatcher=FRAMEBLANK, keyword=keyword if keyword else FRAMEBLANK, payload=payload if payload else FRAMEBLANK) callback = callback or self._handle_response task = Task(request, callback, get_timeout(timeout)) self._tasker.put(task) return task def _synchronous_command(self, command, payload, keyword=None, direction="CDQ", timeout=None, callback=None): """Execute a synchronous command.""" timeout = get_timeout(timeout) task = self._asynchronous_command(command, payload, keyword, direction, timeout, callback) return task.get(timeout=timeout)
@registry.client.keyword_for("zmq")
[docs]class Keyword(ClientKeyword): # Client keyword object for use with ZMQ. def _ktl_reads(self): """Is this keyword readable?""" return True def _ktl_writes(self): """Is this keyword writable?""" return True def _ktl_monitored(self): """Is this keyword monitored.""" return self.name in self.service._monitor.monitored def _handle_response(self, message): """Handle a response, and return the payload.""" self.service.log.log(5, "{0!r}.recv({1!s})".format(self, message)) if message.iserror: raise DispatcherError("Dispatcher error on command: {0}".format(message.payload)) message.verify(self.service) self._update(message.payload) return self._current_value(binary=False, both=False) def _asynchronous_command(self, command, payload, timeout=None, callback=None): """Execute a synchronous command.""" return self.service._asynchronous_command(command, payload, self.name, timeout=timeout, callback=self._handle_response) def _synchronous_command(self, command, payload, timeout=None): """Execute a synchronous command.""" return self.service._synchronous_command(command, payload, self.name, timeout=timeout)
[docs] def wait(self, timeout=None, operator=None, value=None, sequence=None, reset=False, case=False): if sequence is not None: return sequence.wait(timeout=get_timeout(timeout)) raise CauldronAPINotImplemented("Asynchronous expression operations are not supported for Cauldron.zmq")
[docs] def monitor(self, start=True, prime=True, wait=True): if start: if prime: self.read(wait=wait) self.service._monitor.monitored.add(self.name) else: self.service._monitor.monitored.remove(self.name)
[docs] def read(self, binary=False, both=False, wait=True, timeout=None): _call_msg = lambda : "{0!r}.read(wait={1}, timeout={2})".format(self, wait, timeout) if not self['reads']: raise NotImplementedError("Keyword '{0}' does not support reads.".format(self.name)) task = self._asynchronous_command("update", "", timeout=timeout) if wait: self.service.log.debug("{0} waiting.".format(_call_msg())) try: task.get(timeout=timeout) except TimeoutError: raise TimeoutError("{0} timed out.".format(_call_msg())) else: self.service.log.debug("{0} complete.".format(_call_msg())) return self._current_value(binary=binary, both=both) else: return task
[docs] def write(self, value, wait=True, binary=False, timeout=None): _call_msg = lambda : "{0!r}.write(wait={1}, timeout={2})".format(self, wait, timeout) if not self['writes']: raise NotImplementedError("Keyword '{0}' does not support writes.".format(self.name)) # User-facing convenience to make writes smoother. try: value = self.cast(value) except (TypeError, ValueError): pass task = self._asynchronous_command("modify", value, timeout=timeout) if wait: self.service.log.debug("{0} waiting.".format(_call_msg())) try: result = task.get(timeout=timeout) except TimeoutError: raise TimeoutError("{0} timed out.".format(_call_msg())) else: self.service.log.debug("{0} complete.".format(_call_msg())) else: return task