#-----------------------------------------------------------------------------# Copyright (c) Anaconda, Inc., and Bokeh Contributors.# All rights reserved.## The full license is in the file LICENSE.txt, distributed with this software.#-----------------------------------------------------------------------------''' Assemble WebSocket wire message fragments into complete Bokeh Servermessage objects that can be processed.'''#-----------------------------------------------------------------------------# Boilerplate#-----------------------------------------------------------------------------from__future__importannotationsimportlogging# isort:skiplog=logging.getLogger(__name__)#-----------------------------------------------------------------------------# Imports#-----------------------------------------------------------------------------# Standard library importsimportjsonfromtypingimport(TYPE_CHECKING,Any,Callable,Union,cast,)# Bokeh importsfrom.exceptionsimportValidationErrorfrom.messageimportBufferHeader,MessageifTYPE_CHECKING:fromtyping_extensionsimportTypeAliasfrom.importProtocol#-----------------------------------------------------------------------------# Globals and constants#-----------------------------------------------------------------------------__all__=('Receiver',)#-----------------------------------------------------------------------------# General API#-----------------------------------------------------------------------------#-----------------------------------------------------------------------------# Dev API#-----------------------------------------------------------------------------Fragment:TypeAlias=Union[str,bytes]
[docs]classReceiver:''' Receive wire message fragments and assemble complete Bokeh server message objects. On ``MessageError`` or ``ValidationError``, the receiver will reset its state and attempt to consume a new message. The *fragment* received can be either bytes or unicode, depending on the transport's semantics (WebSocket allows both). .. code-block:: python [ # these are required b'{header}', # serialized header dict b'{metadata}', # serialized metadata dict b'{content}, # serialized content dict # these are optional, and come in pairs; header contains num_buffers b'{buf_header}', # serialized buffer header dict b'array' # raw buffer payload data ... ] The ``header`` fragment will have the form: .. code-block:: python header = { # these are required 'msgid' : <str> # a unique id for the message 'msgtype' : <str> # a message type, e.g. 'ACK', 'PATCH-DOC', etc # these are optional 'num_buffers' : <int> # the number of additional buffers, if any } The ``metadata`` fragment may contain any arbitrary information. It is not processed by Bokeh for any purpose, but may be useful for external monitoring or instrumentation tools. The ``content`` fragment is defined by the specific message type. '''_current_consumer:Callable[[Fragment],None]_fragments:list[Fragment]_message:Message[Any]|None_buf_header:BufferHeader|None_partial:Message[Any]|None
[docs]def__init__(self,protocol:Protocol)->None:''' Configure a Receiver with a specific Bokeh protocol. Args: protocol (Protocol) : A Bokeh protocol object to use to assemble collected message fragments. '''self._protocol=protocolself._current_consumer=self._HEADERself._message=Noneself._partial=Noneself._buf_header=None
[docs]asyncdefconsume(self,fragment:Fragment)->Message[Any]|None:''' Consume individual protocol message fragments. Args: fragment (``JSON``) : A message fragment to assemble. When a complete message is assembled, the receiver state will reset to begin consuming a new message. '''self._current_consumer(fragment)returnself._message
def_HEADER(self,fragment:Fragment)->None:self._message=Noneself._partial=Noneself._fragments=[self._assume_text(fragment)]self._current_consumer=self._METADATAdef_METADATA(self,fragment:Fragment)->None:metadata=self._assume_text(fragment)self._fragments.append(metadata)self._current_consumer=self._CONTENTdef_CONTENT(self,fragment:Fragment)->None:content=self._assume_text(fragment)self._fragments.append(content)header_json,metadata_json,content_json=(self._assume_text(x)forxinself._fragments[:3])self._partial=self._protocol.assemble(header_json,metadata_json,content_json)self._check_complete()def_BUFFER_HEADER(self,fragment:Fragment)->None:header=json.loads(self._assume_text(fragment))ifset(header)!={"id"}:raiseValidationError(f"Malformed buffer header {header!r}")self._buf_header=headerself._current_consumer=self._BUFFER_PAYLOADdef_BUFFER_PAYLOAD(self,fragment:Fragment)->None:payload=self._assume_binary(fragment)ifself._buf_headerisNone:raiseValidationError("Consuming a buffer payload, but current buffer header is None")header=BufferHeader(id=self._buf_header["id"])cast(Message[Any],self._partial).assemble_buffer(header,payload)self._check_complete()def_check_complete(self)->None:ifself._partialandself._partial.complete:self._message=self._partialself._current_consumer=self._HEADERelse:self._current_consumer=self._BUFFER_HEADERdef_assume_text(self,fragment:Fragment)->str:ifnotisinstance(fragment,str):raiseValidationError(f"expected text fragment but received binary fragment for {self._current_consumer.__name__}")returnfragmentdef_assume_binary(self,fragment:Fragment)->bytes:ifnotisinstance(fragment,bytes):raiseValidationError(f"expected binary fragment but received text fragment for {self._current_consumer.__name__}")returnfragment