Skip to content

Helper utilities

This section provides supporting utilities for storage access, caching, and chunk-size selection. These helpers are mainly intended to support efficient IO and scalable data layout decisions.

S3 Output handling

Once a pyramid has been created, it can be written to object storage for cloud-native access and downstream analysis.

save_pyramid(pyramid, path, s3_options=None, *, mode='a', compute=True, region='auto', zarr_format=2, encoding=None)

Write a HEALPix pyramid to Zarr stores on S3 or local disk.

Each level is stored below "<path>/level_<level>.zarr".

Parameters:

Name Type Description Default
pyramid dict[int, Dataset]

Mapping of HEALPix level to dataset.

required
path str

Target prefix. An "s3://bucket/pyramid" URL writes to S3; any other value is treated as a local directory path.

required
s3_options dict[str, Any] | None

Options forwarded to :class:s3fs.S3FileSystem. Only used for S3 targets; ignored (and optional) for local paths.

None
mode Literal['a', 'w', 'r+']

Zarr write mode.

'a'
compute bool

Trigger Dask execution immediately when True.

True
region Literal['auto'] | dict[str, slice]

Region writes for partial updates.

'auto'
zarr_format Literal[2, 3]

Zarr format version.

2
encoding dict[int, dict[str, dict[str, Any]]] | None

Per-level encoding dictionaries.

None
Source code in .tox/docs/lib/python3.13/site-packages/grid_doctor/helpers.py
def save_pyramid(
    pyramid: dict[int, xr.Dataset],
    path: str,
    s3_options: dict[str, Any] | None = None,
    *,
    mode: Literal["a", "w", "r+"] = "a",
    compute: bool = True,
    region: Literal["auto"] | dict[str, slice] = "auto",
    zarr_format: Literal[2, 3] = 2,
    encoding: dict[int, dict[str, dict[str, Any]]] | None = None,
) -> None:
    """Write a HEALPix pyramid to Zarr stores on S3 or local disk.

    Each level is stored below ``"<path>/level_<level>.zarr"``.

    Parameters
    ----------
    pyramid:
        Mapping of HEALPix level to dataset.
    path:
        Target prefix.  An ``"s3://bucket/pyramid"`` URL writes to S3; any
        other value is treated as a local directory path.
    s3_options:
        Options forwarded to :class:`s3fs.S3FileSystem`.  Only used for S3
        targets; ignored (and optional) for local paths.
    mode:
        Zarr write mode.
    compute:
        Trigger Dask execution immediately when ``True``.
    region:
        Region writes for partial updates.
    zarr_format:
        Zarr format version.
    encoding:
        Per-level encoding dictionaries.
    """
    is_s3 = path.startswith("s3://")
    fs = s3fs.S3FileSystem(**(s3_options or {})) if is_s3 else None
    for level, dataset in pyramid.items():
        level_path = f"{path}/level_{level}.zarr"
        logger.info("Writing HEALPix level %s to %s", level, level_path)
        store: Any
        if is_s3:
            store = s3fs.S3Map(root=level_path, s3=fs)
        else:
            Path(level_path).parent.mkdir(parents=True, exist_ok=True)
            store = level_path
        zarr_options = ZarrOptions(compute=compute, mode=mode, zarr_format=zarr_format)
        if zarr_format == 2:
            zarr_options["consolidated"] = True
        if encoding is not None:
            zarr_options["encoding"] = encoding[level]

        if region == "auto":
            dataset.to_zarr(store, **zarr_options)  # type: ignore[call-overload]
        else:
            region_keys = set(region)
            to_drop = (
                {
                    name
                    for name, var in dataset.data_vars.items()
                    if region_keys.isdisjoint(map(str, var.dims))
                }
                | {str(dim) for dim in dataset.dims}
                | {str(coord) for coord in dataset.coords}
            )
            dataset.drop_vars(to_drop, errors="ignore").isel(region).to_zarr(
                store,
                region=region,
                **zarr_options,
            )  # type: ignore[call-overload]

        if mode == "w" and not compute:
            coord_options = dict(zarr_options)
            coord_options["mode"] = "w"
            dataset[list(dataset.coords)].to_zarr(store, **coord_options)  # type: ignore[call-overload]

Storage configuration

These utilities simplify access to S3-compatible object storage by collecting and normalizing the options needed by downstream code.

get_s3_options(endpoint_url, secrets_file, **kwargs)

Build an S3 options dictionary from an endpoint and credentials file.

Parameters:

Name Type Description Default
endpoint_url str

S3-compatible endpoint URL.

required
secrets_file str | Path

JSON file containing accessKey and secretKey.

required
**kwargs str

Additional options merged into the returned dictionary.

{}

Returns:

Type Description
dict[str, str]

Options for s3fs.S3FileSystem.

