#-----------------------------------------------------------------------------
# Copyright (c) Anaconda, Inc., and Bokeh Contributors.
# All rights reserved.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
''' Provides the Application, Server, and Session context classes.
'''
#-----------------------------------------------------------------------------
# Boilerplate
#-----------------------------------------------------------------------------
from __future__ import annotations
import logging # isort:skip
log = logging.getLogger(__name__)
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports
import weakref
from typing import (
    TYPE_CHECKING,
    Any,
    Awaitable,
    Callable,
    Iterable,
)
# External imports
from tornado import gen
if TYPE_CHECKING:
    from tornado.httputil import HTTPServerRequest
    from tornado.ioloop import IOLoop
# Bokeh imports
from ..application.application import ServerContext, SessionContext
from ..document import Document
from ..protocol.exceptions import ProtocolError
from ..util.token import get_token_payload
from .session import ServerSession
if TYPE_CHECKING:
    from ..application.application import Application
    from ..core.types import ID
    from ..util.token import TokenPayload
#-----------------------------------------------------------------------------
# Globals and constants
#-----------------------------------------------------------------------------
__all__ = (
    'ApplicationContext',
    'BokehServerContext',
    'BokehSessionContext',
)
#-----------------------------------------------------------------------------
# Setup
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# General API
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Dev API
#-----------------------------------------------------------------------------
[docs]
class BokehServerContext(ServerContext):
    def __init__(self, application_context: ApplicationContext) -> None:
        self._application_context = weakref.ref(application_context)
    @property
    def application_context(self) -> ApplicationContext | None:
        return self._application_context()
    @property
    def sessions(self) -> list[ServerSession]:
        result: list[ServerSession] = []
        context = self.application_context
        if context:
            for session in context.sessions:
                result.append(session)
        return result 
[docs]
class BokehSessionContext(SessionContext):
    _session: ServerSession | None
    _request: _RequestProxy | None
    _token: str | None
    def __init__(self, session_id: ID, server_context: ServerContext,
            document: Document, logout_url: str | None = None) -> None:
        self._document = document
        self._session = None
        self._logout_url = logout_url
        super().__init__(server_context, session_id)
        # request arguments used to instantiate this session
        self._request = None
        self._token = None
    def _set_session(self, session: ServerSession) -> None:
        self._session = session
[docs]
    async def with_locked_document(self, func: Callable[[Document], Awaitable[None]]) -> None:
        if self._session is None:
            # this means we are in on_session_created, so no locking yet,
            # we have exclusive access
            await func(self._document)
        else:
            await self._session.with_document_locked(func, self._document) 
    @property
    def destroyed(self) -> bool:
        if self._session is None:
            # this means we are in on_session_created
            return False
        else:
            return self._session.destroyed
    @property
    def logout_url(self) -> str | None:
        return self._logout_url
    @property
    def request(self) -> _RequestProxy | None:
        return self._request
    @property
    def token_payload(self) -> TokenPayload:
        assert self._token is not None
        return get_token_payload(self._token)
    @property
    def session(self) -> ServerSession | None:
        return self._session 
[docs]
class ApplicationContext:
    ''' Server-side holder for ``bokeh.application.Application`` plus any associated data.
        This holds data that's global to all sessions, while ``ServerSession`` holds
        data specific to an "instance" of the application.
    '''
    _sessions: dict[ID, ServerSession]
    _pending_sessions: dict[ID, gen.Future[ServerSession]]
    _session_contexts: dict[ID, SessionContext]
    _server_context: BokehServerContext
    def __init__(self, application: Application, io_loop: IOLoop | None = None,
            url: str | None = None, logout_url: str | None = None):
        self._application = application
        self._loop = io_loop
        self._sessions = {}
        self._pending_sessions = {}
        self._session_contexts = {}
        self._server_context = BokehServerContext(self)
        self._url = url
        self._logout_url = logout_url
    @property
    def io_loop(self) -> IOLoop | None:
        return self._loop
    @property
    def application(self) -> Application:
        return self._application
    @property
    def url(self) -> str | None:
        return self._url
    @property
    def server_context(self) -> BokehServerContext:
        return self._server_context
    @property
    def sessions(self) -> Iterable[ServerSession]:
        return self._sessions.values()
    def run_load_hook(self) -> None:
        try:
            self._application.on_server_loaded(self.server_context)
        except Exception as e:
            log.error(f"Error in server loaded hook {e!r}", exc_info=True)
    def run_unload_hook(self) -> None:
        try:
            self._application.on_server_unloaded(self.server_context)
        except Exception as e:
            log.error(f"Error in server unloaded hook {e!r}", exc_info=True)
    async def create_session_if_needed(self, session_id: ID, request: HTTPServerRequest | None = None,
            token: str | None = None) -> ServerSession:
        # this is because empty session_ids would be "falsey" and
        # potentially open up a way for clients to confuse us
        if len(session_id) == 0:
            raise ProtocolError("Session ID must not be empty")
        if session_id not in self._sessions and \
           
