# -*- coding: utf-8 -*-
import serial
import logging
import operator
import itertools
import threading
from collections import namedtuple, OrderedDict
from contextlib import contextmanager
from ..robot.io import AbstractIO
from .conversion import *
from .packet import *
logger = logging.getLogger(__name__)
# With this logger you should always provide as extra:
# - the port
# - the baudrate
# - the timeout
_DxlControl = namedtuple('_DxlControl', ('name',
'address', 'length', 'nb_elem',
'access',
'models',
'dxl_to_si', 'si_to_dxl',
'getter_name', 'setter_name'))
class _DxlAccess(object):
readonly, writeonly, readwrite = range(3)
[docs]class DxlIO(AbstractIO):
""" Low-level class to handle the serial communication with the robotis motors. """
__used_ports = set()
__controls = []
# MARK: - Open, Close and Flush the communication
def __init__(self,
port, baudrate=1000000, timeout=0.05,
use_sync_read=False,
error_handler_cls=None,
convert=True):
""" At instanciation, it opens the serial port and sets the communication parameters.
:param string port: the serial port to use (e.g. Unix (/dev/tty...), Windows (COM...)).
:param int baudrate: default for new motors: 57600, for PyPot motors: 1000000
:param float timeout: read timeout in seconds
:param bool use_sync_read: whether or not to use the SYNC_READ instruction
:param error_handler: set a handler that will receive the different errors
:type error_handler: :py:class:`~pypot.dynamixel.error.DxlErrorHandler`
:param bool convert: whether or not convert values to units expressed in the standard system
:raises: :py:exc:`~pypot.dynamixel.io.DxlError` if the port is already used.
"""
self._known_models = {}
self._known_mode = {}
self._sync_read = use_sync_read
self._error_handler = error_handler_cls() if error_handler_cls else None
self._convert = convert
self._serial_lock = threading.Lock()
self.open(port, baudrate, timeout)
def __enter__(self):
return self
def __del__(self):
self.close()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def __repr__(self):
return ('<DXL IO: closed={self.closed}, '
'port="{self.port}", '
'baudrate={self.baudrate}, '
'timeout={self.timeout}>').format(self=self)
[docs] def open(self, port, baudrate=1000000, timeout=0.05):
""" Opens a new serial communication (closes the previous communication if needed).
:raises: :py:exc:`~pypot.dynamixel.io.DxlError` if the port is already used.
"""
self._open(port, baudrate, timeout)
logger.info("Opening port '%s'", self.port,
extra={'port': port,
'baudrate': baudrate,
'timeout': timeout})
def _open(self, port, baudrate, timeout, max_recursion=500):
# Tries to connect to port until it succeeds to ping any motor on the bus.
# This is used to circumvent a bug with the driver for the USB2AX on Mac.
# Warning: If no motor is connected on the bus, this will run forever!!!
import platform
# import time
import pypot.utils.pypot_time as time
for i in range(max_recursion):
self._known_models.clear()
self._known_mode.clear()
with self._serial_lock:
self.close(_force_lock=True)
if port in self.__used_ports:
raise DxlError('port already used {}'.format(port))
# Dirty walkaround to fix a strange bug.
# Observed with the USB2AX on Linux with pyserial 2.7
# We have to first open/close the port in order to make it work
# at 1Mbauds
if platform.system() == 'Linux' and self._sync_read:
self._serial = serial.Serial(port, 9600)
self._serial.close()
self._serial = serial.Serial(port, baudrate, timeout=timeout)
self.__used_ports.add(port)
if platform.system() == 'Darwin' and self._sync_read:
if not self.ping(DxlBroadcast):
self.close()
continue
else:
time.sleep(self.timeout)
self.flush()
break
else:
raise DxlError('could not connect to the port {}'.format(self.port))
[docs] def close(self, _force_lock=False):
""" Closes the serial communication if opened. """
if not self.closed:
with self.__force_lock(_force_lock) or self._serial_lock:
self._serial.close()
self.__used_ports.remove(self.port)
logger.info("Closing port '%s'", self.port,
extra={'port': self.port,
'baudrate': self.baudrate,
'timeout': self.timeout})
[docs] def flush(self, _force_lock=False):
""" Flushes the serial communication (both input and output). """
if self.closed:
raise DxlError('attempt to flush a closed serial communication')
with self.__force_lock(_force_lock) or self._serial_lock:
self._serial.flushInput()
self._serial.flushOutput()
def __force_lock(self, condition):
@contextmanager
def with_True():
yield True
return with_True() if condition else False
# MARK: Properties of the serial communication
@property
def port(self):
""" Port used by the :class:`~pypot.dynamixel.io.DxlIO`. If set, will re-open a new connection. """
return self._serial.port
@port.setter
def port(self, value):
self.open(value, self.baudrate, self.timeout)
@property
def baudrate(self):
""" Baudrate used by the :class:`~pypot.dynamixel.io.DxlIO`. If set, will re-open a new connection. """
return self._serial.baudrate
@baudrate.setter
def baudrate(self, value):
self.open(self.port, value, self.timeout)
@property
def timeout(self):
""" Timeout used by the :class:`~pypot.dynamixel.io.DxlIO`. If set, will re-open a new connection. """
return self._serial.timeout
@timeout.setter
def timeout(self, value):
self.open(self.port, self.baudrate, value)
@property
def closed(self):
""" Checks if the connection is closed. """
return not (hasattr(self, '_serial') and self._serial.isOpen())
# MARK: - Motor discovery
[docs] def ping(self, id):
""" Pings the motor with the specified id.
.. note:: The motor id should always be included in [0, 253]. 254 is used for broadcast.
"""
pp = DxlPingPacket(id)
try:
self._send_packet(pp, error_handler=None)
return True
except DxlTimeoutError:
return False
[docs] def scan(self, ids=xrange(254)):
""" Pings all ids within the specified list, by default it finds all the motors connected to the bus. """
return [id for id in ids if self.ping(id)]
# MARK: - Specific Getter / Setter
[docs] def get_model(self, ids):
""" Gets the model for the specified motors. """
to_get_ids = [i for i in ids if i not in self._known_models]
models = [dxl_to_model(m) for m in self._get_model(to_get_ids, convert=False)]
self._known_models.update(zip(to_get_ids, models))
return tuple(self._known_models[id] for id in ids)
[docs] def change_id(self, new_id_for_id):
""" Changes the id of the specified motors (each id must be unique on the bus). """
if len(set(new_id_for_id.values())) < len(new_id_for_id):
raise ValueError('each id must be unique.')
for new_id in new_id_for_id.itervalues():
if self.ping(new_id):
raise ValueError('id {} is already used.'.format(new_id))
self._change_id(new_id_for_id)
for motor_id, new_id in new_id_for_id.iteritems():
if motor_id in self._known_models:
self._known_models[new_id] = self._known_models[motor_id]
del self._known_models[motor_id]
if motor_id in self._known_mode:
self._known_mode[new_id] = self._known_mode[motor_id]
del self._known_mode[motor_id]
[docs] def change_baudrate(self, baudrate_for_ids):
""" Changes the baudrate of the specified motors. """
self._change_baudrate(baudrate_for_ids)
for motor_id in baudrate_for_ids:
if motor_id in self._known_models:
del self._known_models[motor_id]
if motor_id in self._known_mode:
del self._known_mode[motor_id]
[docs] def get_status_return_level(self, ids, **kwargs):
""" Gets the status level for the specified motors. """
convert = kwargs['convert'] if 'convert' in kwargs else self._convert
srl = []
for id in ids:
try:
srl.extend(self._get_status_return_level((id, ),
error_handler=None, convert=convert))
except DxlTimeoutError as e:
if self.ping(id):
srl.append('never' if convert else 0)
else:
if self._error_handler:
self._error_handler.handle_timeout(e)
return ()
else:
raise e
return tuple(srl)
[docs] def set_status_return_level(self, srl_for_id, **kwargs):
""" Sets status return level to the specified motors. """
convert = kwargs['convert'] if 'convert' in kwargs else self._convert
if convert:
srl_for_id = dict(zip(srl_for_id.keys(),
[('never', 'read', 'always').index(s) for s in srl_for_id.values()]))
self._set_status_return_level(srl_for_id, convert=False)
[docs] def get_mode(self, ids):
""" Gets the mode ('joint' or 'wheel') for the specified motors. """
to_get_ids = [id for id in ids if id not in self._known_mode]
limits = self.get_angle_limit(to_get_ids, convert=False)
modes = ('wheel' if limit == (0, 0) else 'joint' for limit in limits)
self._known_mode.update(zip(to_get_ids, modes))
return tuple(self._known_mode[id] for id in ids)
[docs] def set_wheel_mode(self, ids):
""" Sets the specified motors to wheel mode. """
self._set_mode(dict(zip(ids, itertools.repeat('wheel'))))
[docs] def set_joint_mode(self, ids):
""" Sets the specified motors to joint mode. """
self._set_mode(dict(zip(ids, itertools.repeat('joint'))))
def _set_mode(self, mode_for_id):
models = ['MX' if m.startswith('MX') else '*' for m in self.get_model(list(mode_for_id.keys()))]
pos_max = [position_range[m][0] for m in models]
limits = ((0, 0) if mode == 'wheel' else (0, pos_max[i] - 1)
for i, mode in enumerate(mode_for_id.itervalues()))
self._set_angle_limit(dict(zip(mode_for_id.keys(), limits)), convert=False)
self._known_mode.update(mode_for_id.items())
[docs] def set_angle_limit(self, limit_for_id, **kwargs):
""" Sets the angle limit to the specified motors. """
convert = kwargs['convert'] if 'convert' in kwargs else self._convert
if 'wheel' in self.get_mode(limit_for_id.keys()):
raise ValueError('can not change the angle limit of a motor in wheel mode')
if (0, 0) in limit_for_id.values():
raise ValueError('can not set limit to (0, 0)')
self._set_angle_limit(limit_for_id, convert=convert)
[docs] def switch_led_on(self, ids):
""" Switches on the LED of the motors with the specified ids. """
self._set_LED(dict(zip(ids, itertools.repeat(True))))
[docs] def switch_led_off(self, ids):
""" Switches off the LED of the motors with the specified ids. """
self._set_LED(dict(zip(ids, itertools.repeat(False))))
[docs] def enable_torque(self, ids):
""" Enables torque of the motors with the specified ids. """
self._set_torque_enable(dict(zip(ids, itertools.repeat(True))))
[docs] def disable_torque(self, ids):
""" Disables torque of the motors with the specified ids. """
self._set_torque_enable(dict(zip(ids, itertools.repeat(False))))
[docs] def get_pid_gain(self, ids, **kwargs):
""" Gets the pid gain for the specified motors. """
return tuple([tuple(reversed(t)) for t in self._get_pid_gain(ids, **kwargs)])
[docs] def set_pid_gain(self, pid_for_id, **kwargs):
""" Sets the pid gain to the specified motors. """
pid_for_id = dict(itertools.izip(pid_for_id.iterkeys(),
[tuple(reversed(t)) for t in pid_for_id.values()]))
self._set_pid_gain(pid_for_id, **kwargs)
# MARK: - Generic Getter / Setter
[docs] def get_control_table(self, ids, **kwargs):
""" Gets the full control table for the specified motors.
..note:: This function requires the model for each motor to be known. Querring this additional information might add some extra delay.
"""
error_handler = kwargs['error_handler'] if ('error_handler' in kwargs) else self._error_handler
convert = kwargs['convert'] if ('convert' in kwargs) else self._convert
bl = ('goal position speed load', 'present position speed load')
controls = [c for c in self._DxlIO__controls if c.name not in bl]
res = []
for id, model in zip(ids, self.get_model(ids)):
controls = [c for c in controls if model in c.models]
controls = sorted(controls, key=lambda c: c.address)
address = controls[0].address
length = controls[-1].address + controls[-1].nb_elem * controls[-1].length
rp = DxlReadDataPacket(id, address, length)
sp = self._send_packet(rp, error_handler=error_handler)
d = OrderedDict()
for c in controls:
v = dxl_decode_all(sp.parameters[c.address:c.address + c.nb_elem * c.length], c.nb_elem)
d[c.name] = c.dxl_to_si(v, model) if convert else v
res.append(d)
return tuple(res)
@classmethod
def _generate_accessors(cls, control):
cls.__controls.append(control)
if control.access in (_DxlAccess.readonly, _DxlAccess.readwrite):
def my_getter(self, ids, **kwargs):
return self._get_control_value(control, ids, **kwargs)
func_name = control.getter_name if control.getter_name else 'get_{}'.format(control.name.replace(' ', '_'))
func_name = '_{}'.format(func_name) if hasattr(cls, func_name) else func_name
my_getter.__doc__ = 'Gets {} from the specified motors.'.format(control.name)
my_getter.__name__ = func_name
setattr(cls, func_name, my_getter)
if control.access in (_DxlAccess.writeonly, _DxlAccess.readwrite):
def my_setter(self, value_for_id, **kwargs):
self._set_control_value(control, value_for_id, **kwargs)
func_name = control.setter_name if control.setter_name else 'set_{}'.format(control.name.replace(' ', '_'))
func_name = '_{}'.format(func_name) if hasattr(cls, func_name) else func_name
my_setter.__doc__ = 'Sets {} to the specified motors.'.format(control.name)
my_setter.__name__ = func_name
setattr(cls, func_name, my_setter)
def _get_control_value(self, control, ids, **kwargs):
if not ids:
return ()
error_handler = kwargs['error_handler'] if ('error_handler' in kwargs) else self._error_handler
convert = kwargs['convert'] if ('convert' in kwargs) else self._convert
if self._sync_read and len(ids) > 1:
rp = DxlSyncReadPacket(ids, control.address, control.length * control.nb_elem)
sp = self._send_packet(rp, error_handler=error_handler)
if not sp:
return ()
values = sp.parameters
else:
values = []
for motor_id in ids:
rp = DxlReadDataPacket(motor_id, control.address, control.length * control.nb_elem)
sp = self._send_packet(rp, error_handler=error_handler)
if not sp:
return ()
values.extend(sp.parameters)
values = list(itertools.izip(*([iter(values)] * control.length * control.nb_elem)))
values = [dxl_decode_all(value, control.nb_elem) for value in values]
if not values:
return ()
# when using SYNC_READ instead of getting a timeout
# a non existing motor will "return" the maximum value
if self._sync_read:
max_val = 2 ** (8 * control.length) - 1
if max_val in (itertools.chain(*values) if control.nb_elem > 1 else values):
lost_ids = []
for i, v in enumerate(itertools.chain(*values) if control.nb_elem > 1 else values):
if v == max_val:
lost_ids.append(ids[i // control.nb_elem])
e = DxlTimeoutError(self, rp, list(set(lost_ids)))
if self._error_handler:
self._error_handler.handle_timeout(e)
return ()
else:
raise e
if convert:
models = self.get_model(ids)
if not models:
return ()
values = [control.dxl_to_si(v, m) for v, m in zip(values, models)]
return tuple(values)
def _set_control_value(self, control, value_for_id, **kwargs):
if not value_for_id:
return
convert = kwargs['convert'] if ('convert' in kwargs) else self._convert
if convert:
models = self.get_model(value_for_id.keys())
if not models:
return
value_for_id = dict(zip(value_for_id.keys(),
map(control.si_to_dxl, value_for_id.values(), models)))
data = []
for motor_id, value in value_for_id.iteritems():
data.extend(itertools.chain((motor_id, ),
dxl_code_all(value, control.length, control.nb_elem)))
wp = DxlSyncWritePacket(control.address, control.length * control.nb_elem, data)
self._send_packet(wp, wait_for_status_packet=False)
# MARK: - Send/Receive packet
def __real_send(self, instruction_packet, wait_for_status_packet, _force_lock):
if self.closed:
raise DxlError('try to send a packet on a closed serial communication')
logger.debug('Sending %s', instruction_packet,
extra={'port': self.port,
'baudrate': self.baudrate,
'timeout': self.timeout})
with self.__force_lock(_force_lock) or self._serial_lock:
self.flush(_force_lock=True)
data = instruction_packet.to_string()
nbytes = self._serial.write(data)
if len(data) != nbytes:
raise DxlCommunicationError(self,
'instruction packet not entirely sent',
instruction_packet)
if not wait_for_status_packet:
return
data = self._serial.read(DxlPacketHeader.length)
if not data:
raise DxlTimeoutError(self, instruction_packet, instruction_packet.id)
try:
header = DxlPacketHeader.from_string(data)
data += self._serial.read(header.packet_length)
status_packet = DxlStatusPacket.from_string(data)
except ValueError:
msg = 'could not parse received data {}'.format(bytearray(data))
raise DxlCommunicationError(self, msg, instruction_packet)
logger.debug('Receiving %s', status_packet,
extra={'port': self.port,
'baudrate': self.baudrate,
'timeout': self.timeout})
return status_packet
def _send_packet(self,
instruction_packet, wait_for_status_packet=True,
error_handler=None,
_force_lock=False):
if not error_handler:
return self.__real_send(instruction_packet, wait_for_status_packet, _force_lock)
try:
sp = self.__real_send(instruction_packet, wait_for_status_packet, _force_lock)
if sp and sp.error:
errors = decode_error(sp.error)
for e in errors:
handler_name = 'handle_{}'.format(e.lower().replace(' ', '_'))
f = operator.methodcaller(handler_name, instruction_packet)
f(error_handler)
return sp
except DxlTimeoutError as e:
error_handler.handle_timeout(e)
except DxlCommunicationError as e:
error_handler.handle_communication_error(e)
# MARK: - Dxl Errors
[docs]class DxlError(Exception):
""" Base class for all errors encountered using :class:`~pypot.dynamixel.io.DxlIO`. """
pass
[docs]class DxlCommunicationError(DxlError):
""" Base error for communication error encountered when using :class:`~pypot.dynamixel.io.DxlIO`. """
def __init__(self, dxl_io, message, instruction_packet):
self.dxl_io = dxl_io
self.message = message
self.instruction_packet = instruction_packet
def __str__(self):
return '{self.message} after sending {self.instruction_packet}'.format(self=self)
[docs]class DxlTimeoutError(DxlCommunicationError):
""" Timeout error encountered when using :class:`~pypot.dynamixel.io.DxlIO`. """
def __init__(self, dxl_io, instruction_packet, ids):
DxlCommunicationError.__init__(self, dxl_io, 'timeout occured', instruction_packet)
self.ids = ids
def __str__(self):
return 'motors {} did not respond after sending {}'.format(self.ids, self.instruction_packet)
# MARK: - Generate the accessors
def _add_control(name,
address, length=2, nb_elem=1,
access=_DxlAccess.readwrite,
models=set(dynamixelModels.values()),
dxl_to_si=lambda val, model: val,
si_to_dxl=lambda val, model: val,
getter_name=None,
setter_name=None):
control = _DxlControl(name,
address, length, nb_elem,
access,
models,
dxl_to_si, si_to_dxl,
getter_name, setter_name)
DxlIO._generate_accessors(control)
_add_control('model',
address=0x00,
access=_DxlAccess.readonly,
dxl_to_si=dxl_to_model)
_add_control('firmware',
address=0x02, length=1,
access=_DxlAccess.readonly)
_add_control('id',
address=0x03, length=1,
access=_DxlAccess.writeonly,
setter_name='change_id')
_add_control('baudrate',
address=0x04, length=1,
access=_DxlAccess.writeonly,
setter_name='change_baudrate',
si_to_dxl=baudrate_to_dxl)
_add_control('return delay time',
address=0x05, length=1,
dxl_to_si=dxl_to_rdt,
si_to_dxl=rdt_to_dxl)
_add_control('angle limit',
address=0x06, nb_elem=2,
dxl_to_si=lambda value, model: (dxl_to_degree(value[0], model),
dxl_to_degree(value[1], model)),
si_to_dxl=lambda value, model: (degree_to_dxl(value[0], model),
degree_to_dxl(value[1], model)))
_add_control('drive mode',
address=0x0A, length=1,
access=_DxlAccess.readwrite,
models=('MX-106', ),
dxl_to_si=dxl_to_drive_mode,
si_to_dxl=drive_mode_to_dxl)
_add_control('highest temperature limit',
address=0x0B, length=1,
dxl_to_si=dxl_to_temperature,
si_to_dxl=temperature_to_dxl)
_add_control('voltage limit',
address=0x0C, length=1, nb_elem=2,
dxl_to_si=lambda value, model: (dxl_to_voltage(value[0], model),
dxl_to_voltage(value[1], model)),
si_to_dxl=lambda value, model: (voltage_to_dxl(value[0], model),
voltage_to_dxl(value[1], model)))
_add_control('max torque',
address=0x0E,
dxl_to_si=dxl_to_torque,
si_to_dxl=torque_to_dxl)
_add_control('status return level',
address=0x10, length=1,
dxl_to_si=dxl_to_status,
si_to_dxl=status_to_dxl)
_add_control('alarm LED',
address=0x11, length=1,
dxl_to_si=dxl_to_alarm,
si_to_dxl=alarm_to_dxl)
_add_control('alarm shutdown',
address=0x12, length=1,
dxl_to_si=dxl_to_alarm,
si_to_dxl=alarm_to_dxl)
_add_control('torque_enable',
address=0x18, length=1,
dxl_to_si=dxl_to_bool,
si_to_dxl=bool_to_dxl,
getter_name='is_torque_enabled',
setter_name='_set_torque_enable')
_add_control('LED',
address=0x19, length=1,
dxl_to_si=dxl_to_bool,
si_to_dxl=bool_to_dxl,
setter_name='_set_LED',
getter_name='is_led_on')
_add_control('pid gain',
address=0x1A, length=1, nb_elem=3,
models=('MX-12', 'MX-28', 'MX-64', 'MX-106'),
dxl_to_si=dxl_to_pid,
si_to_dxl=pid_to_dxl)
_add_control('compliance margin',
address=0x1A, length=1, nb_elem=2,
models=('AX-12', 'AX-18', 'RX-28', 'RX-64'))
_add_control('compliance slope',
address=0x1C, length=1, nb_elem=2,
models=('AX-12', 'AX-18', 'RX-28', 'RX-64'))
_add_control('goal position',
address=0x1E,
dxl_to_si=dxl_to_degree,
si_to_dxl=degree_to_dxl)
_add_control('moving speed',
address=0x20,
dxl_to_si=dxl_to_speed,
si_to_dxl=speed_to_dxl)
_add_control('torque limit',
address=0x22,
dxl_to_si=dxl_to_torque,
si_to_dxl=torque_to_dxl)
_add_control('goal position speed load',
address=0x1E, nb_elem=3,
dxl_to_si=lambda value, model: (dxl_to_degree(value[0], model),
dxl_to_speed(value[1], model),
dxl_to_load(value[2], model)),
si_to_dxl=lambda value, model: (degree_to_dxl(value[0], model),
speed_to_dxl(value[1], model),
torque_to_dxl(value[2], model)))
_add_control('present position',
address=0x24,
access=_DxlAccess.readonly,
dxl_to_si=dxl_to_degree)
_add_control('present speed',
address=0x26,
access=_DxlAccess.readonly,
dxl_to_si=dxl_to_speed)
_add_control('present load',
address=0x28,
access=_DxlAccess.readonly,
dxl_to_si=dxl_to_load)
_add_control('present position speed load',
address=0x24, nb_elem=3,
access=_DxlAccess.readonly,
dxl_to_si=lambda value, model: (dxl_to_degree(value[0], model),
dxl_to_speed(value[1], model),
dxl_to_load(value[2], model)))
_add_control('present voltage',
address=0x2A, length=1,
access=_DxlAccess.readonly,
dxl_to_si=dxl_to_voltage)
_add_control('present temperature',
address=0x2B, length=1,
access=_DxlAccess.readonly,
dxl_to_si=dxl_to_temperature)
_add_control('moving',
address=0x2E, length=1,
access=_DxlAccess.readonly,
dxl_to_si=dxl_to_bool,
getter_name='is_moving')