Source code for rodario.actors.actor

""" Actor for rodario framework """

# stdlib
import atexit
from uuid import uuid4
from time import sleep
from threading import Thread, Event
import pickle
import inspect

# local
from rodario import get_redis_connection
from rodario.registry import Registry
from rodario.exceptions import UUIDInUseException

REGISTRY = Registry()


# pylint: disable=E1101
[docs]class Actor(object): """ Base Actor class """ #: Threading Event to tell the message handling loop to die # (needed in __del__ so must be defined here) _stop = None #: Redis PubSub client _pubsub = None
[docs] def __init__(self, uuid=None): """ Initialize the Actor object. :param str uuid: Optionally-provided UUID """ atexit.register(self.__del__) self._stop = Event() #: Separate Thread for handling messages self._proc = None #: Redis connection self._redis = get_redis_connection() # pylint: disable=E1123 self._pubsub = self._redis.pubsub(ignore_subscribe_messages=True) if uuid: self.uuid = uuid else: self.uuid = str(uuid4()) if not REGISTRY.exists(self.uuid): REGISTRY.register(self.uuid) else: self.uuid = None raise UUIDInUseException('UUID is already taken')
def __del__(self): """ Clean up. """ if hasattr(self, 'uuid'): REGISTRY.unregister(self.uuid) self.stop() @property def is_alive(self): """ Return True if this Actor is still alive. :rtype: :class:`bool` """ return not self._stop.is_set() def _handler(self, message): """ Send proxied method call results back through pubsub. :param tuple message: The message to dissect """ data = pickle.loads(message['data']) if not data[2]: # empty method call; bail out return # call the function and respond to the proxy object with return value uuid = data[0] proxy = data[1] func = getattr(self, data[2]) result = (uuid, func(*data[3], **data[4])) self._redis.publish('proxy:%s' % proxy, pickle.dumps(result)) def _get_methods(self): """ List all of this Actor's methods (for creating remote proxies). :rtype: :class:`list` """ methods = inspect.getmembers(self, predicate=callable) method_list = set() for name, _ in methods: if (name in ('proxy', 'start', 'stop', 'part', 'join',) or name[0] == '_'): continue method_list.add(name) return method_list
[docs] def join(self, channel, func=None): """ Join this Actor to a pubsub cluster channel. :param str channel: The channel to join :param callable func: The message handler function """ self._pubsub.subscribe(**{'cluster:%s' % channel: func if func is not None else self._handler})
[docs] def part(self, channel): """ Remove this Actor from a pubsub cluster channel. :param str channel: The channel to part """ self._pubsub.unsubscribe('cluster:%s' % channel)
[docs] def proxy(self): """ Wrap this Actor in an ActorProxy object. :rtype: :class:`rodario.actors.ActorProxy` """ # avoid cyclic import proxy_module = __import__('rodario.actors', fromlist=('ActorProxy',)) return proxy_module.ActorProxy(self)
[docs] def start(self): """ Fire up the message handler thread. """ def pubsub_thread(): """ Call get_message in loop to fire _handler. """ while not self._stop.is_set(): self._pubsub.get_message() sleep(0.01) # subscribe to personal channel and fire up the message handler self._pubsub.subscribe(**{'actor:%s' % self.uuid: self._handler}) self._proc = Thread(target=pubsub_thread) self._proc.daemon = True self._proc.start()
[docs] def stop(self): """ Kill the message handler thread. """ self._stop.set()