Source code for bokeh.client.websocket

#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2022, Anaconda, Inc., and Bokeh Contributors.
# All rights reserved.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
''' Provide a low-level wrapper for Tornado Websockets that adds locking
and smooths some compatibility issues.

'''

#-----------------------------------------------------------------------------
# Boilerplate
#-----------------------------------------------------------------------------
from __future__ import annotations

import logging # isort:skip
log = logging.getLogger(__name__)

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------

# Standard library imports
from typing import Any, Awaitable, Callable

# External imports
from tornado import locks
from tornado.websocket import WebSocketClientConnection

#-----------------------------------------------------------------------------
# Globals and constants
#-----------------------------------------------------------------------------

__all__ = (
    'WebSocketClientConnectionWrapper',
)

#-----------------------------------------------------------------------------
# General API
#-----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# Dev API
#-----------------------------------------------------------------------------

[docs]class WebSocketClientConnectionWrapper: ''' Used for compatibility across Tornado versions and to add write_lock''' def __init__(self, socket: WebSocketClientConnection) -> None: self._socket = socket # write_lock allows us to lock the connection to send multiple # messages atomically. self.write_lock = locks.Lock() # Internal methods --------------------------------------------------------
[docs] async def write_message(self, message: str | bytes, binary: bool = False, locked: bool = True) -> None: ''' Write a message to the websocket after obtaining the appropriate Bokeh Document lock. ''' if locked: with await self.write_lock.acquire(): self._socket.write_message(message, binary) else: self._socket.write_message(message, binary)
[docs] def close(self, code: int | None = None, reason: str | None = None) -> None: ''' Close the websocket. ''' return self._socket.close(code, reason)
[docs] def read_message(self, callback: Callable[..., Any] | None = None) -> Awaitable[None | str | bytes]: ''' Read a message from websocket and execute a callback. ''' return self._socket.read_message(callback)
#----------------------------------------------------------------------------- # Private API #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Code #-----------------------------------------------------------------------------