Source code for pypot.vrep.io

import os
import time
import ctypes

from threading import Lock

from .remoteApiBindings import vrep as remote_api
from ..robot.io import AbstractIO


vrep_error = {
    remote_api.simx_return_ok: 'Ok',
    remote_api.simx_return_novalue_flag: 'No value',
    remote_api.simx_return_timeout_flag: 'Timeout',
    remote_api.simx_return_illegal_opmode_flag: 'Opmode error',
    remote_api.simx_return_remote_error_flag: 'Remote error',
    remote_api.simx_return_split_progress_flag: 'Progress error',
    remote_api.simx_return_local_error_flag: 'Local error',
    remote_api.simx_return_initialize_error_flag: 'Init error'
}

vrep_mode = {
    'normal': remote_api.simx_opmode_oneshot_wait,
    'streaming': remote_api.simx_opmode_streaming,
    'sending': remote_api.simx_opmode_oneshot,
}


[docs]class VrepIO(AbstractIO): """ This class is used to get/set values from/to a V-REP scene. It is based on V-REP remote API (http://www.coppeliarobotics.com/helpFiles/en/remoteApiOverview.htm). """ MAX_ITER = 5 TIMEOUT = 0.4 def __init__(self, vrep_host='127.0.0.1', vrep_port=19997, scene=None, start=False): """ Starts the connection with the V-REP remote API server. :param str vrep_host: V-REP remote API server host :param int vrep_port: V-REP remote API server port :param str scene: path to a V-REP scene file :param bool start: whether to start the scene after loading it .. warning:: Only one connection can be established with the V-REP remote server API. So before trying to connect make sure that all previously started connections have been closed (see :func:`~pypot.vrep.io.close_all_connections`) """ self._object_handles = {} self._lock = Lock() self.vrep_host = vrep_host self.vrep_port = vrep_port self.scene = scene self.start = start # self.client_id = remote_api.simxStart( # vrep_host, vrep_port, True, True, 5000, 5) # if self.client_id == -1: # msg = ('Could not connect to V-REP server on {}:{}. ' # 'This could also means that you still have ' # 'a previously opened connection running! ' # '(try pypot.vrep.close_all_connections())') # raise VrepConnectionError(msg.format(vrep_host, vrep_port)) # if scene is not None: # self.load_scene(scene, start) self.open_io()
[docs] def open_io(self): self.client_id = remote_api.simxStart( self.vrep_host, self.vrep_port, True, True, 5000, 5) if self.client_id == -1: msg = ('Could not connect to V-REP server on {}:{}. ' 'This could also means that you still have ' 'a previously opened connection running! ' '(try pypot.vrep.close_all_connections())') raise VrepConnectionError( msg.format(self.vrep_host, self.vrep_port)) if self.scene is not None: self.load_scene(self.scene, self.start)
[docs] def close(self): """ Closes the current connection. """ with self._lock: remote_api.simxFinish(self.client_id)
[docs] def load_scene(self, scene_path, start=False): """ Loads a scene on the V-REP server. :param str scene_path: path to a V-REP scene file :param bool start: whether to directly start the simulation after loading the scene .. note:: It is assumed that the scene file is always available on the server side. """ self.stop_simulation() if not os.path.exists(scene_path): raise IOError("No such file or directory: '{}'".format(scene_path)) self.call_remote_api('simxLoadScene', scene_path, True) if start: self.start_simulation()
[docs] def start_simulation(self): """ Starts the simulation. .. note:: Do nothing if the simulation is already started. .. warning:: if you start the simulation just after stopping it, the simulation will likely not be started. Use :meth:`~pypot.vrep.io.VrepIO.restart_simulation` instead. """ self.call_remote_api('simxStartSimulation') # We have to force a sleep # Otherwise it may causes troubles?? time.sleep(0.5)
[docs] def restart_simulation(self): """ Re-starts the simulation. """ self.stop_simulation() # We have to force a sleep # Otherwise the simulation is not restarted time.sleep(0.5) self.start_simulation()
[docs] def stop_simulation(self): """ Stops the simulation. """ self.call_remote_api('simxStopSimulation')
[docs] def pause_simulation(self): """ Pauses the simulation. """ self.call_remote_api('simxPauseSimulation')
[docs] def resume_simulation(self): """ Resumes the simulation. """ self.start_simulation()
[docs] def get_motor_position(self, motor_name): """ Gets the motor current position. """ return self.call_remote_api('simxGetJointPosition', self.get_object_handle(motor_name), streaming=True)
[docs] def set_motor_position(self, motor_name, position): """ Sets the motor target position. """ self.call_remote_api('simxSetJointTargetPosition', self.get_object_handle(motor_name), position, sending=True)
[docs] def get_motor_force(self, motor_name): """ Retrieves the force or torque applied to a joint along/about its active axis. """ return self.call_remote_api('simxGetJointForce', self.get_object_handle(motor_name), streaming=True)
[docs] def set_motor_force(self, motor_name, force): """ Sets the maximum force or torque that a joint can exert. """ self.call_remote_api('simxSetJointForce', self.get_object_handle(motor_name), force, sending=True)
[docs] def get_object_position(self, object_name, relative_to_object=None): """ Gets the object position. """ h = self.get_object_handle(object_name) relative_handle = (-1 if relative_to_object is None else self.get_object_handle(relative_to_object)) return self.call_remote_api('simxGetObjectPosition', h, relative_handle, streaming=True)
[docs] def set_object_position(self, object_name, position=[0, 0, 0]): """ Sets the object position. """ h = self.get_object_handle(object_name) return self.call_remote_api('simxSetObjectPosition', h, -1, position, sending=True)
[docs] def get_object_orientation(self, object_name, relative_to_object=None): """ Gets the object orientation. """ h = self.get_object_handle(object_name) relative_handle = (-1 if relative_to_object is None else self.get_object_handle(relative_to_object)) return self.call_remote_api('simxGetObjectOrientation', h, relative_handle, streaming=True)
def _get_object_handle(self, obj): return self.call_remote_api('simxGetObjectHandle', obj)
[docs] def get_object_handle(self, obj): """ Gets the vrep object handle. """ if obj not in self._object_handles: self._object_handles[obj] = self._get_object_handle(obj=obj) return self._object_handles[obj]
[docs] def get_collision_state(self, collision_name): """ Gets the collision state. """ return self.call_remote_api('simxReadCollision', self.get_collision_handle(collision_name), streaming=True)
def _get_collision_handle(self, collision): return self.call_remote_api('simxGetCollisionHandle', collision)
[docs] def get_collision_handle(self, collision): """ Gets a vrep collisions handle. """ if collision not in self._object_handles: h = self._get_collision_handle(collision) self._object_handles[collision] = h return self._object_handles[collision]
[docs] def get_simulation_current_time(self, timer='CurrentTime'): """ Gets the simulation current time. """ try: return self.call_remote_api('simxGetFloatSignal', timer, streaming=True) except VrepIOErrors: return 0.0
[docs] def add_cube(self, name, position, sizes, mass): """ Add Cube """ self._create_pure_shape(0, 239, sizes, mass, [0, 0]) self.set_object_position("Cuboid", position) self.change_object_name("Cuboid", name)
[docs] def add_sphere(self, name, position, sizes, mass, precision=[10, 10]): """ Add Sphere """ self._create_pure_shape(1, 239, sizes, mass, precision) self.set_object_position("Sphere", position) self.change_object_name("Sphere", name)
[docs] def add_cylinder(self, name, position, sizes, mass, precision=[10, 10]): """ Add Cylinder """ self._create_pure_shape(2, 239, sizes, mass, precision) self.set_object_position("Cylinder", position) self.change_object_name("Cylinder", name)
[docs] def add_cone(self, name, position, sizes, mass, precision=[10, 10]): """ Add Cone """ self._create_pure_shape(3, 239, sizes, mass, precision) self.set_object_position("Cylinder", position) self.change_object_name("Cylinder", name)
[docs] def change_object_name(self, old_name, new_name): """ Change object name """ h = self._get_object_handle(old_name) if old_name in self._object_handles: self._object_handles.pop(old_name) lua_code = "simSetObjectName({}, '{}')".format(h, new_name) self._inject_lua_code(lua_code)
def _create_pure_shape(self, primitive_type, options, sizes, mass, precision): """ Create Pure Shape """ lua_code = "simCreatePureShape({}, {}, {{{}, {}, {}}}, {}, {{{}, {}}})".format( primitive_type, options, sizes[0], sizes[1], sizes[2], mass, precision[0], precision[1]) self._inject_lua_code(lua_code) def _inject_lua_code(self, lua_code): """ Sends raw lua code and evaluate it wihtout any checking! """ msg = (ctypes.c_ubyte * len(lua_code)).from_buffer_copy(lua_code.encode()) self.call_remote_api('simxWriteStringStream', 'my_lua_code', msg)
[docs] def call_remote_api(self, func_name, *args, **kwargs): """ Calls any remote API func in a thread_safe way. :param str func_name: name of the remote API func to call :param args: args to pass to the remote API call :param kwargs: args to pass to the remote API call .. note:: You can add an extra keyword to specify if you want to use the streaming or sending mode. The oneshot_wait mode is used by default (see `here <http://www.coppeliarobotics.com/helpFiles/en/remoteApiConstants.htm#operationModes>`_ for details about possible modes). .. warning:: You should not pass the clientId and the operationMode as arguments. They will be automatically added. As an example you can retrieve all joints name using the following call:: vrep_io.remote_api_call('simxGetObjectGroupData', vrep_io.remote_api.sim_object_joint_type, 0, streaming=True) """ f = getattr(remote_api, func_name) mode = self._extract_mode(kwargs) kwargs['operationMode'] = vrep_mode[mode] # hard_retry = True if '_force' in kwargs: del kwargs['_force'] _force = True else: _force = False for _ in range(VrepIO.MAX_ITER): with self._lock: ret = f(self.client_id, *args, **kwargs) if _force: return if mode == 'sending' or isinstance(ret, int): err, res = ret, None else: err, res = ret[0], ret[1:] res = res[0] if len(res) == 1 else res err = [bool((err >> i) & 1) for i in range(len(vrep_error))] if remote_api.simx_return_novalue_flag not in err: break time.sleep(VrepIO.TIMEOUT) # if any(err) and hard_retry: # print "HARD RETRY" # self.stop_simulation() #nope # # notconnected = True # while notconnected: # self.close() # close_all_connections() # time.sleep(0.5) # try: # self.open_io() # notconnected = False # except: # print 'CONNECTION ERROR' # pass # # self.start_simulation() # # with self._lock: # ret = f(self.client_id, *args, **kwargs) # # if mode == 'sending' or isinstance(ret, int): # err, res = ret, None # else: # err, res = ret[0], ret[1:] # res = res[0] if len(res) == 1 else res # # err = [bool((err >> i) & 1) for i in range(len(vrep_error))] # # return res if any(err): msg = ' '.join([vrep_error[2 ** i] for i, e in enumerate(err) if e]) raise VrepIOErrors(msg) return res
def _extract_mode(self, kwargs): for mode in ('streaming', 'sending'): if mode in kwargs: kwargs.pop(mode) return mode return 'normal'
[docs]def close_all_connections(): """ Closes all opened connection to V-REP remote API server. """ remote_api.simxFinish(-1)
# V-REP Errors
[docs]class VrepIOError(Exception): """ Base class for V-REP IO Errors. """ def __init__(self, error_code, message): message = 'V-REP error code {} ({}): "{}"'.format( error_code, vrep_error[error_code], message) Exception.__init__(self, message)
[docs]class VrepIOErrors(Exception): pass
[docs]class VrepConnectionError(Exception): """ Base class for V-REP connection Errors. """ pass