Source code in .tox/docs/lib/python3.13/site-packages/grid_doctor/utils.py
def get_s3_options(
    endpoint_url: str,
    secrets_file: str | Path,
    **kwargs: str,
) -> dict[str, str]:
    """Build an S3 options dictionary from an endpoint and credentials file.

    Parameters
    ----------
    endpoint_url:
        S3-compatible endpoint URL.
    secrets_file:
        JSON file containing `accessKey` and `secretKey`.
    **kwargs:
        Additional options merged into the returned dictionary.

    Returns
    -------
    dict[str, str]
        Options for `s3fs.S3FileSystem`.
    """
    secrets: dict[str, str] = json.loads(Path(secrets_file).read_text())
    return {
        "endpoint_url": endpoint_url,
        "secret": secrets["secretKey"],
        "key": secrets["accessKey"],
        **kwargs,
    }

Chunking

Choosing appropriate chunk sizes is important for balancing storage efficiency and access performance. The function below helps determine a chunk layout for a desired target store size.

chunk_for_target_store_size(*, level, dtype='float32', target_stored_mib=16.0, compression_ratio=2.0, access='map', ntime=None, max_time_chunk=None, max_cell_chunk=None)

Compute (time, cell) chunks for a HEALPix dataset.

Parameters:

Name Type Description Default
level int

HEALPix order / level.

required
dtype str | dtype

Variable dtype.

'float32'
target_stored_mib float

Desired approximate compressed chunk size on disk.

16.0
compression_ratio float

Estimated ratio: uncompressed_bytes / compressed_bytes

2.0
access Literal['time_series', 'map']

"map" or "time_series".

'map'
ntime int | None

Total time size. Needed for time_series mode unless max_time_chunk is given.

None
max_time_chunk int | None

Optional cap for time chunk.

None
max_cell_chunk int | None

Optional cap for cell chunk.

None

Returns:

Type Description
dict[str, int]

Chunk sizes, e.g. {"time": 5, "cell": 786432}.

