Source code for aicsimageio.readers.reader

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import io
import logging
import sys
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import dask.array as da
import numpy as np

from .. import exceptions, transforms, types
from ..constants import Dimensions

###############################################################################

log = logging.getLogger(__name__)

###############################################################################

# Global variable to inform reader to use _read_delayed or _read_immediate
# Useful in sitations where you may be on a cluster with very few workers
# but still using Dask for other operations.
# I.E. GPU clusters where you don't care about the IO of the file so much as the
# training and application of a model
USE_DASK = True


[docs]def use_dask(setting: bool): """ Enable or disable Dask for image reading. When True, image reads are first attempted to be handled by a distributed cluster. When False, image reads are never routed to a distributed cluster and are instead read immediately in the running process. """ global USE_DASK # Check parameter if not isinstance(setting, bool): raise TypeError("The setting parameter provided to use_dask must be a boolean.") # Assign to global state USE_DASK = setting
###############################################################################
[docs]class Reader(ABC): _dask_data = None _data = None _dims = None _metadata = None @staticmethod def _resolve_image_path(img: Union[str, Path]) -> Path: # Convert pathlike to Path if isinstance(img, (str, Path)): # Strictly do not fully resolve the path because Mac is bad with mounted # drives img = Path(img).expanduser() # Check the file exists if not img.exists(): raise FileNotFoundError(img) # Check path if img.is_dir(): raise IsADirectoryError( f"Please provide a single file to the `img` parameter. " f"Received directory: {img}" ) # Check that no other type was provided if not isinstance(img, Path): raise TypeError( f"Please provide a path to a file as a string, or an pathlib.Path, to " f"the `img` parameter. " f"Received type: {type(img)}" ) return img def __init__( self, file: types.ImageLike, dask_kwargs: Dict[str, Any] = {}, **kwargs ): # This will both fully expand and enforce that the filepath exists file = self._resolve_image_path(file) # Check type if not self.is_this_type(file): raise exceptions.UnsupportedFileFormatError( f"Reader does not support file or object: {file}" ) # Store this filepath self._file = file # Store dask client and cluster setup self._dask_kwargs = dask_kwargs self._client = None self._cluster = None
[docs] @staticmethod def guess_dim_order(shape: Tuple[int]) -> str: return Dimensions.DefaultOrder[len(Dimensions.DefaultOrder) - len(shape) :]
[docs] @classmethod def is_this_type(cls, data: types.ImageLike) -> bool: # Check path if isinstance(data, (str, Path)): # Resolve image path f = cls._resolve_image_path(data) # Return and close the open pointer with open(f, "rb") as read_bytes: return cls._is_this_type(read_bytes) # Convert bytes to BytesIO if isinstance(data, bytes): data = io.BytesIO(data) # Check type if isinstance(data, io.BytesIO): return cls._is_this_type(data) # Special cases if isinstance(data, (np.ndarray, da.core.Array)): return cls._is_this_type(data) # Raise because none of the above returned raise TypeError( f"Reader only accepts types: " f"[str, pathlib.Path, bytes, io.BytesIO, numpy or dask array]. " f"Received: {type(data)}" )
@staticmethod @abstractmethod def _is_this_type(buffer: io.BytesIO) -> bool: pass @abstractmethod def _read_delayed(self) -> da.core.Array: pass @abstractmethod def _read_immediate(self) -> np.ndarray: pass @property def dask_data(self) -> da.core.Array: if self._dask_data is None: self._dask_data = self._read_delayed() return self._dask_data @property def data(self) -> np.ndarray: if self._data is None: try: # Fast re-route to _read_immediate if not USE_DASK: raise ValueError("USE_DASK marked False. Rerouting.") # Otherwise, assume the user want's to use their Dask cluster # if the following `get_client` call succeeds # These lines both check if distributed has been imported # and if a client connection has been created # If distributed hasn't been imported it will KeyError # If no client has been created it will ValueError # We can't import distributed due to it requiring network utilities: # https://github.com/AllenCellModeling/aicsimageio/issues/82 sys.modules["distributed"].get_client() # No error means there is a cluster and client # available on this worker process # Use delayed dask reader self._dask_data = self._read_delayed() self._data = self._dask_data.compute() except (KeyError, ValueError): self._data = self._read_immediate() self._dask_data = da.from_array(self._data) return self._data
[docs] def get_image_dask_data( self, out_orientation: Optional[str] = None, **kwargs ) -> da.core.Array: """ Get specific dimension image data out of an image as a dask array. Parameters ---------- out_orientation: Optional[str] A string containing the dimension ordering desired for the returned ndarray. Default: The current image dimensions. i.e. `self.dims` kwargs: * C=1: specifies Channel 1 * T=3: specifies the fourth index in T * D=n: D is Dimension letter and n is the index desired. D should not be present in the out_orientation. * D=[a, b, c]: D is Dimension letter and a, b, c is the list of indicies desired. D should be present in the out_orientation. * D=(a, b, c): D is Dimension letter and a, b, c is the tuple of indicies desired. D should be present in the out_orientation. * D=range(...): D is Dimension letter and range is the standard Python range function. D should be present in the out_orientation. * D=slice(...): D is Dimension letter and slice is the standard Python slice function. D should be present in the out_orientation. Returns ------- data: dask array The read data with the dimension ordering that was specified with out_orientation. Examples -------- Specific index selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... c1 = img.get_image_dask_data("ZYX", C=1) List of index selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... first_and_second = img.get_image_dask_data("CZYX", C=[0, 1]) Tuple of index selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... first_and_last = img.get_image_dask_data("CZYX", C=(0, -1)) Range of index selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... first_three = img.get_image_dask_data("CZYX", C=range(3)) Slice selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... every_other = img.get_image_dask_data("CZYX", C=slice(0, -1, 2)) Notes ----- * If a requested dimension is not present in the data the dimension is added with a depth of 1. See `aicsimageio.transforms.reshape_data` for more details. """ # If no out orientation, simply return current data as dask array if out_orientation is None: return self.dask_data # Transform and return return transforms.reshape_data( data=self.dask_data, given_dims=self.dims, return_dims=out_orientation, **kwargs, )
[docs] def get_image_data( self, out_orientation: Optional[str] = None, **kwargs ) -> np.ndarray: """ Read the image as a numpy array then return specific dimension image data. Parameters ---------- out_orientation: Optional[str] A string containing the dimension ordering desired for the returned ndarray. Default: The current image dimensions. i.e. `self.dims` kwargs: * C=1: specifies Channel 1 * T=3: specifies the fourth index in T * D=n: D is Dimension letter and n is the index desired. D should not be present in the out_orientation. * D=[a, b, c]: D is Dimension letter and a, b, c is the list of indicies desired. D should be present in the out_orientation. * D=(a, b, c): D is Dimension letter and a, b, c is the tuple of indicies desired. D should be present in the out_orientation. * D=range(...): D is Dimension letter and range is the standard Python range function. D should be present in the out_orientation. * D=slice(...): D is Dimension letter and slice is the standard Python slice function. D should be present in the out_orientation. Examples -------- Specific index selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... c1 = img.get_image_data("ZYX", C=1) List of index selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... first_and_second = img.get_image_data("CZYX", C=[0, 1]) Tuple of index selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... first_and_last = img.get_image_data("CZYX", C=(0, -1)) Range of index selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... first_three = img.get_image_data("CZYX", C=range(3)) Slice selection >>> img = Reader("s_1_t_1_c_10_z_20.ome.tiff") ... every_other = img.get_image_data("CZYX", C=slice(0, -1, 2)) Returns ------- data: np.ndarray The read data with the dimension ordering that was specified with out_orientation. Notes ----- * If a requested dimension is not present in the data the dimension is added with a depth of 1. * This will preload the entire image before returning the requested data. See `aicsimageio.transforms.reshape_data` for more details. """ # If no out orientation, simply return current data as dask array if out_orientation is None: return self.data # Transform and return return transforms.reshape_data( data=self.data, given_dims=self.dims, return_dims=out_orientation, **kwargs, )
@property @abstractmethod def dims(self) -> str: pass
[docs] def size(self, dims: str = Dimensions.DefaultOrder) -> Tuple[int]: """ Parameters ---------- dims: str A string containing a list of dimensions being requested. The default is to return the six standard dims. Returns ------- size: Tuple[int] A tuple with the requested dimensions filled in. """ # Ensure dims is an uppercase string dims = dims.upper() # Check that the dims requested are in the image dims if not (all(d in self.dims for d in dims)): raise exceptions.InvalidDimensionOrderingError( f"Invalid dimensions requested: {dims}" ) # Return the shape of the data for the dimensions requested return tuple([self.dask_data.shape[self.dims.index(dim)] for dim in dims])
@property def shape(self) -> Tuple[int]: """ Returns ------- shape: Tuple[int] A tuple with the size of all dimensions. """ return self.dask_data.shape @property @abstractmethod def metadata(self) -> Any: pass
[docs] def get_physical_pixel_size(self, scene: int = 0) -> Tuple[float]: """ Attempts to retrieve physical pixel size for the specified scene. If none available, returns `1.0` for each spatial dimension. Parameters ---------- scene: int The index of the scene for which to return physical pixel sizes. Returns ------- sizes: Tuple[float] Tuple of floats representing the pixel sizes for X, Y, Z, in that order. """ return (1.0, 1.0, 1.0)
[docs] def get_channel_names(self, scene: int = 0) -> Optional[List[str]]: """ Attempts to use the image's metadata to get the image's channel names. Parameters ---------- scene: int The index of the scene for which to return channel names. Returns ------- channels_names: Optional[List[str]] List of strings representing the channel names. If channel dimension not present in file, return None. """ # Check for channels dimension if Dimensions.Channel not in self.dims: return None # Channel dimension in reader data, get default channel names channel_index = self.dims.index(Dimensions.Channel) channel_dim_size = self.dask_data.shape[channel_index] return [str(i) for i in range(channel_dim_size)]
@property def cluster(self) -> Optional["distributed.LocalCluster"]: # noqa: F821 return self._cluster @property def client(self) -> Optional["distributed.Client"]: # noqa: F821 return self._client
[docs] def close(self): """ Always close the Dask Client connection. If connected to *strictly* a LocalCluster, close it down as well. """ from .. import dask_utils self._cluster, self._client = dask_utils.shutdown_cluster_and_client( self.cluster, self.client )
def __enter__(self): """ If provided an address, create a Dask Client connection. If not provided an address, create a LocalCluster and Client connection. If not provided an address, other Dask kwargs are accepted and passed down to the LocalCluster object. """ from .. import dask_utils self._cluster, self._client = dask_utils.spawn_cluster_and_client( **self._dask_kwargs ) return self def __exit__(self, exc_type, exc_val, exc_tb): """ Always close the Dask Client connection. If connected to *strictly* a LocalCluster, close it down as well. """ self.close()