biapy.utils.misc

This module provides a collection of utility functions and classes primarily designed to support distributed training, logging, and model management within a PyTorch deep learning workflow.

It includes functionalities for: - Initializing and managing distributed training environments (DDP). - Controlling print statements for master processes in distributed setups. - Setting random seeds for reproducibility. - Gradient norm calculation. - Saving and loading model checkpoints. - Converting data formats between PyTorch tensors and NumPy arrays. - Logging metrics to TensorBoard. - Tracking and smoothing metric values during training. - Iterating with progress logging. - Updating nested dictionaries. - Cleaning directory walks by excluding specific files/directories.

The module aims to streamline common deep learning operations, especially in distributed and large-scale training scenarios.

biapy.utils.misc.setup_for_distributed(is_master)[source]

Disable printing for non-master processes in a distributed training setup.

This function replaces the built-in print function with a custom one that only prints output if the current process is the master process (rank 0), or if force=True is passed to the print call. This prevents cluttered output when running on multiple GPUs/nodes.

Parameters:

is_master (bool) – True if the current process is the master process (rank 0), False otherwise.

biapy.utils.misc.is_dist_avail_and_initialized()[source]

Check if PyTorch distributed backend is available and initialized.

Returns:

True if distributed training is available and initialized, False otherwise.

Return type:

bool

biapy.utils.misc.get_world_size()[source]

Return the total number of participating processes in the distributed group.

Returns 0 if distributed mode is not initialized.

Returns:

The world size.

Return type:

int

biapy.utils.misc.get_rank()[source]

Return the rank of the current process in the distributed group.

Returns 0 if distributed mode is not initialized. The master process typically has rank 0.

Returns:

The rank of the current process.

Return type:

int

biapy.utils.misc.is_main_process()[source]

Check if the current process is the main (master) process (rank 0).

Returns:

True if the current process is the main process, False otherwise.

Return type:

bool

biapy.utils.misc.init_devices(args, cfg)[source]

Initialize the PyTorch distributed environment and sets up the device for the current process.

This function handles different distributed setup scenarios (e.g., ITP, environment variables, SLURM). It sets the appropriate GPU device, initializes the process group, and configures the custom print function for distributed logging.

Parameters:
  • args (Any) – An object containing command-line arguments or configuration, expected to have attributes like dist_on_itp, gpu, dist_backend, dist_url.

  • cfg (YACS CN object) – The configuration object, used to determine the default device if CUDA is not available.

Returns:

The PyTorch device assigned to the current process.

Return type:

torch.device

Raises:

AssertionError – If distributed training is attempted without GPUs when environment variables are set.

biapy.utils.misc.set_seed(seed=42)[source]

Set the random seed for reproducibility across multiple Python modules and PyTorch.

The seed is adjusted by the distributed rank to ensure different random states for each process in a distributed setup, which can be beneficial for certain operations (e.g., data loading).

Parameters:

seed (int, optional) – The base seed value. Defaults to 42.

biapy.utils.misc.get_grad_norm_(parameters, norm_type: float = 2.0) Tensor[source]

Compute the total norm of gradients for a collection of parameters.

This function is typically used for gradient clipping.

Parameters:
  • parameters (Iterable[torch.Tensor] or torch.Tensor) – An iterable of model parameters or a single parameter tensor.

  • norm_type (float, optional) – The type of the norm (e.g., 2.0 for L2 norm, inf for max norm). Defaults to 2.0.

Returns:

The total norm of the gradients. Returns a tensor with value 0.0 if no parameters have gradients.

Return type:

torch.Tensor

biapy.utils.misc.save_model(cfg, biapy_version, jobname, epoch, model_without_ddp, optimizer, model_build_kwargs=None, extension='pth')[source]

Save the model checkpoint to the specified path.

This function saves the model’s state dictionary, optimizer state, current epoch, configuration, and BiaPy version. It ensures that saving is performed only by the main process in a distributed setup.

