#!/usr/bin/env python
# -*- coding: utf-8 -*-
import io
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import dask.array as da
import numpy as np
from aicspylibczi import CziFile
from dask import delayed
from lxml.etree import _Element
from .. import exceptions, types
from ..buffer_reader import BufferReader
from ..constants import Dimensions
from .reader import Reader
###############################################################################
log = logging.getLogger(__name__)
###############################################################################
[docs]class CziReader(Reader):
"""
CziReader wraps aicspylibczi to provide the same reading capabilities but abstracts
the specifics of using the backend library to create a unified interface. This
enables higher level functions to duck type the File Readers.
Parameters
----------
data: types.FileLike
A string or path to the CZI file to be read.
chunk_by_dims: List[str]
The dimensions to use as the for mapping the chunks / blocks.
Default: [Dimensions.SpatialZ, Dimensions.SpatialY, Dimensions.SpatialX]
Note: SpatialY and SpatialX will always be added to the list if not present.
S: int
If the image has different dimensions on any scene from another, the dask array
construction will fail. In that case, use this parameter to specify a specific
scene to construct a dask array for.
Default: 0 (select the first scene)
"""
ZEISS_2BYTE = b"ZI" # First two characters of a czi file according to Zeiss docs
ZEISS_10BYTE = b"ZISRAWFILE" # First 10 characters of a well formatted czi file.
def __init__(
self,
data: types.FileLike,
chunk_by_dims: List[str] = [
Dimensions.SpatialZ,
Dimensions.SpatialY,
Dimensions.SpatialX,
],
S: int = 0,
**kwargs,
):
# Run super init to check filepath provided
super().__init__(data, **kwargs)
# Store parameters needed for _daread
self.chunk_by_dims = chunk_by_dims
self.specific_s_index = S
@staticmethod
def _is_this_type(buffer: io.BytesIO) -> bool:
with BufferReader(buffer) as buffer_reader:
if buffer_reader.endianness != CziReader.ZEISS_2BYTE:
return False
header = buffer_reader.endianness + buffer_reader.read_bytes(8)
return header == CziReader.ZEISS_10BYTE
@staticmethod
def _read_image(
img: Path, read_dims: Optional[Dict[str, int]] = None
) -> Tuple[np.ndarray, List[Tuple[str, int]]]:
"""
Read and return the squeezed image data requested along with the dimension info
that was read.
Parameters
----------
img: Path
Path to the CZI file to read.
read_dims: Optional[Dict[str, int]]
The dimensions to read from the file as a dictionary of string to integer.
Default: None (Read all data from the image)
Returns
-------
data: np.ndarray
The data read for the dimensions provided.
read_dimensions: List[Tuple[str, int]]]
The dimension sizes that were returned from the read.
"""
# Catch optional read dim
if read_dims is None:
read_dims = {}
# Init czi
czi = CziFile(img)
# Read image
log.debug(f"Reading dimensions: {read_dims}")
data, dims = czi.read_image(**read_dims)
# Drop dims that shouldn't be provided back
ops = []
real_dims = []
for i, dim_info in enumerate(dims):
# Expand dimension info
dim, size = dim_info
# If the dim was provided in the read dims we know a single plane for that
# dimension was requested so remove it
if dim in read_dims:
ops.append(0)
# Otherwise just read the full slice
else:
ops.append(slice(None, None, None))
real_dims.append(dim_info)
# Convert ops and run getitem
return data[tuple(ops)], real_dims
@staticmethod
def _imread(img: Path, read_dims: Optional[Dict[str, str]] = None) -> np.ndarray:
data, dims = CziReader._read_image(img=img, read_dims=read_dims)
return data
@staticmethod
def _daread(
img: Path,
czi: CziFile,
chunk_by_dims: List[str] = [
Dimensions.SpatialZ,
Dimensions.SpatialY,
Dimensions.SpatialX,
],
S: int = 0,
) -> Tuple[da.core.Array, str]:
"""
Read a CZI image file as a delayed dask array where certain dimensions act as
the chunk size.
Parameters
----------
img: Path
The filepath to read.
czi: CziFile
The loaded CziFile object created from reading the filepath.
chunk_by_dims: List[str]
The dimensions to use as the for mapping the chunks / blocks.
Default: [Dimensions.SpatialZ, Dimensions.SpatialY, Dimensions.SpatialX]
Note: SpatialY and SpatialX will always be added to the list if not present.
S: int
If the image has different dimensions on any scene from another, the dask
array construction will fail. In that case, use this parameter to specify a
specific scene to construct a dask array for.
Default: 0 (select the first scene)
Returns
-------
img: dask.array.core.Array
The constructed dask array where certain dimensions are chunked.
dims: str
The dimension order as a string.
"""
# Get image dims indicies
image_dim_indices = czi.dims_shape()
# Catch inconsistent scene dimension sizes
if len(image_dim_indices) > 1:
# Choose the provided scene
try:
image_dim_indices = image_dim_indices[S]
log.info(
f"File contains variable dimensions per scene, "
f"selected scene: {S} for data retrieval."
)
except IndexError:
raise exceptions.InconsistentShapeError(
f"The CZI image provided has variable dimensions per scene. "
f"Please provide a valid index to the 'S' parameter to create a "
f"dask array for the index provided. "
f"Provided scene index: {S}. "
f"Scene index range: 0-{len(image_dim_indices)}."
)
else:
# If the list is length one that means that all the scenes in the image
# have the same dimensions
# Just select the first dictionary in the list
image_dim_indices = image_dim_indices[0]
# Uppercase dimensions provided to chunk by dims
chunk_by_dims = [d.upper() for d in chunk_by_dims]
# Always add Y and X dims to chunk by dims because that is how CZI files work
if Dimensions.SpatialY not in chunk_by_dims:
log.info(
"Adding the Spatial Y dimension to chunk by dimensions as it was not "
"found."
)
chunk_by_dims.append(Dimensions.SpatialY)
if Dimensions.SpatialX not in chunk_by_dims:
log.info(
"Adding the Spatial X dimension to chunk by dimensions as it was not "
"found."
)
chunk_by_dims.append(Dimensions.SpatialX)
# Setup read dimensions for an example chunk
first_chunk_read_dims = {}
for dim, (dim_begin_index, dim_end_index) in image_dim_indices.items():
# Only add the dimension if the dimension isn't a part of the chunk
if dim not in chunk_by_dims:
# Add to read dims
first_chunk_read_dims[dim] = dim_begin_index
# Read first chunk for information used by dask.array.from_delayed
sample, sample_dims = czi.read_image(**first_chunk_read_dims)
# Get the shape for the chunk and operating shape for the dask array
# We also collect the chunk and non chunk dimension ordering so that we can
# swap the dimensions after we
# block the dask array together.
sample_chunk_shape = []
operating_shape = []
non_chunk_dimension_ordering = []
chunk_dimension_ordering = []
for i, dim_info in enumerate(sample_dims):
# Unpack dim info
dim, size = dim_info
# If the dim is part of the specified chunk dims then append it to the
# sample, and, append the dimension
# to the chunk dimension ordering
if dim in chunk_by_dims:
sample_chunk_shape.append(size)
chunk_dimension_ordering.append(dim)
# Otherwise, append the dimension to the non chunk dimension ordering, and,
# append the true size of the
# image at that dimension
else:
non_chunk_dimension_ordering.append(dim)
operating_shape.append(
image_dim_indices[dim][1] - image_dim_indices[dim][0]
)
# Convert shapes to tuples and combine the non and chunked dimension orders as
# that is the order the data will
# actually come out of the read data as
sample_chunk_shape = tuple(sample_chunk_shape)
blocked_dimension_order = (
non_chunk_dimension_ordering + chunk_dimension_ordering
)
# Fill out the rest of the operating shape with dimension sizes of 1 to match
# the length of the sample chunk
# When dask.block happens it fills the dimensions from inner-most to outer-most
# with the chunks as long as the dimension is size 1
# Basically, we are adding empty dimensions to the operating shape that will be
# filled by the chunks from dask
operating_shape = tuple(operating_shape) + (1,) * len(sample_chunk_shape)
# Create empty numpy array with the operating shape so that we can iter through
# and use the multi_index to create the readers.
lazy_arrays = np.ndarray(operating_shape, dtype=object)
# We can enumerate over the multi-indexed array and construct read_dims
# dictionaries by simply zipping together the ordered dims list and the current
# multi-index plus the begin index for that plane. We then set the value of the
# array at the same multi-index to the delayed reader using the constructed
# read_dims dictionary.
dims = [d for d in czi.dims]
begin_indicies = tuple(image_dim_indices[d][0] for d in dims)
for i, _ in np.ndenumerate(lazy_arrays):
# Add the czi file begin index for each dimension to the array dimension
# index
this_chunk_read_indicies = (
current_dim_begin_index + curr_dim_index
for current_dim_begin_index, curr_dim_index in zip(begin_indicies, i)
)
# Zip the dims with the read indices
this_chunk_read_dims = dict(
zip(blocked_dimension_order, this_chunk_read_indicies)
)
# Remove the dimensions that we want to chunk by from the read dims
for d in chunk_by_dims:
if d in this_chunk_read_dims:
this_chunk_read_dims.pop(d)
# Add delayed array to lazy arrays at index
lazy_arrays[i] = da.from_delayed(
delayed(CziReader._imread)(img, this_chunk_read_dims),
shape=sample_chunk_shape,
dtype=sample.dtype,
)
# Convert the numpy array of lazy readers into a dask array and fill the inner
# most empty dimensions with chunks
merged = da.block(lazy_arrays.tolist())
# Because we have set certain dimensions to be chunked and others not
# we will need to transpose back to original dimension ordering
# Example being, if the original dimension ordering was "SZYX" and we want to
# chunk by "S", "Y", and "X" we created an array with dimensions ordering "ZSYX"
transpose_indices = []
transpose_required = False
for i, d in enumerate(czi.dims):
new_index = blocked_dimension_order.index(d)
if new_index != i:
transpose_required = True
transpose_indices.append(new_index)
else:
transpose_indices.append(i)
# Only run if the transpose is actually required
# The default case is "Z", "Y", "X", which _usually_ doesn't need to be
# transposed because that is _usually_ the normal dimension order of the CZI
# file anyway
if transpose_required:
merged = da.transpose(merged, tuple(transpose_indices))
# Because dimensions outside of Y and X can be in any order and present or not
# we also return the dimension order string.
return merged, "".join(dims)
@staticmethod
def _daread_safe(
img: Union[str, Path],
chunk_by_dims: List[str] = [
Dimensions.SpatialZ,
Dimensions.SpatialY,
Dimensions.SpatialX,
],
S: int = 0,
) -> Tuple[da.core.Array, str]:
"""
Safely read a CZI image file as a delayed dask array where certain dimensions
act as the chunk size.
Parameters
----------
img: Union[str, Path]
The filepath to read.
chunk_by_dims: List[str]
The dimensions to use as the for mapping the chunks / blocks.
Default: [Dimensions.SpatialZ, Dimensions.SpatialY, Dimensions.SpatialX]
Note: SpatialY and SpatialX will always be added to the list if not present.
S: int
If the image has different dimensions on any scene from another, the dask
array construction will fail.
In that case, use this parameter to specify a specific scene to construct a
dask array for.
Default: 0 (select the first scene)
Returns
-------
img: dask.array.core.Array
The constructed dask array where certain dimensions are chunked.
dims: str
The dimension order as a string.
"""
# Resolve image path
img = CziReader._resolve_image_path(img)
# Init temp czi
czi = CziFile(img)
# Safely construct the dask array or catch any exception
try:
return CziReader._daread(img=img, czi=czi, chunk_by_dims=chunk_by_dims, S=S)
except Exception as e:
# A really bad way to close any connection to the CZI object
czi._bytes = None
czi.reader = None
raise e
def _read_delayed(self) -> da.core.Array:
"""
Returns
-------
Constructed dask array where each chunk is a delayed read from the CZI file.
Places dimensions in the native order (i.e. "TZCYX")
"""
dask_array, _ = CziReader._daread_safe(
self._file, chunk_by_dims=self.chunk_by_dims, S=self.specific_s_index
)
return dask_array
def _read_immediate(self) -> da.core.Array:
# Init temp czi
czi = CziFile(self._file)
# Safely construct the numpy array or catch any exception
try:
# Get image dims indicies
image_dim_indices = czi.dims_shape()
# Catch inconsistent scene dimension sizes
if len(image_dim_indices) > 1:
# Choose the provided scene
log.info(
f"File contains variable dimensions per scene, "
f"selected scene: {self.specific_s_index} for data retrieval."
)
# Get the specific scene
if self.specific_s_index < len(image_dim_indices):
data, _ = czi.read_image(
**{Dimensions.Scene: self.specific_s_index}
)
else:
raise exceptions.InconsistentShapeError(
f"The CZI image provided has variable dimensions per scene. "
f"Please provide a valid index to the 'S' parameter to create "
f"a dask array for the index provided. "
f"Provided scene index: {self.specific_s_index}. "
f"Scene index range: 0-{len(image_dim_indices)}."
)
else:
# If the list is length one that means that all the scenes in the image
# have the same dimensions
# Read all data in the image
data, _ = czi.read_image()
# A really bad way to close any connection to the CZI object
czi._bytes = None
czi.reader = None
except Exception as e:
# A really bad way to close any connection to the CZI object
czi._bytes = None
czi.reader = None
raise e
return data
@property
def dims(self) -> str:
if self._dims is None:
self._dims = CziFile(self._file).dims
return self._dims
[docs] def dtype(self) -> np.dtype:
return self.dask_data.dtype
@property
def metadata(self) -> _Element:
"""
Load and return the metadata from the CZI file
Returns
-------
The lxml Element Tree of the metadata
"""
# We can't serialize lxml element trees so don't save the tree to the object
# state
return CziFile(self._file).meta
def _size_of_dimension(self, dim: str) -> int:
if dim in self.dims:
return self.dask_data.shape[self.dims.index(dim)]
return 1
[docs] def size_s(self) -> int:
return self._size_of_dimension(Dimensions.Scene)
[docs] def size_t(self) -> int:
return self._size_of_dimension(Dimensions.Time)
[docs] def size_c(self) -> int:
return self._size_of_dimension(Dimensions.Channel)
[docs] def size_z(self) -> int:
return self._size_of_dimension(Dimensions.SpatialZ)
[docs] def size_y(self) -> int:
return self._size_of_dimension(Dimensions.SpatialY)
[docs] def size_x(self) -> int:
return self._size_of_dimension(Dimensions.SpatialX)
[docs] def get_channel_names(self, scene: int = 0):
chelem = self.metadata.findall(
"./Metadata/Information/Image/Dimensions/Channels/Channel"
)
return [ch.get("Name") for ch in chelem]
# TODO refactor this utility function into a metadata wrapper class
def _getmetadataxmltext(self, findpath, default=None):
ref = self.metadata.find(findpath)
if ref is None:
return default
return ref.text
[docs] def get_physical_pixel_size(self, scene: int = 0) -> Tuple[float]:
px = float(
self._getmetadataxmltext(
"./Metadata/Scaling/Items/Distance[@Id='X']/Value", "1.0"
)
)
py = float(
self._getmetadataxmltext(
"./Metadata/Scaling/Items/Distance[@Id='Y']/Value", "1.0"
)
)
pz = float(
self._getmetadataxmltext(
"./Metadata/Scaling/Items/Distance[@Id='Z']/Value", "1.0"
)
)
return (px, py, pz)