Source code for aicsimageio.readers.czi_reader

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import xml.etree.ElementTree as ET
from copy import copy
from pathlib import Path
from typing import Any, Dict, Hashable, List, Optional, Tuple, Union

import dask.array as da
import numpy as np
import xarray as xr
from dask import delayed
from fsspec.implementations.local import LocalFileSystem
from fsspec.spec import AbstractFileSystem
from ome_types.model.ome import OME

from .. import constants, exceptions, types
from ..dimensions import DEFAULT_CHUNK_DIMS, REQUIRED_CHUNK_DIMS, DimensionNames
from ..metadata import utils as metadata_utils
from ..utils import io_utils
from .reader import Reader

try:
    from _aicspylibczi import BBox, TileInfo
    from aicspylibczi import CziFile

except ImportError:
    raise ImportError(
        "aicspylibczi is required for this reader. "
        "Install with `pip install 'aicspylibczi>=3.1.1' 'fsspec>=2022.7.1'`"
    )

###############################################################################

log = logging.getLogger(__name__)

###############################################################################

CZI_SAMPLES_DIM_CHAR = "A"
CZI_BLOCK_DIM_CHAR = "B"
CZI_SCENE_DIM_CHAR = "S"


###############################################################################

PIXEL_DICT = {
    "gray8": np.uint8,
    "gray16": np.uint16,
    "gray32": np.uint32,
    "gray32float": np.float32,
    "bgr24": np.uint8,
    "bgr48": np.uint16,
    "invalid": np.uint8,
}


###############################################################################


