Source code for bokeh.server.views.ws

''' Provide a web socket handler for the Bokeh Server application.

'''
from __future__ import absolute_import, print_function

import logging
log = logging.getLogger(__name__)

import codecs

from six.moves.urllib.parse import urlparse

from tornado import gen, locks
from tornado.websocket import WebSocketHandler, WebSocketClosedError

from ..protocol_handler import ProtocolHandler
from ...protocol import Protocol
from ...protocol.exceptions import MessageError, ProtocolError, ValidationError
from ...protocol.message import Message
from ...protocol.receiver import Receiver

from bokeh.util.session_id import check_session_id_signature

[docs]class WSHandler(WebSocketHandler): ''' Implements a custom Tornado WebSocketHandler for the Bokeh Server. ''' def __init__(self, tornado_app, *args, **kw): self.receiver = None self.handler = None self.connection = None self.application_context = kw['application_context'] self.latest_pong = -1 # write_lock allows us to lock the connection to send multiple # messages atomically. self.write_lock = locks.Lock() # Note: tornado_app is stored as self.application super(WSHandler, self).__init__(tornado_app, *args, **kw) def initialize(self, application_context, bokeh_websocket_path): pass def check_origin(self, origin): from ..util import check_whitelist parsed_origin = urlparse(origin) origin_host = parsed_origin.netloc.lower() allowed_hosts = self.application.websocket_origins allowed = check_whitelist(origin_host, allowed_hosts) if allowed: return True else: log.error("Refusing websocket connection from Origin '%s'; \ use --allow-websocket-origin=%s to permit this; currently we allow origins %r", origin, origin_host, allowed_hosts) return False
[docs] def open(self): ''' Initialize a connection to a client. ''' log.info('WebSocket connection opened') proto_version = self.get_argument("bokeh-protocol-version", default=None) if proto_version is None: self.close() raise ProtocolError("No bokeh-protocol-version specified") session_id = self.get_argument("bokeh-session-id", default=None) if session_id is None: self.close() raise ProtocolError("No bokeh-session-id specified") if not check_session_id_signature(session_id, signed=self.application.sign_sessions, secret_key=self.application.secret_key): log.error("Session id had invalid signature: %r", session_id) raise ProtocolError("Invalid session ID") def on_fully_opened(future): e = future.exception() if e is not None: # this isn't really an error (unless we have a # bug), it just means a client disconnected # immediately, most likely. log.debug("Failed to fully open connection %r", e) future = self._async_open(session_id, proto_version) self.application.io_loop.add_future(future, on_fully_opened)
@gen.coroutine def _async_open(self, session_id, proto_version): try: yield self.application_context.create_session_if_needed(session_id, self.request) session = self.application_context.get_session(session_id) protocol = Protocol(proto_version) self.receiver = Receiver(protocol) log.debug("Receiver created for %r", protocol) self.handler = ProtocolHandler() log.debug("ProtocolHandler created for %r", protocol) self.connection = self.application.new_connection(protocol, self, self.application_context, session) log.info("ServerConnection created") except ProtocolError as e: log.error("Could not create new server session, reason: %s", e) self.close() raise e msg = self.connection.protocol.create('ACK') yield self.send_message(msg) raise gen.Return(None) @gen.coroutine
[docs] def on_message(self, fragment): ''' Process an individual wire protocol fragment. The websocket RFC specifies opcodes for distinguishing text frames from binary frames. Tornado passes us either a text or binary string depending on that opcode, we have to look at the type of the fragment to see what we got. Args: fragment (unicode or bytes) : wire fragment to process ''' # We shouldn't throw exceptions from on_message because # the caller is just Tornado and it doesn't know what to # do with them other than report them as an unhandled # Future try: message = yield self._receive(fragment) except Exception as e: # If you go look at self._receive, it's catching the # expected error types... here we have something weird. log.error("Unhandled exception receiving a message: %r: %r", e, fragment, exc_info=True) self._internal_error("server failed to parse a message") try: if message: work = yield self._handle(message) if work: yield self._schedule(work) except Exception as e: log.error("Handler or its work threw an exception: %r: %r", e, message, exc_info=True) self._internal_error("server failed to handle a message") raise gen.Return(None)
def on_pong(self, data): # if we get an invalid integer or utf-8 back, either we # sent a buggy ping or the client is evil/broken. try: self.latest_pong = int(codecs.decode(data, 'utf-8')) except UnicodeDecodeError: log.error("received invalid unicode in pong %r", data, exc_info=True) except ValueError: log.error("received invalid integer in pong %r", data, exc_info=True) @gen.coroutine
[docs] def send_message(self, message): ''' Send a Bokeh Server protocol message to the connected client. Args: message (Message) : a message to send ''' try: yield message.send(self) except WebSocketClosedError: # on_close() is / will be called anyway log.warn("Failed sending message as connection was closed") raise gen.Return(None)
@gen.coroutine
[docs] def write_message(self, message, binary=False, locked=True): ''' Override parent write_message with a version that acquires a write lock before writing. ''' def write_message_unlocked(): future = super(WSHandler, self).write_message(message, binary) # don't yield this future or we're blocking on ourselves! raise gen.Return(future) if locked: with (yield self.write_lock.acquire()): write_message_unlocked() else: write_message_unlocked()
[docs] def on_close(self): ''' Clean up when the connection is closed. ''' log.info('WebSocket connection closed: code=%s, reason=%r', self.close_code, self.close_reason) if self.connection is not None: self.application.client_lost(self.connection)
@gen.coroutine def _receive(self, fragment): # Receive fragments until a complete message is assembled try: message = yield self.receiver.consume(fragment) raise gen.Return(message) except (MessageError, ProtocolError, ValidationError) as e: self._protocol_error(str(e)) raise gen.Return(None) @gen.coroutine def _handle(self, message): # Handle the message, possibly resulting in work to do try: work = yield self.handler.handle(message, self.connection) raise gen.Return(work) except (MessageError, ProtocolError, ValidationError) as e: # TODO (other exceptions?) self._internal_error(str(e)) raise gen.Return(None) @gen.coroutine def _schedule(self, work): if isinstance(work, Message): yield self.send_message(work) else: self._internal_error("expected a Message not " + repr(work)) raise gen.Return(None) def _internal_error(self, message): log.error("Bokeh Server internal error: %s, closing connection", message) self.close(10000, message) def _protocol_error(self, message): log.error("Bokeh Server protocol error: %s, closing connection", message) self.close(10001, message)