Source code for pisces.particles.utils

"""Particle dataset utilities."""

from pathlib import Path
from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
    from .base import ParticleDataset


[docs] def concatenate_particles( output_path: str | Path, *datasets: "ParticleDataset", overwrite: bool = False, groups: list[str] = None, **kwargs, ) -> "ParticleDataset": """Concatenate multiple :class:`ParticleDataset` objects into a new dataset file. This method creates a new dataset by copying the first input dataset to `output_path` and then concatenates the remaining datasets into it using the `concatenate_inplace` method. Parameters ---------- output_path : str or Path Path where the new HDF5 file will be created. *datasets : ParticleDataset A sequence of particle datasets to concatenate. Must include at least one dataset. overwrite : bool, optional Whether to overwrite the file if it already exists. Defaults to False. groups : list of str, optional Specific groups to concatenate. If None, all groups in each dataset will be used. **kwargs Additional keyword arguments passed to the final dataset constructor. Returns ------- ParticleDataset A new dataset containing the concatenated data. Raises ------ ValueError If no input datasets are provided. FileExistsError If the output path exists and `overwrite` is False. """ if len(datasets) == 0: raise ValueError("At least one dataset must be provided for concatenation.") # Copy the first dataset to the new file base = datasets[0].copy(output_path, overwrite=overwrite, **kwargs) # Concatenate the rest into it base.concatenate_inplace(*datasets[1:], groups=groups) return base
[docs] def inspect_particle_count(path: Union[str, Path]) -> dict[str, int]: """ Inspect the number of particles in each species group of a particle dataset without fully loading the file. This function: 1. Opens the HDF5 particle file at the top level. 2. Iterates over all top-level groups (each representing a particle species). 3. Skips any group with the attribute ``NOT_PARTICLE_GROUP`` set to a truthy value. 4. Reads the ``NUMBER_OF_PARTICLES`` attribute from each valid particle group. Parameters ---------- path : str or ~pathlib.Path Path to the HDF5 particle file. Returns ------- dict Mapping of particle group names to integer particle counts. Raises ------ FileNotFoundError If the file does not exist or is not a regular file. ValueError If a valid particle group is missing the required ``NUMBER_OF_PARTICLES`` attribute. """ import h5py p = Path(path) if not p.is_file(): raise FileNotFoundError(f"No particle file found at: {p}") counts: dict[str, int] = {} with h5py.File(p, "r") as f: for group_name, group in f.items(): if not isinstance(group, h5py.Group): continue if bool(group.attrs.get("NOT_PARTICLE_GROUP", False)): continue if "NUMBER_OF_PARTICLES" not in group.attrs: raise ValueError( f"Particle group '{group_name}' in '{p}' is missing the 'NUMBER_OF_PARTICLES' attribute." ) counts[group_name] = int(group.attrs["NUMBER_OF_PARTICLES"]) return counts
[docs] def inspect_species(path: Union[str, Path]) -> list[str]: """ List the particle species (top-level groups) present in a particle dataset. This function opens the given HDF5 particle file and inspects its immediate top-level groups. Any group with the attribute ``NOT_PARTICLE_GROUP`` set to a truthy value will be skipped. All other top-level groups are assumed to represent particle species (e.g., ``"PartType0"``, ``"PartType1"``, etc.). Parameters ---------- path : str or ~pathlib.Path Path to the HDF5 particle file. Returns ------- list of str Names of particle species present in the dataset. Raises ------ FileNotFoundError If the file does not exist or is not a regular file. """ import h5py p = Path(path) if not p.is_file(): raise FileNotFoundError(f"No particle file found at: {p}") species: list[str] = [] with h5py.File(p, "r") as f: for name, obj in f.items(): if not isinstance(obj, h5py.Group): continue if bool(obj.attrs.get("NOT_PARTICLE_GROUP", False)): continue species.append(name) return species
[docs] def inspect_fields(path: Union[str, Path]) -> dict[str, list[tuple[str, tuple[int, ...]]]]: """ Inspect the available fields for each particle species in a dataset without fully loading it. This function opens the HDF5 particle file, iterates over its top-level groups (each representing a particle species), skips any groups marked with the ``NOT_PARTICLE_GROUP`` attribute set to a truthy value, and collects the names and *per-particle shapes* of all datasets in each group. The "per-particle shape" is defined as ``dataset.shape[1:]``—the shape of each individual particle's data entry (excluding the leading dimension which counts particles). Parameters ---------- path : str or ~pathlib.Path Path to the HDF5 particle file. Returns ------- dict[str, list[tuple[str, tuple[int, ...]]]] Mapping of particle species names to a list of ``(field_name, element_shape)`` tuples, where ``element_shape`` is the shape of one particle's data. Raises ------ FileNotFoundError If the file does not exist or is not a regular file. """ import h5py p = Path(path) if not p.is_file(): raise FileNotFoundError(f"No particle file found at: {p}") fields: dict[str, list[tuple[str, tuple[int, ...]]]] = {} with h5py.File(p, "r") as f: for group_name, group in f.items(): if not isinstance(group, h5py.Group): continue if bool(group.attrs.get("NOT_PARTICLE_GROUP", False)): continue group_fields = [] for field_name, ds in group.items(): if isinstance(ds, h5py.Dataset): element_shape = ds.shape[1:] # shape per particle group_fields.append((field_name, element_shape)) fields[group_name] = group_fields return fields