Source code for bokeh.util.tornado

""" Internal utils related to Tornado

"""
from __future__ import absolute_import, print_function

import logging
import threading
from collections import defaultdict

log = logging.getLogger(__name__)

from traceback import format_exception

from tornado import gen
from ..util.serialization import make_id
from ..util import deprecation


[docs]@gen.coroutine def yield_for_all_futures(result): """ Converts result into a Future by collapsing any futures inside result. If result is a Future we yield until it's done, then if the value inside the Future is another Future we yield until it's done as well, and so on. """ while True: # This is needed for Tornado >= 4.5 where convert_yielded will no # longer raise BadYieldError on None if result is None: break try: future = gen.convert_yielded(result) except gen.BadYieldError: # result is not a yieldable thing, we are done break else: result = yield future raise gen.Return(result)
class _AsyncPeriodic(object): """Like ioloop.PeriodicCallback except the 'func' can be async and return a Future, and we wait for func to finish each time before we call it again. Plain ioloop.PeriodicCallback can "pile up" invocations if they are taking too long. """ def __init__(self, func, period, io_loop): self._func = func self._loop = io_loop self._period = period self._started = False self._stopped = False # this is like gen.sleep but uses our IOLoop instead of the # current IOLoop def sleep(self): f = gen.Future() self._loop.call_later(self._period / 1000.0, lambda: f.set_result(None)) return f def start(self): if self._started: raise RuntimeError("called start() twice on _AsyncPeriodic") self._started = True def invoke(): # important to start the sleep before starting callback # so any initial time spent in callback "counts against" # the period. sleep_future = self.sleep() result = self._func() # This is needed for Tornado >= 4.5 where convert_yielded will no # longer raise BadYieldError on None if result is None: return sleep_future try: callback_future = gen.convert_yielded(result) except gen.BadYieldError: # result is not a yieldable thing return sleep_future else: return gen.multi([sleep_future, callback_future]) def on_done(future): if not self._stopped: self._loop.add_future(invoke(), on_done) if future.exception() is not None: log.error("Error thrown from periodic callback:") log.error("".join(format_exception(*future.exc_info()))) self._loop.add_future(self.sleep(), on_done) def stop(self): self._stopped = True class _CallbackGroup(object): """ A collection of callbacks added to a Tornado IOLoop that we may want to remove as a group. """ def __init__(self, io_loop=None): if io_loop is None: raise ValueError("must provide an io loop") self._loop = io_loop # dicts from callback to remove callable. These are # separate only because it's allowed to add the same # callback as multiple kinds of callback at once. self._next_tick_callback_removers = {} self._timeout_callback_removers = {} self._periodic_callback_removers = {} self._removers_lock = threading.Lock() self._next_tick_removers_by_callable = defaultdict(set) self._timeout_removers_by_callable = defaultdict(set) self._periodic_removers_by_callable = defaultdict(set) def remove_all_callbacks(self): """ Removes all registered callbacks.""" for cb_id in list(self._next_tick_callback_removers.keys()): self.remove_next_tick_callback(cb_id) for cb_id in list(self._timeout_callback_removers.keys()): self.remove_timeout_callback(cb_id) for cb_id in list(self._periodic_callback_removers.keys()): self.remove_periodic_callback(cb_id) def _get_removers_ids_by_callable(self, removers): if removers is self._next_tick_callback_removers: return self._next_tick_removers_by_callable elif removers is self._timeout_callback_removers: return self._timeout_removers_by_callable elif removers is self._periodic_callback_removers: return self._periodic_removers_by_callable else: raise RuntimeError('Unhandled removers', removers) def _assign_deprecated_remover(self, callback, callback_id, removers): self._get_removers_ids_by_callable(removers)[callback].add(callback_id) def _assign_remover(self, callback, callback_id, removers, remover): with self._removers_lock: if callback_id is None: callback_id = make_id() elif callback_id in removers: raise ValueError("A callback of the same type has already been added with this ID") removers[callback_id] = remover self._assign_deprecated_remover(callback, callback_id, removers) return callback_id def _create_deprecated_remover(self, callback, removers): deprecation.deprecated((0, 12, 15), 'The ability to remove a callback function using its value', 'a value returned from the function that adds a callback') callback_ids = self._get_removers_ids_by_callable(removers).pop(callback) to_run = [removers.pop(i) for i in callback_ids] def remover(): for f in to_run: f() return remover def _execute_remover(self, callback_id, removers): try: with self._removers_lock: if callable(callback_id): remover = self._create_deprecated_remover(callback_id, removers) else: remover = removers.pop(callback_id) for cb_ids in self._get_removers_ids_by_callable(removers).values(): try: cb_ids.remove(callback_id) except KeyError: pass except KeyError: raise ValueError("Removing a callback twice (or after it's already been run)") remover() def add_next_tick_callback(self, callback, callback_id=None): """ Adds a callback to be run on the next tick. Returns an ID that can be used with remove_next_tick_callback.""" def wrapper(*args, **kwargs): # this 'removed' flag is a hack because Tornado has no way # to remove a "next tick" callback added with # IOLoop.add_callback. So instead we make our wrapper skip # invoking the callback. if not wrapper.removed: self.remove_next_tick_callback(callback_id) return callback(*args, **kwargs) else: return None wrapper.removed = False def remover(): wrapper.removed = True callback_id = self._assign_remover(callback, callback_id, self._next_tick_callback_removers, remover) self._loop.add_callback(wrapper) return callback_id def remove_next_tick_callback(self, callback_id): """ Removes a callback added with add_next_tick_callback.""" self._execute_remover(callback_id, self._next_tick_callback_removers) def add_timeout_callback(self, callback, timeout_milliseconds, callback_id=None): """ Adds a callback to be run once after timeout_milliseconds. Returns an ID that can be used with remove_timeout_callback.""" def wrapper(*args, **kwargs): self.remove_timeout_callback(callback_id) return callback(*args, **kwargs) handle = None def remover(): if handle is not None: self._loop.remove_timeout(handle) callback_id = self._assign_remover(callback, callback_id, self._timeout_callback_removers, remover) handle = self._loop.call_later(timeout_milliseconds / 1000.0, wrapper) return callback_id def remove_timeout_callback(self, callback_id): """ Removes a callback added with add_timeout_callback, before it runs.""" self._execute_remover(callback_id, self._timeout_callback_removers) def add_periodic_callback(self, callback, period_milliseconds, callback_id=None): """ Adds a callback to be run every period_milliseconds until it is removed. Returns an ID that can be used with remove_periodic_callback.""" cb = _AsyncPeriodic(callback, period_milliseconds, io_loop=self._loop) callback_id = self._assign_remover(callback, callback_id, self._periodic_callback_removers, cb.stop) cb.start() return callback_id def remove_periodic_callback(self, callback_id): """ Removes a callback added with add_periodic_callback.""" self._execute_remover(callback_id, self._periodic_callback_removers)