# Copyright (c) 2012 - 2022, Anaconda, Inc., and Bokeh Contributors.
# All rights reserved.
# The full license is in the file LICENSE.txt, distributed with this software.
''' Provides the ``ServerSession`` class.


from __future__ import annotations

import logging # isort:skip
log = logging.getLogger(__name__)

import inspect
import time
from copy import copy
from functools import wraps
from typing import (

from tornado import locks

    from tornado.ioloop import IOLoop

from ..util.token import generate_jwt_token
from .callbacks import DocumentCallbackGroup

    from ..core.types import ID
    from ..document.document import Document
    from import DocumentPatchedEvent
    from ..protocol import messages as msg
    from .callbacks import Callback, SessionCallback
    from .connection import ServerConnection

T = TypeVar("T")
F = TypeVar("F", bound=Callable[..., Any])

def _needs_document_lock(func: F) -> F:
    '''Decorator that adds the necessary locking and post-processing
       to manipulate the session's document. Expects to decorate a
       method on ServerSession and transforms it into a coroutine
       if it wasn't already.
    async def _needs_document_lock_wrapper(self: ServerSession, *args, **kwargs):
        # while we wait for and hold the lock, prevent the session
        # from being discarded. This avoids potential weirdness
        # with the session vanishing in the middle of some async
        # task.
        if self.destroyed:
            log.debug("Ignoring locked callback on already-destroyed session.")
            return None
            with await self._lock.acquire():
                if self._pending_writes is not None:
                    raise RuntimeError("internal class invariant violated: _pending_writes " + \
                                       "should be None if lock is not held")
                self._pending_writes = []
                    result = func(self, *args, **kwargs)
                    if inspect.isawaitable(result):
                        # Note that this must not be outside of the critical section.
                        # Otherwise, the async callback will be ran without document locking.
                        result = await result
                    # we want to be very sure we reset this or we'll
                    # keep hitting the RuntimeError above as soon as
                    # any callback goes wrong
                    pending_writes = self._pending_writes
                    self._pending_writes = None
                for p in pending_writes:
                    await p
            return result
    return _needs_document_lock_wrapper

