#-----------------------------------------------------------------------------# Copyright (c) Anaconda, Inc., and Bokeh Contributors.# All rights reserved.## The full license is in the file LICENSE.txt, distributed with this software.#-----------------------------------------------------------------------------''' Provide some utility functions useful for implementing differentcomponents in ``bokeh.server``.'''#-----------------------------------------------------------------------------# Boilerplate#-----------------------------------------------------------------------------from__future__importannotationsimportlogging# isort:skiplog=logging.getLogger(__name__)#-----------------------------------------------------------------------------# Imports#-----------------------------------------------------------------------------# Standard library importsfromtypingimportTYPE_CHECKING,SequenceifTYPE_CHECKING:fromsocketimportsocket# External importsfromtornadoimportnetutil#-----------------------------------------------------------------------------# Globals and constants#-----------------------------------------------------------------------------__all__=('bind_sockets','check_allowlist','create_hosts_allowlist','match_host',)#-----------------------------------------------------------------------------# General API#-----------------------------------------------------------------------------
[docs]defbind_sockets(address:str|None,port:int)->tuple[list[socket],int]:''' Bind a socket to a port on an address. Args: address (str) : An address to bind a port on, e.g. ``"localhost"`` port (int) : A port number to bind. Pass 0 to have the OS automatically choose a free port. This function returns a 2-tuple with the new socket as the first element, and the port that was bound as the second. (Useful when passing 0 as a port number to bind any free port.) Returns: (socket, port) '''ss=netutil.bind_sockets(port=portor0,address=address)assertlen(ss)ports={s.getsockname()[1]forsinss}assertlen(ports)==1,"Multiple ports assigned??"actual_port=ports.pop()ifport:assertactual_port==portreturnss,actual_port
[docs]defcheck_allowlist(host:str,allowlist:Sequence[str])->bool:''' Check a given request host against a allowlist. Args: host (str) : A host string to compare against a allowlist. If the host does not specify a port, then ``":80"`` is implicitly assumed. allowlist (seq[str]) : A list of host patterns to match against Returns: ``True``, if ``host`` matches any pattern in ``allowlist``, otherwise ``False`` '''if':'notinhost:host=host+':80'ifhostinallowlist:returnTruereturnany(match_host(host,pattern)forpatterninallowlist)
[docs]defcreate_hosts_allowlist(host_list:Sequence[str]|None,port:int|None)->list[str]:''' This allowlist can be used to restrict websocket or other connections to only those explicitly originating from approved hosts. Args: host_list (seq[str]) : A list of string `<name>` or `<name>:<port>` values to add to the allowlist. If no port is specified in a host string, then ``":80"`` is implicitly assumed. port (int) : If ``host_list`` is empty or ``None``, then the allowlist will be the single item list `` [ 'localhost:<port>' ]`` If ``host_list`` is not empty, this parameter has no effect. Returns: list[str] Raises: ValueError, if host or port values are invalid Note: If any host in ``host_list`` contains a wildcard ``*`` a warning will be logged regarding permissive websocket connections. '''ifnothost_list:return['localhost:'+str(port)]hosts:list[str]=[]forhostinhost_list:if'*'inhost:log.warning("Host wildcard %r will allow connections originating ""from multiple (or possibly all) hostnames or IPs. Use non-wildcard ""values to restrict access explicitly",host)ifhost=='*':# do not append the :80 port suffix in that case: any port is# acceptedhosts.append(host)continueparts=host.split(':')iflen(parts)==1:ifparts[0]=="":raiseValueError("Empty host value")hosts.append(host+":80")eliflen(parts)==2:try:int(parts[1])exceptValueError:raiseValueError(f"Invalid port in host value: {host}")ifparts[0]=="":raiseValueError("Empty host value")hosts.append(host)else:raiseValueError(f"Invalid host value: {host}")returnhosts
[docs]defmatch_host(host:str,pattern:str)->bool:''' Match a host string against a pattern Args: host (str) A hostname to compare to the given pattern pattern (str) A string representing a hostname pattern, possibly including wildcards for ip address octets or ports. This function will return ``True`` if the hostname matches the pattern, including any wildcards. If the pattern contains a port, the host string must also contain a matching port. Returns: bool Examples: >>> match_host('192.168.0.1:80', '192.168.0.1:80') True >>> match_host('192.168.0.1:80', '192.168.0.1') True >>> match_host('192.168.0.1:80', '192.168.0.1:8080') False >>> match_host('192.168.0.1', '192.168.0.2') False >>> match_host('192.168.0.1', '192.168.*.*') True >>> match_host('alice', 'alice') True >>> match_host('alice:80', 'alice') True >>> match_host('alice', 'bob') False >>> match_host('foo.example.com', 'foo.example.com.net') False >>> match_host('alice', '*') True >>> match_host('alice', '*:*') True >>> match_host('alice:80', '*') True >>> match_host('alice:80', '*:80') True >>> match_host('alice:8080', '*:80') False '''host_port:str|None=Noneif':'inhost:host,host_port=host.rsplit(':',1)pattern_port:str|None=Noneif':'inpattern:pattern,pattern_port=pattern.rsplit(':',1)ifpattern_port=='*':pattern_port=Noneifpattern_portisnotNoneandhost_port!=pattern_port:returnFalsehost_parts=host.split('.')pattern_parts=pattern.split('.')iflen(pattern_parts)>len(host_parts):returnFalseforh,pinzip(host_parts,pattern_parts):ifh==porp=='*':continueelse:returnFalsereturnTrue
#-----------------------------------------------------------------------------# Dev API#-----------------------------------------------------------------------------#-----------------------------------------------------------------------------# Private API#-----------------------------------------------------------------------------#-----------------------------------------------------------------------------# Code#-----------------------------------------------------------------------------