Source code for pypot.server.rest

import os

from operator import attrgetter
from pypot.primitive.move import MovePlayer, MoveRecorder, Move


[docs]class RESTRobot(object): """ REST API for a Robot. Through the REST API you can currently access: * the motors list (and the aliases) * the registers list for a specific motor * read/write a value from/to a register of a specific motor * the sensors list * the registers list for a specific motor * read/write a value from/to a register of a specific motor * the primitives list (and the active) * start/stop primitives """ def __init__(self, robot): self.robot = robot # Access motor related values
[docs] def get_motors_list(self, alias='motors'): return [m.name for m in getattr(self.robot, alias)]
[docs] def get_motor_registers_list(self, motor): return self._get_register_value(motor, 'registers')
# alias to above method
[docs] def get_registers_list(self, motor): return self.get_motor_registers_list(motor)
[docs] def get_motor_register_value(self, motor, register): return self._get_register_value(motor, register)
# alias to above method
[docs] def get_register_value(self, motor, register): return self.get_motor_register_value(motor, register)
[docs] def set_motor_register_value(self, motor, register, value): self._set_register_value(motor, register, value)
# alias to above method
[docs] def set_register_value(self, motor, register, value): self.set_motor_register_value(motor, register, value)
[docs] def get_motors_alias(self): return self.robot.alias
[docs] def set_goto_position_for_motor(self, motor, position, duration): m = getattr(self.robot, motor) m.goto_position(position, duration, wait=False)
# Access sensor related values
[docs] def get_sensors_list(self): return [s.name for s in self.robot.sensors]
[docs] def get_sensors_registers_list(self, sensor): return self._get_register_value(sensor, 'registers')
[docs] def get_sensor_register_value(self, sensor, register): return self._get_register_value(sensor, register)
[docs] def set_sensor_register_value(self, sensor, register, value): return self._set_register_value(sensor, register, value)
# Access primitive related values
[docs] def get_primitives_list(self): return [p.name for p in self.robot.primitives]
[docs] def get_running_primitives_list(self): return [p.name for p in self.robot.active_primitives if hasattr(p, 'name')]
[docs] def start_primitive(self, primitive): self._call_primitive_method(primitive, 'start')
[docs] def stop_primitive(self, primitive): self._call_primitive_method(primitive, 'stop')
[docs] def pause_primitive(self, primitive): self._call_primitive_method(primitive, 'pause')
[docs] def resume_primitive(self, primitive): self._call_primitive_method(primitive, 'resume')
[docs] def get_primitive_properties_list(self, primitive): return getattr(self.robot, primitive).properties
[docs] def get_primitive_property(self, primitive, property): return self._get_register_value(primitive, property)
[docs] def set_primitive_property(self, primitive, property, value): self._set_register_value(primitive, property, value)
[docs] def get_primitive_methods_list(self, primitive): return getattr(self.robot, primitive).methods
[docs] def call_primitive_method(self, primitive, method, kwargs): self._call_primitive_method(primitive, method, **kwargs)
def _set_register_value(self, object, register, value): o = getattr(self.robot, object) setattr(o, register, value) def _get_register_value(self, object, register): return attrgetter('{}.{}'.format(object, register))(self.robot) def _call_primitive_method(self, primitive, method_name, *args, **kwargs): p = getattr(self.robot, primitive) f = getattr(p, method_name) return f(*args, **kwargs) # TODO (Theo) : change names with a dic instead of ugly format
[docs] def start_move_recorder(self, move_name, motors_name=None): if not hasattr(self.robot, '_{}_recorder'.format(move_name)): if motors_name is not None: motors = [getattr(self.robot, m) for m in motors_name] else: motors = getattr(self.robot, 'motors') recorder = MoveRecorder(self.robot, 50, motors) self.robot.attach_primitive(recorder, '_{}_recorder'.format(move_name)) recorder.start() else: recorder = getattr(self.robot, '_{}_recorder'.format(move_name)) recorder.start()
[docs] def attach_move_recorder(self, move_name, motors_name): motors = [getattr(self.robot, m) for m in motors_name] recorder = MoveRecorder(self.robot, 50, motors) self.robot.attach_primitive(recorder, '_{}_recorder'.format(move_name))
[docs] def get_move_recorder_motors(self, move_name): try: recorder = getattr(self.robot, '_{}_recorder'.format(move_name)) return [str(m.name) for m in recorder.tracked_motors] except AttributeError: return None
[docs] def stop_move_recorder(self, move_name): """Allow more easily than stop_primitive() to save in a filename the recorded move""" recorder = getattr(self.robot, '_{}_recorder'.format(move_name)) recorder.stop() with open('{}.record'.format(move_name), 'w') as f: recorder.move.save(f) # Stop player if running : to discuss # Recording a playing move can produce strange outputs, but could be a good feature try: player = getattr(self.robot, '_{}_player'.format(move_name)) if player.running: player.stop() except AttributeError: pass
[docs] def start_move_player(self, move_name, speed=1.0, backwards=False): """Move player need to have a move file <move_name.record> in the working directory to play it""" # check if running try: player = getattr(self.robot, '_{}_player'.format(move_name)) if player.running: return except AttributeError: pass # if not running, override the play primitive with open('{}.record'.format(move_name)) as f: loaded_move = Move.load(f) player = MovePlayer(self.robot, loaded_move, play_speed=speed, backwards=backwards) self.robot.attach_primitive(player, '_{}_player'.format(move_name)) player.start() return player.duration()
[docs] def get_available_record_list(self): """Get list of json recorded movement files""" return [f.split('.record')[0] for f in os.listdir('.') if f.endswith('.record')]
[docs] def remove_move_record(self, move_name): """Remove the json recorded movement file""" return os.remove('{}.record'.format(move_name))