[docs]def current_time() -> float: '''Return the time in milliseconds since the epoch as a floating point number. ''' return time.monotonic() * 1000
[docs]class ServerSession: ''' Hosts an application "instance" (an instantiated Document) for one or more connections. .. autoclasstoc:: ''' _subscribed_connections: Set[ServerConnection] _current_patch_connection: ServerConnection | None _pending_writes: List[Awaitable[None]] | None def __init__(self, session_id: ID, document: Document, io_loop: IOLoop | None = None, token: str | None = None) -> None: if session_id is None: raise ValueError("Sessions must have an id") if document is None: raise ValueError("Sessions must have a document") self._id = session_id self._token = token self._document = document self._loop = io_loop self._subscribed_connections = set() self._last_unsubscribe_time = current_time() self._lock = locks.Lock() self._current_patch_connection = None self._document.callbacks.on_change_dispatch_to(self) self._callbacks = DocumentCallbackGroup(io_loop) self._pending_writes = None self._destroyed = False self._expiration_requested = False self._expiration_blocked_count = 0 wrapped_callbacks = [self._wrap_session_callback(cb) for cb in self._document.session_callbacks] self._callbacks.add_session_callbacks(wrapped_callbacks) @property def document(self) -> Document: return self._document @property def id(self) -> ID: return self._id @property def token(self) -> str: ''' A JWT token to authenticate the session. ''' if self._token: return self._token return generate_jwt_token( @property def destroyed(self) -> bool: return self._destroyed @property def expiration_requested(self) -> bool: return self._expiration_requested @property def expiration_blocked(self) -> bool: return self._expiration_blocked_count > 0 @property def expiration_blocked_count(self) -> int: return self._expiration_blocked_count def destroy(self) -> None: self._destroyed = True self._document.destroy(self) del self._document self._callbacks.remove_all_callbacks() del self._callbacks
[docs] def request_expiration(self) -> None: """ Used in test suite for now. Forces immediate expiration if no connections.""" self._expiration_requested = True
def block_expiration(self) -> None: self._expiration_blocked_count += 1 def unblock_expiration(self) -> None: if self._expiration_blocked_count <= 0: raise RuntimeError("mismatched block_expiration / unblock_expiration") self._expiration_blocked_count -= 1
[docs] def subscribe(self, connection: ServerConnection) -> None: """This should only be called by ``ServerConnection.subscribe_session`` or our book-keeping will be broken""" self._subscribed_connections.add(connection)
[docs] def unsubscribe(self, connection: ServerConnection) -> None: """This should only be called by ``ServerConnection.unsubscribe_session`` or our book-keeping will be broken""" self._subscribed_connections.discard(connection) self._last_unsubscribe_time = current_time()
@property def connection_count(self) -> int: return len(self._subscribed_connections) @property def milliseconds_since_last_unsubscribe(self) -> float: return current_time() - self._last_unsubscribe_time
[docs] @_needs_document_lock def with_document_locked(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T: ''' Asynchronously locks the document and runs the function with it locked.''' return func(*args, **kwargs)
def _wrap_document_callback(self, callback: Callback) -> Callback: if getattr(callback, "nolock", False): return callback def wrapped_callback(*args: Any, **kwargs: Any): return self.with_document_locked(callback, *args, **kwargs) return wrapped_callback def _wrap_session_callback(self, callback: SessionCallback) -> SessionCallback: wrapped = copy(callback) wrapped._callback = self._wrap_document_callback(callback.callback) return wrapped def _document_patched(self, event: DocumentPatchedEvent) -> None: may_suppress = event.setter is self if self._pending_writes is None: raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes") # TODO (havocp): our "change sync" protocol is flawed because if both # sides change the same attribute at the same time, they will each end # up with the state of the other and their final states will differ. for connection in self._subscribed_connections: if may_suppress and connection is self._current_patch_connection: continue self._pending_writes.append(connection.send_patch_document(event)) @_needs_document_lock def _handle_pull(self, message: msg.pull_doc_req, connection: ServerConnection) -> msg.pull_doc_reply: log.debug(f"Sending pull-doc-reply from session {!r}") return connection.protocol.create('PULL-DOC-REPLY', message.header['msgid'], self.document) def _session_callback_added(self, event: SessionCallback): wrapped = self._wrap_session_callback(event.callback) self._callbacks.add_session_callback(wrapped) def _session_callback_removed(self, event): self._callbacks.remove_session_callback(event.callback)
[docs] @classmethod def pull(cls, message: msg.pull_doc_req, connection: ServerConnection) -> msg.pull_doc_reply: ''' Handle a PULL-DOC, return a Future with work to be scheduled. ''' return connection.session._handle_pull(message, connection)
@_needs_document_lock def _handle_push(self, message: msg.push_doc, connection: ServerConnection) -> msg.ok: log.debug(f"pushing doc to session {!r}") message.push_to_document(self.document) return connection.ok(message)
[docs] @classmethod def push(cls, message: msg.push_doc, connection: ServerConnection) -> msg.ok: ''' Handle a PUSH-DOC, return a Future with work to be scheduled. ''' return connection.session._handle_push(message, connection)
@_needs_document_lock def _handle_patch(self, message: msg.patch_doc, connection: ServerConnection) -> msg.ok: self._current_patch_connection = connection try: message.apply_to_document(self.document, self) finally: self._current_patch_connection = None return connection.ok(message)
[docs] @classmethod def patch(cls, message: msg.patch_doc, connection: ServerConnection) -> msg.ok: ''' Handle a PATCH-DOC, return a Future with work to be scheduled. ''' return connection.session._handle_patch(message, connection)
