Source code for bokeh.document.events

#-----------------------------------------------------------------------------
# Copyright (c) Anaconda, Inc., and Bokeh Contributors.
# All rights reserved.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
''' Provide events that represent various changes to Bokeh Documents.

These events are used internally to signal changes to Documents. For
information about user-facing (e.g. UI or tool) events, see the reference
for :ref:`bokeh.events`.

These events are employed for incoming and outgoing websocket messages and
internally for triggering callbacks. For example, the sequence of events that
happens when a user calls a Document API or sets a property resulting in a
"patch event" to the Document:

.. code-block::

    user invokes Document API
        -> Document API triggers event objects
        -> registered callbacks are executed
        -> Session callback generates JSON message from event object
        -> Session sends JSON message over websocket

But events may also be triggered from the client, and arrive as JSON messages
over the transport layer, which is why the JSON handling and Document API must
be separated. Consider the alternative sequence of events:

.. code-block::

    Session receives JSON message over websocket
        -> Document calls event.handle_json
        -> handle_json invokes appropriate Document API
        -> Document API triggers event objects
        -> registered callbacks are executed
        -> Session callback suppresses outgoing event

As a final note, message "ping-pong" is avoided by recording a "setter" when
events objects are created. If the session callback notes the event setter is
itself, then no further action (e.g. sending an outgoing change event identical
to the incoming event it just processed) is taken.

'''

#-----------------------------------------------------------------------------
# Boilerplate
#-----------------------------------------------------------------------------
from __future__ import annotations

import logging # isort:skip
log = logging.getLogger(__name__)

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------

# Standard library imports
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    ClassVar,
    TypeAlias,
    cast,
)

# Bokeh imports
from ..core.serialization import Serializable, Serializer
from .json import (
    ColumnDataChanged,
    ColumnsPatched,
    ColumnsStreamed,
    DocumentPatched,
    MessageSent,
    ModelChanged,
    RootAdded,
    RootRemoved,
    TitleChanged,
)

if TYPE_CHECKING:
    import pandas as pd

    from ..core.has_props import Setter
    from ..model import Model
    from ..models.sources import DataDict
    from ..protocol.message import BufferRef
    from ..server.callbacks import SessionCallback
    from .document import Document
    from .json import Patches

#-----------------------------------------------------------------------------
# Globals and constants
#-----------------------------------------------------------------------------

__all__ = (
    'ColumnDataChangedEvent',
    'ColumnsStreamedEvent',
    'ColumnsPatchedEvent',
    'DocumentChangedEvent',
    'DocumentPatchedEvent',
    'ModelChangedEvent',
    'RootAddedEvent',
    'RootRemovedEvent',
    'SessionCallbackAdded',
    'SessionCallbackRemoved',
    'TitleChangedEvent',
    'MessageSentEvent',
)

#-----------------------------------------------------------------------------
# General API
#-----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# Dev API
#-----------------------------------------------------------------------------

if TYPE_CHECKING:
    Buffers: TypeAlias = list[BufferRef] | None

    Invoker: TypeAlias = Callable[..., Any] # TODO

class DocumentChangedMixin:
    def _document_changed(self, event: DocumentChangedEvent) -> None: ...
class DocumentPatchedMixin:
    def _document_patched(self, event: DocumentPatchedEvent) -> None: ...
class DocumentMessageSentMixin:
    def _document_message_sent(self, event: MessageSentEvent) -> None: ...
class DocumentModelChangedMixin:
    def _document_model_changed(self, event: ModelChangedEvent) -> None: ...
class ColumnDataChangedMixin:
    def _column_data_changed(self, event: ColumnDataChangedEvent) -> None: ...
class ColumnsStreamedMixin:
    def _columns_streamed(self, event: ColumnsStreamedEvent) -> None: ...
class ColumnsPatchedMixin:
    def _columns_patched(self, event: ColumnsPatchedEvent) -> None: ...
class SessionCallbackAddedMixin:
    def _session_callback_added(self, event: SessionCallbackAdded) -> None: ...
class SessionCallbackRemovedMixin:
    def _session_callback_removed(self, event: SessionCallbackRemoved) -> None: ...

