Source code for aicsimageio.transforms

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations

from collections import Counter
from typing import Any, List, Literal, Optional, Tuple, Union

import dask.array as da
import numpy as np
import xarray as xr

from . import types
from .exceptions import ConflictingArgumentsError, UnexpectedShapeError
from .image_container import ImageContainer

###############################################################################


[docs]def reduce_to_slice(L: Union[List, Tuple]) -> Union[int, List, slice, Tuple]: # if the list only has one element, then just use it if len(L) == 1: return slice(L[0], L[0] + 1) # if the list has at least 2 elements we can check for sliceable # it is convertable to a slice if the step size between each # consecutive pair of elements is equal and positive # 1. get all the deltas in a list: steps = [(L[i + 1] - L[i]) for i in range(len(L) - 1)] # 2. check if all the deltas are equal and positive if steps[0] > 0 and steps.count(steps[0]) == len(steps): return slice(min(L), max(L) + 1, steps[0]) # if we can't convert to a slice, then just return the list unmodified return L
[docs]def transpose_to_dims( data: types.ArrayLike, given_dims: str, return_dims: str, ) -> types.ArrayLike: """ This shuffles the data dimensions from given_dims to return_dims. Each dimension must be present in given_dims must be used in return_dims Parameters ---------- data: types.ArrayLike Either a dask array or numpy.ndarray of arbitrary shape but with the dimensions specified in given_dims given_dims: str The dimension ordering of data, "CZYX", "VBTCXZY" etc return_dims: str The dimension ordering of the return data Returns ------- data: types.ArrayLike The data with the specified dimension ordering. Raises ------ ConflictingArgumentsError given_dims and return_dims are incompatible. """ # Use a counter to track that the contents are composed of the same letters # and that no letter is repeated if ( Counter(given_dims) != Counter(return_dims) or max(Counter(given_dims).values()) > 1 ): raise ConflictingArgumentsError( f"given_dims={given_dims} and return_dims={return_dims} are incompatible." ) # Resort the data into return_dims order match_map = {dim: given_dims.find(dim) for dim in given_dims} transposer = [] for dim in return_dims: transposer.append(match_map[dim]) data = data.transpose(transposer) return data
[docs]def reshape_data( data: types.ArrayLike, given_dims: str, return_dims: str, **kwargs: Any ) -> types.ArrayLike: """ Reshape the data into return_dims, pad missing dimensions, and prune extra dimensions. Warns the user to use the base reader if the depth of the Dimension being removed is not 1. Parameters ---------- data: types.ArrayLike Either a dask array or numpy.ndarray of arbitrary shape but with the dimensions specified in given_dims given_dims: str The dimension ordering of data, "CZYX", "VBTCXZY" etc return_dims: str The dimension ordering of the return data kwargs: * C=1 => desired specific channel, if C in the input data has depth 3 then C=1 returns the 2nd slice (0 indexed) * Z=10 => desired specific channel, if Z in the input data has depth 20 then Z=10 returns the 11th slice * T=[0, 1] => desired specific timepoints, if T in the input data has depth 100 then T=[0, 1] returns the 1st and 2nd slice (0 indexed) * T=(0, 1) => desired specific timepoints, if T in the input data has depth 100 then T=(0, 1) returns the 1st and 2nd slice (0 indexed) * T=(0, -1) => desired specific timepoints, if T in the input data has depth 100 then T=(0, -1) returns the first and last slice * T=range(10) => desired specific timepoints, if T in the input data has depth 100 then T=range(10) returns the first ten slices * T=slice(0, -1, 5) => desired specific timepoints, T=slice(0, -1, 5) returns every fifth timepoint Returns ------- data: types.ArrayLike The data with the specified dimension ordering. Raises ------ ConflictingArgumentsError Missing dimension in return dims when using range, slice, or multi-index dimension selection for the requested dimension. IndexError Requested dimension index not present in data. Examples -------- Specific index selection >>> data = np.random.rand((10, 100, 100)) ... z1 = reshape_data(data, "ZYX", "YX", Z=1) List of index selection >>> data = np.random.rand((10, 100, 100)) ... first_and_second = reshape_data(data, "ZYX", "YX", Z=[0, 1]) Tuple of index selection >>> data = np.random.rand((10, 100, 100)) ... first_and_last = reshape_data(data, "ZYX", "YX", Z=(0, -1)) Range of index selection >>> data = np.random.rand((10, 100, 100)) ... first_three = reshape_data(data, "ZYX", "YX", Z=range(3)) Slice selection >>> data = np.random.rand((10, 100, 100)) ... every_other = reshape_data(data, "ZYX", "YX", Z=slice(0, -1, 2)) Empty dimension expansion >>> data = np.random.rand((10, 100, 100)) ... with_time = reshape_data(data, "ZYX", "TZYX") Dimension order shuffle >>> data = np.random.rand((10, 100, 100)) ... as_zx_base = reshape_data(data, "ZYX", "YZX") Selections, empty dimension expansions, and dimension order shuffle >>> data = np.random.rand((10, 100, 100)) ... example = reshape_data(data, "CYX", "BSTCZYX", C=slice(0, -1, 3)) """ # Check for parameter conflicts for dim in given_dims: # return_dims='CZYX' and iterable dimensions 'T=range(10)' # Dimension is in kwargs # Dimension is an iterable # Dimension is not in return dimensions if ( isinstance(kwargs.get(dim), (list, tuple, range, slice)) and dim not in return_dims ): raise ConflictingArgumentsError( f"When selecting a multiple dimension indices, the specified " f"dimension must be provided in return_dims. " f"return_dims={return_dims}, dimension {dim} = {kwargs.get(dim)}" ) # Process each dimension available new_dims = given_dims dim_specs = [] for dim in given_dims: # Store index of the dim as it is in given data dim_index = given_dims.index(dim) # Handle dim in return_dims which means that it is # an iterable or None selection if dim in return_dims: # Specific iterable requested if dim in kwargs: # Actual dim specification # The specification provided for this dimension in the kwargs dim_spec = kwargs.get(dim) display_dim_spec = dim_spec if isinstance(dim_spec, int): dim_spec = slice(dim_spec, dim_spec + 1) # Convert operator to standard list or slice # dask.Array and numpy.ndarray both natively support # List[int] and slices being passed to getitem so no need to cast them # to anything different if isinstance(dim_spec, (tuple, range)): dim_spec = list(dim_spec) # Get the largest absolute value index in the list using min and max if isinstance(dim_spec, list): check_selection_max = max([abs(min(dim_spec)), max(dim_spec)]) # try to convert to slice if possible dim_spec = reduce_to_slice(dim_spec) # Get the largest absolute value index from start and stop of slice if isinstance(dim_spec, slice): check_selection_max = max([abs(dim_spec.stop), abs(dim_spec.start)]) else: # Nothing was requested from this dimension dim_spec = slice(None, None, None) display_dim_spec = dim_spec # No op means that it doesn't matter how much data is in this dimension check_selection_max = 0 # Not in return_dims means that it is a fixed integer selection else: if dim in kwargs: # Integer requested dim_spec = kwargs.get(dim) display_dim_spec = dim_spec # Check that integer check_selection_max = dim_spec else: dim_spec = 0 display_dim_spec = dim_spec check_selection_max = 0 # Remove dim from new dims as it is fixed size new_dims = new_dims.replace(dim, "") # Check that fixed integer request isn't outside of request if check_selection_max > data.shape[dim_index]: raise IndexError( f"Dimension specified with {dim}={display_dim_spec} " f"but Dimension shape is {data.shape[dim_index]}." ) # All checks and operations passed, append dim operation to getitem ops dim_specs.append(dim_spec) # Run getitems data = data[tuple(dim_specs)] # Add empty dims where dimensions were requested but data doesn't exist # Add dimensions to new dims where empty dims are added for i, dim in enumerate(return_dims): # This dimension wasn't processed if dim not in given_dims: new_dims = f"{new_dims[:i]}{dim}{new_dims[i:]}" data = data.reshape(*data.shape[:i], 1, *data.shape[i:]) # Any extra dimensions have been removed, only a problem if the depth is > 1 return transpose_to_dims( data, given_dims=new_dims, return_dims=return_dims ) # don't pass kwargs or 2 copies
[docs]def generate_stack( image_container: ImageContainer, mode: Literal["data", "dask_data", "xarray_data", "xarray_dask_data"], drop_non_matching_scenes: bool = False, select_scenes: Optional[ Union[list[Union[str, int]], tuple[Union[str, int], ...]] ] = None, scene_character: str = "I", scene_coord_values: str = "index", ) -> types.MetaArrayLike: """ Stack each scene contained in the reader into a single array. This method handles the logic of determining which stack function to use (dask or numpy) and whether or not to return a labelled array (xr.DataArray). Users should prefer to use one of get_stack, get_dask_stack, get_xarray_stack, or get_xarray_dask_stack. Parameters ---------- mode: Literal["data", "dask_data", "xarray_data", "xarray_dask_data"] String describing the style of data to return. Should be one of: "data", "dask_data", "xarray_data", "xarray_dask_data". drop_non_matching_scenes: bool During the scene iteration process, if the next scene to be added to the stack has different shape or dtype, should it be dropped or raise an error. Default: False (raise an error) select_scenes: Optional[ Union[List[Union[str, int]], Tuple[Union[str, int], ...]]] Which scenes to stack into a single array. Scenes can be provided as a list or tuple of scene indices or names. It is recommended to use the scene integer index instead of the scene name to avoid duplicate scene name lookup issues. Default: None (stack all scenes) scene_character: str Character to use as the name of the scene dimension on the output array. Default "I" scene_coord_values : str How to assign coordinates to the scene dimension of the final array. If scene_coord_values="names" use the scene name from the reader object. If scene_coord_values="index" don't attach any coordinates and fall back to integer values. Default: "index" Returns ------- stack: types.MetaArrayLike The fully stacked array. This can be 6+ dimensions with Scene being the first dimension. """ mode_check = ["data", "dask_data", "xarray_data", "xarray_dask_data"] if mode not in mode_check: raise ValueError( f"Invalid mode kwarg. Found {mode} but should be one of:" f"{', '.join(mode_check)}." ) scene_stacks = [] scene_names = [] if select_scenes is None: select_scenes = list(range(len(image_container.scenes))) for i, s in enumerate(select_scenes): image_container.set_scene(s) data = getattr(image_container, mode) # Store the shape and dtype of the first scenes data # to check against later scenes. If returning a DataArray # store the coords and dims to use in the final output if i == 0: shape = data.shape dtype = data.dtype if "xarray" in mode: coords = dict(data.coords) dims = data.dims if scene_character in dims: raise ValueError( f"Provided scene dimension character '{scene_character}' " f"was found in the existing dimensions of the data {dims}" ) # Check other scenes against the first scene else: if data.shape != shape: if not drop_non_matching_scenes: raise UnexpectedShapeError( f"All scenes must have same shape. Found shape" f"{data.shape} in scene {s} but expected" f"{shape} based on scene {select_scenes[0]}" ) else: continue if data.dtype != dtype: if not drop_non_matching_scenes: raise TypeError( f"All scenes must have the same dtype. Found data" f"with dtype {data.dtype} in scene {s} but expected" f"dtype {dtype} based on scene {select_scenes[0]}" ) else: continue scene_stacks.append(data) scene_names.append(image_container.current_scene) stack = da.stack if "dask" in mode else np.stack if "xarray" in mode: all_data = stack([x.data for x in scene_stacks]) if scene_coord_values == "names": coords = {scene_character: scene_names, **coords} return xr.DataArray( all_data, dims=(scene_character, *dims), coords=coords, ) else: return stack(scene_stacks)