Source code for Cauldron.local.dispatcher

# -*- coding: utf-8 -*-
"""
Local dispatcher.

The local interface is process-local. It is lightweight, and good for testing environments, but doesn't handle anything that wouldn't normally be process local.

"""

from ..base import DispatcherService, DispatcherKeyword
from ..utils.callbacks import Callbacks
from .. import registry

import weakref

__all__ = ['Service', 'Keyword']

_registry = weakref.WeakValueDictionary()

@registry.dispatcher.teardown_for("local")
def clear():
    """Clear the registry."""
    _registry.clear()

@registry.dispatcher.service_for("local")
[docs]class Service(DispatcherService): @classmethod
[docs] def get_service(cls, name): """Get a dispatcher for a service.""" #TODO: Support inverse client startup ordering. name = str(name).lower() return _registry[name]
def __init__(self, name, config, setup=None, dispatcher=None): if str(name).lower() in _registry: raise ValueError("Cannot have two services with name '{0}' in local registry.".format(name)) super(Service, self).__init__(name, config, setup, dispatcher) def _begin(self): """Indicate that this service is ready to act, by inserting it into the local registry.""" _registry[self.name] = self
[docs] def shutdown(self): """To shutdown this service, delete it.""" pass
@registry.dispatcher.keyword_for("local")
[docs]class Keyword(DispatcherKeyword): def __init__(self, name, service, initial=None, period=None): super(Keyword, self).__init__(name, service, initial, period) self._consumers = Callbacks() def _broadcast(self, value): """Notify consumers that this value has changed.""" self._consumers(value)