Source code for aicsimageio.readers.default_reader

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from typing import Any, Dict, List, Optional, Tuple, Union

import dask.array as da
import numpy as np
import xarray as xr
from dask import delayed
from fsspec.spec import AbstractFileSystem

from .. import constants, exceptions, types
from ..dimensions import DimensionNames
from ..metadata import utils as metadata_utils
from ..utils import io_utils
from .reader import Reader

try:
    import imageio

except ImportError:
    raise ImportError(
        "Base imageio is required for this reader. "
        "Install with `pip install aicsimageio[base-imageio]`"
    )

###############################################################################

REMOTE_READ_FAIL_MESSAGE = (
    "Cannot read the provided file ({path}) remotely. "
    "Please download the file locally before continuing your work."
)

###############################################################################


[docs]class DefaultReader(Reader): """ A catch all for image file reading that defaults to using imageio implementations. Parameters ---------- image: types.PathLike Path to image file to construct Reader for. dim_order: Optional[str] Optional string of dimension short names for the image to use instead of guess. Must provide the same number of dimensions as read. Default: None (guess) channel_names: Optional[List[str]] Optional list of channel names. Must provide the same number of channels as the read channel dimension. Default: None (generate standard names) fs_kwargs: Dict[str, Any] Any specific keyword arguments to pass down to the fsspec created filesystem. Default: {} Notes ----- To use this reader, install with: `pip install aicsimageio[base-imageio]`. """ FFMPEG_FORMATS = ["mov", "avi", "mpg", "mpeg", "mp4", "mkv", "wmv", "ogg"] @staticmethod def _get_extension_and_mode(path: str) -> Tuple[str, str]: """ Provided a path to a file, provided back the extension (format) of the file and the imageio read mode. Parameters ---------- path: str The file to provide extension and mode info for. Returns ------- extension: str The extension (a naive guess at the format) of the file. mode: str The imageio read mode to use for image reading. """ # Select extension to handle special formats extension = path.split(".")[-1] # Set mode to many-image reading if FFMPEG format was provided # https://imageio.readthedocs.io/en/stable/userapi.html#imageio.get_reader if extension in DefaultReader.FFMPEG_FORMATS: mode = "I" # Otherwise, have imageio infer the mode else: mode = "?" return extension, mode @staticmethod def _is_supported_image(fs: AbstractFileSystem, path: str, **kwargs: Any) -> bool: # Get extension and mode for reading the file extension, mode = DefaultReader._get_extension_and_mode(path) # Use imageio to check if they have a reader for this file try: with fs.open(path) as open_resource: with imageio.get_reader(open_resource, format=extension, mode=mode): return True # Exceptions that are raised by imageio for unsupported file types except (ValueError, IndexError): return False # Some FFMPEG formats and reading just suck # If they can't get metadata remotely they throw an OSError because ffmpeg is # ran through subprocess (I believe) # If we let the stack trace go, user would receive: # # OSError: Could not load meta information # === stderr === # # ffmpeg version 4.2.2-static https://johnvansickle.com/ffmpeg/ # Copyright (c) # 2000-2019 the FFmpeg developers # ... # /tmp/imageio_cbof2u37: Invalid data found when processing input except OSError: raise IOError(REMOTE_READ_FAIL_MESSAGE.format(path=path)) def __init__( self, image: types.PathLike, dim_order: Optional[str] = None, channel_names: Optional[List[str]] = None, fs_kwargs: Dict[str, Any] = {}, **kwargs: Any, ): # Expand details of provided image self._fs, self._path = io_utils.pathlike_to_fs( image, enforce_exists=True, fs_kwargs=fs_kwargs, ) self.extension, self.imageio_read_mode = self._get_extension_and_mode( self._path ) # Store extras self._dim_order = dim_order self._channel_names = channel_names # Enforce valid image if not self._is_supported_image(self._fs, self._path): raise exceptions.UnsupportedFileFormatError( self.__class__.__name__, self._path ) @staticmethod def _guess_dim_order(shape: Tuple[int, ...]) -> str: if len(shape) == 2: return f"{DimensionNames.SpatialY}{DimensionNames.SpatialX}" elif len(shape) == 3: # Handle greyscale timeseries # If the last dimension is greater than 4 it is unlikely to be # representing a samples dimension if shape[-1] > 4: return ( f"{DimensionNames.Time}" f"{DimensionNames.SpatialY}{DimensionNames.SpatialX}" ) # Else, return normal RGB / RGBA dims return ( f"{DimensionNames.SpatialY}{DimensionNames.SpatialX}" f"{DimensionNames.Samples}" ) # If the last dimension is greater than 4 it is unlikely to be # representing a samples dimension elif len(shape) == 4 and shape[-1] <= 4: return ( f"{DimensionNames.Time}{DimensionNames.SpatialY}" f"{DimensionNames.SpatialX}{DimensionNames.Samples}" ) return Reader._guess_dim_order(shape) @property def scenes(self) -> Tuple[str]: # There is currently an assumption that DefaultReader will not encounter # files with multiple scenes. But, if we do encounter a file that DefaultReader # hits and a user wants scene management from that file type, we can update # this property then. return (metadata_utils.generate_ome_image_id(0),) @staticmethod def _get_image_data( fs: AbstractFileSystem, path: str, extension: str, mode: str, index: int ) -> np.ndarray: """ Open a file for reading, seek to plane index and read as numpy. Parameters ---------- fs: AbstractFileSystem The file system to use for reading. path: str The path to file to read. extension: str The file extension naively indicating format to use to read the file. For our use case this is primarily the file extension. mode: str The read mode to use for opening and reading. See mode parameter on imageio.get_reader https://imageio.readthedocs.io/en/stable/userapi.html#imageio.get_reader index: int The image plane index to seek to and read from the file. Returns ------- plane: np.ndarray The image plane as a numpy array. """ with fs.open(path) as open_resource: with imageio.get_reader( open_resource, format=extension, mode=mode ) as reader: return np.asarray(reader.get_data(index)) @staticmethod def _get_image_length( fs: AbstractFileSystem, path: str, extension: str, mode: str, ) -> int: """ Open a file for reading, using the format, determine the image length (the number of planes). Parameters ---------- fs: AbstractFileSystem The file system to use for reading. path: str The path to file to read. extension: str The format to use to read the file. For our use case this is primarily the file extension. mode: str The read mode to use for opening and reading. See mode parameter on imageio.get_reader https://imageio.readthedocs.io/en/stable/userapi.html#imageio.get_reader Returns ------- length: int The length of the image (number of YX planes). Notes ----- In the case this file is an FFMPEG format, this function will attempt to seek and retrieve the last frame of data as reported by traditional imageio methods to verify that the frame count is correct. This is to check for FFMPEG off-by-one errors in frame indexing. See here for more details: https://github.com/imageio/imageio/issues/168 """ with fs.open(path) as open_resource: with imageio.get_reader( open_resource, format=extension, mode=mode ) as reader: # Handle FFMPEG formats if extension in DefaultReader.FFMPEG_FORMATS: # A reminder, this is the _total_ frame count, not the last index reported_frames = reader.count_frames() # As a safety measure against FFMPEG off-by-one # try and get the last frame (by index) try: reader.get_data(reported_frames - 1) return reported_frames # Couldn't get the last frame by index, FFMPEG must have off-by-oned # So return the _total_ frame count minus the one frame correction except IndexError: return reported_frames - 1 # Default get_length call for all others return reader.get_length() @staticmethod def _unpack_dims_and_coords( image_data: types.ArrayLike, metadata: Dict, scene_id: str, dim_order: Optional[str], channel_names: Optional[List[str]], ) -> Tuple[List[str], Dict[str, Union[List[str], types.ArrayLike]]]: """ Unpack image data into assumed dims and coords. Parameters ---------- image_data: types.ArrayLike The image data to unpack dims and coords for. metadata: Dict The EXIF, XMP, etc metadata dictionary. scene_id: str The scene id for this image. For this reader this is always the same but we need this to create channel names. dim_order: Optional[str] Optional string of dimension order to use instead of guess. Unlike other readers, this reader doesn't have any idea as to many-scene so we can just have a single string instead of a List[str]. channel_names: Optional[List[str]] Optional list of channel names to use instead of None. Unlike other readers, this reader doesn't pull metadata so it would normally generate OME channel names. Returns ------- dims: List[str] The dimension names for each dimension in the image data. coords: Dict[str, Union[List[str], types.ArrayLike]] If possible, the coordinates for dimensions in the image data. """ # Guess dims or use provided dims if dim_order is not None: if len(dim_order) != len(image_data.shape): raise exceptions.ConflictingArgumentsError( f"Provided dimension string does not have the same amount of " f"dimensions as the read image. " f"Read image shape: {image_data.shape}, " f"Provided dimension string: {dim_order}" ) dims = list(dim_order) else: dims = [c for c in DefaultReader._guess_dim_order(image_data.shape)] # Use dims for coord determination coords: Dict[str, Union[List[str], np.ndarray]] = {} # Create or use channel names if channel_names: # Provided channel names but no channel dim if DimensionNames.Channel not in dims: raise exceptions.ConflictingArgumentsError( f"Received channel names for array without channel dimension. " f"Read image shape: {image_data.shape}, " f"Provided (or guessed) dimensions: {dims}, " f"Provided channel names: {channel_names}" ) # Provided different length channel names and if ( len(channel_names) != image_data.shape[dims.index(DimensionNames.Channel)] ): raise exceptions.ConflictingArgumentsError( f"Provided channel names list does not match the size of " f"channel dimension for the provided array. " f"Read image shape: {image_data.shape}, " f"Dims: {dims}, " f"Provided channel names: {channel_names}" ) # Passed all checks, use the channel names coords[DimensionNames.Channel] = channel_names # Otherwise simply generate OME default else: if DimensionNames.Channel in dims: coords[DimensionNames.Channel] = [ metadata_utils.generate_ome_channel_id( image_id=scene_id, channel_id=i ) for i in range(image_data.shape[dims.index(DimensionNames.Channel)]) ] # Handle typical RGB and RGBA from Samples if DimensionNames.Samples in dims: if image_data.shape[dims.index(DimensionNames.Samples)] == 3: coords[DimensionNames.Samples] = ["R", "G", "B"] elif image_data.shape[dims.index(DimensionNames.Samples)] == 4: coords[DimensionNames.Samples] = ["R", "G", "B", "A"] # Handle time when duration is present in metadata if DimensionNames.Time in dims: if "duration" in metadata: coords[DimensionNames.Time] = np.linspace( 0, metadata["duration"], image_data.shape[dims.index(DimensionNames.Time)], ) return dims, coords def _read_delayed(self) -> xr.DataArray: """ Construct the delayed xarray DataArray object for the image. Returns ------- image: xr.DataArray The fully constructed and fully delayed image as a DataArray object. Metadata is attached in some cases as coords, dims, and attrs. Raises ------ exceptions.UnsupportedFileFormatError The file could not be read or is not supported. """ with self._fs.open(self._path) as open_resource: with imageio.get_reader( open_resource, format=self.extension, mode=self.imageio_read_mode ) as reader: # Store image length image_length = self._get_image_length( fs=self._fs, path=self._path, extension=self.extension, mode=self.imageio_read_mode, ) # Handle single image formats like png, jpeg, etc if image_length == 1: image_data = da.from_array( self._get_image_data( fs=self._fs, path=self._path, extension=self.extension, mode=self.imageio_read_mode, index=0, ) ) # Handle many image formats like gif, mp4, etc elif image_length > 1: # Get a sample image sample = self._get_image_data( fs=self._fs, path=self._path, extension=self.extension, mode=self.imageio_read_mode, index=0, ) # Create operating shape for the final dask array by prepending # image length to a tuple of ones that is the same length as # the sample shape operating_shape = (image_length,) + ((1,) * len(sample.shape)) # Create numpy array of empty arrays for delayed get data # functions lazy_arrays: np.ndarray = np.ndarray(operating_shape, dtype=object) for indices, _ in np.ndenumerate(lazy_arrays): lazy_arrays[indices] = da.from_delayed( delayed(self._get_image_data)( fs=self._fs, path=self._path, extension=self.extension, mode=self.imageio_read_mode, index=indices[0], ), shape=sample.shape, dtype=sample.dtype, ) # Block them into a single dask array image_data = da.block(lazy_arrays.tolist()) # Catch all other image types as unsupported # https://imageio.readthedocs.io/en/stable/userapi.html#imageio.core.format.Reader.get_length else: raise exceptions.UnsupportedFileFormatError( self.__class__.__name__, self.extension ) # Get basic metadata metadata = reader.get_meta_data() # Create extra metadata from assumptions based off image data dims, coords = self._unpack_dims_and_coords( image_data=image_data, metadata=metadata, scene_id=self.current_scene, dim_order=self._dim_order, channel_names=self._channel_names, ) return xr.DataArray( image_data, dims=dims, coords=coords, attrs={constants.METADATA_UNPROCESSED: metadata}, ) def _read_immediate(self) -> xr.DataArray: """ Construct the in-memory xarray DataArray object for the image. Returns ------- image: xr.DataArray The fully constructed and fully read into memory image as a DataArray object. Metadata is attached in some cases as coords, dims, and attrs. Raises ------ exceptions.UnsupportedFileFormatError The file could not be read or is not supported. """ # Read image with self._fs.open(self._path) as open_resource: reader = imageio.get_reader( open_resource, format=self.extension, mode=self.imageio_read_mode ) # Store image length image_length = self._get_image_length( fs=self._fs, path=self._path, extension=self.extension, mode=self.imageio_read_mode, ) # Handle single-image formats like png, jpeg, etc if image_length == 1: image_data = reader.get_data(0) # Handle many image formats like gif, mp4, etc elif image_length > 1: # Read and stack all frames frames = [] for frame in reader: frames.append(frame) image_data = np.stack(frames) # Get basic metadata metadata = reader.get_meta_data() # Create extra metadata from assumptions based off image data dims, coords = self._unpack_dims_and_coords( image_data=image_data, metadata=metadata, scene_id=self.current_scene, dim_order=self._dim_order, channel_names=self._channel_names, ) return xr.DataArray( image_data, dims=dims, coords=coords, attrs={constants.METADATA_UNPROCESSED: metadata}, )