Source code for rodario.actors.clusterproxy

""" Cluster Proxy for rodario framework """

# stdlib
import pickle
import types
from multiprocessing import Queue, Event
from threading import Thread
from time import sleep
from uuid import uuid4

# local
from rodario import get_redis_connection
from rodario.future import Future
from rodario.exceptions import EmptyClusterException


# pylint: disable=R0903
[docs]class ClusterProxy(object): """ Proxy object responsible for multiple actors This class is meant to be inherited by child objects which can provide their own API methods for coordinating the Actors in their channel. """ #: Flag for Stopping the message handler thread _stop = None
[docs] def __init__(self, channel): """ Initialize instance of ClusterProxy. :param str channel: The cluster channel to use """ #: Cluster channel self.channel = channel #: Redis connection self._redis = get_redis_connection() #: Redis PubSub client self._pubsub = None #: This proxy object's UUID for creating unique channels self.proxyid = str(uuid4()) #: Response queues for sandboxing method calls self._response_queues = {} #: Response counters for the response queue self._response_counters = {} self._stop = Event() # pylint: disable=E1123 self._pubsub = self._redis.pubsub(ignore_subscribe_messages=True) self._pubsub.subscribe(**{'proxy:%s' % self.proxyid: self._handler}) def pubsub_thread(): """ Call get_message in loop to fire _handler. """ try: while not self._stop.is_set(): self._pubsub.get_message() sleep(0.001) except: # pylint: disable=W0702 pass # fire up the message handler thread as a daemon proc = Thread(target=pubsub_thread) proc.daemon = True proc.start()
def __getattribute__(self, name): """ Return transparent callables for everything and proxy the calls. :param str name: Name of the attribute being requested :rtype: lambda """ def get_lambda(name): """ Create a proxied callable. :param str name: Name of the callable to wrap :rtype: instancemethod """ return lambda _, *args, **kwargs: self._proxy(name, *args, **kwargs) try: return object.__getattribute__(self, name) except AttributeError: return types.MethodType(get_lambda(name), self) def _handler(self, message): """ Handle message response via Queue object. :param tuple message: The message to dissect """ # throw its value in the associated response queue data = pickle.loads(message['data']) if data[1] != False: queue = self._response_queues[data[0]] queue.put(data[1]) self._response_queues[data[0]] = queue self._response_counters[data[0]] -= 1 if self._response_counters[data[0]] <= 0: self._response_queues.pop(data[0])
[docs] def _proxy(self, method_name, *args, **kwargs): """ Proxy a method call to redis pubsub. Use this method in your child objects which inherit from ClusterProxy to provide the proxy with some representation of the public API for the Actors it represents. :paramstr method_name: The method to proxy :param tuple args: The arguments to pass :param dict kwargs: The keyword arguments to pass :rtype: :class:`rodario.future.Future` :returns: A Future whose first value is the number of expected responses """ uuid = str(uuid4()) data = (uuid, self.proxyid, method_name, args, kwargs,) # fire off the method call to the original Actors over pubsub count = self._redis.publish('cluster:%s' % self.channel, pickle.dumps(data)) if count == 0: raise EmptyClusterException() queue = Queue() queue.put(count) self._response_queues[uuid] = queue self._response_counters[uuid] = count return Future(queue)