# -*- coding: utf-8 -*-
"""
Implements the abstract-base Keyword and Service classes used for dispatchers.
This piece of Cauldron is a rough mock of :mod:`DFW`, the dispatcher side interface.
"""
from __future__ import absolute_import
import abc
import six
import weakref
import logging
import warnings
import collections
import time
from ..compat import WeakOrderedSet
from .core import _BaseKeyword, _BaseService
from ..config import read_configuration
from ..exc import CauldronAPINotImplemented, NoWriteNecessary, CauldronXMLWarning, WrongDispatcher, CauldronWarning
from ..utils.helpers import api_not_required, api_not_implemented, api_required, api_override
from ..utils.callbacks import Callbacks
from ..bundled import ktlxml
from ..api import STRICT_KTL_XML
from .. import registry
__all__ = ['Keyword', 'Service']
def get_dispatcher_XML(service, name):
"""Check that the XML for the dispatcher is correct.
Checks that if this service has a dispatcher value, that it
matches the keyword's dispatcher value.
"""
if service.dispatcher != None:
keyword_node = service.xml[name]
dispatcher_node = ktlxml.get.dispatcher(keyword_node)
dispatcher = ktlxml.get.value(dispatcher_node, 'name')
return dispatcher
return None
def get_initial_XML(xml, name):
"""Get the keyword's initial value from the XML configuraton.
:param xml: The XML tree containing keywords.
:param name: Name of the keyword.
"""
keyword_xml = xml[name]
initial = None
for element in keyword_xml.childNodes:
if element.nodeName != 'serverside' and \
element.nodeName != 'server-side':
continue
server_xml = element
for element in server_xml.childNodes:
if element.nodeName != 'initialize':
continue
# TODO: this loop could check to see if
# there is more than one initial value,
# rather than immediately halting the
# iteration.
try:
initial = ktlxml.getValue (element, 'value')
except ValueError: # pragma: no cover
continue
# Found a value, stop the iteration.
break
if initial != None:
# Found a value, stop the iteration.
break
return initial
[docs]class Keyword(_BaseKeyword):
"""A dispatcher-based keyword, which should own its own values."""
_ALLOWED_KEYS = set(['value', 'name', 'readonly', 'writeonly'])
def __init__(self, name, service, initial=None, period=None):
name = str(name).upper()
super(Keyword, self).__init__(name=name, service=service)
if service.get(name, None) is not None:
raise ValueError("keyword named '%s' already exists." % name)
self._acting = False
self._callbacks = Callbacks()
self._history = collections.deque(maxlen=100)
self.writeonly = False
self.readonly = False
self._period = None
try:
if service.xml is not None:
dispatcher = get_dispatcher_XML(service, name)
if dispatcher != service.dispatcher:
if STRICT_KTL_XML:
raise WrongDispatcher("service dispatcher is '%s', dispatcher for %s is '%s'" % (service.dispatcher, name, dispatcher))
warnings.warn(CauldronXMLWarning("Service {0} dispatcher '{1}' does not match keyword {2} dispatcher '{3}'".format(service.name, service.dispatcher, name, dispatcher)))
if initial is None:
initial = get_initial_XML(service.xml, name)
except Exception as e:
if STRICT_KTL_XML:
raise
else:
warnings.warn("XML setup for keyword '{0}' failed. {1}".format(name, e), CauldronXMLWarning)
# Handle XML-specified initial values here.
self.initial = str(initial)
if period is not None:
self.period(period)
service[self.name] = self
def __contains__(self, value):
if self.value == None:
return False
if value in self.value:
return True
return False
def _ktl_readonly(self):
"""Read only key."""
return self.readonly
def _ktl_writeonly(self):
"""Write only key."""
return self.writeonly
[docs] def callback(self, function, remove=False):
"""Register a function to be called whenever this keyword is
set to a new value."""
if remove:
return self._callbacks.discard(function)
self._callbacks.add(function)
@api_override
[docs] def check(self, value):
"""Check that 'value' is appropriate for this keyword. If it is not, raise a value error."""
pass # pragma: no cover
@api_override
[docs] def translate(self, value):
"""Translate a value into a standard representation."""
return value
[docs] def set(self, value, force=False):
"""Set the keyword to the value provided, and broadcast changes.
:param value: The keyword value.
:param bool force: Whether to force the change, or ignore repeatedly setting a keyword to the same value.
"""
value = self.translate(value)
if value == self.value and force is False:
return
if not (isinstance(value, six.string_types) or value is None):
raise TypeError("{0}: Value must be string-like, got {1}".format(self, value))
self.check(value)
self._history.append((value, time.time()))
self.value = value
if value != None:
self._broadcast(value)
self._propogate()
@property
def value(self):
"""Map .value to ._last_value, the cauldron internal location of this variable."""
return self._last_value
@value.setter
def value(self, value):
"""Setter for .value"""
self._last_value = value
@value.deleter
def value(self):
"""Deleter for .value"""
self._last_value = None
@abc.abstractmethod
def _broadcast(self, value):
"""An internal method to be used to actually broadcast the value via the service."""
pass # pragma: no cover
@api_override
[docs] def preread(self):
"""Take some action before this value is read. Called at the start of :meth:`update`."""
pass # pragma: no cover
@api_override
[docs] def prewrite(self, value):
"""Any actions which need to occur before a value is written. Called to adjust the value in :meth:`modify`.
This can raise :exc:`NoWriteNecessary` if a write is not necessary.
"""
if self.value == value:
raise NoWriteNecessary("Value unchanged")
self.check(value)
return value
@api_override
[docs] def write(self, value):
"""Write the value to the authority source. Called to adjust the value in :meth:`modify`."""
pass # pragma: no cover
@api_override
[docs] def read(self):
"""Read the value from the authority source. Called to get the value for :meth:`update`."""
return self.value
@api_not_implemented
[docs] def schedule(self, appointment=None, cancel=False):
"""Schedule an update."""
pass # pragma: no cover
@api_not_implemented
[docs] def period(self, period):
"""How often a keyword should be updated."""
pass # pragma: no cover
def _propogate(self):
"""Propagate the change to any waiting callbacks."""
if not self._acting:
try:
self._acting = True
self._callbacks(self)
finally:
self._acting = False
@api_override
[docs] def postwrite(self, value):
"""Take some action post-write."""
self.set(value)
@api_override
[docs] def postread(self, value):
"""Take some action post-read. Called at the end of :meth:`update`."""
self.set(value)
return value
[docs] def modify(self, value):
"""Modify this keyword's value. This is the public function which should be called to change a keyword value."""
try:
value = self.prewrite(value)
except NoWriteNecessary:
return
self.write(value)
self.postwrite(value)
[docs] def update(self):
"""Update the value by performing a read. This is the public function which should be called when the keyword is read."""
self.preread()
value = self.read()
if value is not None:
return self.postread(value)
return value
[docs]class Service(_BaseService):
"""A dispatcher is a KTL service server-side.
A class encapsulating a basic representation of a complete KTL service. The `name` argument is case sensitive, and will be used to locate (and load) the service's KTLXML representation. The `config` argument specifies the stdiosvc configuration file that will be used when loading the stdiosvc front-end. The `setup` function will be called to properly instantiate all keywords associated with this :class:`Service` instance; it accepts a :class:`Service` instance as its sole argument, and should instantiate :class:`Keyword.Basic` objects directly. If any keywords are not instantiated, they will be given placeholder "cacheing" :class:`Keyword.Basic` instances of the appropriate type (string, integer, etc.). See :func:`setupOrphans` for an example. If `dispatcher` is specified, only keywords corresponding to that dispatcher number will be instantiated.
Parameters
----------
name : str
the Service name.
config : str
the stdiosvc configuration filename, or the Cauldron configuration filename.
setup : callable
a function which will be called to set up the keywords for this service.
dispatcher : str, optional
The name of the dispatcher to use for this service. If not provided, all keywords will be used.
"""
name = None
_DISPATCHER = True
def __init__(self, name, config, setup=None, dispatcher=None):
super(Service, self).__init__(name=name)
self._config = read_configuration(config)
self._configuration_location = config if isinstance(config, six.string_types) else "???"
self.dispatcher = dispatcher if dispatcher else "DEFAULT"
self.log = logging.getLogger("DFW.Service.{0}".format(self.name))
self.log.info("Starting Service '{0}' using backend '{1}'".format(self.name, registry.dispatcher.backend))
self._keywords = {}
self.status_keyword = None
try:
self.xml = ktlxml.Service(self.name)
except Exception as e:
if STRICT_KTL_XML:
raise
warning = CauldronXMLWarning("KTLXML was not loaded correctly. Keywords will not be validated against XML. Exception was {0!s}.".format(e))
warnings.warn(warning)
self.log.warning(str(warning))
self.xml = None
else:
# Implementors will be expected to assign Keyword instances
# for each KTL keyword in this KTL service.
for keyword in self.xml.list():
if self.dispatcher == get_dispatcher_XML(self, keyword):
self._keywords[keyword] = None
self._prepare()
if setup is not None:
setup(self)
self.setupOrphans()
self.begin()
@property
def _Keyword_cls(self):
"""Get the keyword class."""
from Cauldron import DFW
return DFW.Keyword.Keyword
[docs] def keywords(self):
"""The list of available keywords"""
return list(sorted(self._keywords.keys()))
[docs] def setStatusKeyword(self, keyword):
"""Set the status keyword value."""
keyword = self[keyword]
if self.status_keyword == keyword:
return False
self.status_keyword = keyword
return True
[docs] def setupOrphans(self):
"""Set up orphaned keywords, that is keywords which aren't attached to a specific keyword class."""
for name, keyword in self._keywords.items():
if keyword is None:
dispatcher = get_dispatcher_XML(self, name)
if dispatcher == self.dispatcher:
self._setupOrphan(name)
def _setupOrphan(self, name):
"""Set up a single orphan."""
from Cauldron import DFW
try:
xml = self.xml[name]
ktl_type = ktlxml.getValue(xml, 'type')
cls = DFW.Keyword.types[ktl_type]
except Exception as e:
if STRICT_KTL_XML:
raise
warnings.warn(CauldronXMLWarning("XML setup for orphan keyword {0} failed: {1}".format(name, str(e))))
cls = DFW.Keyword.Keyword
try:
cls(name, service=self)
except WrongDispatcher:
pass
else:
warnings.warn(CauldronWarning("Set up an orphaned keyword {0} for service {1} dispatcher {2}".format(
name, self.name, self.dispatcher
)))
@api_override
def _prepare(self):
"""This method is called once the configuration has been read, but before setup. It provides an implementation-dependent way to take action after the system has been configured, but before any keywords are available."""
pass
[docs] def begin(self):
"""Send any queued messages."""
for keyword in self:
if keyword is None:
continue
if keyword.initial is not None:
# Ensure that if this keyword was already written to,
# we don't overwrite the already written value.
if keyword.value is not None:
initial = keyword.value
else:
initial = keyword.initial
try:
keyword.set(initial)
except ValueError as e:
self.log.error("Bad initial value '%s' for '%s'", initial, keyword.name)
self.log.error(str(e))
self._begin()
@abc.abstractmethod
def _begin(self):
"""Implementation-dependent startup tasks should be handled here. This method is called
when :meth:`begin` is done setting initial keyword values."""
pass # pragma: no cover
def __getitem__(self, name):
"""Get a keyword item."""
name = str(name).upper()
try:
keyword = self._keywords[name]
except KeyError:
return self.__missing__(name)
if keyword is None:
return self.__missing__(name)
return keyword
def __missing__(self, key):
"""What to do with missing keys."""
return self._Keyword_cls(key, self)
def __setitem__(self, name, value):
"""Set a keyword instance in this server."""
if not isinstance(value, Keyword):
raise TypeError("value must be a Keyword instance.")
name = str(name).upper()
if name not in self._keywords:
if not STRICT_KTL_XML:
if self.xml is not None:
warnings.warn(CauldronXMLWarning("service '{0}' does not have a keyword '{1}' in XML".format(self.name, name)))
else:
raise KeyError("service '%s' does not have a keyword '%s'" % (self.name, name))
if name in self._keywords and self._keywords[name] is not None:
raise RuntimeError("cannot set keyword '%s' twice" % (name))
self._keywords[name] = value
def __contains__(self, name):
"""Check for name in self"""
if self.xml is not None:
return (str(name).upper() in self.xml and STRICT_KTL_XML) or (str(name).upper() in self._keywords and not STRICT_KTL_XML)
return str(name).upper() in self._keywords
def __iter__(self):
"""Iterator over self."""
return six.itervalues(self._keywords)
[docs] def get(self, name, default=None):
"""Get a keyword."""
return self._keywords.get(str(name).upper(), default)
[docs] def broadcast(self):
"""Called to broadcast all values to ensure that the keyword server matches the keyword."""
for keyword in self:
value = keyword['value']
if value != None:
keyword._broadcast(value)
@api_required
[docs] def shutdown(self):
"""Shutdown this keyword server."""
pass # pragma: no cover
def __del__(self):
"""When this service is deleted, shut it down."""
self.shutdown()