#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2021, Anaconda, Inc., and Bokeh Contributors.
# All rights reserved.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
''' Provides the ``ServerSession`` class.
'''
#-----------------------------------------------------------------------------
# Boilerplate
#-----------------------------------------------------------------------------
from __future__ import annotations
import logging # isort:skip
log = logging.getLogger(__name__)
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports
import inspect
import time
from copy import copy
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
List,
Set,
TypeVar,
)
# External imports
from tornado import locks
if TYPE_CHECKING:
from tornado.ioloop import IOLoop
# Bokeh imports
from ..util.token import generate_jwt_token
from .callbacks import DocumentCallbackGroup
if TYPE_CHECKING:
from ..core.types import ID
from ..document.document import Document
from ..document.events import DocumentPatchedEvent
from ..protocol import messages as msg
from .callbacks import Callback, SessionCallback
from .connection import ServerConnection
#-----------------------------------------------------------------------------
# Globals and constants
#-----------------------------------------------------------------------------
__all__ = (
'current_time',
'ServerSession',
)
#-----------------------------------------------------------------------------
# Private API
#-----------------------------------------------------------------------------
T = TypeVar("T")
F = TypeVar("F", bound=Callable[..., Any])
def _needs_document_lock(func: F) -> F:
'''Decorator that adds the necessary locking and post-processing
to manipulate the session's document. Expects to decorate a
method on ServerSession and transforms it into a coroutine
if it wasn't already.
'''
async def _needs_document_lock_wrapper(self: ServerSession, *args, **kwargs):
# while we wait for and hold the lock, prevent the session
# from being discarded. This avoids potential weirdness
# with the session vanishing in the middle of some async
# task.
if self.destroyed:
log.debug("Ignoring locked callback on already-destroyed session.")
return None
self.block_expiration()
try:
with await self._lock.acquire():
if self._pending_writes is not None:
raise RuntimeError("internal class invariant violated: _pending_writes " + \
"should be None if lock is not held")
self._pending_writes = []
try:
result = func(self, *args, **kwargs)
if inspect.isawaitable(result):
# Note that this must not be outside of the critical section.
# Otherwise, the async callback will be ran without document locking.
result = await result
finally:
# we want to be very sure we reset this or we'll
# keep hitting the RuntimeError above as soon as
# any callback goes wrong
pending_writes = self._pending_writes
self._pending_writes = None
for p in pending_writes:
await p
return result
finally:
self.unblock_expiration()
return _needs_document_lock_wrapper
#-----------------------------------------------------------------------------
# General API
#-----------------------------------------------------------------------------
[docs]def current_time() -> float:
'''Return the time in milliseconds since the epoch as a floating
point number.
'''
return time.monotonic() * 1000
[docs]class ServerSession:
''' Hosts an application "instance" (an instantiated Document) for one or more connections.
.. autoclasstoc::
'''
_subscribed_connections: Set[ServerConnection]
_current_patch_connection: ServerConnection | None
_pending_writes: List[Awaitable[None]] | None
def __init__(self, session_id: ID, document: Document, io_loop: IOLoop | None = None, token: str | None = None) -> None:
if session_id is None:
raise ValueError("Sessions must have an id")
if document is None:
raise ValueError("Sessions must have a document")
self._id = session_id
self._token = token
self._document = document
self._loop = io_loop
self._subscribed_connections = set()
self._last_unsubscribe_time = current_time()
self._lock = locks.Lock()
self._current_patch_connection = None
self._document.callbacks.on_change_dispatch_to(self)
self._callbacks = DocumentCallbackGroup(io_loop)
self._pending_writes = None
self._destroyed = False
self._expiration_requested = False
self._expiration_blocked_count = 0
wrapped_callbacks = [self._wrap_session_callback(cb) for cb in self._document.session_callbacks]
self._callbacks.add_session_callbacks(wrapped_callbacks)
@property
def document(self) -> Document:
return self._document
@property
def id(self) -> ID:
return self._id
@property
def token(self) -> str:
''' A JWT token to authenticate the session. '''
if self._token:
return self._token
return generate_jwt_token(self.id)
@property
def destroyed(self) -> bool:
return self._destroyed
@property
def expiration_requested(self) -> bool:
return self._expiration_requested
@property
def expiration_blocked(self) -> bool:
return self._expiration_blocked_count > 0
@property
def expiration_blocked_count(self) -> int:
return self._expiration_blocked_count
def destroy(self) -> None:
self._destroyed = True
self._document.destroy(self)
del self._document
self._callbacks.remove_all_callbacks()
del self._callbacks
[docs] def request_expiration(self) -> None:
""" Used in test suite for now. Forces immediate expiration if no connections."""
self._expiration_requested = True
def block_expiration(self) -> None:
self._expiration_blocked_count += 1
def unblock_expiration(self) -> None:
if self._expiration_blocked_count <= 0:
raise RuntimeError("mismatched block_expiration / unblock_expiration")
self._expiration_blocked_count -= 1
[docs] def subscribe(self, connection: ServerConnection) -> None:
"""This should only be called by ``ServerConnection.subscribe_session`` or our book-keeping will be broken"""
self._subscribed_connections.add(connection)
[docs] def unsubscribe(self, connection: ServerConnection) -> None:
"""This should only be called by ``ServerConnection.unsubscribe_session`` or our book-keeping will be broken"""
self._subscribed_connections.discard(connection)
self._last_unsubscribe_time = current_time()
@property
def connection_count(self) -> int:
return len(self._subscribed_connections)
@property
def milliseconds_since_last_unsubscribe(self) -> float:
return current_time() - self._last_unsubscribe_time
@_needs_document_lock
def with_document_locked(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
''' Asynchronously locks the document and runs the function with it locked.'''
return func(*args, **kwargs)
def _wrap_document_callback(self, callback: Callback) -> Callback:
if getattr(callback, "nolock", False):
return callback
def wrapped_callback(*args: Any, **kwargs: Any):
return self.with_document_locked(callback, *args, **kwargs)
return wrapped_callback
def _wrap_session_callback(self, callback: SessionCallback) -> SessionCallback:
wrapped = copy(callback)
wrapped._callback = self._wrap_document_callback(callback.callback)
return wrapped
def _document_patched(self, event: DocumentPatchedEvent) -> None:
may_suppress = event.setter is self
if self._pending_writes is None:
raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")
# TODO (havocp): our "change sync" protocol is flawed because if both
# sides change the same attribute at the same time, they will each end
# up with the state of the other and their final states will differ.
for connection in self._subscribed_connections:
if may_suppress and connection is self._current_patch_connection:
continue
self._pending_writes.append(connection.send_patch_document(event))
@_needs_document_lock
def _handle_pull(self, message: msg.pull_doc_req, connection: ServerConnection) -> msg.pull_doc_reply:
log.debug(f"Sending pull-doc-reply from session {self.id!r}")
return connection.protocol.create('PULL-DOC-REPLY', message.header['msgid'], self.document)
def _session_callback_added(self, event: SessionCallback):
wrapped = self._wrap_session_callback(event.callback)
self._callbacks.add_session_callback(wrapped)
def _session_callback_removed(self, event):
self._callbacks.remove_session_callback(event.callback)
[docs] @classmethod
def pull(cls, message: msg.pull_doc_req, connection: ServerConnection) -> msg.pull_doc_reply:
''' Handle a PULL-DOC, return a Future with work to be scheduled. '''
return connection.session._handle_pull(message, connection)
@_needs_document_lock
def _handle_push(self, message: msg.push_doc, connection: ServerConnection) -> msg.ok:
log.debug(f"pushing doc to session {self.id!r}")
message.push_to_document(self.document)
return connection.ok(message)
[docs] @classmethod
def push(cls, message: msg.push_doc, connection: ServerConnection) -> msg.ok:
''' Handle a PUSH-DOC, return a Future with work to be scheduled. '''
return connection.session._handle_push(message, connection)
@_needs_document_lock
def _handle_patch(self, message: msg.patch_doc, connection: ServerConnection) -> msg.ok:
self._current_patch_connection = connection
try:
message.apply_to_document(self.document, self)
finally:
self._current_patch_connection = None
return connection.ok(message)
[docs] @classmethod
def patch(cls, message: msg.patch_doc, connection: ServerConnection) -> msg.ok:
''' Handle a PATCH-DOC, return a Future with work to be scheduled. '''
return connection.session._handle_patch(message, connection)
#-----------------------------------------------------------------------------
# Dev API
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------