Source code for bokeh.util.sampledata

#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2024, Anaconda, Inc., and Bokeh Contributors.
# All rights reserved.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
''' Helper functions for downloading and accessing sample data.

'''

#-----------------------------------------------------------------------------
# Boilerplate
#-----------------------------------------------------------------------------
from __future__ import annotations # isort:skip

# NOTE: skip logging imports so that this module may be run as a script

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------

# Standard library imports
import hashlib
import json
from os.path import splitext
from pathlib import Path
from sys import stdout
from typing import TYPE_CHECKING, Any, TextIO
from urllib.parse import urljoin
from urllib.request import urlopen

# NOTE: since downloading sampledata is not a common occurrence, non-stdlib
# imports are generally deferrered in this module

if TYPE_CHECKING:
    import pandas as pd

#-----------------------------------------------------------------------------
# Globals and constants
#-----------------------------------------------------------------------------

__all__ = (
    'download',
)

#-----------------------------------------------------------------------------
# General API
#-----------------------------------------------------------------------------

[docs] def download(progress: bool = True) -> None: ''' Download larger data sets for various Bokeh examples. ''' data_dir = external_data_dir(create=True) print(f"Using data directory: {data_dir}") # HTTP requests are cheaper for us, and there is nothing private to protect s3 = 'http://sampledata.bokeh.org' for file_name, md5 in metadata().items(): real_path = data_dir / real_name(file_name) if real_path.exists(): with open(real_path, "rb") as file: data = file.read() local_md5 = hashlib.md5(data).hexdigest() if local_md5 == md5: print(f"Skipping {file_name!r} (checksum match)") continue print(f"Fetching {file_name!r}") _download_file(s3, file_name, data_dir, progress=progress)
#----------------------------------------------------------------------------- # Dev API #----------------------------------------------------------------------------- def real_name(name: str) -> str: real_name, ext = splitext(name) if ext == ".zip": if not splitext(real_name)[1]: return f"{real_name}.csv" else: return real_name else: return name def metadata() -> dict[str, str]: with (Path(__file__).parent / "sampledata.json").open("rb") as f: return dict(json.load(f)) def external_csv(module: str, name: str, **kw: Any) -> pd.DataFrame: import pandas as pd return pd.read_csv(external_path(name), **kw) def external_data_dir(*, create: bool = False) -> Path: try: import yaml except ImportError: raise RuntimeError("'yaml' and 'pyyaml' are required to use bokeh.sampledata functions") bokeh_dir = _bokeh_dir(create=create) data_dir = bokeh_dir / "data" try: config = yaml.safe_load(open(bokeh_dir / 'config')) data_dir = Path.expanduser(config['sampledata_dir']) except (OSError, TypeError): pass if not data_dir.exists(): if not create: raise RuntimeError('bokeh sample data directory does not exist, please execute bokeh.sampledata.download()') print(f"Creating {data_dir} directory") try: data_dir.mkdir() except OSError: raise RuntimeError(f"could not create bokeh data directory at {data_dir}") else: if not data_dir.is_dir(): raise RuntimeError(f"{data_dir} exists but is not a directory") return data_dir def external_path(file_name: str) -> Path: data_dir = external_data_dir() file_path = data_dir / file_name if not file_path.exists() or not file_path.is_file(): raise RuntimeError(f"Could not locate external data file {file_path}. Please execute bokeh.sampledata.download()") with open(file_path, "rb") as file: meta = metadata() known_md5 = meta.get(file_name) or \ meta.get(f"{file_name}.zip") or \ meta.get(f"{splitext(file_name)[0]}.zip") if known_md5 is None: raise RuntimeError(f"Unknown external data file {file_name}") local_md5 = hashlib.md5(file.read()).hexdigest() if known_md5 != local_md5: raise RuntimeError(f"External data file {file_path} is outdated. Please execute bokeh.sampledata.download()") return file_path def package_csv(module: str, name: str, **kw: Any) -> pd.DataFrame: import pandas as pd return pd.read_csv(package_path(name), **kw) def package_dir() -> Path: return Path(__file__).parents[1].joinpath("sampledata", "_data").resolve() def package_path(filename: str | Path) -> Path: return package_dir() / filename def load_json(filename: str | Path) -> Any: with open(filename, "rb") as f: return json.load(f) def open_csv(filename: str | Path) -> TextIO: return open(filename, newline='', encoding='utf8') #----------------------------------------------------------------------------- # Private API #----------------------------------------------------------------------------- def _bokeh_dir(create: bool = False) -> Path: bokeh_dir = Path("~").expanduser() / ".bokeh" if not bokeh_dir.exists(): if not create: return bokeh_dir print(f"Creating {bokeh_dir} directory") try: bokeh_dir.mkdir() except OSError: raise RuntimeError(f"could not create bokeh config directory at {bokeh_dir}") else: if not bokeh_dir.is_dir(): raise RuntimeError(f"{bokeh_dir} exists but is not a directory") return bokeh_dir def _download_file(base_url: str, filename: str, data_dir: Path, progress: bool = True) -> None: # These are actually somewhat expensive imports that added ~5% to overall # typical bokeh import times. Since downloading sampledata is not a common # action, we defer them to inside this function. from zipfile import ZipFile file_url = urljoin(base_url, filename) file_path = data_dir / filename url = urlopen(file_url) with open(file_path, 'wb') as file: file_size = int(url.headers["Content-Length"]) print(f"Downloading: {filename} ({file_size} bytes)") fetch_size = 0 block_size = 16384 while True: data = url.read(block_size) if not data: break fetch_size += len(data) file.write(data) if progress: status = f"\r{fetch_size:< 10d} [{fetch_size*100.0/file_size:6.2f}%%]" stdout.write(status) stdout.flush() if progress: print() real_name, ext = splitext(filename) if ext == '.zip': if not splitext(real_name)[1]: real_name += ".csv" print(f"Unpacking: {real_name}") with ZipFile(file_path, 'r') as zip_file: zip_file.extract(real_name, data_dir) file_path.unlink() #----------------------------------------------------------------------------- # Code #----------------------------------------------------------------------------- # This is necessary so that we can run the sampledata download code in the # release build, before an actual package exists. if __name__ == "__main__": download(progress=False)