session_id not in self._pending_sessions:
            future = self._pending_sessions[session_id] = gen.Future()
            doc = Document()
            session_context = BokehSessionContext(session_id,
                                                  self.server_context,
                                                  doc,
                                                  logout_url=self._logout_url)
            if request is not None:
                payload = get_token_payload(token) if token else {}
                if ('cookies' in payload and 'headers' in payload
                    and 'Cookie' not in payload['headers']):
                    # Restore Cookie header from cookies dictionary
                    payload['headers']['Cookie'] = '; '.join([
                        f'{k}={v}' for k, v in payload['cookies'].items()
                    ])
                # using private attr so users only have access to a read-only property
                session_context._request = _RequestProxy(request,
                                                         arguments=payload.get('arguments'),
                                                         cookies=payload.get('cookies'),
                                                         headers=payload.get('headers'))
            session_context._token = token
            # expose the session context to the document
            # use the _attribute to set the public property .session_context
            doc._session_context = weakref.ref(session_context)
            try:
                await self._application.on_session_created(session_context)
            except Exception as e:
                log.error("Failed to run session creation hooks %r", e, exc_info=True)
            self._application.initialize_document(doc)
            session = ServerSession(session_id, doc, io_loop=self._loop, token=token)
            del self._pending_sessions[session_id]
            self._sessions[session_id] = session
            session_context._set_session(session)
            self._session_contexts[session_id] = session_context
            # notify anyone waiting on the pending session
            future.set_result(session)
        if session_id in self._pending_sessions:
            # another create_session_if_needed is working on
            # creating this session
            session = await self._pending_sessions[session_id]
        else:
            session = self._sessions[session_id]
        return session
    def get_session(self, session_id: ID) -> ServerSession:
        if session_id in self._sessions:
            session = self._sessions[session_id]
            return session
        else:
            raise ProtocolError("No such session " + session_id)
    async def _discard_session(self, session: ServerSession, should_discard: Callable[[ServerSession], bool]) -> None:
        if session.connection_count > 0:
            raise RuntimeError("Should not be discarding a session with open connections")
        log.debug("Discarding session %r last in use %r milliseconds ago", session.id, session.milliseconds_since_last_unsubscribe)
        session_context = self._session_contexts[session.id]
        # session.destroy() wants the document lock so it can shut down the document
        # callbacks.
        def do_discard() -> None:
            # while we awaited for the document lock, the discard-worthiness of the
            # session may have changed.
            # However, since we have the document lock, our own lock will cause the
            # block count to be 1. If there's any other block count besides our own,
            # we want to skip session destruction though.
            if should_discard(session) and session.expiration_blocked_count == 1:
                session.destroy()
                del self._sessions[session.id]
                del self._session_contexts[session.id]
                log.trace(f"Session {session.id!r} was successfully discarded")
            else:
                log.warning(f"Session {session.id!r} was scheduled to discard but came back to life")
        await session.with_document_locked(do_discard)
        # session lifecycle hooks are supposed to be called outside the document lock,
        # we only run these if we actually ended up destroying the session.
        if session_context.destroyed:
            try:
                await self._application.on_session_destroyed(session_context)
            except Exception as e:
                log.error("Failed to run session destroy hooks %r", e, exc_info=True)
        return None
    async def _cleanup_sessions(self, unused_session_linger_milliseconds: int) -> None:
        def should_discard_ignoring_block(session: ServerSession) -> bool:
            return session.connection_count == 0 and \
                
(session.milliseconds_since_last_unsubscribe > unused_session_linger_milliseconds or \
                 
session.expiration_requested)
        # build a temp list to avoid trouble from self._sessions changes
        to_discard: list[ServerSession] = []
        for session in self._sessions.values():
            if should_discard_ignoring_block(session) and not session.expiration_blocked:
                to_discard.append(session)
        if len(to_discard) > 0:
            log.debug(f"Scheduling {len(to_discard)} sessions to discard")
        # asynchronously reconsider each session
        for session in to_discard:
            if should_discard_ignoring_block(session) and not session.expiration_blocked:
                await self._discard_session(session, should_discard_ignoring_block)
        return None 
#-----------------------------------------------------------------------------
# Private API
#-----------------------------------------------------------------------------
class _RequestProxy:
    _arguments: dict[str, list[bytes]]
    _cookies: dict[str, str]
    _headers: dict[str, str | list[str]]
    def __init__(
        self,
        request: HTTPServerRequest,
        arguments: dict[str, bytes | list[bytes]] | None = None,
        cookies: dict[str, str] | None = None,
        headers: dict[str, str | list[str]] | None = None,
    ) -> None:
        self._request = request
        if arguments is not None:
            self._arguments = arguments
        elif hasattr(request, 'arguments'):
            self._arguments = dict(request.arguments)
        else:
            self._arguments = {}
        if 'bokeh-session-id' in self._arguments:
            del self._arguments['bokeh-session-id']
        if cookies is not None:
            self._cookies = cookies
        elif hasattr(request, 'cookies'):
            # Django cookies are plain strings, tornado cookies are objects with a value
            self._cookies = {k: v if isinstance(v, str) else v.value for k, v in request.cookies.items()}
        else:
            self._cookies = {}
        if headers is not None:
            self._headers = headers
        elif hasattr(request, 'headers'):
            self._headers = dict(request.headers)
        else:
            self._headers = {}
    @property
    def arguments(self) -> dict[str, list[bytes]]:
        return self._arguments
    @property
    def cookies(self) -> dict[str, str]:
        return self._cookies
    @property
    def headers(self) -> dict[str, str | list[str]]:
        return self._headers
    def __getattr__(self, name: str) -> Any:
        if not name.startswith("_"):
            val = getattr(self._request, name, None)
            if val is not None:
                return val
        return super().__getattr__(name)
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------