Source code for rodario.actors.proxy
""" Actor proxy for rodario framework """
# stdlib
import types
import pickle
from multiprocessing import Queue
from threading import Thread
from uuid import uuid4
from time import sleep
# local
from rodario import get_redis_connection
from rodario.future import Future
from rodario.exceptions import InvalidActorException, InvalidProxyException
[docs]class ActorProxy(object): # pylint: disable=R0903
""" Proxy object that fires calls to an actor over redis pubsub """
[docs] def __init__(self, actor=None, uuid=None):
"""
Initialize instance of ActorProxy.
Accepts either an Actor object to clone or a UUID, but not both.
:param rodario.actors.Actor actor: Actor to clone
:param str uuid: UUID of Actor to clone
"""
#: Redis connection
self._redis = get_redis_connection()
#: Redis PubSub client
self._pubsub = None
#: This proxy object's UUID for creating unique channels
self.proxyid = str(uuid4())
#: Response queues for sandboxing method calls
self._response_queues = {}
# avoid cyclic import
actor_module = __import__('rodario.actors', fromlist=('Actor',))
# pylint: disable=E1123
self._pubsub = self._redis.pubsub(ignore_subscribe_messages=True)
self._pubsub.subscribe(**{'proxy:%s' % self.proxyid: self._handler})
methods = set()
def pubsub_thread():
""" Call get_message in loop to fire _handler. """
try:
while self._pubsub:
self._pubsub.get_message()
sleep(0.001)
except: # pylint: disable=W0702
pass
# fire up the message handler thread as a daemon
proc = Thread(target=pubsub_thread)
proc.daemon = True
proc.start()
if isinstance(actor, actor_module.Actor):
# proxying an Actor directly
self.uuid = actor.uuid
methods = actor._get_methods() # pylint: disable=W0212
elif isinstance(uuid, str):
# proxying by UUID; get actor methods over pubsub
self.uuid = uuid
methods = self._proxy('_get_methods').get()
else:
raise InvalidProxyException('No actor or UUID provided')
def get_lambda(name):
"""
Generate a lambda function to proxy the given method.
:param str name: Name of the method to proxy
:rtype: :expression:`lambda`
"""
return lambda _, *args, **kwargs: self._proxy(name, *args, **kwargs)
# create proxy methods for each public method of the original Actor
for name in methods:
setattr(self, name, types.MethodType(get_lambda(name), self))
def _handler(self, message):
"""
Handle message response via Queue object.
:param tuple message: The message to dissect
"""
# throw its value in the associated response queue
data = pickle.loads(message['data'])
self._response_queues[data[0]].put(data[1])
self._response_queues.pop(data[0])
[docs] def _proxy(self, method_name, *args, **kwargs):
"""
Proxy a method call to redis pubsub.
This method is not meant to be called directly. Instead, it is used
by the proxy's self-generated methods to provide the proxy with the
same public API as the actor it represents.
:param str method_name: The method to proxy
:param tuple args: The arguments to pass
:param dict kwargs: The keyword arguments to pass
:rtype: :class:`multiprocessing.Queue`
"""
# fire off the method call to the original Actor over pubsub
uuid = str(uuid4())
count = self._redis.publish('actor:%s' % self.uuid,
pickle.dumps((uuid, self.proxyid,
method_name, args, kwargs,)))
if count == 0:
raise InvalidActorException('No such actor')
queue = Queue()
self._response_queues[uuid] = queue
return Future(queue)