#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2019, Anaconda, Inc., and Bokeh Contributors.
# All rights reserved.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
""" Internal utils related to Tornado
"""
#-----------------------------------------------------------------------------
# Boilerplate
#-----------------------------------------------------------------------------
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
log = logging.getLogger(__name__)
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports
import threading
from collections import defaultdict
from traceback import format_exception
# External imports
import six
from tornado import gen
# Bokeh imports
from ..util.serialization import make_id
#-----------------------------------------------------------------------------
# Globals and constants
#-----------------------------------------------------------------------------
__all__ = (
'yield_for_all_futures',
)
#-----------------------------------------------------------------------------
# General API
#-----------------------------------------------------------------------------
[docs]@gen.coroutine
def yield_for_all_futures(result):
""" Converts result into a Future by collapsing any futures inside result.
If result is a Future we yield until it's done, then if the value inside
the Future is another Future we yield until it's done as well, and so on.
"""
while True:
# This is needed for Tornado >= 4.5 where convert_yielded will no
# longer raise BadYieldError on None
if result is None:
break
try:
future = gen.convert_yielded(result)
except gen.BadYieldError:
# result is not a yieldable thing, we are done
break
else:
result = yield future
raise gen.Return(result)
#-----------------------------------------------------------------------------
# Dev API
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Private API
#-----------------------------------------------------------------------------
class _AsyncPeriodic(object):
"""Like ioloop.PeriodicCallback except the 'func' can be async and
return a Future, and we wait for func to finish each time
before we call it again. Plain ioloop.PeriodicCallback
can "pile up" invocations if they are taking too long.
"""
def __init__(self, func, period, io_loop):
self._func = func
self._loop = io_loop
self._period = period
self._started = False
self._stopped = False
# this is like gen.sleep but uses our IOLoop instead of the
# current IOLoop
def sleep(self):
f = gen.Future()
self._loop.call_later(self._period / 1000.0, lambda: f.set_result(None))
return f
def start(self):
if self._started:
raise RuntimeError("called start() twice on _AsyncPeriodic")
self._started = True
def invoke():
# important to start the sleep before starting callback
# so any initial time spent in callback "counts against"
# the period.
sleep_future = self.sleep()
result = self._func()
# This is needed for Tornado >= 4.5 where convert_yielded will no
# longer raise BadYieldError on None
if result is None:
return sleep_future
try:
callback_future = gen.convert_yielded(result)
except gen.BadYieldError:
# result is not a yieldable thing
return sleep_future
else:
return gen.multi([sleep_future, callback_future])
def on_done(future):
if not self._stopped:
self._loop.add_future(invoke(), on_done)
ex = future.exception()
if ex is not None:
log.error("Error thrown from periodic callback:")
if six.PY2:
lines = format_exception(*future.exc_info())
else:
lines = format_exception(ex.__class__, ex, ex.__traceback__)
log.error("".join(lines))
self._loop.add_future(self.sleep(), on_done)
def stop(self):
self._stopped = True
class _CallbackGroup(object):
""" A collection of callbacks added to a Tornado IOLoop that we may
want to remove as a group. """
def __init__(self, io_loop=None):
if io_loop is None:
raise ValueError("must provide an io loop")
self._loop = io_loop
# dicts from callback to remove callable. These are
# separate only because it's allowed to add the same
# callback as multiple kinds of callback at once.
self._next_tick_callback_removers = {}
self._timeout_callback_removers = {}
self._periodic_callback_removers = {}
self._removers_lock = threading.Lock()
self._next_tick_removers_by_callable = defaultdict(set)
self._timeout_removers_by_callable = defaultdict(set)
self._periodic_removers_by_callable = defaultdict(set)
def remove_all_callbacks(self):
""" Removes all registered callbacks."""
for cb_id in list(self._next_tick_callback_removers.keys()):
self.remove_next_tick_callback(cb_id)
for cb_id in list(self._timeout_callback_removers.keys()):
self.remove_timeout_callback(cb_id)
for cb_id in list(self._periodic_callback_removers.keys()):
self.remove_periodic_callback(cb_id)
def _get_removers_ids_by_callable(self, removers):
if removers is self._next_tick_callback_removers:
return self._next_tick_removers_by_callable
elif removers is self._timeout_callback_removers:
return self._timeout_removers_by_callable
elif removers is self._periodic_callback_removers:
return self._periodic_removers_by_callable
else:
raise RuntimeError('Unhandled removers', removers)
def _assign_remover(self, callback, callback_id, removers, remover):
with self._removers_lock:
if callback_id is None:
callback_id = make_id()
elif callback_id in removers:
raise ValueError("A callback of the same type has already been added with this ID")
removers[callback_id] = remover
return callback_id
def _execute_remover(self, callback_id, removers):
try:
with self._removers_lock:
remover = removers.pop(callback_id)
for cb, cb_ids in list(self._get_removers_ids_by_callable(removers).items()):
try:
cb_ids.remove(callback_id)
if not cb_ids:
del self._get_removers_ids_by_callable(removers)[cb]
except KeyError:
pass
except KeyError:
raise ValueError("Removing a callback twice (or after it's already been run)")
remover()
def add_next_tick_callback(self, callback, callback_id=None):
""" Adds a callback to be run on the next tick.
Returns an ID that can be used with remove_next_tick_callback."""
def wrapper(*args, **kwargs):
# this 'removed' flag is a hack because Tornado has no way
# to remove a "next tick" callback added with
# IOLoop.add_callback. So instead we make our wrapper skip
# invoking the callback.
if not wrapper.removed:
self.remove_next_tick_callback(callback_id)
return callback(*args, **kwargs)
else:
return None
wrapper.removed = False
def remover():
wrapper.removed = True
callback_id = self._assign_remover(callback, callback_id, self._next_tick_callback_removers, remover)
self._loop.add_callback(wrapper)
return callback_id
def remove_next_tick_callback(self, callback_id):
""" Removes a callback added with add_next_tick_callback."""
self._execute_remover(callback_id, self._next_tick_callback_removers)
def add_timeout_callback(self, callback, timeout_milliseconds, callback_id=None):
""" Adds a callback to be run once after timeout_milliseconds.
Returns an ID that can be used with remove_timeout_callback."""
def wrapper(*args, **kwargs):
self.remove_timeout_callback(callback_id)
return callback(*args, **kwargs)
handle = None
def remover():
if handle is not None:
self._loop.remove_timeout(handle)
callback_id = self._assign_remover(callback, callback_id, self._timeout_callback_removers, remover)
handle = self._loop.call_later(timeout_milliseconds / 1000.0, wrapper)
return callback_id
def remove_timeout_callback(self, callback_id):
""" Removes a callback added with add_timeout_callback, before it runs."""
self._execute_remover(callback_id, self._timeout_callback_removers)
def add_periodic_callback(self, callback, period_milliseconds, callback_id=None):
""" Adds a callback to be run every period_milliseconds until it is removed.
Returns an ID that can be used with remove_periodic_callback."""
cb = _AsyncPeriodic(callback, period_milliseconds, io_loop=self._loop)
callback_id = self._assign_remover(callback, callback_id, self._periodic_callback_removers, cb.stop)
cb.start()
return callback_id
def remove_periodic_callback(self, callback_id):
""" Removes a callback added with add_periodic_callback."""
self._execute_remover(callback_id, self._periodic_callback_removers)
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------