#-----------------------------------------------------------------------------# Copyright (c) Anaconda, Inc., and Bokeh Contributors.# All rights reserved.## The full license is in the file LICENSE.txt, distributed with this software.#-----------------------------------------------------------------------------''' Provide a base class for all Bokeh Server Protocol message types.Boker messages are comprised of a sequence of JSON fragments. Specified asPython JSON-like data, messages have the general form:.. code-block:: python [ # these are required b'{header}', # serialized header dict b'{metadata}', # serialized metadata dict b'{content}', # serialized content dict # these are optional, and come in pairs; header contains num_buffers b'{buf_header}', # serialized buffer header dict b'array' # raw buffer payload data ... ]The ``header`` fragment will have the form:.. code-block:: python header = { # these are required 'msgid' : <str> # a unique id for the message 'msgtype' : <str> # a message type, e.g. 'ACK', 'PATCH-DOC', etc # these are optional 'num_buffers' : <int> # the number of additional buffers, if any }The ``metadata`` fragment may contain any arbitrary information. It is notprocessed by Bokeh for any purpose, but may be useful for externalmonitoring or instrumentation tools.The ``content`` fragment is defined by the specific message type.'''#-----------------------------------------------------------------------------# Boilerplate#-----------------------------------------------------------------------------from__future__importannotationsimportlogging# isort:skiplog=logging.getLogger(__name__)#-----------------------------------------------------------------------------# Imports#-----------------------------------------------------------------------------# Standard library importsimportjsonfromtypingimport(TYPE_CHECKING,Any,ClassVar,Generic,TypeAlias,TypedDict,TypeVar,)# Bokeh importsimportbokeh.util.serializationasbkserial# Bokeh importsfrom..core.json_encoderimportserialize_jsonfrom..core.serializationimportBuffer,Serializedfrom..core.typesimportIDfrom.exceptionsimportMessageError,ProtocolErrorifTYPE_CHECKING:fromtyping_extensionsimportNotRequiredfrom..client.websocketimportWebSocketClientConnectionWrapper#-----------------------------------------------------------------------------# Globals and constants#-----------------------------------------------------------------------------__all__=('Message',)#-----------------------------------------------------------------------------# General API#-----------------------------------------------------------------------------#-----------------------------------------------------------------------------# Dev API#-----------------------------------------------------------------------------classHeader(TypedDict):msgid:IDmsgtype:strreqid:NotRequired[ID]num_buffers:NotRequired[int]classBufferHeader(TypedDict):id:IDContent=TypeVar("Content")Metadata:TypeAlias=dict[str,Any]BufferRef:TypeAlias=tuple[BufferHeader,bytes]classEmpty(TypedDict):pass
[docs]classMessage(Generic[Content]):''' The Message base class encapsulates creating, assembling, and validating the integrity of Bokeh Server messages. Additionally, it provide hooks '''msgtype:ClassVar[str]_header:Header_header_json:str|None_content:Content_content_json:str|None_metadata:Metadata_metadata_json:str|None_buffers:list[Buffer]
[docs]def__init__(self,header:Header,metadata:Metadata,content:Content)->None:''' Initialize a new message from header, metadata, and content dictionaries. To assemble a message from existing JSON fragments, use the ``assemble`` method. To create new messages with automatically generated headers, use subclass ``create`` methods. Args: header (JSON-like) : metadata (JSON-like) : content (JSON-like) : '''self.header=headerself.metadata=metadataself.content=contentself._buffers=[]
[docs]@classmethoddefassemble(cls,header_json:str,metadata_json:str,content_json:str)->Message[Content]:''' Creates a new message, assembled from JSON fragments. Args: header_json (``JSON``) : metadata_json (``JSON``) : content_json (``JSON``) : Returns: Message subclass Raises: MessageError '''try:header=json.loads(header_json)exceptValueError:raiseMessageError("header could not be decoded")try:metadata=json.loads(metadata_json)exceptValueError:raiseMessageError("metadata could not be decoded")try:content=json.loads(content_json)exceptValueError:raiseMessageError("content could not be decoded")msg=cls(header,metadata,content)msg._header_json=header_jsonmsg._metadata_json=metadata_jsonmsg._content_json=content_jsonreturnmsg
[docs]defadd_buffer(self,buffer:Buffer)->None:''' Associate a buffer header and payload with this message. Args: buf_header (``JSON``) : a buffer header buf_payload (``JSON`` or bytes) : a buffer payload Returns: None Raises: MessageError '''if'num_buffers'inself._header:self._header['num_buffers']+=1else:self._header['num_buffers']=1self._header_json=Noneself._buffers.append(buffer)
[docs]defassemble_buffer(self,buf_header:BufferHeader,buf_payload:bytes)->None:''' Add a buffer header and payload that we read from the socket. This differs from add_buffer() because we're validating vs. the header's num_buffers, instead of filling in the header. Args: buf_header (``JSON``) : a buffer header buf_payload (``JSON`` or bytes) : a buffer payload Returns: None Raises: ProtocolError '''num_buffers=self.header.get("num_buffers",0)ifnum_buffers<=len(self._buffers):raiseProtocolError(f"too many buffers received expecting {num_buffers}")self._buffers.append(Buffer(buf_header["id"],buf_payload))
[docs]asyncdefwrite_buffers(self,conn:WebSocketClientConnectionWrapper,locked:bool=True)->int:''' Write any buffer headers and payloads to the given connection. Args: conn (object) : May be any object with a ``write_message`` method. Typically, a Tornado ``WSHandler`` or ``WebSocketClientConnection`` locked (bool) : Returns: int : number of bytes sent '''ifconnisNone:raiseValueError("Cannot write_buffers to connection None")sent=0forbufferinself._buffers:header=json.dumps(buffer.ref)payload=buffer.to_bytes()awaitconn.write_message(header,locked=locked)awaitconn.write_message(payload,binary=True,locked=locked)sent+=len(header)+len(payload)returnsent
[docs]@classmethoddefcreate_header(cls,request_id:ID|None=None)->Header:''' Return a message header fragment dict. Args: request_id (str or None) : Message ID of the message this message replies to Returns: dict : a message header '''header=Header(msgid=bkserial.make_id(),msgtype=cls.msgtype,)ifrequest_idisnotNone:header['reqid']=request_idreturnheader
[docs]asyncdefsend(self,conn:WebSocketClientConnectionWrapper)->int:''' Send the message on the given connection. Args: conn (WebSocketHandler) : a WebSocketHandler to send messages Returns: int : number of bytes sent '''ifconnisNone:raiseValueError("Cannot send to connection None")withawaitconn.write_lock.acquire():sent=0awaitconn.write_message(self.header_json,locked=False)sent+=len(self.header_json)# uncomment this to make it a lot easier to reproduce lock-related bugs#await asyncio.sleep(0.1)awaitconn.write_message(self.metadata_json,locked=False)sent+=len(self.metadata_json)# uncomment this to make it a lot easier to reproduce lock-related bugs#await asyncio.sleep(0.1)awaitconn.write_message(self.content_json,locked=False)sent+=len(self.content_json)sent+=awaitself.write_buffers(conn,locked=False)returnsent
@propertydefcomplete(self)->bool:''' Returns whether all required parts of a message are present. Returns: bool : True if the message is complete, False otherwise '''returnself.headerisnotNoneand \
self.metadataisnotNoneand \
self.contentisnotNoneand \
self.header.get('num_buffers',0)==len(self._buffers)@propertydefpayload(self)->Serialized[Content]:returnSerialized(self.content,self.buffers)# header fragment properties@propertydefheader(self)->Header:returnself._header@header.setterdefheader(self,value:Header)->None:self._header=valueself._header_json=None@propertydefheader_json(self)->str:ifnotself._header_json:self._header_json=json.dumps(self.header)returnself._header_json# content fragment properties@propertydefcontent(self)->Content:returnself._content@content.setterdefcontent(self,value:Content)->None:self._content=valueself._content_json=None@propertydefcontent_json(self)->str:ifnotself._content_json:self._content_json=serialize_json(self.payload)returnself._content_json# metadata fragment properties@propertydefmetadata(self)->Metadata:returnself._metadata@metadata.setterdefmetadata(self,value:Metadata)->None:self._metadata=valueself._metadata_json=None@propertydefmetadata_json(self)->str:ifnotself._metadata_json:self._metadata_json=json.dumps(self.metadata)returnself._metadata_json# buffer properties@propertydefbuffers(self)->list[Buffer]:returnlist(self._buffers)