#-----------------------------------------------------------------------------# Copyright (c) Anaconda, Inc., and Bokeh Contributors.# All rights reserved.## The full license is in the file LICENSE.txt, distributed with this software.#-----------------------------------------------------------------------------''' Provide a web socket handler for the Bokeh Server application.'''#-----------------------------------------------------------------------------# Boilerplate#-----------------------------------------------------------------------------from__future__importannotationsimportlogging# isort:skiplog=logging.getLogger(__name__)#-----------------------------------------------------------------------------# Imports#-----------------------------------------------------------------------------# Standard library importsimportcalendarimportdatetimeasdtfromtypingimportTYPE_CHECKING,Any,castfromurllib.parseimporturlparse# External importsfromtornadoimportlocks,webfromtornado.websocketimportWebSocketClosedError,WebSocketHandler# Bokeh importsfrombokeh.settingsimportsettingsfrombokeh.util.tokenimportcheck_token_signature,get_session_id,get_token_payload# Bokeh importsfrom...protocolimportProtocolfrom...protocol.exceptionsimportMessageError,ProtocolError,ValidationErrorfrom...protocol.messageimportMessagefrom...protocol.receiverimportReceiverfrom...util.dataclassesimportdataclassfrom..protocol_handlerimportProtocolHandlerfrom.auth_request_handlerimportAuthRequestHandlerifTYPE_CHECKING:from..connectionimportServerConnection#-----------------------------------------------------------------------------# Globals and constants#-----------------------------------------------------------------------------__all__=('WSHandler',)#-----------------------------------------------------------------------------# General API#-----------------------------------------------------------------------------#-----------------------------------------------------------------------------# Dev API#-----------------------------------------------------------------------------
[docs]classWSHandler(AuthRequestHandler,WebSocketHandler):''' Implements a custom Tornado WebSocketHandler for the Bokeh Server. '''connection:ServerConnection|Nonedef__init__(self,tornado_app,*args,**kw)->None:self.receiver=Noneself.handler=Noneself.connection=Noneself.application_context=kw['application_context']self.latest_pong=-1# write_lock allows us to lock the connection to send multiple# messages atomically.self.write_lock=locks.Lock()self._token=Noneself._compression_level=kw.pop('compression_level',None)self._mem_level=kw.pop('mem_level',None)# Note: tornado_app is stored as self.applicationsuper().__init__(tornado_app,*args,**kw)
[docs]defcheck_origin(self,origin:str)->bool:''' Implement a check_origin policy for Tornado to call. The supplied origin will be compared to the Bokeh server allowlist. If the origin is not allow, an error will be logged and ``False`` will be returned. Args: origin (str) : The URL of the connection origin Returns: bool, True if the connection is allowed, False otherwise '''from..utilimportcheck_allowlistparsed_origin=urlparse(origin)origin_host=parsed_origin.netloc.lower()allowed_hosts=self.application.websocket_originsifsettings.allowed_ws_origin():allowed_hosts=set(settings.allowed_ws_origin())allowed=check_allowlist(origin_host,allowed_hosts)ifallowed:returnTrueelse:log.error("Refusing websocket connection from Origin '%s'; \ use --allow-websocket-origin=%s or set BOKEH_ALLOW_WS_ORIGIN=%s to permit this; currently we allow origins %r",origin,origin_host,origin_host,allowed_hosts)returnFalse
[docs]@web.authenticateddefopen(self)->None:''' Initialize a connection to a client. Returns: None '''log.info('WebSocket connection opened')token=self._tokenifself.selected_subprotocol!='bokeh':self.close()raiseProtocolError("Subprotocol header is not 'bokeh'")eliftokenisNone:self.close()raiseProtocolError("No token received in subprotocol header")now=calendar.timegm(dt.datetime.now(tz=dt.timezone.utc).timetuple())payload=get_token_payload(token)if'session_expiry'notinpayload:self.close()raiseProtocolError("Session expiry has not been provided")elifnow>=payload['session_expiry']:self.close()raiseProtocolError("Token is expired. Configure the app with a larger value for --session-token-expiration if necessary")elifnotcheck_token_signature(token,signed=self.application.sign_sessions,secret_key=self.application.secret_key):session_id=get_session_id(token)log.error("Token for session %r had invalid signature",session_id)raiseProtocolError("Invalid token signature")try:self.application.io_loop.add_callback(self._async_open,self._token)exceptExceptionase:# this isn't really an error (unless we have a# bug), it just means a client disconnected# immediately, most likely.log.debug("Failed to fully open connection %r",e)
asyncdef_async_open(self,token:str)->None:''' Perform the specific steps needed to open a connection to a Bokeh session Specifically, this method coordinates: * Getting a session for a session ID (creating a new one if needed) * Creating a protocol receiver and handler * Opening a new ServerConnection and sending it an ACK Args: session_id (str) : A session ID to for a session to connect to If no session exists with the given ID, a new session is made Returns: None '''try:session_id=get_session_id(token)awaitself.application_context.create_session_if_needed(session_id,self.request,token)session=self.application_context.get_session(session_id)protocol=Protocol()self.receiver=Receiver(protocol)log.debug("Receiver created for %r",protocol)self.handler=ProtocolHandler()log.debug("ProtocolHandler created for %r",protocol)self.connection=self.application.new_connection(protocol,self,self.application_context,session)log.info("ServerConnection created")exceptProtocolErrorase:log.error("Could not create new server session, reason: %s",e)self.close()raiseemsg=self.connection.protocol.create('ACK')awaitself.send_message(msg)returnNone
[docs]asyncdefon_message(self,fragment:str|bytes)->None:''' Process an individual wire protocol fragment. The websocket RFC specifies opcodes for distinguishing text frames from binary frames. Tornado passes us either a text or binary string depending on that opcode, we have to look at the type of the fragment to see what we got. Args: fragment (unicode or bytes) : wire fragment to process '''# We shouldn't throw exceptions from on_message because the caller is# just Tornado and it doesn't know what to do with them other than# report them as an unhandled Futuretry:message=awaitself._receive(fragment)exceptExceptionase:# If you go look at self._receive, it's catching the# expected error types... here we have something weird.log.error("Unhandled exception receiving a message: %r: %r",e,fragment,exc_info=True)self._internal_error("server failed to parse a message")message=Nonetry:ifmessage:if_message_test_portisnotNone:_message_test_port.received.append(message)work=awaitself._handle(message)ifwork:awaitself._schedule(work)exceptExceptionase:log.error("Handler or its work threw an exception: %r: %r",e,message,exc_info=True)self._internal_error("server failed to handle a message")returnNone
[docs]defon_pong(self,data:bytes)->None:# if we get an invalid integer or utf-8 back, either we# sent a buggy ping or the client is evil/broken.try:self.latest_pong=int(data.decode("utf-8"))exceptUnicodeDecodeError:log.trace("received invalid unicode in pong %r",data,exc_info=True)exceptValueError:log.trace("received invalid integer in pong %r",data,exc_info=True)
[docs]asyncdefsend_message(self,message:Message[Any])->None:''' Send a Bokeh Server protocol message to the connected client. Args: message (Message) : a message to send '''try:if_message_test_portisnotNone:_message_test_port.sent.append(message)awaitmessage.send(self)exceptWebSocketClosedError:# on_close() is / will be called anywaylog.warning("Failed sending message as connection was closed")returnNone
[docs]asyncdefwrite_message(self,message:bytes|str|dict[str,Any],binary:bool=False,locked:bool=True)->None:''' Override parent write_message with a version that acquires a write lock before writing. '''iflocked:withawaitself.write_lock.acquire():awaitsuper().write_message(message,binary)else:awaitsuper().write_message(message,binary)
[docs]defon_close(self)->None:''' Clean up when the connection is closed. '''log.info('WebSocket connection closed: code=%s, reason=%r',self.close_code,self.close_reason)ifself.connectionisnotNone:self.connection.session.notify_connection_lost()self.application.client_lost(self.connection)
asyncdef_receive(self,fragment:str|bytes)->Message[Any]|None:# Receive fragments until a complete message is assembledtry:message=awaitself.receiver.consume(fragment)returnmessageexcept(MessageError,ProtocolError,ValidationError)ase:self._protocol_error(str(e))returnNoneasyncdef_handle(self,message:Message[Any])->Any|None:# Handle the message, possibly resulting in work to dotry:work=awaitself.handler.handle(message,self.connection)returnworkexcept(MessageError,ProtocolError,ValidationError)ase:# TODO (other exceptions?)self._internal_error(str(e))returnNoneasyncdef_schedule(self,work:Any)->None:ifisinstance(work,Message):awaitself.send_message(cast(Message[Any],work))else:self._internal_error(f"expected a Message not {work!r}")returnNonedef_internal_error(self,message:str)->None:log.error("Bokeh Server internal error: %s, closing connection",message)self.close(10000,message)def_protocol_error(self,message:str)->None:log.error("Bokeh Server protocol error: %s, closing connection",message)self.close(10001,message)
#-----------------------------------------------------------------------------# Private API#-----------------------------------------------------------------------------# This is an undocumented API purely for harvesting low level messages# for testing. When needed it will be set by the testing machinery, and# should not be used for any other purpose.@dataclassclassMessageTestPort:sent:list[Message[Any]]received:list[Message[Any]]_message_test_port:MessageTestPort|None=None#-----------------------------------------------------------------------------# Code#-----------------------------------------------------------------------------