Source code for aicsimageio.readers.ome_zarr_reader

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import warnings
from typing import Any, Dict, List, Optional, Tuple

import xarray as xr
from fsspec.spec import AbstractFileSystem

from .. import constants, exceptions, types
from ..dimensions import DimensionNames
from ..metadata import utils as metadata_utils
from ..types import PhysicalPixelSizes
from ..utils import io_utils
from .reader import Reader

try:
    from ome_zarr.io import parse_url
    from ome_zarr.reader import Reader as ZarrReader

except ImportError:
    raise ImportError(
        "ome_zarr is required for this reader. " "Install with `pip install 'ome_zarr'`"
    )

###############################################################################


[docs]class OmeZarrReader(Reader): """ Wraps the ome-zarr-py API to provide the same aicsimageio Reader API but for OmeZarr images. Parameters ---------- image: types.PathLike Path to image file to construct Reader for. fs_kwargs: Dict[str, Any] Any specific keyword arguments to pass down to the fsspec created filesystem. Default: {} """ @staticmethod def _is_supported_image(fs: AbstractFileSystem, path: str, **kwargs: Any) -> bool: try: ZarrReader(parse_url(path, mode="r")) return True except AttributeError: return False def __init__( self, image: types.PathLike, fs_kwargs: Dict[str, Any] = {}, ): # Expand details of provided image self._fs, self._path = io_utils.pathlike_to_fs( image, enforce_exists=False, fs_kwargs=fs_kwargs, ) # Enforce valid image if not self._is_supported_image(self._fs, self._path): raise exceptions.UnsupportedFileFormatError( self.__class__.__name__, self._path ) self._zarr = ZarrReader(parse_url(self._path, mode="r")).zarr self._physical_pixel_sizes: Optional[PhysicalPixelSizes] = None self._multiresolution_level = 0 self._channel_names: Optional[List[str]] = None @property def scenes(self) -> Tuple[str, ...]: if self._scenes is None: scenes = self._zarr.root_attrs["multiscales"] # if (each scene has a name) and (that name is unique) use name. # otherwise generate scene names. if all("name" in scene for scene in scenes) and ( len({scene["name"] for scene in scenes}) == len(scenes) ): self._scenes = tuple(str(scene["name"]) for scene in scenes) else: self._scenes = tuple( metadata_utils.generate_ome_image_id(i) for i in range(len(self._zarr.root_attrs["multiscales"])) ) return self._scenes @property def physical_pixel_sizes(self) -> PhysicalPixelSizes: """Return the physical pixel sizes of the image.""" if self._physical_pixel_sizes is None: try: z_size, y_size, x_size = OmeZarrReader._get_pixel_size( self._zarr, list(self.dims.order), self._current_scene_index, self._multiresolution_level, ) except Exception as e: warnings.warn(f"Could not parse zarr pixel size: {e}") z_size, y_size, x_size = None, None, None self._physical_pixel_sizes = PhysicalPixelSizes(z_size, y_size, x_size) return self._physical_pixel_sizes @property def channel_names(self) -> Optional[List[str]]: if self._channel_names is None: try: self._channel_names = [ str(channel["label"]) for channel in self._zarr.root_attrs["omero"]["channels"] ] except KeyError: self._channel_names = super().channel_names return self._channel_names def _read_delayed(self) -> xr.DataArray: return self._xarr_format(delayed=True) def _read_immediate(self) -> xr.DataArray: return self._xarr_format(delayed=False) def _xarr_format(self, delayed: bool) -> xr.DataArray: image_data = self._zarr.load(str(self.current_scene_index)) axes = self._zarr.root_attrs["multiscales"][self.current_scene_index].get( "axes" ) if axes: dims = [sub["name"].upper() for sub in axes] else: dims = list(OmeZarrReader._guess_dim_order(image_data.shape)) if not delayed: image_data = image_data.compute() coords = self._get_coords( dims, image_data.shape, scene=self.current_scene, channel_names=self.channel_names, ) return xr.DataArray( image_data, dims=dims, coords=coords, attrs={constants.METADATA_UNPROCESSED: self._zarr.root_attrs}, ) @staticmethod def _get_coords( dims: List[str], shape: Tuple[int, ...], scene: str, channel_names: Optional[List[str]], ) -> Dict[str, Any]: coords: Dict[str, Any] = {} # Use dims for coord determination if DimensionNames.Channel in dims: # Generate channel names if no existing channel names if channel_names is None: coords[DimensionNames.Channel] = [ metadata_utils.generate_ome_channel_id(image_id=scene, channel_id=i) for i in range(shape[dims.index(DimensionNames.Channel)]) ] else: coords[DimensionNames.Channel] = channel_names return coords @staticmethod def _get_pixel_size( reader: ZarrReader, dims: List[str], series_index: int, resolution_index: int ) -> Tuple[Optional[float], Optional[float], Optional[float]]: # OmeZarr file may contain an additional set of "coordinateTransformations" # these coefficents are applied to all resolution levels. if ( "coordinateTransformations" in reader.root_attrs["multiscales"][series_index] ): universal_res_consts = reader.root_attrs["multiscales"][series_index][ "coordinateTransformations" ][0]["scale"] else: universal_res_consts = [1.0 for _ in range(len(dims))] coord_transform = reader.root_attrs["multiscales"][series_index]["datasets"][ resolution_index ]["coordinateTransformations"] spatial_coeffs = {} for dim in [ DimensionNames.SpatialX, DimensionNames.SpatialY, DimensionNames.SpatialZ, ]: if dim in dims: dim_index = dims.index(dim) spatial_coeffs[dim] = ( coord_transform[0]["scale"][dim_index] * universal_res_consts[dim_index] ) else: spatial_coeffs[dim] = None return ( spatial_coeffs[DimensionNames.SpatialZ], spatial_coeffs[DimensionNames.SpatialY], spatial_coeffs[DimensionNames.SpatialX], )