Source code in .tox/docs/lib/python3.13/site-packages/grid_doctor/utils.py
def chunk_for_target_store_size(
    *,
    level: int,
    dtype: str | np.dtype = "float32",
    target_stored_mib: float = 16.0,
    compression_ratio: float = 2.0,
    access: Literal["time_series", "map"] = "map",
    ntime: int | None = None,
    max_time_chunk: int | None = None,
    max_cell_chunk: int | None = None,
) -> dict[str, int]:
    """
    Compute (time, cell) chunks for a HEALPix dataset.

    Parameters
    ----------
    level
        HEALPix order / level.
    dtype
        Variable dtype.
    target_stored_mib
        Desired approximate compressed chunk size on disk.
    compression_ratio
        Estimated ratio:
            uncompressed_bytes / compressed_bytes
    access
        "map" or "time_series".
    ntime
        Total time size. Needed for time_series mode unless max_time_chunk is given.
    max_time_chunk
        Optional cap for time chunk.
    max_cell_chunk
        Optional cap for cell chunk.

    Returns
    -------
    dict[str, int]
        Chunk sizes, e.g. {"time": 5, "cell": 786432}.
    """
    nside = 2**level
    ncell = 12 * nside * nside
    itemsize = np.dtype(dtype).itemsize

    target_stored_bytes = int(target_stored_mib * 1024 * 1024)
    target_uncompressed_bytes = int(target_stored_bytes * compression_ratio)

    if access == "map":
        cell_chunk = ncell if max_cell_chunk is None else min(ncell, max_cell_chunk)
        time_chunk = max(1, target_uncompressed_bytes // (itemsize * cell_chunk))
        return {"time": int(time_chunk), "cell": int(cell_chunk)}

    if access == "time_series":
        if max_time_chunk is not None:
            time_chunk = max_time_chunk
        elif ntime is not None:
            time_chunk = ntime
        else:
            raise ValueError(
                "For access='time_series', provide either ntime or max_time_chunk."
            )

        if ntime is not None:
            time_chunk = min(time_chunk, ntime)

        cell_chunk = max(1, target_uncompressed_bytes // (itemsize * time_chunk))
        cell_chunk = min(cell_chunk, ncell)

        if max_cell_chunk is not None:
            cell_chunk = min(cell_chunk, max_cell_chunk)

        return {"time": int(time_chunk), "cell": int(cell_chunk)}

    raise ValueError(f"Unsupported access mode: {access!r}")

Caching

These helpers provide lightweight caching for repeatedly used datasets and weight files to avoid unnecessary reopen or recomputation overhead.

cached_open_dataset(files, **kwargs)

Open multiple files and cache the merged dataset as a pickle.

Parameters:

Name Type Description Default
files Collection[str]

Input file paths or glob-expanded file names.

required
**kwargs Any

Extra keyword arguments for xarray.open_mfdataset.

{}

Returns:

Type Description
Dataset

The opened dataset.

Source code in .tox/docs/lib/python3.13/site-packages/grid_doctor/utils.py
def cached_open_dataset(files: Collection[str], **kwargs: Any) -> xr.Dataset:
    """Open multiple files and cache the merged dataset as a pickle.

    Parameters
    ----------
    files:
        Input file paths or glob-expanded file names.
    **kwargs:
        Extra keyword arguments for `xarray.open_mfdataset`.

    Returns
    -------
    xarray.Dataset
        The opened dataset.
    """
    digest = hashlib.sha256()
    normalised = sorted({str(path) for path in files})
    digest.update("\0".join(normalised).encode())
    pickle_file = cache_dir() / f"{digest.hexdigest()}.pickle"

    if pickle_file.exists():
        try:
            with pickle_file.open("rb") as handle:
                return cast(xr.Dataset, pickle.load(handle))  # nosec B301  # noqa: S301
        except Exception as exc:  # pragma: no cover - defensive cache cleanup
            logger.warning("Could not read cached dataset %s: %s", pickle_file, exc)
            pickle_file.unlink(missing_ok=True)

    from dask.diagnostics.progress import ProgressBar

    merged_kwargs: dict[str, Any] = {"parallel": True, "chunks": "auto"} | kwargs
    with ProgressBar():
        dataset = xr.open_mfdataset(normalised, **merged_kwargs)

    with pickle_file.open("wb") as handle:
        pickle.dump(dataset, handle)
    return dataset

cached_weights(ds, level=None, *, method='conservative', nest=True, source_units='auto', cache_path=None, **kwargs)

Compute or load a cached HEALPix weight file.

Parameters:

Name Type Description Default
ds Dataset

Source dataset whose grid geometry defines the weights.

required
level int | None

HEALPix level.

None
method RemapMethod

Weight-generation method. Supported values are "nearest" and "conservative".

'conservative'
nest bool

Use nested HEALPix ordering when True.

True
source_units SourceUnits

Unit convention of the source coordinates.

'auto'
cache_path str | Path | None

Cache directory or explicit file name. When omitted, the default package cache directory is used.

None
**kwargs Any

Any additional keyword arguments for compute_healpix_weights

{}

Returns:

Type Description
Path

Path to the cached NetCDF weight file.

Examples:

weight_file = cached_weights(ds, level=8, method="conservative")
Source code in .tox/docs/lib/python3.13/site-packages/grid_doctor/utils.py
def cached_weights(
    ds: xr.Dataset,
    level: int | None = None,
    *,
    method: RemapMethod = "conservative",
    nest: bool = True,
    source_units: SourceUnits = "auto",
    cache_path: str | Path | None = None,
    **kwargs: Any,
) -> Path:
    """Compute or load a cached HEALPix weight file.

    Parameters
    ----------
    ds:
        Source dataset whose grid geometry defines the weights.
    level:
        HEALPix level.
    method:
        Weight-generation method. Supported values are `"nearest"` and
        `"conservative"`.
    nest:
        Use nested HEALPix ordering when `True`.
    source_units:
        Unit convention of the source coordinates.
    cache_path:
        Cache directory or explicit file name. When omitted,
        the default package cache directory is used.
    **kwargs:
        Any additional keyword arguments for
        [`compute_healpix_weights`][grid_doctor.remap.compute_healpix_weights]

    Returns
    -------
    pathlib.Path
        Path to the cached NetCDF weight file.

    Examples
    --------
    ```python
    weight_file = cached_weights(ds, level=8, method="conservative")
    ```
    """
    from .helpers import get_latlon_resolution, resolution_to_healpix_level
    from .remap import compute_healpix_weights

    digest = hashlib.sha256()
    for candidate in (
        "clon_vertices",
        "clat_vertices",
        "lon_vertices",
        "lat_vertices",
        "clon",
        "clat",
        "lon",
        "lat",
        "longitude",
        "latitude",
        "rlon",
        "rlat",
        "X",
        "Y",
    ):
        if candidate in ds:
            digest.update(
                np.ascontiguousarray(np.asarray(ds[candidate].values)).tobytes()
            )
    digest.update(
        f"level={level};method={method};nest={nest};units={source_units}".encode()
    )
    key = digest.hexdigest()[:16]

    if cache_path is None:
        weight_file = cache_dir() / f"weights_{key}.nc"
    else:
        path = Path(cache_path)
        weight_file = path / f"weights_{key}.nc" if path.is_dir() else path
    if weight_file.exists():
        logger.info("Using cached weight file %s", weight_file)
        return weight_file

    logger.info("Generating HEALPix weight file %s", weight_file)
    if level is None:
        level = resolution_to_healpix_level(get_latlon_resolution(ds))

    return compute_healpix_weights(
        ds,
        level,
        method=method,
        nest=nest,
        source_units=source_units,
        weights_path=weight_file,
        **kwargs,
    )