[docs] class DocumentChangedEvent: ''' Base class for all internal events representing a change to a Bokeh Document. ''' document: Document setter: Setter | None callback_invoker: Invoker | None
[docs] def __init__(self, document: Document, setter: Setter | None = None, callback_invoker: Invoker | None = None) -> None: ''' Args: document (Document) : A Bokeh document that is to be updated. setter (ClientSession or ServerSession or None, optional) : This is used to prevent "boomerang" updates to Bokeh apps. (default: None) In the context of a Bokeh server application, incoming updates to properties will be annotated with the session that is doing the updating. This value is propagated through any subsequent change notifications that the update triggers. The session can compare the event setter to itself, and suppress any updates that originate from itself. callback_invoker (callable, optional) : A callable that will invoke any Model callbacks that should be executed in response to the change that triggered this event. (default: None) ''' self.document = document self.setter = setter self.callback_invoker = callback_invoker
[docs] def combine(self, event: DocumentChangedEvent) -> bool: ''' ''' return False
[docs] def dispatch(self, receiver: Any) -> None: ''' Dispatch handling of this event to a receiver. This method will invoke ``receiver._document_changed`` if it exists. ''' if hasattr(receiver, '_document_changed'): cast(DocumentChangedMixin, receiver)._document_changed(self)
[docs] class DocumentPatchedEvent(DocumentChangedEvent, Serializable): ''' A Base class for events that represent updating Bokeh Models and their properties. ''' kind: ClassVar[str] _handlers: ClassVar[dict[str, type[DocumentPatchedEvent]]] = {} def __init_subclass__(cls): cls._handlers[cls.kind] = cls
[docs] def dispatch(self, receiver: Any) -> None: ''' Dispatch handling of this event to a receiver. This method will invoke ``receiver._document_patched`` if it exists. ''' super().dispatch(receiver) if hasattr(receiver, '_document_patched'): cast(DocumentPatchedMixin, receiver)._document_patched(self)
[docs] def to_serializable(self, serializer: Serializer) -> DocumentPatched: ''' Create a JSON representation of this event suitable for sending to clients. *Sub-classes must implement this method.* Args: serializer (Serializer): ''' raise NotImplementedError()
[docs] @staticmethod def handle_event(doc: Document, event_rep: DocumentPatched, setter: Setter | None) -> None: ''' ''' event_kind = event_rep.pop("kind") event_cls = DocumentPatchedEvent._handlers.get(event_kind, None) if event_cls is None: raise RuntimeError(f"unknown patch event type '{event_kind!r}'") event = event_cls(document=doc, setter=setter, **event_rep) event_cls._handle_event(doc, event)
@staticmethod def _handle_event(doc: Document, event: DocumentPatchedEvent) -> None: raise NotImplementedError()
[docs] class MessageSentEvent(DocumentPatchedEvent): ''' ''' kind = "MessageSent" def __init__(self, document: Document, msg_type: str, msg_data: Any | bytes, setter: Setter | None = None, callback_invoker: Invoker | None = None): super().__init__(document, setter, callback_invoker) self.msg_type = msg_type self.msg_data = msg_data
[docs] def dispatch(self, receiver: Any) -> None: super().dispatch(receiver) if hasattr(receiver, "_document_message_sent"): cast(DocumentMessageSentMixin, receiver)._document_message_sent(self)
[docs] def to_serializable(self, serializer: Serializer) -> MessageSent: return MessageSent( kind=self.kind, msg_type=self.msg_type, msg_data=serializer.encode(self.msg_data), )
@staticmethod def _handle_event(doc: Document, event: MessageSentEvent) -> None: message_callbacks = doc.callbacks._message_callbacks.get(event.msg_type, []) for cb in message_callbacks: cb(event.msg_data)
[docs] class ModelChangedEvent(DocumentPatchedEvent): ''' A concrete event representing updating an attribute and value of a specific Bokeh Model. ''' kind = "ModelChanged"
[docs] def __init__(self, document: Document, model: Model, attr: str, new: Any, setter: Setter | None = None, callback_invoker: Invoker | None = None): ''' Args: document (Document) : A Bokeh document that is to be updated. model (Model) : A Model to update attr (str) : The name of the attribute to update on the model. new (object) : The new value of the attribute setter (ClientSession or ServerSession or None, optional) : This is used to prevent "boomerang" updates to Bokeh apps. (default: None) See :class:`~bokeh.document.events.DocumentChangedEvent` for more details. callback_invoker (callable, optional) : A callable that will invoke any Model callbacks that should be executed in response to the change that triggered this event. (default: None) ''' super().__init__(document, setter, callback_invoker) self.model = model self.attr = attr self.new = new
[docs] def combine(self, event: DocumentChangedEvent) -> bool: ''' ''' if not isinstance(event, ModelChangedEvent): return False # If these are not true something weird is going on, maybe updates from # Python bokeh.client, don't try to combine if self.setter != event.setter: return False if self.document != event.document: return False if (self.model == event.model) and (self.attr == event.attr): self.new = event.new self.callback_invoker = event.callback_invoker return True return False
[docs] def dispatch(self, receiver: Any) -> None: ''' Dispatch handling of this event to a receiver. This method will invoke ``receiver._document_model_changed`` if it exists. ''' super().dispatch(receiver) if hasattr(receiver, '_document_model_changed'): cast(DocumentModelChangedMixin, receiver)._document_model_changed(self)
[docs] def to_serializable(self, serializer: Serializer) -> ModelChanged: ''' Create a JSON representation of this event suitable for sending to clients. Args: serializer (Serializer): ''' return ModelChanged( kind = self.kind, model = self.model.ref, attr = self.attr, new = serializer.encode(self.new), )
@staticmethod def _handle_event(doc: Document, event: ModelChangedEvent) -> None: model = event.model attr = event.attr value = event.new model.set_from_json(attr, value, setter=event.setter)
[docs] class ColumnDataChangedEvent(DocumentPatchedEvent): ''' A concrete event representing efficiently replacing *all* existing data for a :class:`~bokeh.models.sources.ColumnDataSource` ''' kind = "ColumnDataChanged"
[docs] def __init__(self, document: Document, model: Model, attr: str, data: DataDict | None = None, cols: list[str] | None = None, setter: Setter | None = None, callback_invoker: Invoker | None = None): ''' Args: document (Document) : A Bokeh document that is to be updated. column_source (ColumnDataSource) : cols (list[str]) : optional explicit list of column names to update. If None, all columns will be updated (default: None) setter (ClientSession or ServerSession or None, optional) : This is used to prevent "boomerang" updates to Bokeh apps. (default: None) See :class:`~bokeh.document.events.DocumentChangedEvent` for more details. callback_invoker (callable, optional) : A callable that will invoke any Model callbacks that should be executed in response to the change that triggered this event. (default: None) ''' super().__init__(document, setter, callback_invoker) self.model = model self.attr = attr self.data = data self.cols = cols
[docs] def dispatch(self, receiver: Any) -> None: ''' Dispatch handling of this event to a receiver. This method will invoke ``receiver._column_data_changed`` if it exists. ''' super().dispatch(receiver) if hasattr(receiver, '_column_data_changed'): cast(ColumnDataChangedMixin, receiver)._column_data_changed(self)
[docs] def to_serializable(self, serializer: Serializer) -> ColumnDataChanged: ''' Create a JSON representation of this event suitable for sending to clients. .. code-block:: python { 'kind' : 'ColumnDataChanged' 'column_source' : <reference to a CDS> 'data' : <new data to steam to column_source> 'cols' : <specific columns to update> } Args: serializer (Serializer): ''' data = self.data if self.data is not None else getattr(self.model, self.attr) cols = self.cols if cols is not None: data = {col: value for col in cols if (value := data.get(col)) is not None} return ColumnDataChanged( kind = self.kind, model = self.model.ref, attr = self.attr, data = serializer.encode(data), cols = serializer.encode(cols), )
@staticmethod def _handle_event(doc: Document, event: ColumnDataChangedEvent) -> None: model = event.model attr = event.attr data = event.data model.set_from_json(attr, data, setter=event.setter)
[docs] class ColumnsStreamedEvent(DocumentPatchedEvent): ''' A concrete event representing efficiently streaming new data to a :class:`~bokeh.models.sources.ColumnDataSource` ''' kind = "ColumnsStreamed" data: DataDict
[docs] def __init__(self, document: Document, model: Model, attr: str, data: DataDict | pd.DataFrame, rollover: int | None = None, setter: Setter | None = None, callback_invoker: Invoker | None = None): ''' Args: document (Document) : A Bokeh document that is to be updated. column_source (ColumnDataSource) : The data source to stream new data to. data (dict or DataFrame) : New data to stream. If a DataFrame, will be stored as ``{c: df[c] for c in df.columns}`` rollover (int, optional) : A rollover limit. If the data source columns exceed this limit, earlier values will be discarded to maintain the column length under the limit. setter (ClientSession or ServerSession or None, optional) : This is used to prevent "boomerang" updates to Bokeh apps. (default: None) See :class:`~bokeh.document.events.DocumentChangedEvent` for more details. callback_invoker (callable, optional) : A callable that will invoke any Model callbacks that should be executed in response to the change that triggered this event. (default: None) ''' super().__init__(document, setter, callback_invoker) self.model = model self.attr = attr import pandas as pd if isinstance(data, pd.DataFrame): data = {c: data[c] for c in data.columns} self.data = data self.rollover = rollover
[docs] def dispatch(self, receiver: Any) -> None: ''' Dispatch handling of this event to a receiver. This method will invoke ``receiver._columns_streamed`` if it exists. ''' super().dispatch(receiver) if hasattr(receiver, '_columns_streamed'): cast(ColumnsStreamedMixin, receiver)._columns_streamed(self)
[docs] def to_serializable(self, serializer: Serializer) -> ColumnsStreamed: ''' Create a JSON representation of this event suitable for sending to clients. .. code-block:: python { 'kind' : 'ColumnsStreamed' 'column_source' : <reference to a CDS> 'data' : <new data to steam to column_source> 'rollover' : <rollover limit> } Args: serializer (Serializer): ''' return ColumnsStreamed( kind = self.kind, model = self.model.ref, attr = self.attr, data = serializer.encode(self.data), rollover = self.rollover, )
@staticmethod def _handle_event(doc: Document, event: ColumnsStreamedEvent) -> None: model = event.model attr = event.attr assert attr == "data" data = event.data rollover = event.rollover model._stream(data, rollover, event.setter)
[docs] class ColumnsPatchedEvent(DocumentPatchedEvent): ''' A concrete event representing efficiently applying data patches to a :class:`~bokeh.models.sources.ColumnDataSource` ''' kind = "ColumnsPatched"
[docs] def __init__(self, document: Document, model: Model, attr: str, patches: Patches, setter: Setter | None = None, callback_invoker: Invoker | None = None): ''' Args: document (Document) : A Bokeh document that is to be updated. column_source (ColumnDataSource) : The data source to apply patches to. patches (list) : setter (ClientSession or ServerSession or None, optional) : This is used to prevent "boomerang" updates to Bokeh apps. (default: None) See :class:`~bokeh.document.events.DocumentChangedEvent` for more details. callback_invoker (callable, optional) : A callable that will invoke any Model callbacks that should be executed in response to the change that triggered this event. (default: None) ''' super().__init__(document, setter, callback_invoker) self.model = model self.attr = attr self.patches = patches
[docs] def dispatch(self, receiver: Any) -> None: ''' Dispatch handling of this event to a receiver. This method will invoke ``receiver._columns_patched`` if it exists. ''' super().dispatch(receiver) if hasattr(receiver, '_columns_patched'): cast(ColumnsPatchedMixin, receiver)._columns_patched(self)
[docs] def to_serializable(self, serializer: Serializer) -> ColumnsPatched: ''' Create a JSON representation of this event suitable for sending to clients. .. code-block:: python { 'kind' : 'ColumnsPatched' 'column_source' : <reference to a CDS> 'patches' : <patches to apply to column_source> } Args: serializer (Serializer): ''' return ColumnsPatched( kind = self.kind, model = self.model.ref, attr = self.attr, patches = serializer.encode(self.patches), )
@staticmethod def _handle_event(doc: Document, event: ColumnsPatchedEvent) -> None: model = event.model attr = event.attr assert attr == "data" patches = event.patches model.patch(patches, event.setter)
[docs] class TitleChangedEvent(DocumentPatchedEvent): ''' A concrete event representing a change to the title of a Bokeh Document. ''' kind = "TitleChanged"
[docs] def __init__(self, document: Document, title: str, setter: Setter | None = None, callback_invoker: Invoker | None = None): ''' Args: document (Document) : A Bokeh document that is to be updated. title (str) : The new title to set on the Document setter (ClientSession or ServerSession or None, optional) : This is used to prevent "boomerang" updates to Bokeh apps. (default: None) See :class:`~bokeh.document.events.DocumentChangedEvent` for more details. callback_invoker (callable, optional) : A callable that will invoke any Model callbacks that should be executed in response to the change that triggered this event. (default: None) ''' super().__init__(document, setter, callback_invoker) self.title = title
[docs] def combine(self, event: DocumentChangedEvent) -> bool: ''' ''' if not isinstance(event, TitleChangedEvent): return False # If these are not true something weird is going on, maybe updates from # Python bokeh.client, don't try to combine if self.setter != event.setter: return False if self.document != event.document: return False self.title = event.title self.callback_invoker = event.callback_invoker return True
[docs] def to_serializable(self, serializer: Serializer) -> TitleChanged: ''' Create a JSON representation of this event suitable for sending to clients. .. code-block:: python { 'kind' : 'TitleChanged' 'title' : <new title to set> } Args: serializer (Serializer): ''' return TitleChanged( kind = self.kind, title = self.title, )
@staticmethod def _handle_event(doc: Document, event: TitleChangedEvent) -> None: doc.set_title(event.title, event.setter)
[docs] class RootAddedEvent(DocumentPatchedEvent): ''' A concrete event representing a change to add a new Model to a Document's collection of "root" models. ''' kind = "RootAdded"
[docs] def __init__(self, document: Document, model: Model, setter: Setter | None = None, callback_invoker: Invoker | None = None) -> None: ''' Args: document (Document) : A Bokeh document that is to be updated. model (Model) : The Bokeh Model to add as a Document root. setter (ClientSession or ServerSession or None, optional) : This is used to prevent "boomerang" updates to Bokeh apps. (default: None) See :class:`~bokeh.document.events.DocumentChangedEvent` for more details. callback_invoker (callable, optional) : A callable that will invoke any Model callbacks that should be executed in response to the change that triggered this event. (default: None) ''' super().__init__(document, setter, callback_invoker) self.model = model
[docs] def to_serializable(self, serializer: Serializer) -> RootAdded: ''' Create a JSON representation of this event suitable for sending to clients. .. code-block:: python { 'kind' : 'RootAdded' 'title' : <reference to a Model> } Args: serializer (Serializer): ''' return RootAdded( kind = self.kind, model = serializer.encode(self.model), )
@staticmethod def _handle_event(doc: Document, event: RootAddedEvent) -> None: model = event.model doc.add_root(model, event.setter)
[docs] class RootRemovedEvent(DocumentPatchedEvent): ''' A concrete event representing a change to remove an existing Model from a Document's collection of "root" models. ''' kind = "RootRemoved"
[docs] def __init__(self, document: Document, model: Model, setter: Setter | None = None, callback_invoker: Invoker | None = None) -> None: ''' Args: document (Document) : A Bokeh document that is to be updated. model (Model) : The Bokeh Model to remove as a Document root. setter (ClientSession or ServerSession or None, optional) : This is used to prevent "boomerang" updates to Bokeh apps. (default: None) See :class:`~bokeh.document.events.DocumentChangedEvent` for more details. callback_invoker (callable, optional) : A callable that will invoke any Model callbacks that should be executed in response to the change that triggered this event. (default: None) ''' super().__init__(document, setter, callback_invoker) self.model = model
[docs] def to_serializable(self, serializer: Serializer) -> RootRemoved: ''' Create a JSON representation of this event suitable for sending to clients. .. code-block:: python { 'kind' : 'RootRemoved' 'title' : <reference to a Model> } Args: serializer (Serializer): ''' return RootRemoved( kind = self.kind, model = self.model.ref, )
@staticmethod def _handle_event(doc: Document, event: RootRemovedEvent) -> None: model = event.model doc.remove_root(model, event.setter)
[docs] class SessionCallbackAdded(DocumentChangedEvent): ''' A concrete event representing a change to add a new callback (e.g. periodic, timeout, or "next tick") to a Document. '''
[docs] def __init__(self, document: Document, callback: SessionCallback) -> None: ''' Args: document (Document) : A Bokeh document that is to be updated. callback (SessionCallback) : The callback to add ''' super().__init__(document) self.callback = callback
[docs] def dispatch(self, receiver: Any) -> None: ''' Dispatch handling of this event to a receiver. This method will invoke ``receiver._session_callback_added`` if it exists. ''' super().dispatch(receiver) if hasattr(receiver, '_session_callback_added'): cast(SessionCallbackAddedMixin, receiver)._session_callback_added(self)
[docs] class SessionCallbackRemoved(DocumentChangedEvent): ''' A concrete event representing a change to remove an existing callback (e.g. periodic, timeout, or "next tick") from a Document. '''
[docs] def __init__(self, document: Document, callback: SessionCallback) -> None: ''' Args: document (Document) : A Bokeh document that is to be updated. callback (SessionCallback) : The callback to remove ''' super().__init__(document) self.callback = callback
[docs] def dispatch(self, receiver: Any) -> None: ''' Dispatch handling of this event to a receiver. This method will invoke ``receiver._session_callback_removed`` if it exists. ''' super().dispatch(receiver) if hasattr(receiver, '_session_callback_removed'): cast(SessionCallbackRemovedMixin, receiver)._session_callback_removed(self)
DocumentChangeCallback = Callable[[DocumentChangedEvent], None] #----------------------------------------------------------------------------- # Private API #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Code #-----------------------------------------------------------------------------