Source code for aicsimageio.writers.ome_tiff_writer

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from typing import Any, Dict, List, Optional, Tuple, Union

import dask.array as da
import numpy as np
import tifffile
from fsspec.implementations.local import LocalFileSystem
from ome_types import from_xml, to_xml
from ome_types.model import OME, Channel, Image, Pixels, TiffData
from ome_types.model.simple_types import ChannelID, Color, PositiveFloat, PositiveInt
from tifffile import TIFF

from .. import exceptions, get_module_version, types
from ..dimensions import (
    DEFAULT_DIMENSION_ORDER,
    DEFAULT_DIMENSION_ORDER_LIST_WITH_SAMPLES,
    DEFAULT_DIMENSION_ORDER_WITH_SAMPLES,
    DimensionNames,
)
from ..metadata import utils
from ..utils import io_utils
from .writer import Writer

# This is the threshold to use BigTiff, if it's the 4GB boundary it should be 2**22 but
# the libtiff writer was unable to handle a 2GB numpy array.
# It would be great if we better understood exactly what this threshold is and how to
# calculate it but for now this is a stopgap working value
BIGTIFF_BYTE_LIMIT = 2**21


[docs]class OmeTiffWriter(Writer):
[docs] @staticmethod def save( data: Union[List[types.ArrayLike], types.ArrayLike], uri: types.PathLike, dim_order: Optional[Union[str, List[Union[str, None]]]] = None, ome_xml: Optional[Union[str, OME]] = None, channel_names: Optional[Union[List[str], List[Optional[List[str]]]]] = None, image_name: Optional[Union[str, List[Union[str, None]]]] = None, physical_pixel_sizes: Optional[ Union[types.PhysicalPixelSizes, List[types.PhysicalPixelSizes]] ] = None, channel_colors: Optional[ Union[List[List[int]], List[Optional[List[List[int]]]]] ] = None, fs_kwargs: Dict[str, Any] = {}, **kwargs: Any, ) -> None: """ Write a data array to a file. Parameters ---------- data: Union[List[types.ArrayLike], types.ArrayLike] The array of data to store. Data arrays must have 2 to 6 dimensions. If a list is provided, then it is understood to be multiple images written to the ome-tiff file. All following metadata parameters will be expanded to the length of this list. uri: types.PathLike The URI or local path for where to save the data. Note: OmeTiffWriter can only write to local file systems. dim_order: Optional[Union[str, List[Union[str, None]]]] The dimension order of the provided data. Dimensions must be a list of T, C, Z, Y, Z, and S (S=samples for rgb data). Dimension strings must be same length as number of dimensions in the data. If S is present it must be last and its data count must be 3 or 4. Default: None. If None is provided for any data array, we will guess dimensions based on a TCZYX ordering. In the None case, data will be assumed to be scalar, not RGB. ome_xml: Optional[Union[str, OME]] Provided OME metadata. The metadata can be an xml string or an OME object from ome-types. A provided ome_xml will override any other provided metadata arguments. Default: None The passed-in metadata will be validated against current OME_XML schema and raise exception if invalid. The ome_xml will also be compared against the dimensions of the input data. If None is given, then OME-XML metadata will be generated from the data array and any of the following metadata arguments. channel_names: Optional[Union[List[str], List[Optional[List[str]]]]] Lists of strings representing the names of the data channels Default: None If None is given, the list will be generated as a 0-indexed list of strings of the form "Channel:image_index:channel_index" image_names: Optional[Union[str, List[Union[str, None]]]] List of strings representing the names of the images Default: None If None is given, the list will be generated as a 0-indexed list of strings of the form "Image:image_index" physical_pixel_sizes: Optional[Union[types.PhysicalPixelSizes, List[types.PhysicalPixelSizes]]] List of numbers representing the physical pixel sizes in Z, Y, X in microns Default: None channel_colors: Optional[Union[List[List[int]], List[Optional[List[List[int]]]]] List of rgb color values per channel or a list of lists for each image. These must be values compatible with the OME spec. Default: None fs_kwargs: Dict[str, Any] Any specific keyword arguments to pass down to the fsspec created filesystem. Default: {} Raises ------ ValueError: Non-local file system URI provided. Examples -------- Write a TCZYX data set to OME-Tiff >>> image = numpy.ndarray([1, 10, 3, 1024, 2048]) ... OmeTiffWriter.save(image, "file.ome.tif") Write data with a dimension order into OME-Tiff >>> image = numpy.ndarray([10, 3, 1024, 2048]) ... OmeTiffWriter.save(image, "file.ome.tif", dim_order="ZCYX") Write multi-scene data to OME-Tiff, specifying channel names >>> image0 = numpy.ndarray([3, 10, 1024, 2048]) ... image1 = numpy.ndarray([3, 10, 512, 512]) ... OmeTiffWriter.save( ... [image0, image1], ... "file.ome.tif", ... dim_order="CZYX", # this single value will be repeated to each image ... channel_names=[["C00","C01","C02"],["C10","C11","C12"]] ... ) """ # Resolve final destination fs, path = io_utils.pathlike_to_fs(uri, fs_kwargs=fs_kwargs) # Catch non-local file system if not isinstance(fs, LocalFileSystem): raise ValueError( f"Cannot write to non-local file system. " f"Received URI: {uri}, which points to {type(fs)}." ) # If metadata is attached as lists, enforce matching shape if isinstance(data, list): num_images = len(data) if isinstance(dim_order, list): if len(dim_order) != num_images: raise exceptions.ConflictingArgumentsError( f"OmeTiffWriter received a list of arrays to use as scenes " f"but the provided list of dimension_order is of different " f"length. " f"Number of provided scenes: {num_images}, " f"Number of provided dimension strings: " f"{len(dim_order)}" ) if isinstance(image_name, list): if len(image_name) != num_images: raise exceptions.ConflictingArgumentsError( f"OmeTiffWriter received a list of arrays to use as scenes " f"but the provided list of image_names is of different " f"length. " f"Number of provided scenes: {num_images}, " f"Number of provided dimension strings: {len(image_name)}" ) if isinstance(physical_pixel_sizes, list): if len(physical_pixel_sizes) != num_images: raise exceptions.ConflictingArgumentsError( f"OmeTiffWriter received a list of arrays to use as scenes " f"but the provided list of image_names is of different " f"length. " f"Number of provided scenes: {num_images}, " f"Number of provided dimension strings: " f"{len(physical_pixel_sizes)}" ) if channel_names is not None: if isinstance(channel_names[0], list): if len(channel_names) != num_images: raise exceptions.ConflictingArgumentsError( f"OmeTiffWriter received a list of arrays to use as scenes " f"but the provided list of channel_names is of different " f"length. " f"Number of provided scenes: {num_images}, " f"Number of provided dimension strings: " f"{len(channel_names)}" ) if channel_colors is not None: if isinstance(channel_colors[0], list): if not isinstance(channel_colors[0][0], int): if len(channel_colors) != num_images: raise exceptions.ConflictingArgumentsError( f"OmeTiffWriter received a list of arrays to use as " f"scenes but the provided list of channel_colors is of " f"different length. " f"Number of provided scenes: {num_images}, " f"Number of provided dimension strings: " f"{len(channel_colors)}" ) # make sure data is a list if not isinstance(data, list): data = [data] num_images = len(data) # If metadata is attached as singles, expand to lists to match data if dim_order is None or isinstance(dim_order, str): dim_order = [dim_order] * num_images if image_name is None or isinstance(image_name, str): image_name = [image_name] * num_images if isinstance(physical_pixel_sizes, tuple): physical_pixel_sizes = [physical_pixel_sizes] * num_images elif physical_pixel_sizes is None: physical_pixel_sizes = [ types.PhysicalPixelSizes(None, None, None) ] * num_images if channel_names is None or isinstance(channel_names[0], str): channel_names = [channel_names] * num_images # type: ignore if channel_colors is not None: if all( [ ( channel_colors[img_idx] is None or isinstance(channel_colors[img_idx], list) ) for img_idx in range(num_images) ] ): single_image_channel_colors_provided = False else: single_image_channel_colors_provided = True if ( channel_colors[0] is not None and isinstance(channel_colors[0], list) and isinstance(channel_colors[0][0], int) ): single_image_channel_colors_provided = True if channel_colors is None or single_image_channel_colors_provided: channel_colors = [channel_colors] * num_images # type: ignore xml = b"" # try to construct OME from params if ome_xml is None: ome_xml = OmeTiffWriter.build_ome( [i.shape for i in data], [i.dtype for i in data], channel_names=channel_names, # type: ignore image_name=image_name, physical_pixel_sizes=physical_pixel_sizes, channel_colors=channel_colors, # type: ignore dimension_order=dim_order, ) # else if string, then construct OME from string elif isinstance(ome_xml, str): ome_xml = from_xml(ome_xml, parser="lxml") # if we do not have an OME object now, something is wrong if not isinstance(ome_xml, OME): raise TypeError( "Unknown OME-XML metadata passed in. Use OME object, or xml string or \ None" ) # vaidate ome for scene_index in range(num_images): OmeTiffWriter._check_ome_dims( ome_xml, scene_index, data[scene_index].shape, data[scene_index].dtype ) # convert to string for writing xml = to_xml(ome_xml).encode() # Save image to tiff! with fs.open(path, "wb") as open_resource: tif = tifffile.TiffWriter( open_resource, bigtiff=OmeTiffWriter._size_of_ndarray(data=data) > BIGTIFF_BYTE_LIMIT, ) # now the heavy lifting. assemble the raw data and write it for scene_index in range(num_images): image_data = data[scene_index] # Assumption: if provided a dask array to save, it can fit into memory if isinstance(image_data, da.core.Array): image_data = data[scene_index].compute() description = xml if scene_index == 0 else None # assume if first channel is rgb then all of it is spp = ome_xml.images[scene_index].pixels.channels[0].samples_per_pixel is_rgb = spp is not None and spp > 1 photometric = ( TIFF.PHOTOMETRIC.RGB if is_rgb else TIFF.PHOTOMETRIC.MINISBLACK ) planarconfig = TIFF.PLANARCONFIG.CONTIG if is_rgb else None tif.write( image_data, description=description, photometric=photometric, metadata=None, planarconfig=planarconfig, compression=TIFF.COMPRESSION.ADOBE_DEFLATE, ) tif.close()
@staticmethod def _resolve_OME_dimension_order( shape: Tuple[int, ...], dimension_order: Union[str, None] ) -> Tuple[str, bool]: """ Do some dimension validation and return an ome-compatible 5D dimension order and whether the data is rgb multisample Parameters ---------- shape: Tuple[int, ...] A data array shape dimension_order: Union[str, None] A dimension order string, composed of some subset of TCZYXS Returns ------- Tuple[str, bool] An OME-compatible 5D dimension_order string and a boolean for whether the data shape had rgb samples """ ndims = len(shape) if ndims > 5 and (shape[-1] != 3 and shape[-1] != 4): raise ValueError( f"Passed in greater than 5D data but last dimension is not 3 or 4: " f"{shape[-1]}" ) if dimension_order is not None and len(dimension_order) != ndims: raise exceptions.InvalidDimensionOrderingError( f"Dimension order string has {len(dimension_order)} dims but data " f"shape has {ndims} dims" ) # data is rgb if last dimension is S and its size is 3 or 4 is_rgb = False if dimension_order is None: # we will only guess rgb here if ndims > 5 # I could make a better guess if I look at any ome-xml passed in is_rgb = ndims > 5 and (shape[-1] == 3 or shape[-1] == 4) dimension_order = ( DEFAULT_DIMENSION_ORDER_WITH_SAMPLES if is_rgb else DEFAULT_DIMENSION_ORDER ) else: is_rgb = dimension_order[-1] == DimensionNames.Samples and ( shape[-1] == 3 or shape[-1] == 4 ) if (ndims > 5 and not is_rgb) or ndims > 6 or ndims < 2: raise ValueError( f"Data array has unexpected number of dimensions: is_rgb = {is_rgb} " f"and shape is {shape}" ) # assert valid characters in dimension_order if not ( all(d in DEFAULT_DIMENSION_ORDER_LIST_WITH_SAMPLES for d in dimension_order) ): raise exceptions.InvalidDimensionOrderingError( f"Invalid dimension_order {dimension_order}" ) if dimension_order.find(DimensionNames.Samples) > -1 and not is_rgb: raise exceptions.InvalidDimensionOrderingError( "Samples must be last dimension if present, and only S=3 or 4 is \ supported." ) if dimension_order[-2:] != "YX" and dimension_order[-3:] != "YXS": raise exceptions.InvalidDimensionOrderingError( f"Last characters of dimension_order {dimension_order} expected to \ be YX or YXS. Please transpose your data." ) # remember whether S was a dim or not, and remove it for now if is_rgb: ndims = ndims - 1 dimension_order = dimension_order[:-1] # expand to 5D and add appropriate dimensions if len(dimension_order) == 2: dimension_order = "TCZ" + dimension_order # expand to 5D and add appropriate dimensions elif len(dimension_order) == 3: # prepend either TC, TZ or CZ if dimension_order[0] == DimensionNames.Time: dimension_order = "CZ" + dimension_order elif dimension_order[0] == DimensionNames.Channel: dimension_order = "TZ" + dimension_order elif dimension_order[0] == DimensionNames.SpatialZ: dimension_order = "TC" + dimension_order # expand to 5D and add appropriate dimensions elif len(dimension_order) == 4: # prepend either T, C, or Z first2 = dimension_order[:2] if first2 == "TC" or first2 == "CT": dimension_order = DimensionNames.SpatialZ + dimension_order elif first2 == "TZ" or first2 == "ZT": dimension_order = DimensionNames.Channel + dimension_order elif first2 == "CZ" or first2 == "ZC": dimension_order = DimensionNames.Time + dimension_order return dimension_order, is_rgb @staticmethod def _size_of_ndarray(data: List[types.ArrayLike]) -> int: """ Calculate the size of data to determine if we require bigtiff Parameters ---------- data: list of data arrays, one per image to be saved to tiff Returns ------- the total size of data in bytes """ size = 0 for i in range(len(data)): size += data[i].size * data[i].itemsize return size @staticmethod def _extend_data_shape(shape: Tuple[int, ...], num_dims: int) -> Tuple[int, ...]: # extend data shape to be same len as dimension_order if len(shape) < num_dims: shape = tuple([1] * (num_dims - len(shape))) + shape return shape @staticmethod def _build_ome_image( image_index: int = 0, tiff_plane_offset: int = 0, data_shape: Tuple[int, ...] = (1, 1, 1, 1, 1), data_dtype: np.dtype = np.dtype(np.uint8), is_rgb: bool = False, dimension_order: str = DEFAULT_DIMENSION_ORDER, image_name: Optional[str] = "I0", physical_pixel_sizes: types.PhysicalPixelSizes = types.PhysicalPixelSizes( None, None, None ), channel_names: List[str] = None, channel_colors: Optional[List[List[int]]] = None, ) -> Image: if len(data_shape) < 2 or len(data_shape) > 6: raise ValueError(f"Bad OME image shape length: {data_shape}") # extend data shape to be same len as dimension_order, accounting for rgb if is_rgb: data_shape = OmeTiffWriter._extend_data_shape( data_shape, len(dimension_order) + 1 ) else: data_shape = OmeTiffWriter._extend_data_shape( data_shape, len(dimension_order) ) def dim_or_1(dim: str) -> int: idx = dimension_order.find(dim) return 1 if idx == -1 else data_shape[idx] channel_count = dim_or_1(DimensionNames.Channel) if len(dimension_order) != 5: raise ValueError(f"Unrecognized OME TIFF dimension order {dimension_order}") for c in dimension_order: if c not in DEFAULT_DIMENSION_ORDER: raise ValueError(f"Unrecognized OME TIFF dimension {c}") if isinstance(channel_names, list) and len(channel_names) != channel_count: raise ValueError(f"Wrong number of channel names {len(channel_names)}") if isinstance(channel_colors, list) and len(channel_colors) != channel_count: raise ValueError( f"Wrong number of channel colors. " f"Received: {len(channel_colors)} ({channel_colors}) " f"Expected: {channel_count}." ) samples_per_pixel = 1 if is_rgb: samples_per_pixel = data_shape[-1] # dimension_order must be set to the *reverse* of what dimensionality # the ome tif file is saved as pixels = Pixels( id=f"Pixels:{image_index}:0", dimension_order=dimension_order[::-1], type=utils.dtype_to_ome_type(data_dtype), size_t=dim_or_1(DimensionNames.Time), size_c=channel_count * samples_per_pixel, size_z=dim_or_1(DimensionNames.SpatialZ), size_y=dim_or_1(DimensionNames.SpatialY), size_x=dim_or_1(DimensionNames.SpatialX), interleaved=True if samples_per_pixel > 1 else None, ) if physical_pixel_sizes.Z is None or physical_pixel_sizes.Z == 0: pixels.physical_size_z = None else: pixels.physical_size_z = PositiveFloat(physical_pixel_sizes.Z) if physical_pixel_sizes.Y is None or physical_pixel_sizes.Y == 0: pixels.physical_size_y = None else: pixels.physical_size_y = PositiveFloat(physical_pixel_sizes.Y) if physical_pixel_sizes.X is None or physical_pixel_sizes.X == 0: pixels.physical_size_x = None else: pixels.physical_size_x = PositiveFloat(physical_pixel_sizes.X) # one single tiffdata indicating sequential tiff IFDs based on dimension_order pixels.tiff_data_blocks = [ TiffData( plane_count=pixels.size_t * channel_count * pixels.size_z, ifd=tiff_plane_offset, ) ] pixels.channels = [ Channel(samples_per_pixel=samples_per_pixel) for i in range(channel_count) ] if channel_names is None: for i in range(channel_count): pixels.channels[i].id = ChannelID( utils.generate_ome_channel_id(str(image_index), i) ) pixels.channels[i].name = "C:" + str(i) else: for i in range(channel_count): name = channel_names[i] pixels.channels[i].id = ChannelID( utils.generate_ome_channel_id(str(image_index), i) ) pixels.channels[i].name = name if channel_colors is not None: assert len(channel_colors) >= pixels.size_c for i in range(channel_count): this_channel_color_def = channel_colors[i] if len(this_channel_color_def) != 3: raise ValueError( f"Expected RGB (3) color definition for channel color. " f"Received {len(this_channel_color_def)} values " f"({this_channel_color_def}) for image {image_index} " f"channel {i}." ) else: # Handle List[int] -> Tuple[int, int, int] for color def # Naive cast of tuple(List[int]) generates type: Tuple[int, ...] this_channel_color = ( this_channel_color_def[0], this_channel_color_def[1], this_channel_color_def[2], ) pixels.channels[i].color = Color(this_channel_color) img = Image( name=image_name, id=utils.generate_ome_image_id(str(image_index)), pixels=pixels, ) return img
[docs] @staticmethod def build_ome( data_shapes: List[Tuple[int, ...]], data_types: List[np.dtype], dimension_order: Optional[List[Optional[str]]] = None, channel_names: Optional[List[Optional[List[str]]]] = None, image_name: List[Optional[str]] = None, physical_pixel_sizes: List[types.PhysicalPixelSizes] = None, channel_colors: List[Optional[List[List[int]]]] = None, ) -> OME: """ Create the necessary metadata for an OME tiff image Parameters ---------- data_shapes: A list of 5- or 6-d tuples data_types: A list of data types dimension_order: The order of dimensions in the data array, using T,C,Z,Y,X and optionally S channel_names: The names for each channel to be put into the OME metadata image_name: The name of the image to be put into the OME metadata physical_pixel_sizes: Z,Y, and X physical dimensions of each pixel, defaulting to microns channel_colors: List of all images channel colors to be put into the OME metadata is_rgb: is a S dimension present? S is expected to be the last dim in the data shape Returns ------- OME An OME object that can be converted to a valid OME-XML string """ num_images = len(data_shapes) # resolve defaults that are None if dimension_order is None: dimension_order = [None] * num_images if channel_names is None: channel_names = [None] * num_images if image_name is None: image_name = [None] * num_images if physical_pixel_sizes is None: physical_pixel_sizes = [ types.PhysicalPixelSizes(None, None, None) ] * num_images if channel_colors is None: channel_colors = [None] * num_images # assert all lists are same length if ( num_images != len(data_types) or num_images != len(dimension_order) or num_images != len(channel_names) or num_images != len(image_name) or num_images != len(physical_pixel_sizes) or num_images != len(channel_colors) ): raise ValueError("Mismatched array counts in parameters to build_ome") images = [] tiff_plane_offset = 0 for image_index in range(len(data_shapes)): # correct the dimension_order for ome ome_dimension_order, is_rgb = OmeTiffWriter._resolve_OME_dimension_order( data_shapes[image_index], dimension_order[image_index] ) img = OmeTiffWriter._build_ome_image( image_index, tiff_plane_offset, data_shapes[image_index], data_types[image_index], is_rgb, ome_dimension_order, image_name[image_index], physical_pixel_sizes[image_index], channel_names[image_index], channel_colors[image_index], ) # increment tiff_plane_offset for next image tiff_plane_offset += ( img.pixels.size_z * img.pixels.size_t * len(img.pixels.channels) ) images.append(img) ome_object = OME(creator=f"aicsimageio {get_module_version()}", images=images) # validate! (TODO: Is there a better api in ome-types for this?) test = to_xml(ome_object) from_xml(test) return ome_object
@staticmethod def _check_ome_dims( ome_xml: OME, image_index: int, data_shape: Tuple, data_dtype: np.dtype ) -> None: if len(ome_xml.images) < 1: raise ValueError("OME has no images") # look at number of samples from first channel only (possible bad assumption) samples = ome_xml.images[image_index].pixels.channels[0].samples_per_pixel # reverse the OME dimension order to compare against numpy shape dimension_order = ome_xml.images[image_index].pixels.dimension_order.value[::-1] dims = { DimensionNames.Time: ome_xml.images[image_index].pixels.size_t, DimensionNames.Channel: ome_xml.images[image_index].pixels.size_c, DimensionNames.SpatialZ: ome_xml.images[image_index].pixels.size_z, DimensionNames.SpatialY: ome_xml.images[image_index].pixels.size_y, DimensionNames.SpatialX: ome_xml.images[image_index].pixels.size_x, } if samples is not None and samples > 1: dims[DimensionNames.Channel] = PositiveInt( len(ome_xml.images[image_index].pixels.channels) ) dims[DimensionNames.Samples] = samples dimension_order += DimensionNames.Samples expected_shape = tuple(dims[i] for i in dimension_order) data_shape = OmeTiffWriter._extend_data_shape(data_shape, len(dimension_order)) if expected_shape != data_shape: raise ValueError( f"OME shape {expected_shape} is not the same as data array shape: \ {data_shape}" ) expected_type = utils.ome_to_numpy_dtype( ome_xml.images[image_index].pixels.type ) if expected_type != data_dtype: raise ValueError( f"OME pixel type {expected_type.name} is not the same as data array type: \ {data_dtype.name}" )