Source code for pypot.dynamixel.controller

import time
import logging

from .io.abstract_io import DxlError
from ..robot.controller import MotorsController

logger = logging.getLogger(__name__)


[docs]class DxlController(MotorsController): def __init__(self, io, motors, sync_freq, synchronous, mode, regname, varname=None): MotorsController.__init__(self, io, motors, sync_freq) self.ids = [m.id for m in self.working_motors] self.synchronous = synchronous self.mode = mode self.regname = regname self.varname = regname if varname is None else varname for m in motors: if mode == 'get': m._read_synchronous[self.varname] = self.synchronous else: m._write_synchronous[self.varname] = self.synchronous @property def working_motors(self): return [m for m in self.motors if not m._broken] @property def synced_motors(self): motors = [m for m in self.working_motors if self.varname in m.registers] if self.synchronous: motors = ([m for m in motors if m._read_synced[self.varname].needed] if self.mode == 'get' else [m for m in motors if m._write_synced[self.varname].needed]) return motors
[docs] def setup(self): if self.mode == 'set': MAX_TRIALS = 25 for _ in range(MAX_TRIALS): if self.get_register(self.working_motors, disable_sync_read=True): break time.sleep(0.1) else: raise IOError('Cannot initialize syncloop for "{}". You need to desactivate sync_read if you use a usb2dynamixel device. '.format( self.regname))
[docs] def update(self): if not self.synced_motors: return return (self.get_register(self.synced_motors) if self.mode == 'get' else self.set_register(self.synced_motors))
[docs] def get_register(self, motors, disable_sync_read=False): """ Gets the value from the specified register and sets it to the :class:`~pypot.dynamixel.motor.DxlMotor`. """ if not motors: return False ids = [m.id for m in motors] getter = getattr(self.io, 'get_{}'.format(self.regname)) values = (sum([list(getter([id])) for id in ids], []) if disable_sync_read else getter(ids)) if not values: return False for m, val in zip(motors, values): m.__dict__[self.varname] = val for m in motors: m._read_synced[self.varname].done() return True
[docs] def set_register(self, motors): """ Gets the value from :class:`~pypot.dynamixel.motor.DxlMotor` and sets it to the specified register. """ if not motors: return ids = [m.id for m in motors] values = (m.__dict__[self.varname] for m in motors) getattr(self.io, 'set_{}'.format(self.regname))(dict(zip(ids, values))) for m in motors: m._write_synced[self.varname].done()
[docs]class AngleLimitRegisterController(DxlController): def __init__(self, io, motors, sync_freq, synchronous): DxlController.__init__(self, io, motors, sync_freq, synchronous, 'get', 'angle_limit') self.varnames = ['lower_limit', 'upper_limit'] for m in motors: for var in self.varnames: m._read_synchronous[var] = self.synchronous @property def synced_motors(self): motors = self.working_motors if self.synchronous: sync_motors = [] for m in motors: for var in self.varnames: if m._read_synced[var].needed: sync_motors.append(m) motors = sync_motors return motors
[docs] def get_register(self, motors): if not motors: return ids = [m.id for m in motors] values = self.io.get_angle_limit(ids) for m, val in zip(motors, values): m.__dict__['lower_limit'], m.__dict__['upper_limit'] = val for m in motors: for var in ['lower_limit', 'upper_limit']: m._read_synced[var].done()
[docs]class PosSpeedLoadDxlController(DxlController): def __init__(self, io, motors, sync_freq): DxlController.__init__(self, io, motors, sync_freq, False, 'get', 'present_position')
[docs] def setup(self): torques = self.io.is_torque_enabled(self.ids) for m, c in zip(self.working_motors, torques): m.compliant = not c self._old_torques = torques self._old_goals = {m.id: 0.0 for m in self.motors} try: values = self.io.get_goal_position_speed_load(self.ids) positions, speeds, loads = zip(*values) except ValueError: raise DxlError("Couldn't initialize pos/speed/load sync loop!") for m, p, s, l in zip(self.working_motors, positions, speeds, loads): m.__dict__['goal_position'] = p m.__dict__['moving_speed'] = s m.__dict__['torque_limit'] = l
[docs] def update(self): self.get_present_position_speed_load(self.working_motors) self.set_goal_position_speed_load(self.working_motors)
[docs] def get_present_position_speed_load(self, motors): ids = [m.id for m in motors] values = self.io.get_present_position_speed_load(ids) if not values: logger.warning('Timeout when getting pos/speed/load from %s', ids) return positions, speeds, loads = zip(*values) for m, p, s, l in zip(motors, positions, speeds, loads): m.__dict__['present_position'] = p m.__dict__['present_speed'] = s m.__dict__['present_load'] = l
[docs] def set_goal_position_speed_load(self, motors): change_torque = {} torques = [not m.compliant for m in motors] for m, t, old_t in zip(motors, torques, self._old_torques): if t != old_t: change_torque[m.id] = t self._old_torques = torques if change_torque: self.io._set_torque_enable(change_torque) rigid_motors = [] for m in motors: # Filter force control motors - only update values if goal_position has changed if getattr(m, "force_control_enable", False) and not m.compliant and self._old_goals[m.id] != m.__dict__['goal_position']: rigid_motors += [m] self._old_goals[m.id] = m.__dict__['goal_position'] # Do not filter motors without force control elif not m.compliant: rigid_motors += [m] ids = tuple(m.id for m in rigid_motors) if not ids: return values = ((m.__dict__['goal_position'], m.__dict__['moving_speed'], m.__dict__['torque_limit']) for m in rigid_motors) self.io.set_goal_position_speed_load(dict(zip(ids, values)))