Source code for aicsimageio.readers.sldy_reader

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
from pathlib import Path
from typing import Any, Dict, List, Tuple, cast

import dask.array as da
import numpy as np
import xarray as xr
from fsspec.spec import AbstractFileSystem

from ... import constants, exceptions, transforms, types
from ...dimensions import DEFAULT_DIMENSION_ORDER_LIST, DimensionNames
from ...types import PhysicalPixelSizes
from ...utils import io_utils
from ..reader import Reader
from .sldy_image import SldyImage

###############################################################################

log = logging.getLogger(__name__)

###############################################################################

DEFAULT_DATA_FILE_PREFIX = "ImageData"


[docs]class SldyReader(Reader): """ Read 3i slidebook (SLDY) images Parameters ---------- image : Path or str path to file fs_kwargs: Dict[str, Any] Any specific keyword arguments to pass down to the fsspec created filesystem. Default: {} data_file_prefix: str, default = "ImageData" Prefix to the data files within the image directories to extract. By default this will be set to "ImageData". However, specifying "HistogramData" would allow you to interact with the image's histogram data instead. Raises ------ exceptions.UnsupportedFileFormatError If the file is not supported by this reader. """ @staticmethod def _is_supported_image(fs: AbstractFileSystem, path: str, **kwargs: Any) -> bool: try: SldyReader._get_images_from_data_directory( fs, path, kwargs.get("data_file_prefix", DEFAULT_DATA_FILE_PREFIX) ) return True except Exception: return False @staticmethod def _get_images_from_data_directory( fs: AbstractFileSystem, path: types.PathLike, data_file_prefix: str ) -> List[SldyImage]: """ Retrieves list of image acquisitions found at path. Parameters ---------- fs: AbstractFileSystem The file system to used for reading. path: str The path to the file to read. data_file_prefix: str Prefix to the data files within the image directories to extract. Returns ------- images: List[SldyImage] A list of the acquisitions found at the path given ordered by their names """ data_directory = Path(path).with_suffix(".dir") images = [ SldyImage(fs, image_dir, data_file_prefix=data_file_prefix) for image_dir in data_directory.glob("*.imgdir") ] if not images: raise ValueError("Unable to find any images within the image directory") # Prevent inconsistent scene (image) ordering images.sort(key=lambda img: img.id) return images def __init__( self, image: types.PathLike, fs_kwargs: Dict[str, Any] = {}, data_file_prefix: str = DEFAULT_DATA_FILE_PREFIX, **kwargs: Any, ): # Expand details of provided image self._fs, self._path = io_utils.pathlike_to_fs( image, enforce_exists=True, fs_kwargs=fs_kwargs, ) try: self._images = SldyReader._get_images_from_data_directory( self._fs, self._path, data_file_prefix=data_file_prefix ) except Exception: # Enforce valid image raise exceptions.UnsupportedFileFormatError( self.__class__.__name__, self._path ) @property def scenes(self) -> Tuple[str, ...]: if self._scenes is None: self._scenes = tuple(image.id for image in self._images) return self._scenes @property def physical_pixel_sizes(self) -> PhysicalPixelSizes: image = self._images[self.current_scene_index] return PhysicalPixelSizes( image.physical_pixel_size_z, image.physical_pixel_size_y, image.physical_pixel_size_x, ) def _read_delayed(self) -> xr.DataArray: image = self._images[self.current_scene_index] # If no timepoints or channels available create a single # element list of `None` to represent the empty dimension timepoints = cast(list, image.timepoints) or [None] channels = cast(list, image.channels) or [None] # Iterate over each timepoint and channel retreiving data from the # image data file (lazily as a delayed read). # If no timepoints or channels are available this will fill # the otherwise empty Time/Channel dimension data_as_list: List[da.Array] = [] for timepoint in timepoints: data_for_timepoint: List[da.Array] = [] for channel in channels: data = image.get_data( timepoint=timepoint, channel=channel, delayed=True ) data_for_timepoint.append(data) data_as_list.append(da.stack(data_for_timepoint)) image_data = da.stack(data_as_list) return self._create_data_array(image_data) def _read_immediate(self) -> xr.DataArray: image = self._images[self.current_scene_index] # If no timepoints or channels available create a single # element list of `None` to represent the empty dimension timepoints = cast(list, image.timepoints) or [None] channels = cast(list, image.channels) or [None] # Iterate over each timepoint and channel retreiving data from the # image data file. # If no timepoints or channels are available this will fill # the otherwise empty Time/Channel dimension data_as_list: List[np.ndarray] = [] for timepoint in timepoints: data_for_timepoint: List[np.ndarray] = [] for channel in channels: data = image.get_data( timepoint=timepoint, channel=channel, delayed=False ) data_for_timepoint.append(data) data_as_list.append(np.array(data_for_timepoint)) image_data = np.array(data_as_list) return self._create_data_array(image_data) def _create_data_array(self, image_data: types.ArrayLike) -> xr.DataArray: """ Given data representing an image this will create an xr.DataArray representation of the image data. This makes some assumptions about the dimensions of the image data that are based on how the data arrays were constructed in _read_immediate and _read_delayed. Parameters ---------- image_data: types.ArrayLike Array like representation of the image data intended to be wrapped. Returns ------- data_array: xr.DataArray xarray representation of the image data given """ original_dims = [ DimensionNames.Time, DimensionNames.Channel, DimensionNames.SpatialZ, DimensionNames.SpatialY, DimensionNames.SpatialX, ] intended_dims = DEFAULT_DIMENSION_ORDER_LIST # If the original dimensions of the data does not equal the dimensions # this needs to output then reshape the data if original_dims != intended_dims: image_data = transforms.reshape_data( data=image_data, given_dims="".join(original_dims), return_dims="".join(intended_dims), ) return xr.DataArray( data=image_data, dims=intended_dims, coords=self._get_coords(image_data, intended_dims), attrs={ constants.METADATA_UNPROCESSED: self._images[ self.current_scene_index ].metadata, }, ) def _get_coords( self, image_data: types.ArrayLike, dims: List[str] ) -> Dict[str, Any]: """ Given data representing an image and the dimension order this will return a dictionary mapping representing the coordinates of the dimensions inside the image data. Parameters ---------- image_data: types.ArrayLike Array like representation of the image data intended to be mapped. dims: List[str] Order of dimensions present in the image data. Returns ------- coords: Dict[str, Any] Dictionary mapping dimensions to their coordinates within the given image data array """ coords = {} image = self._images[self.current_scene_index] if image.channels: coords[DimensionNames.Channel] = [ str(channel) for channel in image.channels ] if image.timepoints: timepoint_scale = 1 coords[DimensionNames.Time] = Reader._generate_coord_array( 0, len(image.timepoints), timepoint_scale ) if self.physical_pixel_sizes.Z is not None: coords[DimensionNames.SpatialZ] = Reader._generate_coord_array( 0, image_data.shape[dims.index(DimensionNames.SpatialZ)], self.physical_pixel_sizes.Z, ) # Should never happen due to how SldyImage reads these, but here to typeguard if self.physical_pixel_sizes.Y is None or self.physical_pixel_sizes.X is None: raise ValueError( "Unable to determine physical pixel size of Y and/or X dimension" ) coords[DimensionNames.SpatialY] = Reader._generate_coord_array( 0, image_data.shape[dims.index(DimensionNames.SpatialY)], self.physical_pixel_sizes.Y, ) coords[DimensionNames.SpatialX] = Reader._generate_coord_array( 0, image_data.shape[dims.index(DimensionNames.SpatialX)], self.physical_pixel_sizes.X, ) return coords