This docs on this page refers to a PREVIOUS VERSION. For the latest stable release, go to https://docs.bokeh.org/

Archived docs for versions <= 1.0.4 have had to be modified from their original published configuration, and may be missing some features (e.g. source listing)

All users are encourage to update to version 1.1 or later, as soon as they are able.

bokeh.util.tornado — Bokeh 0.12.5 documentation

Source code for bokeh.util.tornado

""" Internal utils related to Tornado

"""
from __future__ import absolute_import, print_function

import logging
log = logging.getLogger(__name__)

from tornado import gen

@gen.coroutine
[docs]def yield_for_all_futures(result): """ Converts result into a Future by collapsing any futures inside result. If result is a Future we yield until it's done, then if the value inside the Future is another Future we yield until it's done as well, and so on. """ while True: try: future = gen.convert_yielded(result) except gen.BadYieldError: # result is not a yieldable thing, we are done break else: result = yield future raise gen.Return(result)
class _AsyncPeriodic(object): """Like ioloop.PeriodicCallback except the 'func' can be async and return a Future, and we wait for func to finish each time before we call it again. Plain ioloop.PeriodicCallback can "pile up" invocations if they are taking too long. """ def __init__(self, func, period, io_loop): self._func = func self._loop = io_loop self._period = period self._started = False self._stopped = False # this is like gen.sleep but uses our IOLoop instead of the # current IOLoop def sleep(self): f = gen.Future() self._loop.call_later(self._period / 1000.0, lambda: f.set_result(None)) return f def start(self): if self._started: raise RuntimeError("called start() twice on _AsyncPeriodic") self._started = True def invoke(): # important to start the sleep before starting callback # so any initial time spent in callback "counts against" # the period. sleep_future = self.sleep() result = self._func() try: callback_future = gen.convert_yielded(result) except gen.BadYieldError: # result is not a yieldable thing return sleep_future else: return gen.multi([sleep_future, callback_future]) def on_done(future): if not self._stopped: self._loop.add_future(invoke(), on_done) if future.exception() is not None: log.error("Error thrown from periodic callback: %r", future.exception()) self._loop.add_future(self.sleep(), on_done) def stop(self): self._stopped = True class _CallbackGroup(object): """ A collection of callbacks added to a Tornado IOLoop that we may want to remove as a group. """ def __init__(self, io_loop=None): if io_loop is None: raise ValueError("must provide an io loop") self._loop = io_loop # dicts from callback to remove callable. These are # separate only because it's allowed to add the same # callback as multiple kinds of callback at once. self._next_tick_callbacks = {} self._timeout_callbacks = {} self._periodic_callbacks = {} def remove_all_callbacks(self): """ Removes all registered callbacks.""" for cb in list(self._next_tick_callbacks.keys()): self.remove_next_tick_callback(cb) for cb in list(self._timeout_callbacks.keys()): self.remove_timeout_callback(cb) for cb in list(self._periodic_callbacks.keys()): self.remove_periodic_callback(cb) def _error_on_double_remove(self, callback, callbacks): if callback not in callbacks: raise ValueError("Removing a callback twice (or after it's already been run)") def _remover(self, callback, callbacks, cleanup): self._error_on_double_remove(callback, callbacks) del callbacks[callback] if cleanup is not None: cleanup(callback) def _wrap_next_tick(self, callback, cleanup): # this 'removed' flag is a hack because Tornado has no way # to remove a "next tick" callback added with # IOLoop.add_callback. So instead we make our wrapper skip # invoking the callback. handle = { 'removed' : False } def wrapper(*args, **kwargs): was_removed = handle['removed'] if not was_removed: self.remove_next_tick_callback(callback) return callback(*args, **kwargs) else: return None wrapper.handle = handle return wrapper def add_next_tick_callback(self, callback, cleanup=None): """ Adds a callback to be run on the next tick. Returns a callable that removes the callback if called.""" if callback in self._next_tick_callbacks: raise ValueError("Next-tick callback added twice") wrapper = self._wrap_next_tick(callback, cleanup) self._loop.add_callback(wrapper) def remover(): wrapper.handle['removed'] = True self._remover(callback, self._next_tick_callbacks, cleanup) self._next_tick_callbacks[callback] = remover return remover def _remove(self, callback, callbacks): self._error_on_double_remove(callback, callbacks) callbacks[callback]() def remove_next_tick_callback(self, callback): """ Removes a callback added with add_next_tick_callback.""" self._remove(callback, self._next_tick_callbacks) def _wrap_timeout(self, callback): def wrapper(*args, **kwargs): self.remove_timeout_callback(callback) return callback(*args, **kwargs) return wrapper def add_timeout_callback(self, callback, timeout_milliseconds, cleanup=None): """ Adds a callback to be run once after timeout_milliseconds. Returns a callable that removes the callback if called.""" if callback in self._timeout_callbacks: raise ValueError("Callback added as a timeout twice") handle = self._loop.call_later(timeout_milliseconds / 1000.0, self._wrap_timeout(callback)) def remover(): self._loop.remove_timeout(handle) self._remover(callback, self._timeout_callbacks, cleanup) self._timeout_callbacks[callback] = remover return remover def remove_timeout_callback(self, callback): """ Removes a callback added with add_timeout_callback, before it runs.""" self._remove(callback, self._timeout_callbacks) def add_periodic_callback(self, callback, period_milliseconds, cleanup=None): """ Adds a callback to be run every period_milliseconds until it is removed.""" if callback in self._periodic_callbacks: raise ValueError("Callback added as a periodic callback twice") cb = _AsyncPeriodic( callback, period_milliseconds, io_loop=self._loop ) def remover(): cb.stop() self._remover(callback, self._periodic_callbacks, cleanup) self._periodic_callbacks[callback] = remover cb.start() return remover def remove_periodic_callback(self, callback): """ Removes a callback added with add_periodic_callback.""" self._remove(callback, self._periodic_callbacks)