Source code for rodario.actors.proxy

""" Actor proxy for rodario framework """

# stdlib
import types
import pickle
from multiprocessing import Queue
from threading import Thread
from uuid import uuid4
from time import sleep

# 3rd party
import redis

# local
from rodario.future import Future
from rodario.exceptions import InvalidActorException, InvalidProxyException


[docs]class ActorProxy(object): # pylint: disable=R0903 """ Proxy object that fires calls to an actor over redis pubsub """
[docs] def __init__(self, actor=None, uuid=None): """ Initialize instance of ActorProxy. Accepts either an Actor object to clone or a UUID, but not both. :param rodario.actors.Actor actor: Actor to clone :param str uuid: UUID of Actor to clone """ #: Redis connection self._redis = redis.StrictRedis() #: Redis PubSub client self._pubsub = None #: This proxy object's UUID for creating unique channels self.proxyid = str(uuid4()) #: Dict of response queues for sandboxing method calls self._response_queues = {} # avoid cyclic import actor_module = __import__('rodario.actors', fromlist=('Actor',)) # pylint: disable=E1123 self._pubsub = self._redis.pubsub(ignore_subscribe_messages=True) self._pubsub.subscribe(**{'proxy:%s' % self.proxyid: self._handler}) # list of blocking methods self._blocking_methods = set() methods = set() def pubsub_thread(): """ Call get_message in loop to fire _handler. """ try: while self._pubsub: self._pubsub.get_message() sleep(0.001) except: # pylint: disable=W0702 pass # fire up the message handler thread as a daemon proc = Thread(target=pubsub_thread) proc.daemon = True proc.start() if isinstance(actor, actor_module.Actor): # proxying an Actor directly self.uuid = actor.uuid methods = actor._get_methods() # pylint: disable=W0212 elif isinstance(uuid, str): # proxying by UUID; get actor methods over pubsub self.uuid = uuid methods = self._proxy('_get_methods').get() else: raise InvalidProxyException('No actor or UUID provided') def get_lambda(name): """ Generate a lambda function to proxy the given method. :param str name: Name of the method to proxy :rtype: :expression:`lambda` """ return lambda _, *args, **kwargs: self._proxy(name, *args, **kwargs) # create proxy methods for each public method of the original Actor for name in methods: name_split = name.split(':') for attr in name_split[1:]: if attr == 'blocking': self._blocking_methods.add(name_split[0]) setattr(self, name_split[0], types.MethodType(get_lambda(name_split[0]), self))
def _handler(self, message): """ Handle message response via Queue object. :param tuple message: The message to dissect """ # throw its value in the associated response queue data = pickle.loads(message['data']) queue = data[0] self._response_queues[queue].put(data[1]) self._response_queues.pop(queue, None)
[docs] def _proxy(self, method_name, *args, **kwargs): """ Proxy a method call to redis pubsub. This method is not meant to be called directly. Instead, it is used by the proxy's self-generated methods to provide the proxy with the same public API as the actor it represents. :param str method_name: The method to proxy :param tuple args: The arguments to pass :param dict kwargs: The keyword arguments to pass :rtype: :class:`multiprocessing.Queue` """ # create a unique response queue for retrieving the return value async queue = str(uuid4()) # fire off the method call to the original Actor over pubsub count = self._redis.publish('actor:%s' % self.uuid, pickle.dumps((self.proxyid, queue, method_name, args, kwargs,))) if count == 0: raise InvalidActorException('No such actor') self._response_queues[queue] = Queue() if method_name in self._blocking_methods: return self._response_queues[queue].get() return Future(self._response_queues[queue])