# -*- coding: utf-8 -*-
from __future__ import absolute_import
import weakref
import warnings
import logging
import threading
from six.moves import queue
from .dispatcher import Service as Dispatcher
from ..base import ClientService, ClientKeyword
from ..base.core import Task as _BaseTask
from ..exc import CauldronAPINotImplementedWarning, CauldronAPINotImplemented, ServiceNotStarted, DispatcherError
from .. import registry
__all__ = ['Service', 'Keyword']
class LocalTask(_BaseTask):
"""A simple object to mock asynchronous operations."""
def __call__(self):
"""Make sure that errors are properly set."""
super(LocalTask, self).__call__()
if self.error is not None:
self.error = DispatcherError(str(self.error))
class LocalTaskQueue(threading.Thread):
def __init__(self, name, log=None):
super(LocalTaskQueue, self).__init__(name="Task Queue for {0:s}".format(name))
self.queue = queue.Queue()
self.log = log or logging.getLogger("ktl.local.TaskQueue.{0:s}".format(name))
self.daemon = True
self.shutdown = threading.Event()
def run(self):
"""Run the task queue thread."""
while not self.shutdown.isSet():
try:
task = self.queue.get()
if task is None:
raise queue.Empty
task()
self.queue.task_done()
except queue.Empty:
pass
def stop(self):
"""Stop the task-queue thread."""
self.shutdown.set()
try:
self.queue.put(None, block=False)
except queue.Full:
pass
self.join()
@registry.client.keyword_for("local")
[docs]class Keyword(ClientKeyword):
@property
def source(self):
"""The source of knowledge about this keyword."""
return self.service._dispatcher[self.name]
def _ktl_reads(self):
"""Is this keyword readable?"""
return not self.source.writeonly
def _ktl_writes(self):
"""Is this keyword writable?"""
return not self.source.readonly
def _ktl_monitored(self):
"""Determine if this keyword is monitored."""
return self._update in self.source._consumers
[docs] def monitor(self, start=True, prime=True, wait=True):
if prime:
self.read(wait=wait)
if start:
self.source._consumers.add(self._update)
else:
self.source._consumers.discard(self._update)
def _read_task(self, unused):
result = self.source.update()
self._update(result)
[docs] def read(self, binary=False, both=False, wait=True, timeout=None):
if not self['reads']:
raise ValueError("Keyword '{0}' does not support reads, it is write-only.".format(self.name))
task = LocalTask(None, self._read_task, timeout)
self.service._thread.queue.put(task)
if wait:
result = task.get(timeout=timeout)
return self._current_value(binary=binary, both=both)
else:
return task
def _write_task(self, value):
self.source.modify(value)
self._update(self.source.value)
return self._current_value()
[docs] def write(self, value, wait=True, binary=False, timeout=None):
_call_msg = lambda : "{0!r}.write({1}, wait={2}, timeout={3})".format(self, value, wait, timeout)
if not self['writes']:
raise ValueError("Keyword '{0}' does not support writes, it is read-only.".format(self.name))
# User-facing convenience to make writes smoother.
try:
value = self.cast(value)
except (TypeError, ValueError): #pragma: no cover
pass
task = LocalTask(value, self._write_task, timeout)
self.service._thread.queue.put(task)
if wait:
self.service.log.debug("{0} waiting.".format(_call_msg()))
result = task.get(timeout=timeout)
self.service.log.debug("{0} complete.".format(_call_msg()))
return
else:
return task
[docs] def wait(self, timeout=None, operator=None, value=None, sequence=None, reset=False, case=False):
if sequence is not None:
return sequence.wait()
raise CauldronAPINotImplemented("Asynchronous operations are not supported for Cauldron.local")
@registry.client.service_for("local")
[docs]class Service(ClientService):
def __init__(self, name, populate=False):
try:
self._dispatcher = Dispatcher.get_service(name)
except KeyError:
raise ServiceNotStarted("Service '{0!s}' is not started.".format(name))
super(Service, self).__init__(name, populate)
self._thread = LocalTaskQueue(name, self.log)
self._thread.start()
[docs] def shutdown(self):
"""Shutdown this client."""
if hasattr(self, '_thread'):
self._thread.stop()
super(Service, self).shutdown()
[docs] def has_keyword(self, name):
"""Check for the existence of a keyword."""
return name in self._dispatcher._keywords
[docs] def keywords(self):
"""Return the list of all available keywords in this service instance."""
return self._dispatcher.keywords()
def _ktl_type(self, key):
"""Return the KTL type of a named keyword."""
return self._dispatcher[key].KTL_TYPE