[docs]class CziReader(Reader): """ Wraps the aicspylibczi API to provide the same aicsimageio Reader API but for volumetric Zeiss CZI images. Parameters ---------- image: types.PathLike Path to image file to construct Reader for. chunk_dims: Union[str, List[str]] Which dimensions to create chunks for. Default: DEFAULT_CHUNK_DIMS Note: DimensionNames.SpatialY, DimensionNames.SpatialX, and DimensionNames.Samples, will always be added to the list if not present during dask array construction. include_subblock_metadata: bool Whether to append metadata from the subblocks to the rest of the embeded metadata. fs_kwargs: Dict[str, Any] Any specific keyword arguments to pass down to the fsspec created filesystem. Default: {} Notes ----- To use this reader, install with: `pip install aicspylibczi>=3.1.1`. """ @staticmethod def _is_supported_image(fs: AbstractFileSystem, path: str, **kwargs: Any) -> bool: try: if not isinstance(fs, LocalFileSystem): raise ValueError( f"Cannot read CZIs from non-local file system. " f"Received URI: {path}, which points to {type(fs)}." ) with fs.open(path) as open_resource: CziFile(open_resource.f) return True except RuntimeError: return False def __init__( self, image: types.PathLike, chunk_dims: Union[str, List[str]] = DEFAULT_CHUNK_DIMS, include_subblock_metadata: bool = False, fs_kwargs: Dict[str, Any] = {}, ): # Expand details of provided image self._fs, self._path = io_utils.pathlike_to_fs( image, enforce_exists=True, fs_kwargs=fs_kwargs, ) # Catch non-local file system if not isinstance(self._fs, LocalFileSystem): raise ValueError( f"Cannot read CZIs from non-local file system. " f"Received URI: {self._path}, which points to {type(self._fs)}." ) # Store params if isinstance(chunk_dims, str): chunk_dims = list(chunk_dims) self.chunk_dims = chunk_dims self._include_subblock_metadata = include_subblock_metadata # Delayed storage self._px_sizes: Optional[types.PhysicalPixelSizes] = None self._mapped_dims: Optional[str] = None # Enforce valid image if not self._is_supported_image(self._fs, self._path): raise exceptions.UnsupportedFileFormatError( self.__class__.__name__, self._path ) @property def mapped_dims(self) -> str: if self._mapped_dims is None: with self._fs.open(self._path) as open_resource: czi = CziFile(open_resource.f) self._mapped_dims = CziReader._fix_czi_dims(czi.dims) return self._mapped_dims @staticmethod def _fix_czi_dims(dims: str) -> str: return ( dims.replace(CZI_BLOCK_DIM_CHAR, "") .replace(CZI_SCENE_DIM_CHAR, "") .replace(CZI_SAMPLES_DIM_CHAR, DimensionNames.Samples) ) @property def scenes(self) -> Tuple[str, ...]: """Note: scenes with no name (`None`) will be renamed to "filename-<scene index>" to prevent ambiguity. Similarly, scenes with same names are automatically appended with occurrence number to distinguish between the two. Returns: Tuple[str, ...]: Scene names/id """ if self._scenes is None: with self._fs.open(self._path) as open_resource: czi = CziFile(open_resource.f) xpath_str = "./Metadata/Information/Image/Dimensions/S/Scenes/Scene" meta_scenes = czi.meta.findall(xpath_str) scene_names: List[str] = [] # mapping of scene name to occurrences, indicating duplication. scene_name_frequency = {} for scene_idx, meta_scene in enumerate(meta_scenes): shape = meta_scene.find("Shape") if shape is not None: shape_name = shape.get("Name") scene_name = meta_scene.get("Name") combined_scene_name = f"{scene_name}-{shape_name}" else: combined_scene_name = meta_scene.get("Name") # Some scene names can be unpopulated, for those we should fill # with filename-idx if combined_scene_name is None: fname_prefix = Path(self._path).stem combined_scene_name = f"{fname_prefix}-{scene_idx}" # Check for duplicated names # first encounter with a duplicate modify original scene name # to reflect its new duplicate status if combined_scene_name not in scene_name_frequency: scene_name_frequency[combined_scene_name] = [scene_idx, 1] else: if scene_name_frequency[combined_scene_name][1] == 1: scene_names[ scene_name_frequency[combined_scene_name][0] ] += "-1" scene_name_frequency[combined_scene_name][1] += 1 combined_scene_name += ( f"-{scene_name_frequency[combined_scene_name][1]}" ) scene_names.append(combined_scene_name) # If the scene is implicit just assign it name Scene:0 if len(scene_names) < 1: scene_names = [metadata_utils.generate_ome_image_id(0)] else: # reconcile scene list against the dims shape dims_shape = czi.get_dims_shape() if len(scene_names) != len(dims_shape) and czi.shape_is_consistent: dims_shape_dict = dims_shape[0] scene_range = dims_shape_dict.get(CZI_SCENE_DIM_CHAR) if scene_range is not None: scene_names = scene_names[scene_range[0] : scene_range[1]] else: # If this is the root node of a split multiscene czi, # then the scene_range could be None because the dims_shape # will be effectively empty. # We do not currently support loading multi-file split # scene CZI files log.warning( "CZI file appears to contain multiple scenes but " "dimension data is not available in this file. " "Root node of split multi-scene CZI files are not " "supported by CziReader." ) self._scenes = tuple(scene_names) return self._scenes @staticmethod def _dims_shape_to_scene_dims_shape( dims_shape: List[Dict], scene_index: int, consistent: bool ) -> Dict[str, Tuple[int, int]]: """ This function takes the output of `get_dims_shape()` and returns a dictionary of dimensions for the selected scene Parameters ---------- dims_shape: List[Dict] a list of dictionaries, generated by `get_dims_shape()` scene_index: int the index of the scene being used consistent: bool true if the dictionaries are consistent could be represented compactly (dims_shape with length 1) Returns ------- A dictionary of dimensions, ie {"T": (0, 1), "C": (0, 3), "Y": (0, 256), "X":(0, 256)}. """ dims_shape_index = 0 if consistent else scene_index dims_shape_dict = dims_shape[dims_shape_index] dims_shape_dict.pop(CZI_SCENE_DIM_CHAR, None) return dims_shape_dict @staticmethod def _adjust_scene_index( dims_shape: List[Dict], scene_index: int, consistent: bool ) -> int: """ This function modifies a scene index to be an offset into the true scene indices reported by the czi file Parameters ---------- dims_shape: List[Dict] a list of dictionaries, generated by `get_dims_shape()` scene_index: int the index of the scene being used consistent: bool true if the dictionaries are consistent could be represented compactly (dims_shape with length 1) Returns ------- An int representing the scene index to use in a true libCZI dimension """ dims_shape_index = 0 if consistent else scene_index dims_shape_dict = dims_shape[dims_shape_index] scene_range = dims_shape_dict.get(CZI_SCENE_DIM_CHAR) if scene_range is None: return scene_index if not consistent: # we have selected a dims_shape_dict already based on scene index # let's make sure the scene index is in the S range if scene_index < scene_range[0] or scene_index >= scene_range[1]: raise ValueError( f"Scene index {scene_index} is not in the range " f"{scene_range[0]} to {scene_range[1]}" ) return scene_index return scene_range[0] + scene_index @staticmethod def _read_chunk_from_image( fs: AbstractFileSystem, path: str, scene: int, read_dims: Optional[Dict[str, int]] = None, ) -> np.ndarray: return CziReader._get_image_data( fs=fs, path=path, scene=scene, read_dims=read_dims )[0] @staticmethod def _get_image_data( fs: AbstractFileSystem, path: str, scene: int, read_dims: Optional[Dict[str, int]] = None, ) -> Tuple[np.ndarray, List[Tuple[str, int]]]: """ Read and return the squeezed image data requested along with the dimension info that was read. Parameters ---------- fs: AbstractFileSystem The file system to use for reading. path: str The path to the file to read. scene: int The scene index to pull the chunk from. read_dims: Optional[Dict[str, int]] The dimensions to read from the file as a dictionary of string to integer. Default: None (Read all data from the image) Returns ------- chunk: np.ndarray The image chunk read as a numpy array. read_dimensions: List[Tuple[str, int]]] The dimension sizes that were returned from the read. """ # Catch optional read dim if read_dims is None: read_dims = {} # Init czi with fs.open(path) as open_resource: czi = CziFile(open_resource.f) # Get current scene read dims adjusted_scene_index = CziReader._adjust_scene_index( czi.get_dims_shape(), scene, czi.shape_is_consistent ) read_dims[CZI_SCENE_DIM_CHAR] = adjusted_scene_index # Read image data, dims = czi.read_image(**read_dims) # Drop dims that shouldn't be provided back ops: List[Union[int, slice]] = [] real_dims = [] for dim_info in dims: # Expand dimension info dim, _ = dim_info # If the dim was provided in the read dims # we know a single plane for that dimension was requested so remove it if dim in read_dims or dim == CZI_BLOCK_DIM_CHAR: ops.append(0) # Otherwise just read the full slice else: ops.append(slice(None, None, None)) real_dims.append(dim_info) # Convert ops and run getitem return data[tuple(ops)], real_dims def _create_dask_array(self, czi: CziFile) -> xr.DataArray: """ Creates a delayed dask array for the file. Parameters ---------- czi: CziFile An open CziFile for processing. Returns ------- image_data: da.Array The fully constructed and fully delayed image as a Dask Array object. """ # Always add the plane dimensions if not present already for dim in REQUIRED_CHUNK_DIMS: if dim not in self.chunk_dims: self.chunk_dims.append(dim) # Safety measure / "feature" self.chunk_dims = [d.upper() for d in self.chunk_dims] # Construct the delayed dask array dims_shape = CziReader._dims_shape_to_scene_dims_shape( czi.get_dims_shape(), scene_index=self.current_scene_index, consistent=czi.shape_is_consistent, ) # Remove block dim as not useful dims_shape.pop(CZI_BLOCK_DIM_CHAR, None) dims_str = czi.dims for remove_dim_char in [CZI_BLOCK_DIM_CHAR, CZI_SCENE_DIM_CHAR]: dims_str = dims_str.replace(remove_dim_char, "") # Get the shape for the chunk and operating shape for the dask array # We also collect the chunk and non chunk dimension ordering so that we can # swap the dimensions after we # block the dask array together. sample_chunk_shape = [] operating_shape = [] non_chunk_dimension_ordering = [] chunk_dimension_ordering = [] for i, dim in enumerate(dims_str): # Unpack dim info _, dim_size = dims_shape[dim] # If the dim is part of the specified chunk dims then append it to the # sample, and, append the dimension # to the chunk dimension ordering if dim in self.chunk_dims: sample_chunk_shape.append(dim_size) chunk_dimension_ordering.append(dim) # Otherwise, append the dimension to the non chunk dimension ordering, and, # append the true size of the # image at that dimension else: non_chunk_dimension_ordering.append(dim) operating_shape.append(dim_size) # Convert shapes to tuples and combine the non and chunked dimension orders as # that is the order the data will # actually come out of the read data as sample_chunk_shape_tuple = tuple(sample_chunk_shape) blocked_dimension_order = ( non_chunk_dimension_ordering + chunk_dimension_ordering ) # Fill out the rest of the operating shape with dimension sizes of 1 to match # the length of the sample chunk # When dask.block happens it fills the dimensions from inner-most to outer-most # with the chunks as long as the dimension is size 1 # Basically, we are adding empty dimensions to the operating shape that will be # filled by the chunks from dask operating_shape_tuple = tuple(operating_shape) + (1,) * len( sample_chunk_shape_tuple ) # Create empty numpy array with the operating shape so that we can iter through # and use the multi_index to create the readers. lazy_arrays: np.ndarray = np.ndarray(operating_shape_tuple, dtype=object) # We can enumerate over the multi-indexed array and construct read_dims # dictionaries by simply zipping together the ordered dims list and the current # multi-index plus the begin index for that plane. We then set the value of the # array at the same multi-index to the delayed reader using the constructed # read_dims dictionary. dims = [ d for d in czi.dims if d not in [CZI_BLOCK_DIM_CHAR, CZI_SCENE_DIM_CHAR] ] begin_indicies = tuple(dims_shape[d][0] for d in dims) for np_index, _ in np.ndenumerate(lazy_arrays): # Add the czi file begin index for each dimension to the array dimension # index this_chunk_read_indicies = ( current_dim_begin_index + curr_dim_index for current_dim_begin_index, curr_dim_index in zip( begin_indicies, np_index ) ) # Zip the dims with the read indices this_chunk_read_dims = dict( zip(blocked_dimension_order, this_chunk_read_indicies) ) # Remove the dimensions that we want to chunk by from the read dims for d in self.chunk_dims: this_chunk_read_dims.pop(d, None) # Get pixel type and catch unsupported pixel_type = PIXEL_DICT.get(czi.pixel_type) if pixel_type is None: raise TypeError(f"Pixel type: {czi.pixel_type} is not supported.") # Add delayed array to lazy arrays at index lazy_arrays[np_index] = da.from_delayed( delayed(CziReader._read_chunk_from_image)( fs=self._fs, path=self._path, scene=self.current_scene_index, read_dims=this_chunk_read_dims, ), shape=sample_chunk_shape, dtype=pixel_type, ) # Convert the numpy array of lazy readers into a dask array and fill the inner # most empty dimensions with chunks merged = da.block(lazy_arrays.tolist()) # Because we have set certain dimensions to be chunked and others not # we will need to transpose back to original dimension ordering # Example being, if the original dimension ordering was "SZYX" and we want to # chunk by "S", "Y", and "X" we created an array with dimensions ordering "ZSYX" transpose_indices = [] transpose_required = False for i, d in enumerate(dims_str): new_index = blocked_dimension_order.index(d) if new_index != i: transpose_required = True transpose_indices.append(new_index) else: transpose_indices.append(i) # Only run if the transpose is actually required # The default case is "Z", "Y", "X", which _usually_ doesn't need to be # transposed because that is _usually_ the normal dimension order of the CZI # file anyway if transpose_required: merged = da.transpose(merged, tuple(transpose_indices)) # Because dimensions outside of Y and X can be in any order and present or not # we also return the dimension order string. return merged @staticmethod def _get_coords_and_physical_px_sizes( xml: ET.Element, scene_index: int, dims_shape: Dict[str, Any] ) -> Tuple[Dict[str, Any], types.PhysicalPixelSizes]: # Create coord dict coords: Dict[str, Any] = {} # Get all images img_sets = xml.findall(".//Image/Dimensions/Channels") if len(img_sets) != 0: # Select the current scene img = img_sets[0] if scene_index < len(img_sets): img = img_sets[scene_index] # Construct channel name list scene_channel_list = [] channels = img.findall("./Channel") for i, channel in enumerate(channels): # Id is required, Name is not. # But we prefer to use Name if it is present channel_name = channel.attrib.get("Name") channel_id = channel.attrib.get("Id") if channel_name is None: # TODO: the next best guess is to see if there's a Name in # DisplaySetting/Channels/Channel # xpath_str = "./Metadata/DisplaySetting/Channels" # displaysetting_channels = xml.findall(xpath_str) # ds_channels = displaysetting_channels[0].findall("./Channel") # to find matching channel must match on Id attribute or if Id not # present, just on collection index i # If we didn't find a match this way, just use the Id as the name channel_name = channel_id if channel_name is None: # This is actually an error because Id was required by the spec channel_name = metadata_utils.generate_ome_channel_id( str(scene_index), str(i) ) scene_channel_list.append(channel_name) # Attach channel names to coords coords[DimensionNames.Channel] = scene_channel_list # Unpack short info scales list_xs = xml.findall(".//Distance[@Id='X']") list_ys = xml.findall(".//Distance[@Id='Y']") list_zs = xml.findall(".//Distance[@Id='Z']") scale_xe = list_xs[0].find("./Value") scale_ye = list_ys[0].find("./Value") scale_ze = None if len(list_zs) == 0 else list_zs[0].find("./Value") # Set default scales scale_x = None scale_y = None scale_z = None # Unpack the string value to a float # the values are stored in units of meters always in .czi, so # divide by 1E-6 to convert to microns if scale_xe is not None and scale_xe.text is not None: scale_x = float(scale_xe.text) / (1e-6) if scale_ye is not None and scale_ye.text is not None: scale_y = float(scale_ye.text) / (1e-6) if scale_ze is not None and scale_ze.text is not None: scale_z = float(scale_ze.text) / (1e-6) # Handle Spatial Dimensions for scale, dim_name in [ (scale_z, DimensionNames.SpatialZ), (scale_y, DimensionNames.SpatialY), (scale_x, DimensionNames.SpatialX), ]: if scale is not None and dim_name in dims_shape: dim_size = dims_shape[dim_name][1] - dims_shape[dim_name][0] coords[dim_name] = Reader._generate_coord_array(0, dim_size, scale) # Time # TODO: unpack "TimeSpan" elements # I can find a single "TimeSpan" in our data but unsure how multi-scene handles # Create physical pixel sizes px_sizes = types.PhysicalPixelSizes(scale_z, scale_y, scale_x) return coords, px_sizes def _read_delayed(self) -> xr.DataArray: """ Construct the delayed xarray DataArray object for the image. Returns ------- image: xr.DataArray The fully constructed and fully delayed image as a DataArray object. Metadata is attached in some cases as coords, dims, and attrs. Raises ------ exceptions.UnsupportedFileFormatError The file could not be read or is not supported. """ with self._fs.open(self._path) as open_resource: czi = CziFile(open_resource.f) dims_shape = CziReader._dims_shape_to_scene_dims_shape( dims_shape=czi.get_dims_shape(), scene_index=self.current_scene_index, consistent=czi.shape_is_consistent, ) # Get dims as list for xarray img_dims_list = list(self.mapped_dims) # Get image data image_data = self._create_dask_array(czi) # Create coordinate planes meta = czi.meta coords, px_sizes = self._get_coords_and_physical_px_sizes( xml=meta, scene_index=self.current_scene_index, dims_shape=dims_shape, ) # Append subblock metadata to the other metadata if param is True if self._include_subblock_metadata: subblocks = czi.read_subblock_metadata(unified_xml=True) meta.append(subblocks) # Store pixel sizes self._px_sizes = px_sizes # handle edge case where image has 0,0 YX dims: if image_data.shape[-2:] == (0, 0): return xr.DataArray( dims=coords.keys(), coords=coords, attrs={constants.METADATA_UNPROCESSED: meta}, ) else: return xr.DataArray( image_data, dims=img_dims_list, coords=coords, attrs={constants.METADATA_UNPROCESSED: meta}, ) def _read_immediate(self) -> xr.DataArray: """ Construct the in-memory xarray DataArray object for the image. Returns ------- image: xr.DataArray The fully constructed and fully read into memory image as a DataArray object. Metadata is attached in some cases as coords, dims, and attrs. Raises ------ exceptions.UnsupportedFileFormatError The file could not be read or is not supported. """ with self._fs.open(self._path) as open_resource: czi = CziFile(open_resource.f) dims_shape = CziReader._dims_shape_to_scene_dims_shape( dims_shape=czi.get_dims_shape(), scene_index=self.current_scene_index, consistent=czi.shape_is_consistent, ) # Get image data image_data, _ = self._get_image_data( fs=self._fs, path=self._path, scene=self.current_scene_index, ) # Get metadata meta = czi.meta # Create coordinate planes coords, px_sizes = self._get_coords_and_physical_px_sizes( xml=meta, scene_index=self.current_scene_index, dims_shape=dims_shape, ) # Store pixel sizes self._px_sizes = px_sizes return xr.DataArray( image_data, dims=[d for d in self.mapped_dims], coords=coords, attrs={constants.METADATA_UNPROCESSED: meta}, ) @staticmethod def _stitch_tiles( data: types.ArrayLike, data_dims: str, data_dims_shape: Dict[str, Tuple[int, int]], tile_bboxes: Dict[TileInfo, BBox], final_bbox: BBox, ) -> types.ArrayLike: # Assumptions: 1) docs for ZEISSRAW(CZI) say: # Scene – for clustering items in X/Y direction (data belonging to # contiguous regions of interests in a mosaic image). # Store the mosaic array shape arr_shape_list = [] ordered_dims_present = [ dim for dim in data_dims if dim not in [CZI_BLOCK_DIM_CHAR, DimensionNames.MosaicTile] ] for dim in ordered_dims_present: if dim not in REQUIRED_CHUNK_DIMS: arr_shape_list.append(data_dims_shape[dim][1]) if dim is DimensionNames.SpatialY: arr_shape_list.append(final_bbox.h) if dim is DimensionNames.SpatialX: arr_shape_list.append(final_bbox.w) if dim is DimensionNames.Samples: arr_shape_list.append(data_dims_shape[CZI_SAMPLES_DIM_CHAR][1]) ans = None if isinstance(data, da.Array): ans = da.zeros( shape=tuple(arr_shape_list), dtype=data.dtype, ) else: ans = np.zeros(arr_shape_list, dtype=data.dtype) for (tile_info, box) in tile_bboxes.items(): # Construct data indexes to use tile_dims = tile_info.dimension_coordinates tile_dims.pop(CZI_SCENE_DIM_CHAR, None) tile_dims.pop(CZI_BLOCK_DIM_CHAR, None) data_indexes = [ tile_dims[t_dim] for t_dim in data_dims if t_dim not in REQUIRED_CHUNK_DIMS ] # Add Y and X data_indexes.append(slice(None)) # Y ":" data_indexes.append(slice(None)) # X ":" if CZI_SAMPLES_DIM_CHAR in tile_dims.keys(): data_indexes.append(slice(None)) # Construct data indexes for ans ans_indexes = [] for dim in ordered_dims_present: if dim not in [ DimensionNames.MosaicTile, DimensionNames.Samples, DimensionNames.SpatialY, DimensionNames.SpatialX, ]: if dim in tile_dims.keys(): ans_indexes.append(tile_dims[dim]) if dim is DimensionNames.SpatialY: start = box.y - final_bbox.y ans_indexes.append(slice(start, start + box.h, 1)) if dim is DimensionNames.SpatialX: start = box.x - final_bbox.x ans_indexes.append(slice(start, start + box.w, 1)) if dim is DimensionNames.Samples: ans_indexes.append(slice(None)) # Assign the tiles into ans ans[tuple(ans_indexes)] = data[tuple(data_indexes)] return ans def _construct_mosaic_xarray(self, data: types.ArrayLike) -> xr.DataArray: # Get max of mosaic positions from lif with self._fs.open(self._path) as open_resource: czi = CziFile(open_resource.f) dims_shape = CziReader._dims_shape_to_scene_dims_shape( dims_shape=czi.get_dims_shape(), scene_index=self.current_scene_index, consistent=czi.shape_is_consistent, ) bboxes = czi.get_all_mosaic_tile_bounding_boxes(S=self.current_scene_index) mosaic_scene_bbox = czi.get_mosaic_scene_bounding_box( index=self.current_scene_index ) # Stitch stitched = self._stitch_tiles( data=data, data_dims=self.mapped_dims, data_dims_shape=dims_shape, tile_bboxes=bboxes, final_bbox=mosaic_scene_bbox, ) # Copy metadata dims = [ d for d in self.xarray_dask_data.dims if d is not DimensionNames.MosaicTile ] coords: Dict[Hashable, Any] = { d: v for d, v in self.xarray_dask_data.coords.items() if d not in [ DimensionNames.MosaicTile, DimensionNames.SpatialY, DimensionNames.SpatialX, ] } # Add expanded Y and X coords if self.physical_pixel_sizes.Y is not None: dim_y_index = dims.index(DimensionNames.SpatialY) coords[DimensionNames.SpatialY] = Reader._generate_coord_array( 0, stitched.shape[dim_y_index], self.physical_pixel_sizes.Y ) if self.physical_pixel_sizes.X is not None: dim_x_index = dims.index(DimensionNames.SpatialX) coords[DimensionNames.SpatialX] = Reader._generate_coord_array( 0, stitched.shape[dim_x_index], self.physical_pixel_sizes.X ) attrs = copy(self.xarray_dask_data.attrs) return xr.DataArray( data=stitched, dims=dims, coords=coords, attrs=attrs, ) def _get_stitched_dask_mosaic(self) -> xr.DataArray: return self._construct_mosaic_xarray(self.dask_data) def _get_stitched_mosaic(self) -> xr.DataArray: return self._construct_mosaic_xarray(self.data) @property def ome_metadata(self) -> OME: return metadata_utils.transform_metadata_with_xslt( self.metadata, Path(__file__).parent.parent / "metadata/czi-to-ome-xslt/xslt/czi-to-ome.xsl", ) @property def physical_pixel_sizes(self) -> types.PhysicalPixelSizes: """ Returns ------- sizes: PhysicalPixelSizes Using available metadata, the floats representing physical pixel sizes for dimensions Z, Y, and X. Notes ----- We currently do not handle unit attachment to these values. Please see the file metadata for unit information. """ if self._px_sizes is None: # We get pixel sizes as a part of array construct # so simply run array construct self.dask_data if self._px_sizes is None: raise ValueError("Pixel sizes weren't created as a part of image reading") return self._px_sizes
[docs] def get_mosaic_tile_position( self, mosaic_tile_index: int, **kwargs: int, ) -> Tuple[int, int]: """ Get the absolute position of the top left point for a single mosaic tile. Parameters ---------- mosaic_tile_index: int The index for the mosaic tile to retrieve position information for. kwargs: int The keywords below allow you to specify the dimensions that you wish to match. If you under-specify the constraints you can easily end up with a massive image stack. Z = 1 # The Z-dimension. C = 2 # The C-dimension ("channel"). T = 3 # The T-dimension ("time"). Returns ------- top: int The Y coordinate for the tile position. left: int The X coordinate for the tile position. Raises ------ UnexpectedShapeError The image has no mosaic dimension available. Notes ----- Defaults T and C dimensions to 0 if present as dimensions in image to avoid reading in massive image stack for large files. """ if DimensionNames.MosaicTile not in self.dims.order: raise exceptions.UnexpectedShapeError("No mosaic dimension in image.") with self._fs.open(self._path) as open_resource: czi = CziFile(open_resource.f) # Default Channel and Time dimensions to 0 to improve # worst case read time for large files **only** # when those dimensions are present on the image. for dimension_name in [DimensionNames.Channel, DimensionNames.Time]: if dimension_name not in kwargs and dimension_name in self.dims.order: kwargs[dimension_name] = 0 bbox = czi.get_mosaic_tile_bounding_box( M=mosaic_tile_index, S=self.current_scene_index, **kwargs ) return bbox.y, bbox.x
[docs] def get_mosaic_tile_positions(self, **kwargs: int) -> List[Tuple[int, int]]: """ Get the absolute positions of the top left points for each mosaic tile matching the specified dimensions and current scene. Parameters ---------- kwargs: int The keywords below allow you to specify the dimensions that you wish to match. If you under-specify the constraints you can easily end up with a massive image stack. Z = 1 # The Z-dimension. C = 2 # The C-dimension ("channel"). T = 3 # The T-dimension ("time"). Returns ------- mosaic_tile_positions: List[Tuple[int, int]] List of the Y and X coordinate for the tile positions. Raises ------ UnexpectedShapeError The image has no mosaic dimension available. """ if DimensionNames.MosaicTile not in self.dims.order: raise exceptions.UnexpectedShapeError("No mosaic dimension in image.") with self._fs.open(self._path) as open_resource: czi = CziFile(open_resource.f) tile_info_to_bboxes = czi.get_all_mosaic_tile_bounding_boxes( S=self.current_scene_index, **kwargs ) # Convert dictionary of tile info mappings to # a list of bounding boxes sorted according to their # respective M indexes m_indexes_to_mosaic_positions = { tile_info.m_index: (bbox.y, bbox.x) for tile_info, bbox in tile_info_to_bboxes.items() } return [ m_indexes_to_mosaic_positions[m_index] for m_index in sorted(m_indexes_to_mosaic_positions.keys()) ]