Source code for bokeh.server.tornado

''' Provides the Bokeh Server Tornado application.

'''
from __future__ import absolute_import, print_function

import logging
log = logging.getLogger(__name__)

# NOTE: needs PyPI backport on Python 2 (https://pypi.python.org/pypi/futures)
from concurrent.futures import ProcessPoolExecutor
import os
from pprint import pformat

from tornado import gen
from tornado.ioloop import PeriodicCallback
from tornado.web import Application as TornadoApplication
from tornado.web import HTTPError
from tornado.web import StaticFileHandler

from bokeh.resources import Resources
from bokeh.settings import settings

from .views.root_handler import RootHandler
from .urls import per_app_patterns, toplevel_patterns
from .connection import ServerConnection
from .application_context import ApplicationContext
from .views.static_handler import StaticHandler


[docs]def match_host(host, pattern): """ Match host against pattern >>> match_host('192.168.0.1:80', '192.168.0.1:80') True >>> match_host('192.168.0.1:80', '192.168.0.1') True >>> match_host('192.168.0.1:80', '192.168.0.1:8080') False >>> match_host('192.168.0.1', '192.168.0.2') False >>> match_host('192.168.0.1', '192.168.*.*') True >>> match_host('alice', 'alice') True >>> match_host('alice:80', 'alice') True >>> match_host('alice', 'bob') False >>> match_host('foo.example.com', 'foo.example.com.net') False >>> match_host('alice', '*') True >>> match_host('alice', '*:*') True >>> match_host('alice:80', '*') True >>> match_host('alice:80', '*:80') True >>> match_host('alice:8080', '*:80') False """ if ':' in host: host, host_port = host.rsplit(':', 1) else: host_port = None if ':' in pattern: pattern, pattern_port = pattern.rsplit(':', 1) if pattern_port == '*': pattern_port = None else: pattern_port = None if pattern_port is not None and host_port != pattern_port: return False host = host.split('.') pattern = pattern.split('.') if len(pattern) > len(host): return False for h, p in zip(host, pattern): if h == p or p == '*': continue else: return False return True
# factored out to be easier to test
[docs]def check_whitelist(request_host, whitelist): ''' Check a given request host against a whitelist. ''' if ':' not in request_host: request_host = request_host + ':80' if request_host in whitelist: return True return any(match_host(request_host, host) for host in whitelist)
def _whitelist(handler_class): if hasattr(handler_class.prepare, 'patched'): return old_prepare = handler_class.prepare def _prepare(self, *args, **kw): if not check_whitelist(self.request.host, self.application._hosts): log.info("Rejected connection from host '%s' because it is not in the --host whitelist" % self.request.host) raise HTTPError(403) return old_prepare(self, *args, **kw) _prepare.patched = True handler_class.prepare = _prepare
[docs]class BokehTornado(TornadoApplication): ''' A Tornado Application used to implement the Bokeh Server. The Server class is the main public interface, this class has Tornado implementation details. Args: applications (dict of str : bokeh.application.Application) : map from paths to Application instances The application is used to create documents for each session. extra_patterns (seq[tuple]) : tuples of (str, http or websocket handler) Use this argument to add additional endpoints to custom deployments of the Bokeh Server. prefix (str) : a URL prefix to use for all Bokeh server paths hosts (list) : hosts that are valid values for the Host header secret_key (str) : secret key for signing session IDs sign_sessions (boolean) : whether to sign session IDs generate_session_ids (boolean) : whether to generate a session ID when none is provided extra_websocket_origins (list) : hosts that can connect to the websocket These are in addition to ``hosts``. keep_alive_milliseconds (int) : number of milliseconds between keep-alive pings Set to 0 to disable pings. Pings keep the websocket open. check_unused_sessions_milliseconds (int) : number of milliseconds between check for unused sessions unused_session_lifetime_milliseconds (int) : number of milliseconds for unused session lifetime stats_log_frequency_milliseconds (int) : number of milliseconds between logging stats use_index (boolean) : True to generate an index of the running apps in the RootHandler ''' def __init__(self, applications, prefix, hosts, extra_websocket_origins, extra_patterns=None, secret_key=settings.secret_key_bytes(), sign_sessions=settings.sign_sessions(), generate_session_ids=True, # heroku, nginx default to 60s timeout, so well less than that keep_alive_milliseconds=37000, # how often to check for unused sessions check_unused_sessions_milliseconds=17000, # how long unused sessions last unused_session_lifetime_milliseconds=15000, # how often to log stats stats_log_frequency_milliseconds=15000, use_index=True, redirect_root=True): self._prefix = prefix self.use_index = use_index if keep_alive_milliseconds < 0: # 0 means "disable" raise ValueError("keep_alive_milliseconds must be >= 0") if check_unused_sessions_milliseconds <= 0: raise ValueError("check_unused_sessions_milliseconds must be > 0") if unused_session_lifetime_milliseconds <= 0: raise ValueError("check_unused_sessions_milliseconds must be > 0") if stats_log_frequency_milliseconds <= 0: raise ValueError("stats_log_frequency_milliseconds must be > 0") self._hosts = set(hosts) self._websocket_origins = self._hosts | set(extra_websocket_origins) self._resources = {} self._secret_key = secret_key self._sign_sessions = sign_sessions self._generate_session_ids = generate_session_ids log.debug("Allowed Host headers: %r", list(self._hosts)) log.debug("These host origins can connect to the websocket: %r", list(self._websocket_origins)) # Wrap applications in ApplicationContext self._applications = dict() for k,v in applications.items(): self._applications[k] = ApplicationContext(v) extra_patterns = extra_patterns or [] all_patterns = [] for key, app in applications.items(): app_patterns = [] for p in per_app_patterns: if key == "/": route = p[0] else: route = key + p[0] route = self._prefix + route app_patterns.append((route, p[1], { "application_context" : self._applications[key] })) websocket_path = None for r in app_patterns: if r[0].endswith("/ws"): websocket_path = r[0] if not websocket_path: raise RuntimeError("Couldn't find websocket path") for r in app_patterns: r[2]["bokeh_websocket_path"] = websocket_path all_patterns.extend(app_patterns) # add a per-app static path if requested by the application if app.static_path is not None: if key == "/": route = "/static/(.*)" else: route = key + "/static/(.*)" route = self._prefix + route all_patterns.append((route, StaticFileHandler, { "path" : app.static_path })) for p in extra_patterns + toplevel_patterns: if p[1] == RootHandler: if self.use_index: data = {"applications": self._applications, "prefix": self._prefix, "use_redirect": redirect_root} prefixed_pat = (self._prefix + p[0],) + p[1:] + (data,) all_patterns.append(prefixed_pat) else: prefixed_pat = (self._prefix + p[0],) + p[1:] all_patterns.append(prefixed_pat) for pat in all_patterns: _whitelist(pat[1]) log.debug("Patterns are:") for line in pformat(all_patterns, width=60).split("\n"): log.debug(" " + line) super(BokehTornado, self).__init__(all_patterns) def initialize(self, io_loop, keep_alive_milliseconds=37000, # how often to check for unused sessions check_unused_sessions_milliseconds=17000, # how long unused sessions last unused_session_lifetime_milliseconds=15000, # how often to log stats stats_log_frequency_milliseconds=15000, **kw): self._loop = io_loop for app_context in self._applications.values(): app_context._loop = self._loop self._clients = set() self._executor = ProcessPoolExecutor(max_workers=4) self._stats_job = PeriodicCallback(self.log_stats, stats_log_frequency_milliseconds, io_loop=self._loop) self._unused_session_linger_milliseconds = unused_session_lifetime_milliseconds self._cleanup_job = PeriodicCallback(self.cleanup_sessions, check_unused_sessions_milliseconds, io_loop=self._loop) if keep_alive_milliseconds > 0: self._ping_job = PeriodicCallback(self.keep_alive, keep_alive_milliseconds, io_loop=self._loop) else: self._ping_job = None @property def io_loop(self): return self._loop @property def websocket_origins(self): return self._websocket_origins @property def secret_key(self): return self._secret_key @property def sign_sessions(self): return self._sign_sessions @property def generate_session_ids(self): return self._generate_session_ids def root_url_for_request(self, request): return request.protocol + "://" + request.host + self._prefix + "/" def websocket_url_for_request(self, request, websocket_path): # websocket_path comes from the handler, and already has any # prefix included, no need to add here protocol = "ws" if request.protocol == "https": protocol = "wss" return protocol + "://" + request.host + websocket_path def resources(self, request): root_url = self.root_url_for_request(request) if root_url not in self._resources: self._resources[root_url] = Resources(mode="server", root_url=root_url, path_versioner=StaticHandler.append_version) return self._resources[root_url]
[docs] def start(self): ''' Start the Bokeh Server application. ''' self._stats_job.start() self._cleanup_job.start() if self._ping_job is not None: self._ping_job.start() for context in self._applications.values(): context.run_load_hook()
[docs] def stop(self, wait=True): ''' Stop the Bokeh Server application. Args: wait (boolean): whether to wait for orderly cleanup (default: True) Returns: None ''' # TODO we should probably close all connections and shut # down all sessions here for context in self._applications.values(): context.run_unload_hook() self._stats_job.stop() self._cleanup_job.stop() if self._ping_job is not None: self._ping_job.stop() self._executor.shutdown(wait=wait) self._clients.clear()
@property def executor(self): return self._executor def new_connection(self, protocol, socket, application_context, session): connection = ServerConnection(protocol, socket, application_context, session) self._clients.add(connection) return connection def client_lost(self, connection): self._clients.discard(connection) connection.detach_session() def get_session(self, app_path, session_id): if app_path not in self._applications: raise ValueError("Application %s does not exist on this server" % app_path) return self._applications[app_path].get_session(session_id) def get_sessions(self, app_path): if app_path not in self._applications: raise ValueError("Application %s does not exist on this server" % app_path) return list(self._applications[app_path].sessions) @gen.coroutine def cleanup_sessions(self): for app in self._applications.values(): yield app.cleanup_sessions(self._unused_session_linger_milliseconds) raise gen.Return(None) def log_stats(self): if log.getEffectiveLevel() > logging.DEBUG: # avoid the work below if we aren't going to log anything return log.debug("[pid %d] %d clients connected", os.getpid(), len(self._clients)) for app_path, app in self._applications.items(): sessions = list(app.sessions) unused_count = 0 for s in sessions: if s.connection_count == 0: unused_count += 1 log.debug("[pid %d] %s has %d sessions with %d unused", os.getpid(), app_path, len(sessions), unused_count) def keep_alive(self): for c in self._clients: c.send_ping() @gen.coroutine
[docs] def run_in_background(self, _func, *args, **kwargs): """ Run a synchronous function in the background without disrupting the main thread. Useful for long-running jobs. """ res = yield self._executor.submit(_func, *args, **kwargs) raise gen.Return(res)