''' Provide a web socket handler for the Bokeh Server application.
'''
from __future__ import absolute_import, print_function
import logging
log = logging.getLogger(__name__)
import codecs
from six.moves.urllib.parse import urlparse
from tornado import gen, locks
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from ..protocol_handler import ProtocolHandler
from ...protocol import Protocol
from ...protocol.exceptions import MessageError, ProtocolError, ValidationError
from ...protocol.message import Message
from ...protocol.receiver import Receiver
from bokeh.util.session_id import check_session_id_signature
[docs]class WSHandler(WebSocketHandler):
    ''' Implements a custom Tornado WebSocketHandler for the Bokeh Server.
    '''
    def __init__(self, tornado_app, *args, **kw):
        self.receiver = None
        self.handler = None
        self.connection = None
        self.application_context = kw['application_context']
        self.latest_pong = -1
        # write_lock allows us to lock the connection to send multiple
        # messages atomically.
        self.write_lock = locks.Lock()
        # Note: tornado_app is stored as self.application
        super(WSHandler, self).__init__(tornado_app, *args, **kw)
    def initialize(self, application_context, bokeh_websocket_path):
        pass
    def check_origin(self, origin):
        from ..util import check_whitelist
        parsed_origin = urlparse(origin)
        origin_host = parsed_origin.netloc.lower()
        allowed_hosts = self.application.websocket_origins
        allowed = check_whitelist(origin_host, allowed_hosts)
        if allowed:
            return True
        else:
            log.error("Refusing websocket connection from Origin '%s'; \
                      use --allow-websocket-origin=%s to permit this; currently we allow origins %r",
                      origin, origin_host, allowed_hosts)
            return False
[docs]    def open(self):
        ''' Initialize a connection to a client.
        '''
        log.info('WebSocket connection opened')
        proto_version = self.get_argument("bokeh-protocol-version", default=None)
        if proto_version is None:
            self.close()
            raise ProtocolError("No bokeh-protocol-version specified")
        session_id = self.get_argument("bokeh-session-id", default=None)
        if session_id is None:
            self.close()
            raise ProtocolError("No bokeh-session-id specified")
        if not check_session_id_signature(session_id,
                                          signed=self.application.sign_sessions,
                                          secret_key=self.application.secret_key):
            log.error("Session id had invalid signature: %r", session_id)
            raise ProtocolError("Invalid session ID")
        def on_fully_opened(future):
            e = future.exception()
            if e is not None:
                # this isn't really an error (unless we have a
                # bug), it just means a client disconnected
                # immediately, most likely.
                log.debug("Failed to fully open connection %r", e)
        future = self._async_open(session_id, proto_version)
        self.application.io_loop.add_future(future,
                                            on_fully_opened) 
    @gen.coroutine
    def _async_open(self, session_id, proto_version):
        try:
            yield self.application_context.create_session_if_needed(session_id, self.request)
            session = self.application_context.get_session(session_id)
            protocol = Protocol(proto_version)
            self.receiver = Receiver(protocol)
            log.debug("Receiver created for %r", protocol)
            self.handler = ProtocolHandler()
            log.debug("ProtocolHandler created for %r", protocol)
            self.connection = self.application.new_connection(protocol, self, self.application_context, session)
            log.info("ServerConnection created")
        except ProtocolError as e:
            log.error("Could not create new server session, reason: %s", e)
            self.close()
            raise e
        msg = self.connection.protocol.create('ACK')
        yield self.send_message(msg)
        raise gen.Return(None)
    @gen.coroutine
[docs]    def on_message(self, fragment):
        ''' Process an individual wire protocol fragment.
            The websocket RFC specifies opcodes for distinguishing
            text frames from binary frames. Tornado passes us either
            a text or binary string depending on that opcode, we have
            to look at the type of the fragment to see what we got.
        Args:
            fragment (unicode or bytes) : wire fragment to process
        '''
        # We shouldn't throw exceptions from on_message because
        # the caller is just Tornado and it doesn't know what to
        # do with them other than report them as an unhandled
        # Future
        try:
            message = yield self._receive(fragment)
        except Exception as e:
            # If you go look at self._receive, it's catching the
            # expected error types... here we have something weird.
            log.error("Unhandled exception receiving a message: %r: %r", e, fragment, exc_info=True)
            self._internal_error("server failed to parse a message")
        try:
            if message:
                work = yield self._handle(message)
                if work:
                    yield self._schedule(work)
        except Exception as e:
            log.error("Handler or its work threw an exception: %r: %r", e, message, exc_info=True)
            self._internal_error("server failed to handle a message")
        raise gen.Return(None) 
    def on_pong(self, data):
        # if we get an invalid integer or utf-8 back, either we
        # sent a buggy ping or the client is evil/broken.
        try:
            self.latest_pong = int(codecs.decode(data, 'utf-8'))
        except UnicodeDecodeError:
            log.error("received invalid unicode in pong %r", data, exc_info=True)
        except ValueError:
            log.error("received invalid integer in pong %r", data, exc_info=True)
    @gen.coroutine
[docs]    def send_message(self, message):
        ''' Send a Bokeh Server protocol message to the connected client.
        Args:
            message (Message) : a message to send
        '''
        try:
            yield message.send(self)
        except WebSocketClosedError:
            # on_close() is / will be called anyway
            log.warn("Failed sending message as connection was closed")
        raise gen.Return(None) 
    @gen.coroutine
[docs]    def write_message(self, message, binary=False, locked=True):
        ''' Override parent write_message with a version that acquires a
        write lock before writing.
        '''
        def write_message_unlocked():
            future = super(WSHandler, self).write_message(message, binary)
            # don't yield this future or we're blocking on ourselves!
            raise gen.Return(future)
        if locked:
            with (yield self.write_lock.acquire()):
                write_message_unlocked()
        else:
            write_message_unlocked() 
[docs]    def on_close(self):
        ''' Clean up when the connection is closed.
        '''
        log.info('WebSocket connection closed: code=%s, reason=%r',
                 self.close_code, self.close_reason)
        if self.connection is not None:
            self.application.client_lost(self.connection) 
    @gen.coroutine
    def _receive(self, fragment):
        # Receive fragments until a complete message is assembled
        try:
            message = yield self.receiver.consume(fragment)
            raise gen.Return(message)
        except (MessageError, ProtocolError, ValidationError) as e:
            self._protocol_error(str(e))
            raise gen.Return(None)
    @gen.coroutine
    def _handle(self, message):
        # Handle the message, possibly resulting in work to do
        try:
            work = yield self.handler.handle(message, self.connection)
            raise gen.Return(work)
        except (MessageError, ProtocolError, ValidationError) as e: # TODO (other exceptions?)
            self._internal_error(str(e))
            raise gen.Return(None)
    @gen.coroutine
    def _schedule(self, work):
        if isinstance(work, Message):
            yield self.send_message(work)
        else:
            self._internal_error("expected a Message not " + repr(work))
        raise gen.Return(None)
    def _internal_error(self, message):
        log.error("Bokeh Server internal error: %s, closing connection", message)
        self.close(10000, message)
    def _protocol_error(self, message):
        log.error("Bokeh Server protocol error: %s, closing connection", message)
        self.close(10001, message)