#-----------------------------------------------------------------------------# Copyright (c) Anaconda, Inc., and Bokeh Contributors.# All rights reserved.## The full license is in the file LICENSE.txt, distributed with this software.#-----------------------------------------------------------------------------''' Provides the ``ServerSession`` class.'''#-----------------------------------------------------------------------------# Boilerplate#-----------------------------------------------------------------------------from__future__importannotationsimportlogging# isort:skiplog=logging.getLogger(__name__)#-----------------------------------------------------------------------------# Imports#-----------------------------------------------------------------------------# Standard library importsimportinspectimporttimefromcopyimportcopyfromfunctoolsimportwrapsfromtypingimport(TYPE_CHECKING,Any,Awaitable,Callable,TypeVar,)# External importsfromtornadoimportlocksifTYPE_CHECKING:fromtornado.ioloopimportIOLoop# Bokeh importsfrom..eventsimportConnectionLostfrom..util.tokenimportgenerate_jwt_tokenfrom.callbacksimportDocumentCallbackGroupifTYPE_CHECKING:from..core.typesimportIDfrom..document.documentimportDocumentfrom..document.eventsimportDocumentPatchedEventfrom..protocolimportmessagesasmsgfrom.callbacksimportCallback,SessionCallbackfrom.connectionimportServerConnection#-----------------------------------------------------------------------------# Globals and constants#-----------------------------------------------------------------------------__all__=('current_time','ServerSession',)#-----------------------------------------------------------------------------# Private API#-----------------------------------------------------------------------------T=TypeVar("T")F=TypeVar("F",bound=Callable[...,Any])def_needs_document_lock(func:F)->F:'''Decorator that adds the necessary locking and post-processing to manipulate the session's document. Expects to decorate a method on ServerSession and transforms it into a coroutine if it wasn't already. '''@wraps(func)asyncdef_needs_document_lock_wrapper(self:ServerSession,*args,**kwargs):# while we wait for and hold the lock, prevent the session# from being discarded. This avoids potential weirdness# with the session vanishing in the middle of some async# task.ifself.destroyed:log.debug("Ignoring locked callback on already-destroyed session.")returnNoneself.block_expiration()try:withawaitself._lock.acquire():ifself._pending_writesisnotNone:raiseRuntimeError("internal class invariant violated: _pending_writes "+ \
"should be None if lock is not held")self._pending_writes=[]try:result=func(self,*args,**kwargs)ifinspect.isawaitable(result):# Note that this must not be outside of the critical section.# Otherwise, the async callback will be ran without document locking.result=awaitresultfinally:# we want to be very sure we reset this or we'll# keep hitting the RuntimeError above as soon as# any callback goes wrongpending_writes=self._pending_writesself._pending_writes=Noneforpinpending_writes:awaitpreturnresultfinally:self.unblock_expiration()return_needs_document_lock_wrapper#-----------------------------------------------------------------------------# General API#-----------------------------------------------------------------------------
[docs]defcurrent_time()->float:'''Return the time in milliseconds since the epoch as a floating point number. '''returntime.monotonic()*1000
[docs]classServerSession:''' Hosts an application "instance" (an instantiated Document) for one or more connections. '''_subscribed_connections:set[ServerConnection]_current_patch_connection:ServerConnection|None_pending_writes:list[Awaitable[None]]|Nonedef__init__(self,session_id:ID,document:Document,io_loop:IOLoop|None=None,token:str|None=None)->None:ifsession_idisNone:raiseValueError("Sessions must have an id")ifdocumentisNone:raiseValueError("Sessions must have a document")self._id=session_idself._token=tokenself._document=documentself._loop=io_loopself._subscribed_connections=set()self._last_unsubscribe_time=current_time()self._lock=locks.Lock()self._current_patch_connection=Noneself._document.callbacks.on_change_dispatch_to(self)self._callbacks=DocumentCallbackGroup(io_loop)self._pending_writes=Noneself._destroyed=Falseself._expiration_requested=Falseself._expiration_blocked_count=0wrapped_callbacks=[self._wrap_session_callback(cb)forcbinself._document.session_callbacks]self._callbacks.add_session_callbacks(wrapped_callbacks)@propertydefdocument(self)->Document:returnself._document@propertydefid(self)->ID:returnself._id@propertydeftoken(self)->str:''' A JWT token to authenticate the session. '''ifself._token:returnself._tokenreturngenerate_jwt_token(self.id)@propertydefdestroyed(self)->bool:returnself._destroyed@propertydefexpiration_requested(self)->bool:returnself._expiration_requested@propertydefexpiration_blocked(self)->bool:returnself._expiration_blocked_count>0@propertydefexpiration_blocked_count(self)->int:returnself._expiration_blocked_countdefdestroy(self)->None:self._destroyed=Trueself._document.destroy(self)delself._documentself._callbacks.remove_all_callbacks()delself._callbacks
[docs]defrequest_expiration(self)->None:""" Used in test suite for now. Forces immediate expiration if no connections."""self._expiration_requested=True
[docs]defsubscribe(self,connection:ServerConnection)->None:"""This should only be called by ``ServerConnection.subscribe_session`` or our book-keeping will be broken"""self._subscribed_connections.add(connection)
[docs]defunsubscribe(self,connection:ServerConnection)->None:"""This should only be called by ``ServerConnection.unsubscribe_session`` or our book-keeping will be broken"""self._subscribed_connections.discard(connection)self._last_unsubscribe_time=current_time()
[docs]@_needs_document_lockdefwith_document_locked(self,func:Callable[...,T],*args:Any,**kwargs:Any)->T:''' Asynchronously locks the document and runs the function with it locked.'''returnfunc(*args,**kwargs)
def_wrap_document_callback(self,callback:Callback)->Callback:ifgetattr(callback,"nolock",False):returncallbackdefwrapped_callback(*args:Any,**kwargs:Any):returnself.with_document_locked(callback,*args,**kwargs)returnwrapped_callbackdef_wrap_session_callback(self,callback:SessionCallback)->SessionCallback:wrapped=copy(callback)wrapped._callback=self._wrap_document_callback(callback.callback)returnwrappeddef_document_patched(self,event:DocumentPatchedEvent)->None:may_suppress=event.setterisselfifself._pending_writesisNone:raiseRuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")# TODO (havocp): our "change sync" protocol is flawed because if both# sides change the same attribute at the same time, they will each end# up with the state of the other and their final states will differ.forconnectioninself._subscribed_connections:ifmay_suppressandconnectionisself._current_patch_connection:continueself._pending_writes.append(connection.send_patch_document(event))@_needs_document_lockdef_handle_pull(self,message:msg.pull_doc_req,connection:ServerConnection)->msg.pull_doc_reply:log.debug(f"Sending pull-doc-reply from session {self.id!r}")returnconnection.protocol.create('PULL-DOC-REPLY',message.header['msgid'],self.document)def_session_callback_added(self,event:SessionCallback):wrapped=self._wrap_session_callback(event.callback)self._callbacks.add_session_callback(wrapped)def_session_callback_removed(self,event):self._callbacks.remove_session_callback(event.callback)
[docs]@classmethoddefpull(cls,message:msg.pull_doc_req,connection:ServerConnection)->msg.pull_doc_reply:''' Handle a PULL-DOC, return a Future with work to be scheduled. '''returnconnection.session._handle_pull(message,connection)
@_needs_document_lockdef_handle_push(self,message:msg.push_doc,connection:ServerConnection)->msg.ok:log.debug(f"pushing doc to session {self.id!r}")message.push_to_document(self.document)returnconnection.ok(message)
[docs]@classmethoddefpush(cls,message:msg.push_doc,connection:ServerConnection)->msg.ok:''' Handle a PUSH-DOC, return a Future with work to be scheduled. '''returnconnection.session._handle_push(message,connection)
[docs]@classmethoddefpatch(cls,message:msg.patch_doc,connection:ServerConnection)->msg.ok:''' Handle a PATCH-DOC, return a Future with work to be scheduled. '''returnconnection.session._handle_patch(message,connection)
[docs]defnotify_connection_lost(self)->None:''' Notify the document that the connection was lost. '''self.document.callbacks.trigger_event(ConnectionLost())
#-----------------------------------------------------------------------------# Dev API#-----------------------------------------------------------------------------#-----------------------------------------------------------------------------# Code#-----------------------------------------------------------------------------