# -*- coding: utf-8 -*-
"""
ZMQ Client Implementation
"""
from __future__ import absolute_import
import weakref
from ..base import ClientService, ClientKeyword
from ..exc import CauldronAPINotImplementedWarning, CauldronAPINotImplemented, DispatcherError, TimeoutError
from .common import zmq_get_address, check_zmq, teardown, zmq_connect_socket
from .thread import ZMQThread
from .protocol import ZMQCauldronMessage, ZMQCauldronErrorResponse, FRAMEBLANK, PrefixMatchError, FrameFailureError
from .tasker import Task, TaskQueue
from .broker import ZMQBroker
from .responder import ZMQDispatcherError
from .. import registry
from ..config import get_configuration, get_timeout
from ..logger import KeywordMessageFilter
from ..compat import WeakSet
import json
import six
import threading
import logging
import warnings
__all__ = ["Service", "Keyword"]
def teardown():
"""Teardown registered instances."""
_cleanup()
registry.client.teardown_for('zmq')(teardown)
_service_registry = WeakSet()
def _cleanup(_registry=_service_registry):
"""Cleanup a service instance at exit."""
while True:
try:
svc = _registry.pop()
except KeyError:
break
else:
svc.shutdown()
class _ZMQMonitorThread(ZMQThread):
"""A monitoring thread for ZMQ-powered Services which listens for broadcasts."""
def __init__(self, service):
super(_ZMQMonitorThread, self).__init__(name="ktl.Service.{0}.Broadcasts".format(service.name), context=service.ctx)
self.service = weakref.proxy(service)
self.monitored = set()
self.address = None
def thread_target(self):
"""Run the monitoring thread."""
zmq = check_zmq()
try:
ctx = self.service.ctx
except weakref.ReferenceError:
self.log.debug("Can't start ZMQ monitor, service has disappeared.")
return
socket = ctx.socket(zmq.SUB)
signal = self.get_signal_socket()
poller = zmq.Poller()
poller.register(signal, zmq.POLLIN)
poller.register(socket, zmq.POLLIN)
try:
zmq_connect_socket(socket, get_configuration(), "subscribe", log=self.log, label='client-monitor', address=self.address)
# Accept everything belonging to this service.
socket.setsockopt_string(zmq.SUBSCRIBE, six.text_type(self.service.name))
self.started.set()
while self.running.is_set():
ready = dict(poller.poll(timeout=1e3))
if signal in ready:
_ = signal.recv()
self.log.trace("Got a signal: .running = {0}".format(self.running.is_set()))
continue
if socket in ready:
try:
message = ZMQCauldronMessage.parse(socket.recv_multipart())
message.verify(self.service)
keyword = self.service[message.keyword]
f = KeywordMessageFilter(keyword)
self.log.addFilter(f)
try:
if keyword.name in self.monitored:
keyword._update(message.unwrap())
self.log.trace("{0!r}.monitor({1}={2})".format(self, keyword.name, message.unwrap()))
else:
self.log.trace("{0!r}.monitor({1}) ignored".format(self, keyword.name))
except Exception as e:
self.log.exception("{0!r}._update() error: {1!r}".format(keyword, e))
finally:
self.log.removeFilter(f)
except PrefixMatchError as e:
self.log.trace("{0!r}.monitor() ignored".format(self))
except ZMQCauldronErrorResponse as e:
self.log.error("Broadcast Message Error: {0!r}".format(e))
except (zmq.ContextTerminated, zmq.ZMQError):
raise
except Exception as e:
self.log.exception("Broadcast error: {0!r}".format(e))
except (zmq.ContextTerminated, zmq.ZMQError) as e:
self.log.trace("Service shutdown and context terminated, closing broadcast thread. {0}".format(repr(e)))
else:
try:
socket.close(linger=0)
except:
pass
finally:
signal.close(linger=0)
self.log.debug("Stopped Monitor Thread")
@registry.client.service_for("zmq")
[docs]class Service(ClientService):
# Client service object for use with ZMQ.
def __init__(self, name, populate=False):
zmq = check_zmq()
self.ctx = zmq.Context.instance()
self._sockets = threading.local()
self._monitor = None
self._tasker = None
self._lock = threading.RLock()
self._type_ktl_cache = {}
_service_registry.add(self)
super(Service, self).__init__(name, populate)
def _prepare(self):
"""Prepare step."""
if not ZMQBroker.check(ctx=self.ctx):
raise ZMQDispatcherError("Can't locate a suitable dispatcher for {0}".format(self.name))
self._monitor = _ZMQMonitorThread(self)
self._tasker = TaskQueue("ktl.Service.{0:s}.Tasks".format(self.name), ctx=self.ctx, log=self.log)
self._tasker.start()
address = self._synchronous_command("lookup", "subscribe", direction="CBQ")
self._monitor.address = address
self._monitor.start()
def _ktl_type(self, key):
"""Get the KTL type of a specific keyword."""
name = key.upper()
with self._lock:
try:
ktl_type = self._type_ktl_cache[name]
except KeyError:
ktl_type = self._lookup_ktl_type(name)
return ktl_type
def _lookup_ktl_type(self, name):
"""Lookup the KTL type for a key."""
try:
message = self._synchronous_command("identify", payload=name, keyword=name, direction="CSQ")
except FrameFailureError:
raise KeyError("Keyword '{0}' does not exist.".format(name))
else:
items = list(set(message.split(":")))
if len(items) == 1:
ktl_type = items[0]
else:
raise KeyError("Keyword '{0}' is multiply defined.".format(name))
self._type_ktl_cache[name] = ktl_type
return ktl_type
def __del__(self):
"""On delete, try to shutdown."""
self.shutdown()
[docs] def shutdown(self):
if hasattr(self, '_monitor') and self._monitor is not None and self._monitor.isAlive():
self.log.trace("Stopping monitor")
self._monitor.stop()
self.log.trace("Stopped monitor")
if hasattr(self, '_tasker') and self._tasker is not None and self._tasker.isAlive():
self.log.trace("Stopping tasker")
self._tasker.stop()
self.log.trace("Stopped tasker")
def _has_keyword(self, name):
name = name.upper()
try:
ktype = self._ktl_type(name)
except KeyError as e:
return False
else:
return True
[docs] def keywords(self):
message = self._synchronous_command("enumerate", FRAMEBLANK, direction="CSQ")
return message.split(":")
def _handle_response(self, message):
"""Handle a response, and return the payload."""
self.log.msg("{0!r}.recv({1!s})".format(self, message))
if message.iserror:
raise DispatcherError("Dispatcher error on command: {0}".format(message.payload))
message.verify(self)
return message.unwrap()
def _asynchronous_command(self, command, payload, keyword=None, direction="CDQ", timeout=None, callback=None):
"""Run an asynchronous command."""
callback = callback or self._handle_response
return self._tasker.asynchronous_command(command, payload, self, keyword, direction, timeout, callback)
def _synchronous_command(self, command, payload, keyword=None, direction="CDQ", timeout=None, callback=None):
"""Execute a synchronous command."""
callback = callback or self._handle_response
return self._tasker.synchronous_command(command, payload, self, keyword, direction, timeout, callback)
@registry.client.keyword_for("zmq")
[docs]class Keyword(ClientKeyword):
# Client keyword object for use with ZMQ.
def _prepare(self):
"""Prepare this keyword for use."""
if self.KTL_TYPE == 'enumerated':
self._async_units()
def _ktl_reads(self):
"""Is this keyword readable?"""
return True
def _ktl_writes(self):
"""Is this keyword writable?"""
return True
def _ktl_monitored(self):
"""Is this keyword monitored."""
return self.name in self.service._monitor.monitored
def _ktl_units(self):
"""Get KTL units."""
if getattr(self, '_units', None) is None:
timeout = get_timeout(None)
if not hasattr(self, '_got_units'):
self._async_units(timeout)
self._got_units.wait(timeout)
if not self._got_units.is_set():
raise DispatcherError("Dispatcher error on command 'units'.")
return '' if self._units is None else self._units
def _async_units(self, timeout=None):
"""Asynchronously request units."""
self._got_units = threading.Event()
return self._asynchronous_command("units", "", timeout=timeout, callback=self._handle_units)
def _handle_units(self, message):
"""Handle a message response which has units.."""
self.log.msg("{0!r}.recv({1!s})".format(self, message))
if message.iserror:
raise DispatcherError("Dispatcher error on command: {0}".format(message.payload))
message.verify(self.service)
self._units = json.loads(message.unwrap())
self._got_units.set()
return self._units
def _handle_response(self, message):
"""Handle a response, and return the payload."""
self.log.msg("{0!r}.recv({1!s})".format(self, message))
if message.iserror:
self.log.error("Dispatcher error: {0}".format(message))
raise DispatcherError("Dispatcher error on command: {0}".format(message.payload))
message.verify(self.service)
self._update(message.unwrap())
return self._current_value(binary=False, both=False)
def _asynchronous_command(self, command, payload, timeout=None, callback=None):
"""Execute a synchronous command."""
return self.service._asynchronous_command(command, payload, self.name, timeout=timeout, callback=callback or self._handle_response)
def _synchronous_command(self, command, payload, timeout=None):
"""Execute a synchronous command."""
return self.service._synchronous_command(command, payload, self.name, timeout=timeout)
[docs] def wait(self, timeout=None, operator=None, value=None, sequence=None, reset=False, case=False):
if sequence is not None:
return sequence.wait(timeout=get_timeout(timeout))
raise CauldronAPINotImplemented("Asynchronous expression operations are not supported for Cauldron.zmq")
[docs] def monitor(self, start=True, prime=True, wait=True):
if start:
if prime:
self.read(wait=wait)
self.service._monitor.monitored.add(self.name)
else:
self.service._monitor.monitored.remove(self.name)
def _await(self, task, timeout, _call_msg=None):
"""Await an asynchronous task."""
if _call_msg is None:
_call_msg = lambda : "{0!r}_await(task={1!r}, timeout={2!r})".format(self, task, timeout)
self.service.log.trace("{0} waiting.".format(_call_msg()))
try:
result = task.get(timeout=timeout)
except TimeoutError:
raise TimeoutError("{0} timed out.".format(_call_msg()))
else:
self.service.log.trace("{0} complete.".format(_call_msg()))
return result
[docs] def read(self, binary=False, both=False, wait=True, timeout=None):
_call_msg = lambda : "{0!r}.read(wait={1}, timeout={2})".format(self, wait, timeout)
if not self['reads']:
raise NotImplementedError("Keyword '{0}' does not support reads.".format(self.name))
task = self._asynchronous_command("update", "", timeout=timeout)
if wait:
self._await(task, timeout, _call_msg)
return self._current_value(binary=binary, both=both)
else:
return task
[docs] def write(self, value, wait=True, binary=False, timeout=None):
_call_msg = lambda : "{0!r}.write(wait={1}, timeout={2})".format(self, wait, timeout)
if not self['writes']:
raise NotImplementedError("Keyword '{0}' does not support writes.".format(self.name))
# User-facing convenience to make writes smoother.
try:
value = self.cast(value)
except (TypeError, ValueError):
pass
self.service.log.trace("{0} = {1}".format(_call_msg(), value))
task = self._asynchronous_command("modify", value, timeout=timeout)
if wait:
return self._await(task, timeout, _call_msg)
else:
return task