Source code for pypot.vrep.controller

from numpy import rad2deg, deg2rad
from collections import deque

from ..robot.controller import MotorsController, SensorsController
from ..dynamixel.conversion import torque_max
from ..robot.sensor import Sensor
from .io import remote_api


[docs]class VrepController(MotorsController): """ V-REP motors controller. """ def __init__(self, vrep_io, scene, motors, sync_freq=50., id=None): """ :param vrep_io: vrep io instance :type vrep_io: :class:`~pypot.vrep.io.VrepIO` :param str scene: path to the V-REP scene file to start :param list motors: list of motors attached to the controller :param float sync_freq: synchronization frequency :param int id: robot id in simulator (useful when using a scene with multiple robots) """ MotorsController.__init__(self, vrep_io, motors, sync_freq) self.id = id if scene is not None: vrep_io.load_scene(scene, start=True)
[docs] def setup(self): """ Setups the controller by reading/setting position for all motors. """ self._init_vrep_streaming() # Init lifo for temperature spoofing for m in self.motors: m.__dict__['_load_fifo'] = deque(200 * [1], maxlen=200) self.update()
[docs] def update(self): """ Synchronization update loop. At each update all motor position are read from vrep and set to the motors. The motors target position are also send to v-rep. """ # Read all the angle limits h, _, l, _ = self.io.call_remote_api('simxGetObjectGroupData', remote_api.sim_object_joint_type, 16, streaming=True) limits4handle = {hh: (ll, lr) for hh, ll, lr in zip(h, l[::2], l[1::2])} for m in self.motors: tmax = torque_max[m.model] # Read values from V-REP and set them to the Motor p = round( rad2deg(self.io.get_motor_position(motor_name=self._motor_name(m))), 1) m.__dict__['present_position'] = p l = 100. * self.io.get_motor_force(motor_name=self._motor_name(m)) / tmax m.__dict__['present_load'] = l m.__dict__['_load_fifo'].append(abs(l)) m.__dict__['present_temperature'] = 25 + \ round(2.5 * sum(m.__dict__['_load_fifo']) / len(m.__dict__['_load_fifo']), 1) ll, lr = limits4handle[self.io._object_handles[self._motor_name(m)]] m.__dict__['lower_limit'] = rad2deg(ll) m.__dict__['upper_limit'] = rad2deg(ll) + rad2deg(lr) # Send new values from Motor to V-REP p = deg2rad(round(m.__dict__['goal_position'], 1)) self.io.set_motor_position(motor_name=self._motor_name(m), position=p) t = m.__dict__['torque_limit'] * tmax / 100. if m.__dict__['compliant']: t = 0. self.io.set_motor_force(motor_name=self._motor_name(m), force=t)
def _init_vrep_streaming(self): # While the code below may look redundant and that # it could be simplified. It is written as such to # speed-up the initialization of the streaming process. # Here, we initalized all streaming and then wait for # them to be ready at once. # Prepare streaming for getting position for each motor for m in self.motors: for vrep_call in ['simxGetJointPosition', 'simxGetJointForce']: self.io.call_remote_api(vrep_call, self.io.get_object_handle(self._motor_name(m)), streaming=True, _force=True) # Now actually retrieves all values pos = [self.io.get_motor_position(self._motor_name(m)) for m in self.motors] # Prepare streaming for setting position for each motor for m, p in zip(self.motors, pos): self.io.call_remote_api('simxSetJointTargetPosition', self.io.get_object_handle(self._motor_name(m)), p, sending=True, _force=True) for m in self.motors: self.io.call_remote_api('simxSetJointForce', self.io.get_object_handle(self._motor_name(m)), torque_max[m.model], sending=True, _force=True) # Prepare streaming for the angle limit self.io.call_remote_api('simxGetObjectGroupData', remote_api.sim_object_joint_type, 16, streaming=True, _force=True) # And actually affect them for m, p in zip(self.motors, pos): self.io.set_motor_position(self._motor_name(m), p) m.__dict__['goal_position'] = rad2deg(p) for m in self.motors: self.io.set_motor_force(self._motor_name(m), torque_max[m.model]) m.__dict__['torque_limit'] = 100. m.__dict__['compliant'] = False def _motor_name(self, m): if self.id is None: return m.name else: return '{}{}'.format(m.name, self.id)
[docs]class VrepObjectTracker(SensorsController): """ Tracks the 3D position and orientation of a V-REP object. """
[docs] def setup(self): """ Forces a first update to trigger V-REP streaming. """ self.update()
[docs] def update(self): """ Updates the position and orientation of the tracked objects. """ for s in self.sensors: s.position = self.io.get_object_position(object_name=s.name) s.orientation = self.io.get_object_orientation(object_name=s.name)
[docs]class VrepCollisionDetector(Sensor): def __init__(self, name): Sensor.__init__(self, name) self._colliding = False @property def colliding(self): return self._colliding @colliding.setter def colliding(self, new_state): self._colliding = new_state
[docs]class VrepCollisionTracker(SensorsController): """ Tracks collision state. """
[docs] def setup(self): """ Forces a first update to trigger V-REP streaming. """ self.update()
[docs] def update(self): """ Update the state of the collision detectors. """ for s in self.sensors: s.colliding = self.io.get_collision_state(collision_name=s.name)