Source code for pypot.vrep
import json
import logging
from functools import partial
from collections import OrderedDict
from .io import (VrepIO, close_all_connections,
VrepIOError, VrepConnectionError)
from .controller import VrepController, VrepObjectTracker
from .controller import VrepCollisionTracker, VrepCollisionDetector
from ..robot import Robot
from ..robot.sensor import ObjectTracker
from ..robot.config import motor_from_confignode, make_alias
import pypot.utils.pypot_time as pypot_time
import time as sys_time
logger = logging.getLogger(__name__)
[docs]class vrep_time():
def __init__(self, vrep_io):
self.io = vrep_io
[docs] def get_time(self, trial=0):
t = self.io.get_simulation_current_time()
if t == 0:
sys_time.sleep(.5)
return self.get_time(trial + 1)
if trial > 10:
raise EnvironmentError('Could not get current simulation time. Make sure the V-REP simulation is running. And that you have added the "time" child script to your scene.')
return t
[docs] def sleep(self, t):
if t > 1000: # That's probably due to an error in get_time
logger.warning('Big vrep sleep: {}'.format(t))
t = 1
t0 = self.get_time()
while (self.get_time() - t0) < t:
if self.get_time() < t0:
break
sys_time.sleep(0.01)
[docs]def from_vrep(config, vrep_host='127.0.0.1', vrep_port=19997, scene=None,
tracked_objects=[], tracked_collisions=[],
id=None, shared_vrep_io=None):
""" Create a robot from a V-REP instance.
:param config: robot configuration (either the path to the json or directly the dictionary)
:type config: str or dict
:param str vrep_host: host of the V-REP server
:param int vrep_port: port of the V-REP server
:param str scene: path to the V-REP scene to load and start
:param list tracked_objects: list of V-REP dummy object to track
:param list tracked_collisions: list of V-REP collision to track
:param int id: robot id in simulator (useful when using a scene with multiple robots)
:param vrep_io: use an already connected VrepIO (useful when using a scene with multiple robots)
:type vrep_io: :class:`~pypot.vrep.io.VrepIO`
This function tries to connect to a V-REP instance and expects to find motors with names corresponding as the ones found in the config.
.. note:: The :class:`~pypot.robot.robot.Robot` returned will also provide a convenience reset_simulation method which resets the simulation and the robot position to its intial stance.
.. note:: Using the same configuration, you should be able to switch from a real to a simulated robot just by switching from :func:`~pypot.robot.config.from_config` to :func:`~pypot.vrep.from_vrep`.
For instance::
import json
with open('my_config.json') as f:
config = json.load(f)
from pypot.robot import from_config
from pypot.vrep import from_vrep
real_robot = from_config(config)
simulated_robot = from_vrep(config, '127.0.0.1', 19997, 'poppy.ttt')
"""
if shared_vrep_io is None:
vrep_io = VrepIO(vrep_host, vrep_port)
else:
vrep_io = shared_vrep_io
vreptime = vrep_time(vrep_io)
pypot_time.time = vreptime.get_time
pypot_time.sleep = vreptime.sleep
if isinstance(config, basestring):
with open(config) as f:
config = json.load(f, object_pairs_hook=OrderedDict)
motors = [motor_from_confignode(config, name)
for name in config['motors'].keys()]
vc = VrepController(vrep_io, scene, motors, id=id)
vc._init_vrep_streaming()
sensor_controllers = []
if tracked_objects:
sensors = [ObjectTracker(name) for name in tracked_objects]
vot = VrepObjectTracker(vrep_io, sensors)
sensor_controllers.append(vot)
if tracked_collisions:
sensors = [VrepCollisionDetector(name) for name in tracked_collisions]
vct = VrepCollisionTracker(vrep_io, sensors)
sensor_controllers.append(vct)
robot = Robot(motor_controllers=[vc],
sensor_controllers=sensor_controllers)
for m in robot.motors:
m.goto_behavior = 'minjerk'
init_pos = {m: m.goal_position for m in robot.motors}
make_alias(config, robot)
def start_simu():
vrep_io.start_simulation()
for m, p in init_pos.iteritems():
m.goal_position = p
vc.start()
if tracked_objects:
vot.start()
if tracked_collisions:
vct.start()
while vrep_io.get_simulation_current_time() < 1.:
sys_time.sleep(0.1)
def stop_simu():
if tracked_objects:
vot.stop()
if tracked_collisions:
vct.stop()
vc.stop()
vrep_io.stop_simulation()
def reset_simu():
stop_simu()
sys_time.sleep(0.5)
start_simu()
robot.start_simulation = start_simu
robot.stop_simulation = stop_simu
robot.reset_simulation = reset_simu
def current_simulation_time(robot):
return robot._controllers[0].io.get_simulation_current_time()
Robot.current_simulation_time = property(lambda robot: current_simulation_time(robot))
def get_object_position(robot, object, relative_to_object=None):
return vrep_io.get_object_position(object, relative_to_object)
Robot.get_object_position = partial(get_object_position, robot)
def get_object_orientation(robot, object, relative_to_object=None):
return vrep_io.get_object_orientation(object, relative_to_object)
Robot.get_object_orientation = partial(get_object_orientation, robot)
return robot