Source code for bokeh.util.tornado

""" Internal utils related to Tornado

"""
from __future__ import absolute_import, print_function

import logging
log = logging.getLogger(__name__)

from tornado import gen

@gen.coroutine
[docs]def yield_for_all_futures(result): """ Converts result into a Future by collapsing any futures inside result. If result is a Future we yield until it's done, then if the value inside the Future is another Future we yield until it's done as well, and so on. """ while True: try: future = gen.convert_yielded(result) except gen.BadYieldError: # result is not a yieldable thing, we are done break else: result = yield future raise gen.Return(result)
class _AsyncPeriodic(object): """Like ioloop.PeriodicCallback except the 'func' can be async and return a Future, and we wait for func to finish each time before we call it again. Plain ioloop.PeriodicCallback can "pile up" invocations if they are taking too long. """ def __init__(self, func, period, io_loop): self._func = func self._loop = io_loop self._period = period self._started = False self._stopped = False # this is like gen.sleep but uses our IOLoop instead of the # current IOLoop def sleep(self): f = gen.Future() self._loop.call_later(self._period / 1000.0, lambda: f.set_result(None)) return f def start(self): if self._started: raise RuntimeError("called start() twice on _AsyncPeriodic") self._started = True def invoke(): # important to start the sleep before starting callback # so any initial time spent in callback "counts against" # the period. sleep_future = self.sleep() result = self._func() try: callback_future = gen.convert_yielded(result) except gen.BadYieldError: # result is not a yieldable thing return sleep_future else: return gen.multi([sleep_future, callback_future]) def on_done(future): if not self._stopped: self._loop.add_future(invoke(), on_done) if future.exception() is not None: log.error("Error thrown from periodic callback: %r", future.exception()) self._loop.add_future(self.sleep(), on_done) def stop(self): self._stopped = True class _CallbackGroup(object): """ A collection of callbacks added to a Tornado IOLoop that we may want to remove as a group. """ def __init__(self, io_loop=None): if io_loop is None: raise ValueError("must provide an io loop") self._loop = io_loop # dicts from callback to remove callable. These are # separate only because it's allowed to add the same # callback as multiple kinds of callback at once. self._next_tick_callbacks = {} self._timeout_callbacks = {} self._periodic_callbacks = {} def remove_all_callbacks(self): """ Removes all registered callbacks.""" for cb in list(self._next_tick_callbacks.keys()): self.remove_next_tick_callback(cb) for cb in list(self._timeout_callbacks.keys()): self.remove_timeout_callback(cb) for cb in list(self._periodic_callbacks.keys()): self.remove_periodic_callback(cb) def _error_on_double_remove(self, callback, callbacks): if callback not in callbacks: raise ValueError("Removing a callback twice (or after it's already been run)") def _remover(self, callback, callbacks, cleanup): self._error_on_double_remove(callback, callbacks) del callbacks[callback] if cleanup is not None: cleanup(callback) def _wrap_next_tick(self, callback, cleanup): # this 'removed' flag is a hack because Tornado has no way # to remove a "next tick" callback added with # IOLoop.add_callback. So instead we make our wrapper skip # invoking the callback. handle = { 'removed' : False } def wrapper(*args, **kwargs): was_removed = handle['removed'] if not was_removed: self.remove_next_tick_callback(callback) return callback(*args, **kwargs) else: return None wrapper.handle = handle return wrapper def add_next_tick_callback(self, callback, cleanup=None): """ Adds a callback to be run on the next tick. Returns a callable that removes the callback if called.""" if callback in self._next_tick_callbacks: raise ValueError("Next-tick callback added twice") wrapper = self._wrap_next_tick(callback, cleanup) self._loop.add_callback(wrapper) def remover(): wrapper.handle['removed'] = True self._remover(callback, self._next_tick_callbacks, cleanup) self._next_tick_callbacks[callback] = remover return remover def _remove(self, callback, callbacks): self._error_on_double_remove(callback, callbacks) callbacks[callback]() def remove_next_tick_callback(self, callback): """ Removes a callback added with add_next_tick_callback.""" self._remove(callback, self._next_tick_callbacks) def _wrap_timeout(self, callback): def wrapper(*args, **kwargs): self.remove_timeout_callback(callback) return callback(*args, **kwargs) return wrapper def add_timeout_callback(self, callback, timeout_milliseconds, cleanup=None): """ Adds a callback to be run once after timeout_milliseconds. Returns a callable that removes the callback if called.""" if callback in self._timeout_callbacks: raise ValueError("Callback added as a timeout twice") handle = self._loop.call_later(timeout_milliseconds / 1000.0, self._wrap_timeout(callback)) def remover(): self._loop.remove_timeout(handle) self._remover(callback, self._timeout_callbacks, cleanup) self._timeout_callbacks[callback] = remover return remover def remove_timeout_callback(self, callback): """ Removes a callback added with add_timeout_callback, before it runs.""" self._remove(callback, self._timeout_callbacks) def add_periodic_callback(self, callback, period_milliseconds, cleanup=None): """ Adds a callback to be run every period_milliseconds until it is removed.""" if callback in self._periodic_callbacks: raise ValueError("Callback added as a periodic callback twice") cb = _AsyncPeriodic( callback, period_milliseconds, io_loop=self._loop ) def remover(): cb.stop() self._remover(callback, self._periodic_callbacks, cleanup) self._periodic_callbacks[callback] = remover cb.start() return remover def remove_periodic_callback(self, callback): """ Removes a callback added with add_periodic_callback.""" self._remove(callback, self._periodic_callbacks)