Source code for pypot.utils
from collections import namedtuple
from operator import attrgetter
from threading import Event
from .stoppablethread import StoppableThread, StoppableLoopThread
from .pypot_time import time
from .flushed_print import flushed_print
Point2D = namedtuple('Point2D', ('x', 'y'))
Point3D = namedtuple('Point3D', ('x', 'y', 'z'))
Point = Point3D
Vector3D = namedtuple('Vector3D', ('x', 'y', 'z'))
Vector = Vector3D
Quaternion = namedtuple('Quaternion', ('x', 'y', 'z', 'w'))
[docs]def attrsetter(item):
def resolve_attr(obj, attr):
if not attr:
return obj
for name in attr.split('.'):
obj = getattr(obj, name)
return obj
def g(obj, value):
var_path, _, var_name = item.rpartition('.')
setattr(resolve_attr(obj, var_path), var_name, value)
return g
[docs]class SyncEvent(object):
def __init__(self, period=.1):
self._event = Event()
self._needed = False
self._last_sync = 0.
self.period = period
[docs] def request(self):
self._needed = True
self._event.wait()
self._event.clear()
[docs] def done(self):
self._event.set()
self._last_sync = time()
self._needed = False
@property
def is_recent(self):
return (time() - self._last_sync) < self.period
@property
def needed(self):
return self._needed and not self.is_recent