""" Internal utils related to Tornado
"""
from __future__ import absolute_import, print_function
import logging
log = logging.getLogger(__name__)
from tornado import gen
@gen.coroutine
[docs]def yield_for_all_futures(result):
""" Converts result into a Future by collapsing any futures inside result.
If result is a Future we yield until it's done, then if the value inside
the Future is another Future we yield until it's done as well, and so on.
"""
while True:
try:
future = gen.convert_yielded(result)
except gen.BadYieldError:
# result is not a yieldable thing, we are done
break
else:
result = yield future
raise gen.Return(result)
class _AsyncPeriodic(object):
"""Like ioloop.PeriodicCallback except the 'func' can be async and
return a Future, and we wait for func to finish each time
before we call it again. Plain ioloop.PeriodicCallback
can "pile up" invocations if they are taking too long.
"""
def __init__(self, func, period, io_loop):
self._func = func
self._loop = io_loop
self._period = period
self._started = False
self._stopped = False
# this is like gen.sleep but uses our IOLoop instead of the
# current IOLoop
def sleep(self):
f = gen.Future()
self._loop.call_later(self._period / 1000.0, lambda: f.set_result(None))
return f
def start(self):
if self._started:
raise RuntimeError("called start() twice on _AsyncPeriodic")
self._started = True
def invoke():
# important to start the sleep before starting callback
# so any initial time spent in callback "counts against"
# the period.
sleep_future = self.sleep()
result = self._func()
try:
callback_future = gen.convert_yielded(result)
except gen.BadYieldError:
# result is not a yieldable thing
return sleep_future
else:
return gen.multi([sleep_future, callback_future])
def on_done(future):
if not self._stopped:
self._loop.add_future(invoke(), on_done)
if future.exception() is not None:
log.error("Error thrown from periodic callback: %r", future.exception())
self._loop.add_future(self.sleep(), on_done)
def stop(self):
self._stopped = True
class _CallbackGroup(object):
""" A collection of callbacks added to a Tornado IOLoop that we may
want to remove as a group. """
def __init__(self, io_loop=None):
if io_loop is None:
raise ValueError("must provide an io loop")
self._loop = io_loop
# dicts from callback to remove callable. These are
# separate only because it's allowed to add the same
# callback as multiple kinds of callback at once.
self._next_tick_callbacks = {}
self._timeout_callbacks = {}
self._periodic_callbacks = {}
def remove_all_callbacks(self):
""" Removes all registered callbacks."""
for cb in list(self._next_tick_callbacks.keys()):
self.remove_next_tick_callback(cb)
for cb in list(self._timeout_callbacks.keys()):
self.remove_timeout_callback(cb)
for cb in list(self._periodic_callbacks.keys()):
self.remove_periodic_callback(cb)
def _error_on_double_remove(self, callback, callbacks):
if callback not in callbacks:
raise ValueError("Removing a callback twice (or after it's already been run)")
def _remover(self, callback, callbacks, cleanup):
self._error_on_double_remove(callback, callbacks)
del callbacks[callback]
if cleanup is not None:
cleanup(callback)
def _wrap_next_tick(self, callback, cleanup):
# this 'removed' flag is a hack because Tornado has no way
# to remove a "next tick" callback added with
# IOLoop.add_callback. So instead we make our wrapper skip
# invoking the callback.
handle = { 'removed' : False }
def wrapper(*args, **kwargs):
was_removed = handle['removed']
if not was_removed:
self.remove_next_tick_callback(callback)
return callback(*args, **kwargs)
else:
return None
wrapper.handle = handle
return wrapper
def add_next_tick_callback(self, callback, cleanup=None):
""" Adds a callback to be run on the next tick.
Returns a callable that removes the callback if called."""
if callback in self._next_tick_callbacks:
raise ValueError("Next-tick callback added twice")
wrapper = self._wrap_next_tick(callback, cleanup)
self._loop.add_callback(wrapper)
def remover():
wrapper.handle['removed'] = True
self._remover(callback, self._next_tick_callbacks, cleanup)
self._next_tick_callbacks[callback] = remover
return remover
def _remove(self, callback, callbacks):
self._error_on_double_remove(callback, callbacks)
callbacks[callback]()
def remove_next_tick_callback(self, callback):
""" Removes a callback added with add_next_tick_callback."""
self._remove(callback, self._next_tick_callbacks)
def _wrap_timeout(self, callback):
def wrapper(*args, **kwargs):
self.remove_timeout_callback(callback)
return callback(*args, **kwargs)
return wrapper
def add_timeout_callback(self, callback, timeout_milliseconds, cleanup=None):
""" Adds a callback to be run once after timeout_milliseconds.
Returns a callable that removes the callback if called."""
if callback in self._timeout_callbacks:
raise ValueError("Callback added as a timeout twice")
handle = self._loop.call_later(timeout_milliseconds / 1000.0,
self._wrap_timeout(callback))
def remover():
self._loop.remove_timeout(handle)
self._remover(callback, self._timeout_callbacks, cleanup)
self._timeout_callbacks[callback] = remover
return remover
def remove_timeout_callback(self, callback):
""" Removes a callback added with add_timeout_callback, before it runs."""
self._remove(callback, self._timeout_callbacks)
def add_periodic_callback(self, callback, period_milliseconds, cleanup=None):
""" Adds a callback to be run every period_milliseconds until it is removed."""
if callback in self._periodic_callbacks:
raise ValueError("Callback added as a periodic callback twice")
cb = _AsyncPeriodic(
callback, period_milliseconds, io_loop=self._loop
)
def remover():
cb.stop()
self._remover(callback, self._periodic_callbacks, cleanup)
self._periodic_callbacks[callback] = remover
cb.start()
return remover
def remove_periodic_callback(self, callback):
""" Removes a callback added with add_periodic_callback."""
self._remove(callback, self._periodic_callbacks)