Source code for aicsimageio.writers.ome_zarr_writer

import math
from typing import Dict, List, Optional, Tuple

import zarr
from ome_zarr.io import parse_url
from ome_zarr.scale import Scaler
from ome_zarr.writer import write_image
from zarr.storage import default_compressor

from .. import exceptions, types
from ..dimensions import DEFAULT_DIMENSION_ORDER, DimensionNames
from ..metadata import utils
from ..utils import io_utils


[docs]class OmeZarrWriter: def __init__(self, uri: types.PathLike): """ Constructor. Parameters ---------- uri: types.PathLike The URI or local path for where to save the data. """ # Resolve final destination fs, path = io_utils.pathlike_to_fs(uri) # Save image to zarr store! self.store = parse_url(uri, mode="w").store self.root_group = zarr.group(store=self.store)
[docs] @staticmethod def build_ome( size_z: int, image_name: str, channel_names: List[str], channel_colors: List[int], channel_minmax: List[Tuple[float, float]], ) -> Dict: """ Create the omero metadata for an OME zarr image Parameters ---------- size_z: Number of z planes image_name: The name of the image channel_names: The names for each channel channel_colors: List of all channel colors channel_minmax: List of all (min, max) pairs of channel intensities Returns ------- Dict An "omero" metadata object suitable for writing to ome-zarr """ ch = [] for i in range(len(channel_names)): ch.append( { "active": True, "coefficient": 1, "color": f"{channel_colors[i]:06x}", "family": "linear", "inverted": False, "label": channel_names[i], "window": { "end": float(channel_minmax[i][1]), "max": float(channel_minmax[i][1]), "min": float(channel_minmax[i][0]), "start": float(channel_minmax[i][0]), }, } ) omero = { "id": 1, # ID in OMERO "name": image_name, # Name as shown in the UI "version": "0.4", # Current version "channels": ch, "rdefs": { "defaultT": 0, # First timepoint to show the user "defaultZ": size_z // 2, # First Z section to show the user "model": "color", # "color" or "greyscale" }, # TODO: can we add more metadata here? # # from here down this is all extra and not part of the ome-zarr spec # "meta": { # "projectDescription": "20+ lines of gene edited cells etc", # "datasetName": "aics_hipsc_v2020.1", # "projectId": 2, # "imageDescription": "foo bar", # "imageTimestamp": 1277977808.0, # "imageId": 12, # "imageAuthor": "danielt", # "imageName": "AICS-12_143.ome.tif", # "datasetDescription": "variance dataset after QC", # "projectName": "aics cell variance project", # "datasetId": 3 # }, } return omero
@staticmethod def _build_chunk_dims( chunk_dim_map: Dict[str, int], dimension_order: str = DEFAULT_DIMENSION_ORDER, ) -> Tuple[int, ...]: return tuple(chunk_dim_map[d] for d in dimension_order)
[docs] def write_image( self, # TODO how to pass in precomputed multiscales? image_data: types.ArrayLike, # must be 3D, 4D or 5D image_name: str, physical_pixel_sizes: Optional[types.PhysicalPixelSizes], channel_names: Optional[List[str]], channel_colors: Optional[List[int]], chunk_dims: Optional[Tuple] = None, scale_num_levels: int = 1, scale_factor: float = 2.0, dimension_order: Optional[str] = None, ) -> None: """ Write a data array to a file. NOTE that this API is not yet finalized and will change in the future. Parameters ---------- image_data: types.ArrayLike The array of data to store. Data arrays must have 2 to 6 dimensions. If a list is provided, then it is understood to be multiple images written to the ome-tiff file. All following metadata parameters will be expanded to the length of this list. image_name: str string representing the name of the image physical_pixel_sizes: Optional[types.PhysicalPixelSizes] PhysicalPixelSizes object representing the physical pixel sizes in Z, Y, X in microns. Default: None channel_names: Optional[List[str]] Lists of strings representing the names of the data channels Default: None If None is given, the list will be generated as a 0-indexed list of strings of the form "Channel:image_index:channel_index" channel_colors: Optional[List[int]] List of rgb color values per channel or a list of lists for each image. These must be values compatible with the OME spec. Default: None scale_num_levels: Optional[int] Number of pyramid levels to use for the image. Default: 1 (represents no downsampled levels) scale_factor: Optional[float] The scale factor to use for the image. Only active if scale_num_levels > 1. Default: 2.0 dimension_order: Optional[str] The dimension order of the data. If None is given, the dimension order will be guessed from the number of dimensions in the data according to TCZYX order. Examples -------- Write a TCZYX data set to OME-Zarr >>> image = numpy.ndarray([1, 10, 3, 1024, 2048]) ... writer = OmeZarrWriter("/path/to/file.ome.zarr") ... writer.write_image(image) Write multi-scene data to OME-Zarr, specifying channel names >>> image0 = numpy.ndarray([3, 10, 1024, 2048]) ... image1 = numpy.ndarray([3, 10, 512, 512]) ... writer = OmeZarrWriter("/path/to/file.ome.zarr") ... writer.write_image(image0, "Image:0", ["C00","C01","C02"]) ... writer.write_image(image1, "Image:1", ["C10","C11","C12"]) """ ndims = len(image_data.shape) if ndims < 2 or ndims > 5: raise exceptions.InvalidDimensionOrderingError( f"Image data must have 2, 3, 4, or 5 dimensions. " f"Received image data with shape: {image_data.shape}" ) if dimension_order is None: dimension_order = DEFAULT_DIMENSION_ORDER[-ndims:] if len(dimension_order) != ndims: raise exceptions.InvalidDimensionOrderingError( f"Dimension order {dimension_order} does not match data " f"shape: {image_data.shape}" ) if (len(set(dimension_order) - set(DEFAULT_DIMENSION_ORDER)) > 0) or len( dimension_order ) != len(set(dimension_order)): raise exceptions.InvalidDimensionOrderingError( f"Dimension order {dimension_order} is invalid or contains" f"unexpected dimensions. Only {DEFAULT_DIMENSION_ORDER}" f"currently supported." ) xdimindex = dimension_order.find(DimensionNames.SpatialX) ydimindex = dimension_order.find(DimensionNames.SpatialY) zdimindex = dimension_order.find(DimensionNames.SpatialZ) cdimindex = dimension_order.find(DimensionNames.Channel) if cdimindex > min(i for i in [xdimindex, ydimindex, zdimindex] if i > -1): raise exceptions.InvalidDimensionOrderingError( f"Dimension order {dimension_order} is invalid. Channel dimension " f"must be before X, Y, and Z." ) if chunk_dims is not None and len(chunk_dims) != ndims: raise exceptions.UnexpectedShapeError( f"Chunk dimensions:{chunk_dims} do not match data. " f"Expected chunk dimension length:{ndims}" ) if physical_pixel_sizes is None: pixelsizes = (1.0, 1.0, 1.0) else: pixelsizes = ( physical_pixel_sizes.Z if physical_pixel_sizes.Z is not None else 1.0, physical_pixel_sizes.Y if physical_pixel_sizes.Y is not None else 1.0, physical_pixel_sizes.X if physical_pixel_sizes.X is not None else 1.0, ) if channel_names is None: # TODO this isn't generating a very pretty looking name but it will be # unique channel_names = ( [ utils.generate_ome_channel_id(image_id=image_name, channel_id=i) for i in range(image_data.shape[cdimindex]) ] if cdimindex > -1 else [utils.generate_ome_channel_id(image_id=image_name, channel_id=0)] ) if channel_colors is None: # TODO generate proper colors or confirm that the underlying lib can handle # None channel_colors = ( [i for i in range(image_data.shape[cdimindex])] if cdimindex > -1 else [0] ) # Chunk spatial dimensions scale_dim_map = { DimensionNames.Time: 1.0, DimensionNames.Channel: 1.0, DimensionNames.SpatialZ: pixelsizes[0], DimensionNames.SpatialY: pixelsizes[1], DimensionNames.SpatialX: pixelsizes[2], } transforms = [ [ # the voxel size for the first scale level { "type": "scale", "scale": [scale_dim_map[d] for d in dimension_order], } ] ] # TODO precompute sizes for downsampled also. plane_size = ( image_data.shape[xdimindex] * image_data.shape[ydimindex] * image_data.itemsize ) target_chunk_size = 16 * (1024 * 1024) # 16 MB # this is making an assumption of chunking whole XY planes. if chunk_dims is None: nplanes_per_chunk = int(math.ceil(target_chunk_size / plane_size)) nplanes_per_chunk = ( min(nplanes_per_chunk, image_data.shape[zdimindex]) if zdimindex > -1 else 1 ) chunk_dim_map = { DimensionNames.Time: 1, DimensionNames.Channel: 1, DimensionNames.SpatialZ: nplanes_per_chunk, DimensionNames.SpatialY: image_data.shape[ydimindex], DimensionNames.SpatialX: image_data.shape[xdimindex], } chunks = [ dict( chunks=OmeZarrWriter._build_chunk_dims( chunk_dim_map=chunk_dim_map, dimension_order=dimension_order ), compressor=default_compressor, ) ] else: chunks = [ dict( chunks=chunk_dims, compressor=default_compressor, ) ] lasty = image_data.shape[ydimindex] lastx = image_data.shape[xdimindex] # TODO scaler might want to use different method for segmentations than raw # TODO allow custom scaler or pre-computed multiresolution levels if scale_num_levels > 1: # TODO As of this writing, this Scaler is not the most general # implementation (it does things by xy plane) but it's code already # written that also works with dask, so it's a good starting point. scaler = Scaler() scaler.method = "nearest" scaler.max_layer = scale_num_levels - 1 scaler.downscale = scale_factor if scale_factor is not None else 2 for _ in range(scale_num_levels - 1): scale_dim_map[DimensionNames.SpatialY] *= scaler.downscale scale_dim_map[DimensionNames.SpatialX] *= scaler.downscale transforms.append( [ { "type": "scale", "scale": [scale_dim_map[d] for d in dimension_order], } ] ) if chunk_dims is None: lasty = int(math.ceil(lasty / scaler.downscale)) lastx = int(math.ceil(lastx / scaler.downscale)) chunk_dim_map = { DimensionNames.Time: 1, DimensionNames.Channel: 1, } plane_size = lasty * lastx * image_data.itemsize nplanes_per_chunk = int(math.ceil(target_chunk_size / plane_size)) nplanes_per_chunk = ( min(nplanes_per_chunk, image_data.shape[zdimindex]) if zdimindex > -1 else 1 ) chunk_dim_map[DimensionNames.SpatialZ] = nplanes_per_chunk chunk_dim_map[DimensionNames.SpatialY] = lasty chunk_dim_map[DimensionNames.SpatialX] = lastx chunks.append( dict( chunks=OmeZarrWriter._build_chunk_dims( chunk_dim_map=chunk_dim_map, dimension_order=dimension_order, ), compressor=default_compressor, ) ) else: rescaley = int(math.ceil(chunk_dims[ydimindex] / scaler.downscale)) rescalex = int(math.ceil(chunk_dims[xdimindex] / scaler.downscale)) chunk_dims = tuple(list(chunk_dims[:-2]) + [rescaley, rescalex]) chunks.append( dict( chunks=chunk_dims, compressor=default_compressor, ) ) else: scaler = None # try to construct per-image metadata ome_json = OmeZarrWriter.build_ome( image_data.shape[zdimindex] if zdimindex > -1 else 1, image_name, channel_names=channel_names, # type: ignore channel_colors=channel_colors, # type: ignore # This can be slow if computed here. # TODO: Rely on user to supply the per-channel min/max. channel_minmax=[ (0.0, 1.0) for i in range(image_data.shape[cdimindex] if cdimindex > -1 else 1) ], ) # TODO user supplies units? dim_to_axis = { DimensionNames.Time: {"name": "t", "type": "time", "unit": "millisecond"}, DimensionNames.Channel: {"name": "c", "type": "channel"}, DimensionNames.SpatialZ: { "name": "z", "type": "space", "unit": "micrometer", }, DimensionNames.SpatialY: { "name": "y", "type": "space", "unit": "micrometer", }, DimensionNames.SpatialX: { "name": "x", "type": "space", "unit": "micrometer", }, } axes = [dim_to_axis[d] for d in dimension_order] # TODO image name must be unique within this root group group = self.root_group # .create_group(image_name, overwrite=True) group.attrs["omero"] = ome_json write_image( image=image_data, group=group, scaler=scaler, axes=axes, # For each resolution, we have a List of transformation Dicts (not # validated). Each list of dicts are added to each datasets in order. coordinate_transformations=transforms, # Options to be passed on to the storage backend. A list would need to # match the number of datasets in a multiresolution pyramid. One can # provide different chunk size for each level of a pyramid using this # option. storage_options=chunks, )