from __future__ import division
import json
import time
import logging
import numpy as np
from .primitive import LoopPrimitive
from pypot.utils.interpolation import KDTreeDict
logger = logging.getLogger(__name__)
[docs]class Move(object):
""" Simple class used to represent a movement.
This class simply wraps a sequence of positions of specified motors. The sequence must be recorded at a predefined frequency. This move can be recorded through the :class:`~pypot.primitive.move.MoveRecorder` class and played thanks to a :class:`~pypot.primitive.move.MovePlayer`.
"""
def __init__(self, freq):
self._framerate = freq
self._timed_positions = KDTreeDict()
def __repr__(self):
return '<Move framerate={} #keyframes={}>'.format(self.framerate,
len(self.positions()))
def __getitem__(self, i):
return list(self._timed_positions.items())[i]
@property
def framerate(self):
return self._framerate
[docs] def add_position(self, pos, time):
""" Add a new position to the movement sequence.
Each position is typically stored as a dict of (time, (motor_name,motor_position)).
"""
self._timed_positions[time] = pos
[docs] def iterpositions(self):
""" Returns an iterator on the stored positions. """
return self._timed_positions.items()
[docs] def positions(self):
""" Returns a copy of the stored positions. """
return self._timed_positions
[docs] def plot(self, ax):
pos = self.positions()
if not pos:
return
motors = pos[0].keys()
n = len(pos)
t = np.linspace(0, n / self.framerate, n)
pos = self.positions()
p = {}
for name in motors:
p[name] = []
for tt in t:
for k, v in pos[float(tt)].items():
p[k].append(v[0])
for name in motors:
ax.plot(t, p[name])
ax.legend(motors)
ax.set_xlabel('Time (in s)')
ax.set_ylabel('Position (in degree)')
[docs] def save(self, file):
""" Saves the :class:`~pypot.primitive.move.Move` to a json file.
.. note:: The format used to store the :class:`~pypot.primitive.move.Move` is extremely verbose and should be obviously optimized for long moves.
"""
d = {
'framerate': self.framerate,
'positions': self._timed_positions,
}
json.dump(d, file, indent=2)
[docs] @classmethod
def create(cls, d):
""" Create a :class:`~pypot.primitive.move.Move` from a dictionary. """
move = cls(d['framerate'])
move._timed_positions.update(d['positions'])
return move
[docs] @classmethod
def load(cls, file):
""" Loads a :class:`~pypot.primitive.move.Move` from a json file. """
d = json.load(file)
return cls.create(d)
[docs] @classmethod
def loads(cls, str):
""" Loads a :class:`~pypot.primitive.move.Move` from a json string. """
d = json.loads(str)
return cls.create(d)
[docs]class MoveRecorder(LoopPrimitive):
""" Primitive used to record a :class:`~pypot.primitive.move.Move`.
The recording can be :meth:`~pypot.primitive.primitive.Primitive.start` and :meth:`~pypot.primitive.primitive.Primitive.stop` by using the :class:`~pypot.primitive.primitive.LoopPrimitive` methods.
.. note:: Re-starting the recording will create a new :class:`~pypot.primitive.move.Move` losing all the previously stored data.
"""
def __init__(self, robot, freq, tracked_motors):
LoopPrimitive.__init__(self, robot, freq)
self.freq = freq
self.tracked_motors = map(self.get_mockup_motor, tracked_motors)
[docs] def setup(self):
self._move = Move(self.freq)
[docs] def update(self):
position = dict([(m.name, (m.present_position, m.present_speed))
for m in self.tracked_motors])
self._move.add_position(position, self.elapsed_time)
@property
def move(self):
""" Returns the currently recorded :class:`~pypot.primitive.move.Move`. """
return self._move
[docs] def add_tracked_motors(self, tracked_motors):
"""Add new motors to the recording"""
new_mockup_motors = map(self.get_mockup_motor, tracked_motors)
self.tracked_motors = list(set(self.tracked_motors + new_mockup_motors))
[docs]class MovePlayer(LoopPrimitive):
""" Primitive used to play a :class:`~pypot.primitive.move.Move`.
The playing can be :meth:`~pypot.primitive.primitive.Primitive.start` and :meth:`~pypot.primitive.primitive.Primitive.stop` by using the :class:`~pypot.primitive.primitive.LoopPrimitive` methods.
.. warning:: the primitive is run automatically the same framerate than the move record.
The play_speed attribute change only time lockup/interpolation
"""
def __init__(self, robot, move=None, play_speed=1.0, move_filename=None, start_max_speed=50, **kwargs):
self.move = move
self.backwards = False
if move_filename is not None:
with open(move_filename, 'r') as f:
self.move = Move.load(f)
self.play_speed = play_speed if play_speed != 0 and isinstance(play_speed, float) else 1.0
framerate = self.move.framerate if self.move is not None else 50.0
self.start_max_speed = start_max_speed if start_max_speed != 0 else np.inf
for key, value in kwargs.items():
setattr(self, key, value)
LoopPrimitive.__init__(self, robot, framerate)
[docs] def setup(self):
if self.move is None:
raise AttributeError("Attribute move is not defined")
self.period = 1.0 / self.move.framerate
self.positions = self.move.positions()
self.__duration = self.duration()
if self.play_speed < 0:
self.play_speed = - self.play_speed
self.backwards = not self.backwards
if self.play_speed == 0:
self.play_speed = 1.0
# Quick fix for limiting too fast movements at the play start
max_goto_time = 0
if self.backwards:
position = self.positions[self.__duration]
else:
position = self.positions[0]
for motor, value in position.iteritems():
motor = getattr(self.robot, motor)
motor.compliant = False
delta_angle = abs(motor.present_position - value[0])
if delta_angle > 5:
goto_time = delta_angle / self.start_max_speed
motor.goto_position(value[0], goto_time)
max_goto_time = goto_time if goto_time > max_goto_time else max_goto_time
time.sleep(max_goto_time)
[docs] def update(self):
if self.elapsed_time < self.__duration:
if self.backwards:
position = self.positions[(self.__duration - self.elapsed_time) * self.play_speed]
else:
position = self.positions[self.elapsed_time * self.play_speed]
for motor, value in position.iteritems():
# TODO: Ask pierre if its not a fgi to turn off the compliance
getattr(self.robot, motor).compliant = False
getattr(self.robot, motor).goal_position = value[0]
else:
self.stop()
[docs] def duration(self):
if self.move is not None:
return (len(self.move.positions()) / self.move.framerate) / self.play_speed
else:
return 1.0