#-----------------------------------------------------------------------------# Copyright (c) Anaconda, Inc., and Bokeh Contributors.# All rights reserved.## The full license is in the file LICENSE.txt, distributed with this software.#-----------------------------------------------------------------------------''' Implements a very low level facility for communicating with a BokehServer.Users will always want to use :class:`~bokeh.client.session.ClientSession`instead for standard usage.'''#-----------------------------------------------------------------------------# Boilerplate#-----------------------------------------------------------------------------from__future__importannotationsimportlogging# isort:skiplog=logging.getLogger(__name__)#-----------------------------------------------------------------------------# Imports#-----------------------------------------------------------------------------# Standard library importsfromtypingimportTYPE_CHECKING,Any,Callable# External importsfromtornado.httpclientimportHTTPClientError,HTTPRequestfromtornado.ioloopimportIOLoopfromtornado.websocketimportWebSocketError,websocket_connect# Bokeh importsfrom..core.typesimportIDfrom..protocolimportProtocolfrom..protocol.exceptionsimportMessageError,ProtocolError,ValidationErrorfrom..protocol.receiverimportReceiverfrom..util.stringsimportformat_url_query_argumentsfrom..util.tornadoimportfixup_windows_event_loop_policyfrom.statesimport(CONNECTED_AFTER_ACK,CONNECTED_BEFORE_ACK,DISCONNECTED,NOT_YET_CONNECTED,WAITING_FOR_REPLY,ErrorReason,State,)from.websocketimportWebSocketClientConnectionWrapperifTYPE_CHECKING:from..documentimportDocumentfrom..document.eventsimportDocumentChangedEventfrom..protocol.messageimportMessagefrom..protocol.messages.server_info_replyimportServerInfofrom.sessionimportClientSession#-----------------------------------------------------------------------------# Globals and constants#-----------------------------------------------------------------------------__all__=('ClientConnection',)#-----------------------------------------------------------------------------# General API#-----------------------------------------------------------------------------#-----------------------------------------------------------------------------# Dev API#-----------------------------------------------------------------------------
[docs]classClientConnection:''' A low-level class used to connect to a Bokeh server. '''_state:State_loop:IOLoop_until_predicate:Callable[[],bool]|None
[docs]def__init__(self,session:ClientSession,websocket_url:str,io_loop:IOLoop|None=None,arguments:dict[str,str]|None=None,max_message_size:int=20*1024*1024)->None:''' Opens a websocket connection to the server. '''self._url=websocket_urlself._session=sessionself._arguments=argumentsself._max_message_size=max_message_sizeself._protocol=Protocol()self._receiver=Receiver(self._protocol)self._socket=Noneself._state=NOT_YET_CONNECTED()# We can't use IOLoop.current because then we break# when running inside a notebook since ipython also uses itself._loop=io_loopifio_loopisnotNoneelseIOLoop()self._until_predicate=Noneself._server_info=None
# Properties --------------------------------------------------------------@propertydefconnected(self)->bool:''' Whether we've connected the Websocket and have exchanged initial handshake messages. '''returnisinstance(self._state,CONNECTED_AFTER_ACK)@propertydefio_loop(self)->IOLoop:''' The Tornado ``IOLoop`` this connection is using. '''returnself._loop@propertydefurl(self)->str:''' The URL of the websocket this Connection is to. '''returnself._url@propertydeferror_reason(self)->ErrorReason|None:''' The reason of the connection loss encoded as a ``DISCONNECTED.ErrorReason`` enum value '''ifnotisinstance(self._state,DISCONNECTED):returnNonereturnself._state.error_reason@propertydeferror_code(self)->int|None:''' If there was an error that caused a disconnect, this property holds the error code. None otherwise. '''ifnotisinstance(self._state,DISCONNECTED):return0returnself._state.error_code@propertydeferror_detail(self)->str:''' If there was an error that caused a disconnect, this property holds the error detail. Empty string otherwise. '''ifnotisinstance(self._state,DISCONNECTED):return""returnself._state.error_detail# Internal methods --------------------------------------------------------defconnect(self)->None:defconnected_or_closed()->bool:# we should be looking at the same state here as the 'connected' property above, so connected# means both connected and that we did our initial message exchangereturnisinstance(self._state,CONNECTED_AFTER_ACK|DISCONNECTED)self._loop_until(connected_or_closed)
[docs]defclose(self,why:str="closed")->None:''' Close the Websocket connection. '''ifself._socketisnotNone:self._socket.close(1000,why)
[docs]defforce_roundtrip(self)->None:''' Force a round-trip request/reply to the server, sometimes needed to avoid race conditions. Mostly useful for testing. Outside of test suites, this method hurts performance and should not be needed. Returns: None '''self._send_request_server_info()
[docs]defloop_until_closed(self)->None:''' Execute a blocking loop that runs and executes event callbacks until the connection is closed (e.g. by hitting Ctrl-C). While this method can be used to run Bokeh application code "outside" the Bokeh server, this practice is HIGHLY DISCOURAGED for any real use case. '''ifisinstance(self._state,NOT_YET_CONNECTED):# we don't use self._transition_to_disconnected here# because _transition is a coroutineself._tell_session_about_disconnect()self._state=DISCONNECTED()else:defclosed()->bool:returnisinstance(self._state,DISCONNECTED)self._loop_until(closed)
[docs]defpull_doc(self,document:Document)->None:''' Pull a document from the server, overwriting the passed-in document Args: document : (Document) The document to overwrite with server content. Returns: None '''msg=self._protocol.create('PULL-DOC-REQ')reply=self._send_message_wait_for_reply(msg)ifreplyisNone:raiseRuntimeError("Connection to server was lost")elifreply.header['msgtype']=='ERROR':raiseRuntimeError("Failed to pull document: "+reply.content['text'])else:reply.push_to_document(document)
[docs]defpush_doc(self,document:Document)->Message[Any]:''' Push a document to the server, overwriting any existing server-side doc. Args: document : (Document) A Document to push to the server Returns: The server reply '''msg=self._protocol.create('PUSH-DOC',document)reply=self._send_message_wait_for_reply(msg)ifreplyisNone:raiseRuntimeError("Connection to server was lost")elifreply.header['msgtype']=='ERROR':raiseRuntimeError("Failed to push document: "+reply.content['text'])else:returnreply
[docs]defrequest_server_info(self)->ServerInfo:''' Ask for information about the server. Returns: A dictionary of server attributes. '''ifself._server_infoisNone:self._server_info=self._send_request_server_info()returnself._server_info
asyncdefsend_message(self,message:Message[Any])->None:ifself._socketisNone:log.info("We're disconnected, so not sending message %r",message)else:try:sent=awaitmessage.send(self._socket)log.debug("Sent %r [%d bytes]",message,sent)exceptWebSocketErrorase:# A thing that happens is that we detect the# socket closing by getting a None from# read_message, but the network socket can be down# with many messages still in the read buffer, so# we'll process all those incoming messages and# get write errors trying to send change# notifications during that processing.# this is just debug level because it's completely normal# for it to happen when the socket shuts down.log.debug("Error sending message to server: %r",e)# error is almost certainly because# socket is already closed, but be sure,# because once we fail to send a message# we can't recoverself.close(why="received error while sending")# don't re-throw the error - there's nothing to# do about it.returnNone# Private methods ---------------------------------------------------------asyncdef_connect_async(self)->None:formatted_url=format_url_query_arguments(self._url,self._arguments)request=HTTPRequest(formatted_url)try:socket=awaitwebsocket_connect(request,subprotocols=["bokeh",self._session.token],max_message_size=self._max_message_size)self._socket=WebSocketClientConnectionWrapper(socket)exceptHTTPClientErrorase:awaitself._transition_to_disconnected(DISCONNECTED(ErrorReason.HTTP_ERROR,e.code,e.message))returnexceptExceptionase:log.info("Failed to connect to server: %r",e)ifself._socketisNone:awaitself._transition_to_disconnected(DISCONNECTED(ErrorReason.NETWORK_ERROR,None,"Socket invalid."))else:awaitself._transition(CONNECTED_BEFORE_ACK())asyncdef_handle_messages(self)->None:message=awaitself._pop_message()ifmessageisNone:awaitself._transition_to_disconnected(DISCONNECTED(ErrorReason.HTTP_ERROR,500,"Internal server error."))else:ifmessage.msgtype=='PATCH-DOC':log.debug("Got PATCH-DOC, applying to session")self._session._handle_patch(message)else:log.debug("Ignoring %r",message)# we don't know about whatever message we got, ignore it.awaitself._next()def_loop_until(self,predicate:Callable[[],bool])->None:self._until_predicate=predicatetry:# this runs self._next ONE time, but# self._next re-runs itself until# the predicate says to quit.self._loop.add_callback(self._next)self._loop.start()exceptKeyboardInterrupt:self.close("user interruption")asyncdef_next(self)->None:ifself._until_predicateisnotNoneandself._until_predicate():log.debug("Stopping client loop in state %s due to True from %s",self._state.__class__.__name__,self._until_predicate.__name__)self._until_predicate=Noneself._loop.stop()returnNoneelse:log.debug("Running state "+self._state.__class__.__name__)awaitself._state.run(self)asyncdef_pop_message(self)->Message[Any]|None:whileTrue:ifself._socketisNone:returnNone# log.debug("Waiting for fragment...")fragment=Nonetry:fragment=awaitself._socket.read_message()exceptExceptionase:# this happens on close, so debug level since it's "normal"log.debug("Error reading from socket %r",e)# log.debug("... got fragment %r", fragment)iffragmentisNone:# XXX Tornado doesn't give us the code and reasonlog.info("Connection closed by server")returnNonetry:message=awaitself._receiver.consume(fragment)ifmessageisnotNone:log.debug(f"Received message {message!r}")returnmessageexcept(MessageError,ProtocolError,ValidationError)ase:log.error("%r",e,exc_info=True)self.close(why="error parsing message from server")def_send_message_wait_for_reply(self,message:Message[Any])->Message[Any]|None:waiter=WAITING_FOR_REPLY(message.header['msgid'])self._state=waitersend_result:list[None]=[]asyncdefhandle_message(message:Message[Any],send_result:list[None])->None:result=awaitself.send_message(message)send_result.append(result)self._loop.add_callback(handle_message,message,send_result)defhave_send_result_or_disconnected()->bool:returnlen(send_result)>0orself._state!=waiterself._loop_until(have_send_result_or_disconnected)defhave_reply_or_disconnected()->bool:returnself._state!=waiterorwaiter.replyisnotNoneself._loop_until(have_reply_or_disconnected)returnwaiter.replydef_send_patch_document(self,session_id:ID,event:DocumentChangedEvent)->None:msg=self._protocol.create('PATCH-DOC',[event])self._loop.add_callback(self.send_message,msg)def_send_request_server_info(self)->ServerInfo:msg=self._protocol.create('SERVER-INFO-REQ')reply=self._send_message_wait_for_reply(msg)ifreplyisNone:raiseRuntimeError("Did not get a reply to server info request before disconnect")returnreply.contentdef_tell_session_about_disconnect(self)->None:ifself._session:self._session._notify_disconnected()asyncdef_transition(self,new_state:State)->None:log.debug(f"transitioning to state {new_state.__class__.__name__}")self._state=new_stateawaitself._next()asyncdef_transition_to_disconnected(self,dis_state:DISCONNECTED)->None:self._tell_session_about_disconnect()awaitself._transition(dis_state)asyncdef_wait_for_ack(self)->None:message=awaitself._pop_message()ifmessageandmessage.msgtype=='ACK':log.debug(f"Received {message!r}")awaitself._transition(CONNECTED_AFTER_ACK())elifmessageisNone:awaitself._transition_to_disconnected(DISCONNECTED(ErrorReason.HTTP_ERROR,500,"Internal server error."))else:raiseProtocolError(f"Received {message!r} instead of ACK")