Source code for pypot.primitive.manager
import logging
import numpy
from collections import defaultdict
from functools import partial
from threading import Lock
from ..utils.stoppablethread import StoppableLoopThread
logger = logging.getLogger(__name__)
[docs]class PrimitiveManager(StoppableLoopThread):
""" Combines all :class:`~pypot.primitive.primitive.Primitive` orders and affect them to the real motors.
At a predefined frequency, the manager gathers all the orders sent by the primitive to the "fake" motors, combined them thanks to the filter function and affect them to the "real" motors.
.. note:: The primitives are automatically added (resp. removed) to the manager when they are started (resp. stopped).
"""
def __init__(self, motors, freq=50, filter=partial(numpy.mean, axis=0)):
"""
:param motors: list of real motors used by the attached primitives
:type motors: list of :class:`~pypot.dynamixel.motor.DxlMotor`
:param int freq: update frequency
:param func filter: function used to combine the different request (default mean)
"""
StoppableLoopThread.__init__(self, freq)
self._prim = []
self._motors = motors
self._filter = filter
self.syncing = Lock()
[docs] def add(self, p):
""" Add a primitive to the manager. The primitive automatically attached itself when started. """
self._prim.append(p)
[docs] def remove(self, p):
""" Remove a primitive from the manager. The primitive automatically remove itself when stopped. """
self._prim.remove(p)
@property
def primitives(self):
""" List of all attached :class:`~pypot.primitive.primitive.Primitive`. """
return self._prim
[docs] def update(self):
""" Combined at a predefined frequency the request orders and affect them to the real motors. """
with self.syncing:
for m in self._motors:
to_set = defaultdict(list)
for p in self._prim:
for key, val in getattr(p.robot, m.name)._to_set.iteritems():
to_set[key].append(val)
for key, val in to_set.iteritems():
if key == 'led':
colors = set(val)
if len(colors) > 1:
colors -= {'off'}
filtred_val = colors.pop()
else:
filtred_val = self._filter(val)
logger.debug('Combined %s.%s from %s to %s',
m.name, key, val, filtred_val)
setattr(m, key, filtred_val)
[p._synced.set() for p in self._prim]
[docs] def stop(self):
""" Stop the primitive manager. """
for p in self.primitives[:]:
p.stop()
StoppableLoopThread.stop(self)