Source code for pypot.utils.stoppablethread

import threading

from . import pypot_time as time


[docs]class StoppableThread(object): """ Stoppable version of python Thread. This class provides the following mechanism on top of "classical" python Thread: * you can stop the thread (if you defined your run method accordingly). * you can restart a thread (stop it and re-run it) * you can pause/resume a thread .. warning:: It is up to the subclass to correctly respond to the stop, pause/resume signals (see :meth:`~pypot.utils.stoppablethread.StoppableThread.run` for details). """ def __init__(self, setup=None, target=None, teardown=None): """ :param func setup: specific setup function to use (otherwise self.setup) :param func target: specific target function to use (otherwise self.run) :param func teardown: specific teardown function to use (otherwise self.teardown) """ self._started = threading.Event() self._running = threading.Event() self._resume = threading.Event() self._setup = self.setup if setup is None else setup self._target = self.run if target is None else target self._teardown = self.teardown if teardown is None else teardown self._crashed = False
[docs] def start(self): """ Start the run method as a new thread. It will first stop the thread if it is already running. """ if self.running: self.stop() self._thread = threading.Thread(target=self._wrapped_target) self._thread.daemon = True self._thread.start()
[docs] def stop(self, wait=True): """ Stop the thread. More precisely, sends the stopping signal to the thread. It is then up to the run method to correctly responds. """ if self.started: self._running.clear() self._resume.set() # We cannot wait for ourself if wait and (threading.current_thread() != self._thread): while self._thread.is_alive(): self._running.clear() self._resume.set() self._thread.join(timeout=1.0) self._started.clear() self._resume.clear()
[docs] def join(self): """ Wait for the thread termination. """ if not self.started: raise RuntimeError('cannot join thread before it is started') self._thread.join()
@property def running(self): """ Whether the thread is running. """ return self._running.is_set() @property def started(self): """ Whether the thread has been started. """ return self._started.is_set()
[docs] def wait_to_start(self, allow_failure=False): """ Wait for the thread to actually starts. """ self._started.wait() if self._crashed and not allow_failure: self._thread.join() raise RuntimeError('Setup failed, see {} Traceback' 'for details.'.format(self._thread.name))
[docs] def should_stop(self): """ Signals if the thread should be stopped or not. """ return not self.running
[docs] def wait_to_stop(self): """ Wait for the thread to terminate. """ if not self.started: self.wait_to_start() self.join()
[docs] def setup(self): """ Setup method call just before the run. """ pass
[docs] def run(self): """ Run method of the thread. .. note:: In order to be stoppable (resp. pausable), this method has to check the running property - as often as possible to improve responsivness - and terminate when :meth:`~pypot.utils.stoppablethread.StoppableThread.should_stop` (resp. :meth:`~pypot.utils.stoppablethread.StoppableThread.should_pause`) becomes True. For instance:: while self.should_stop(): do_atom_work() ... """ pass
[docs] def teardown(self): """ Teardown method call just after the run. """ pass
def _wrapped_target(self): try: self._setup() self._started.set() self._resume.set() self._running.set() self._target() self._running.clear() self._teardown() # In case something goes wrong within the thread # we try/catch the exceptions # clear all Condition locks (to avoid blocking main thread) # and re-raise exception for backtrace except Exception: self._crashed = True self._started.set() self._running.clear() self._resume.clear() raise
[docs] def should_pause(self): """ Signals if the thread should be paused or not. """ return self.paused
@property def paused(self): return not self._resume.is_set()
[docs] def pause(self): """ Requests the thread to pause. """ self._resume.clear()
[docs] def resume(self): """ Requests the thread to resume. """ self._resume.set()
[docs] def wait_to_resume(self): """ Waits until the thread is resumed. """ self._resume.wait()
[docs]def make_update_loop(thread, update_func): """ Makes a run loop which calls an update function at a predefined frequency. """ while not thread.should_stop(): if thread.should_pause(): thread.wait_to_resume() start = time.time() if hasattr(thread, '_updated'): thread._updated.clear() update_func() if hasattr(thread, '_updated'): thread._updated.set() end = time.time() dt = thread.period - (end - start) if dt > 0: time.sleep(dt)
[docs]class StoppableLoopThread(StoppableThread): """ LoopThread calling an update method at a pre-defined frequency. .. note:: This class does not mean to be accurate. The given frequency will be approximately followed - depending for instance on CPU load - and only reached if the update method takes less time than the chosen loop period. """ def __init__(self, frequency, update=None): """ :params float frequency: called frequency of the :meth:`~pypot.stoppablethread.StoppableLoopThread.update` method """ StoppableThread.__init__(self) self.period = 1.0 / frequency self._update = self.update if update is None else update self._updated = threading.Event()
[docs] def run(self): """ Called the update method at the pre-defined frequency. """ make_update_loop(self, self._update)
[docs] def update(self): """ Update method called at the pre-defined frequency. """ pass