Source code for bokeh.protocol.receiver
#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2024, Anaconda, Inc., and Bokeh Contributors.
# All rights reserved.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
''' Assemble WebSocket wire message fragments into complete Bokeh Server
message objects that can be processed.
'''
#-----------------------------------------------------------------------------
# Boilerplate
#-----------------------------------------------------------------------------
from __future__ import annotations
import logging # isort:skip
log = logging.getLogger(__name__)
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports
import json
from typing import (
TYPE_CHECKING,
Any,
Callable,
Union,
cast,
)
# Bokeh imports
from .exceptions import ValidationError
from .message import BufferHeader, Message
if TYPE_CHECKING:
from typing_extensions import TypeAlias
from . import Protocol
#-----------------------------------------------------------------------------
# Globals and constants
#-----------------------------------------------------------------------------
__all__ = (
'Receiver',
)
#-----------------------------------------------------------------------------
# General API
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Dev API
#-----------------------------------------------------------------------------
Fragment: TypeAlias = Union[str, bytes]
[docs]
class Receiver:
''' Receive wire message fragments and assemble complete Bokeh server
message objects.
On ``MessageError`` or ``ValidationError``, the receiver will reset its
state and attempt to consume a new message.
The *fragment* received can be either bytes or unicode, depending on
the transport's semantics (WebSocket allows both).
.. code-block:: python
[
# these are required
b'{header}', # serialized header dict
b'{metadata}', # serialized metadata dict
b'{content}, # serialized content dict
# these are optional, and come in pairs; header contains num_buffers
b'{buf_header}', # serialized buffer header dict
b'array' # raw buffer payload data
...
]
The ``header`` fragment will have the form:
.. code-block:: python
header = {
# these are required
'msgid' : <str> # a unique id for the message
'msgtype' : <str> # a message type, e.g. 'ACK', 'PATCH-DOC', etc
# these are optional
'num_buffers' : <int> # the number of additional buffers, if any
}
The ``metadata`` fragment may contain any arbitrary information. It is not
processed by Bokeh for any purpose, but may be useful for external
monitoring or instrumentation tools.
The ``content`` fragment is defined by the specific message type.
'''
_current_consumer: Callable[[Fragment], None]
_fragments: list[Fragment]
_message: Message[Any] | None
_buf_header: BufferHeader | None
_partial: Message[Any] | None
[docs]
def __init__(self, protocol: Protocol) -> None:
''' Configure a Receiver with a specific Bokeh protocol.
Args:
protocol (Protocol) :
A Bokeh protocol object to use to assemble collected message
fragments.
'''
self._protocol = protocol
self._current_consumer = self._HEADER
self._message = None
self._partial = None
self._buf_header = None
[docs]
async def consume(self, fragment: Fragment) -> Message[Any]|None:
''' Consume individual protocol message fragments.
Args:
fragment (``JSON``) :
A message fragment to assemble. When a complete message is
assembled, the receiver state will reset to begin consuming a
new message.
'''
self._current_consumer(fragment)
return self._message
def _HEADER(self, fragment: Fragment) -> None:
self._message = None
self._partial = None
self._fragments = [self._assume_text(fragment)]
self._current_consumer = self._METADATA
def _METADATA(self, fragment: Fragment) -> None:
metadata = self._assume_text(fragment)
self._fragments.append(metadata)
self._current_consumer = self._CONTENT
def _CONTENT(self, fragment: Fragment) -> None:
content = self._assume_text(fragment)
self._fragments.append(content)
header_json, metadata_json, content_json = (self._assume_text(x) for x in self._fragments[:3])
self._partial = self._protocol.assemble(header_json, metadata_json, content_json)
self._check_complete()
def _BUFFER_HEADER(self, fragment: Fragment) -> None:
header = json.loads(self._assume_text(fragment))
if set(header) != { "id" }:
raise ValidationError(f"Malformed buffer header {header!r}")
self._buf_header = header
self._current_consumer = self._BUFFER_PAYLOAD
def _BUFFER_PAYLOAD(self, fragment: Fragment) -> None:
payload = self._assume_binary(fragment)
if self._buf_header is None:
raise ValidationError("Consuming a buffer payload, but current buffer header is None")
header = BufferHeader(id=self._buf_header["id"])
cast(Message[Any], self._partial).assemble_buffer(header, payload)
self._check_complete()
def _check_complete(self) -> None:
if self._partial and self._partial.complete:
self._message = self._partial
self._current_consumer = self._HEADER
else:
self._current_consumer = self._BUFFER_HEADER
def _assume_text(self, fragment: Fragment) -> str:
if not isinstance(fragment, str):
raise ValidationError(f"expected text fragment but received binary fragment for {self._current_consumer.__name__}")
return fragment
def _assume_binary(self, fragment: Fragment) -> bytes:
if not isinstance(fragment, bytes):
raise ValidationError(f"expected binary fragment but received text fragment for {self._current_consumer.__name__}")
return fragment
#-----------------------------------------------------------------------------
# Private API
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------