Parameters:
  • cfg (YACS CN object) – The configuration object.

  • biapy_version (str) – The current version of BiaPy.

  • jobname (str) – The name of the current job/experiment.

  • epoch (int) – The current epoch number.

  • model_without_ddp (nn.Module) – The model instance, typically the unwrapped model if using DistributedDataParallel.

  • optimizer (torch.optim.Optimizer) – The optimizer’s state.

  • model_build_kwargs (Optional[Dict], optional) – Keyword arguments used to build the model, useful for re-instantiating the model from the checkpoint. Defaults to None.

  • extension (str, optional) – The file extension for the checkpoint file. Options are ‘pth’ (native PyTorch format) or ‘safetensors’ (https://github.com/huggingface/safetensors). Defaults to “pth”.

Returns:

The path to the saved checkpoint file.

Return type:

Path

biapy.utils.misc.save_on_master(model_dict, checkpoint_path)[source]

Save a PyTorch object only if the current process is the main (master) process.

This is a wrapper around torch.save to ensure that checkpoints are not redundantly saved by all processes in a distributed training setup.

Parameters:
  • *args (Any) – Positional arguments to pass to torch.save.

  • **kwargs (Any) – Keyword arguments to pass to torch.save.

biapy.utils.misc.get_checkpoint_path(cfg, jobname)[source]

Determine the path to the checkpoint file to load.

It selects the checkpoint based on cfg.PATHS.CHECKPOINT_FILE, cfg.MODEL.LOAD_CHECKPOINT_EPOCH (“last_on_train” or “best_on_val”), and the jobname.

Parameters:
  • cfg (YACS CN object) – The configuration object. Key parameters:

    • cfg.PATHS.CHECKPOINT: Base directory for checkpoints.

    • cfg.PATHS.CHECKPOINT_FILE: Explicit path to a checkpoint file (if set).

    • cfg.MODEL.LOAD_CHECKPOINT_EPOCH: Strategy for selecting checkpoint (“last_on_train” or “best_on_val”).

  • jobname (str) – The name of the current job/experiment.

Returns:

The absolute path to the checkpoint file without the extension (without the .pth or .safetensors).

Return type:

str

Raises:

NotImplementedError – If cfg.MODEL.LOAD_CHECKPOINT_EPOCH is an unrecognized value.

biapy.utils.misc.load_model_checkpoint(cfg, jobname, model_without_ddp, device, optimizer=None, just_extract_checkpoint_info=False, skip_unmatched_layers=False) Tuple[int | CfgNode | None, str | None][source]

Load a model checkpoint from disk.

This function handles loading the model’s state dictionary, optimizer state, and epoch number from a checkpoint file. It can also be configured to only extract configuration information or to skip layers with mismatched shapes.

Parameters:
  • cfg (YACS CN object) – The configuration object. Key parameters: - cfg.PATHS.CHECKPOINT_FILE: Explicit path to checkpoint. - cfg.MODEL.LOAD_CHECKPOINT_EPOCH: Strategy for checkpoint selection. - cfg.MODEL.ITEMS_TO_LOAD_FROM_CHECKPOINT: List of items to load from the checkpoint (if available). Options are:

    • “weights”: Load model weights.

    • “model_arch”: Load model architecture.

    • “optimizer”: Load optimizer state.

    • “epoch”: Load epoch number.

  • jobname (str) – The name of the current job/experiment.

  • model_without_ddp (nn.Module) – The model instance (unwrapped if DDP is used) to load weights into.

  • device (torch.device) – The device to map the loaded checkpoint to.

  • optimizer (Optional[torch.optim.Optimizer], optional) – The optimizer instance to load state into. If None, optimizer state is not loaded. Defaults to None.

  • just_extract_checkpoint_info (bool, optional) – If True, only the configuration (cfg) and BiaPy version from the checkpoint are returned, without loading model or optimizer states. Defaults to False.

  • skip_unmatched_layers (bool, optional) – If True, layers in the checkpoint that have different shapes than the current model’s layers will be skipped during loading. Defaults to False.

Returns:

If just_extract_checkpoint_info is True: returns (checkpoint_cfg, biapy_version). Otherwise: returns (start_epoch, resume_path). checkpoint_cfg and biapy_version can be None if not found in the checkpoint.

Return type:

Tuple[int | CN | None, str | None]

Raises:

FileNotFoundError – If the specified checkpoint file does not exist.

biapy.utils.misc.all_reduce_mean(x)[source]

Perform an all-reduce operation on a scalar or single-element tensor, then computes the mean across all processes in a distributed group.

If not in a distributed environment, returns the input value directly.

Parameters:

x (float or torch.Tensor) – The scalar value or single-element tensor to be reduced.

Returns:

The mean of x across all processes.

Return type:

float

biapy.utils.misc.to_pytorch_format(x: Tensor | ndarray[tuple[int, ...], dtype[_ScalarType_co]], axes_order: Tuple, device: device, dtype=torch.float32) Tensor[source]

Convert a NumPy array or PyTorch tensor to PyTorch tensor format with a specified axis order and moves it to the target device.

Parameters:
  • x (torch.Tensor or numpy.ndarray) – The input data.

  • axes_order (Tuple[int, …]) – A tuple specifying the desired permutation of axes. For example, (0, 3, 1, 2) for (N, H, W, C) to (N, C, H, W).

  • device (torch.device) – The target PyTorch device (e.g., “cuda”, “cpu”).

  • dtype (torch.dtype, optional) – The desired data type for the output tensor. Defaults to torch.float32.

Returns:

The converted PyTorch tensor.

Return type:

torch.Tensor

biapy.utils.misc.to_numpy_format(x, axes_order_back)[source]

Convert a PyTorch tensor back to a NumPy array with a specified axis order.

Parameters:
  • x (torch.Tensor) – The input PyTorch tensor.

  • axes_order_back (Tuple[int, …]) – A tuple specifying the desired permutation of axes to revert to the original NumPy-like order.

Returns:

The converted NumPy array.

Return type:

numpy.ndarray

biapy.utils.misc.time_text(t)[source]

Format a time duration (in seconds) into a human-readable string.

Formats as ‘Xh’, ‘Xm’, or ‘Xs’ depending on the duration.

Parameters:

t (float) – Time duration in seconds.

Returns:

Formatted time string.

Return type:

str

class biapy.utils.misc.TensorboardLogger(log_dir)[source]

Bases: object

A simple wrapper for tensorboardX.SummaryWriter to log scalar metrics.

set_step(step=None)[source]

Set the current global step for logging.

If step is None, increments the internal step counter.

Parameters:

step (Optional[int], optional) – The specific step number to set. If None, increments the current step. Defaults to None.

update(head='scalar', step=None, **kwargs)[source]

Log scalar values to TensorBoard.

Parameters:
  • head (str, optional) – The main category for the scalar (e.g., “train_loss”, “val_metrics”). Defaults to “scalar”.

  • step (Optional[int], optional) – The specific global step to log this update at. If None, uses the internal self.step. Defaults to None.

  • **kwargs (float | int | torch.Tensor) – Keyword arguments where keys are metric names (e.g., “loss”, “accuracy”) and values are the corresponding scalar values (can be PyTorch tensors or Python floats/ints).

flush()[source]

Ensure all pending events have been written to disk.

class biapy.utils.misc.SmoothedValue(window_size=20, fmt=None)[source]

Bases: object

Track a series of values and provides access to smoothed values (median, average) over a sliding window or the global series average.

update(value, n=1)[source]

Update the tracker with a new value.

Parameters:
  • value (float) – The new value to add.

  • n (int, optional) – The number of samples represented by this value (e.g., batch size). Defaults to 1.

synchronize_between_processes()[source]

Synchronize the count and total attributes across all processes in a distributed environment using dist.all_reduce.

Warning: This method does not synchronize the deque (sliding window).

property median

Return the median of the values in the current sliding window.

property avg

Return the average of the values in the current sliding window.

property global_avg: float

Return the global average of all values recorded since initialization.

property max

Return the maximum value in the current sliding window.

property value

Return the most recently updated value.

class biapy.utils.misc.MetricLogger(delimiter='\t', verbose=False)[source]

Bases: object

Aggregate and logs various metrics using SmoothedValue objects.

update(**kwargs)[source]

Update the values of tracked metrics.

Parameters:

**kwargs (float | int | torch.Tensor) – Keyword arguments where keys are metric names and values are their current scalar values.

synchronize_between_processes()[source]

Synchronize all tracked SmoothedValue meters across distributed processes.

add_meter(name, meter)[source]

Add a custom SmoothedValue meter to the logger.

Parameters:
  • name (str) – The name of the meter.

  • meter (SmoothedValue) – The SmoothedValue instance to add.

log_every(iterable, print_freq, header=None)[source]

Log progress for an iterable, printing metrics at a specified frequency.

Parameters:
  • iterable (Iterable[Any]) – The iterable (e.g., DataLoader) to iterate over.

  • print_freq (int) – The frequency (in iterations) at which to print log messages.

  • header (Optional[str], optional) – An optional header string to prepend to log messages. Defaults to None.

Yields:

Any – Items from the input iterable.

biapy.utils.misc.update_dict_with_existing_keys(d, u, not_recognized_keys=[], not_recognized_key_vals=[])[source]

Recursively update a dictionary d with values from dictionary u, only for keys that already exist in d.

This function is useful for updating configuration dictionaries while ensuring that no new keys are introduced from the update dictionary. It also tracks keys from u that were not found in d.

Parameters:
  • d (Dict) – The dictionary to be updated (destination).

  • u (Dict) – The dictionary containing update values (source).

  • not_recognized_keys (Optional[List], optional) – A list to append keys from u that were not found in d. If None, a new list is created. Defaults to None.

  • not_recognized_key_vals (Optional[List], optional) – A list to append values corresponding to not_recognized_keys. If None, a new list is created. Defaults to None.

Returns:

  • d: The updated dictionary.

  • not_recognized_keys: List of keys from u not found in d.

  • not_recognized_key_vals: List of values from u corresponding to not_recognized_keys.

Return type:

Tuple[Dict, List, List]

biapy.utils.misc.os_walk_clean(path: str, exclude_files: Tuple = ('Thumbs.db', 'desktop.ini', '.DS_Store'), exclude_dirs: Tuple = ('.git', '__pycache__')) Iterator[Tuple[str, List[str], List[str]]][source]

Clean os.walk + robust natural sorting (numeric-aware).

Parameters:
  • path (str) – The root directory to walk.

  • exclude_files (tuple, optional) – Filenames to exclude from the results. Defaults to common system files.

  • exclude_dirs (tuple, optional) – Directory names to exclude from the results. Defaults to common system directories.

Yields:

Iterator[Tuple[str, List[str], List[str]]] – Yields tuples of (root, dirs, files) with excluded items removed and directories/files sorted in natural order.

biapy.utils.misc.resolve_cpu_budget(user_num_cpus: int) int[source]

Total CPU cores budget for the entire job.

biapy.utils.misc.compute_threads_and_workers(user_num_cpus: int, world_size: int, training_samples: int | None = None, max_workers_cap: int = 8) Tuple[int, int, int, int][source]

Compute CPU budget, CPU per rank, main threads, and DataLoader workers per rank.

Parameters:
  • user_num_cpus (int) – User-specified number of CPUs (-1 to use all available).

  • world_size (int) – Number of distributed ranks/processes.

  • training_samples (int, optional) – Number of training samples (to limit workers for small datasets).

  • max_workers_cap (int, optional) – Maximum cap on DataLoader workers per rank. Defaults to 8.

Returns:

  • cpu_budget: Total CPU cores budget for the job.

  • cpu_per_rank: CPU cores allocated per rank.

  • main_threads: Number of main threads for training process.

  • num_workers: Number of DataLoader workers per rank.

Return type:

Tuple[int, int, int, int]