# -*- coding: utf-8 -*-
import numpy
import itertools
from collections import namedtuple
name = 'v1'
DxlBroadcast = 254
[docs]class DxlInstruction(object):
PING = 0x01
READ_DATA = 0x02
WRITE_DATA = 0x03
RESET = 0x06
SYNC_WRITE = 0x83
SYNC_READ = 0x84
# MARK: - Packet Header
# MARK: - Instruction Packet
[docs]class DxlInstructionPacket(namedtuple('DxlInstructionPacket',
('id', 'instruction', 'parameters'))):
""" This class is used to represent a dynamixel instruction packet.
An instruction packet is constructed as follows:
[0xFF, 0xFF, ID, LENGTH, INSTRUCTION, PARAM 1, PARAM 2, ..., PARAM N, CHECKSUM]
(for more details see http://support.robotis.com/en/product/dxl_main.htm)
"""
[docs] def to_array(self):
return bytearray(itertools.chain(DxlPacketHeader.marker,
(self.id, self.length, self.instruction),
self.parameters,
(self.checksum, )))
[docs] def to_string(self):
return bytes(self.to_array())
@property
def length(self):
return len(self.parameters) + 2
@property
def checksum(self):
return int(255 - ((self.id + self.length + self.instruction +
sum(self.parameters)) % 256))
[docs]class DxlPingPacket(DxlInstructionPacket):
""" This class is used to represent ping packet. """
def __new__(cls, id):
return DxlInstructionPacket.__new__(cls, id, DxlInstruction.PING, ())
def __repr__(self):
return 'DxlPingPacket(id={})'.format(self.id)
[docs]class DxlResetPacket(DxlInstructionPacket):
""" This class is used to represent reset packet. """
def __new__(cls):
return DxlInstructionPacket.__new__(cls, DxlBroadcast,
DxlInstruction.RESET, ())
[docs]class DxlReadDataPacket(DxlInstructionPacket):
""" This class is used to represent read data packet (to read value). """
def __new__(cls, id, address, length):
return DxlInstructionPacket.__new__(cls, id,
DxlInstruction.READ_DATA,
(address, length))
def __repr__(self):
return ('DxlReadDataPacket(id={}, address={}'
', length={})'.format(self.id,
self.parameters[0],
self.parameters[1]))
[docs]class DxlSyncReadPacket(DxlInstructionPacket):
""" This class is used to represent sync read packet (to synchronously read values). """
def __new__(cls, ids, address, length):
return DxlInstructionPacket.__new__(cls, DxlBroadcast,
DxlInstruction.SYNC_READ,
tuple(itertools.chain((address, length),
ids)))
def __repr__(self):
return ('DxlSyncReadDataPacket(ids={}, '
'address={}, length={})'.format(self.parameters[2:],
self.parameters[0],
self.parameters[1]))
[docs]class DxlWriteDataPacket(DxlInstructionPacket):
""" This class is used to reprensent write data packet (to write value). """
def __new__(cls, id, address, coded_value):
return DxlInstructionPacket.__new__(cls, id,
DxlInstruction.WRITE_DATA,
tuple(itertools.chain((address,),
coded_value)))
def __repr__(self):
return ('DxlWriteDataPacket(id={}, '
'address={}, value={})'.format(self.id,
self.parameters[0],
self.parameters[1:]))
[docs]class DxlSyncWritePacket(DxlInstructionPacket):
""" This class is used to represent sync write packet (to synchronously write values). """
def __new__(cls, address, length, id_value_couples):
return DxlInstructionPacket.__new__(cls, DxlBroadcast,
DxlInstruction.SYNC_WRITE,
tuple(itertools.chain((address, length),
id_value_couples)))
def __repr__(self):
address = self.parameters[0]
length = self.parameters[1]
a = numpy.array(self.parameters[2:]).reshape((-1, length + 1))
ids = a[:, 0]
values = [tuple(v) for v in a[:, 1:]]
return ('DxlSyncWriteDataPacket(ids={}, '
'address={}, length={}, values={})'.format(ids,
address,
length,
values))
# MARK: - Status Packet
[docs]class DxlStatusPacket(namedtuple('DxlStatusPacket', ('id', 'error', 'parameters'))):
""" This class is used to represent a dynamixel status packet.
A status packet is constructed as follows:
[0xFF, 0xFF, ID, LENGTH, ERROR, PARAM 1, PARAM 2, ..., PARAM N, CHECKSUM]
(for more details see http://support.robotis.com/en/product/dxl_main.htm)
"""
[docs] @classmethod
def from_string(cls, data):
packet = bytearray(data)
header = DxlPacketHeader.from_string(packet[:4])
if len(packet) != DxlPacketHeader.length + header.packet_length \
or cls._checksum(packet) != packet[-1]:
raise ValueError('try to parse corrupted data ({})'.format(packet))
return cls(header.id, packet[4], tuple(packet[5:-1]))
@classmethod
def _checksum(cls, packet):
return int(255 - (sum(packet[2:-1]) % 256))