Source code for aicsimageio.readers.sldy_reader.sldy_image

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Set

import numpy as np
import yaml
from fsspec.spec import AbstractFileSystem

###############################################################################

log = logging.getLogger(__name__)

###############################################################################


[docs]class SldyImage: """ Representation of a single acquisition in a 3i slidebook (SLDY) image. Parameters ---------- fs: AbstractFileSystem The file system to used for reading. image_directory: types.PathLike Path to the image directory this is meant to represent. data_file_prefix: str, default = "ImageData" Prefix to the data files within this image directory to extract. """ _metadata: Optional[Dict[str, Optional[dict]]] = None @staticmethod def _yaml_mapping(loader: yaml.Loader, node: yaml.Node, deep: bool = False) -> dict: """ Static method intended to map key-value pairs found in image metadata yaml files to Python dictionaries. Necessary due to duplicate keys found in yaml files. Parameters ---------- loader: yaml.Loader Loader to attach the mapping to and extract data using. node: Any Representation of the node at which this is at in the nested metadata tree. deep: bool default False Whether or not metadata will be deeply extractly. Returns ------- mapping: dict Dictionary representation of the metadata in the node. """ mapping: dict = {} for key_node, value_node in node.value: key = loader.construct_object(key_node, deep=deep) value = loader.construct_object(value_node, deep=deep) # It seems slidebook classes are naively converted to yaml # files resulting in both duplicate keys mapped underneath # "StartClass" as well as duplicate classes if key == "StartClass": key = value["ClassName"] # Combine duplicate classes into a list if key in mapping: if not isinstance(mapping[key], list): mapping[key] = [mapping[key]] mapping[key].append(value) else: mapping[key] = value return mapping @staticmethod def _get_yaml_contents( fs: AbstractFileSystem, yaml_path: Path, is_required: bool = True ) -> Optional[dict]: """ Given a path to a yaml file will return a dictionary representation of the data found in the file. If the file does not exist will return `None` unless `is_required` is `True` in which case `FileNotFoundError` will be allowed to bubble up out of this method. Parameters ---------- fs: AbstractFileSystem The file system to used for reading. yaml_path: str The path to the file to read. is_required: bool default True If True, will not ignore `FileNotFoundError`s that occur while attempting to read in the yaml file. Returns ------- yaml_contents: Optional[dict] Optional dictionary representation of the contents of the yaml file. """ try: with fs.open(yaml_path) as f: return yaml.load(f, Loader=yaml.Loader) except FileNotFoundError: if is_required: raise log.debug(f"Unable to load metadata file {yaml_path}, ignoring") return None @staticmethod def _get_dim_to_data_path_map( data_paths: Set[Path], dim_prefix: str ) -> Dict[int, List[Path]]: """ Returns a dictionary mapping from an arbitrary dimension index to the list of data paths matching that dimension. Parameters ---------- data_paths: Set[Path] Set of data paths to compare against the dim_prefix. dim_prefix: str Prefix to the data paths, used to discern which dimension to read in. Returns ------- dim_to_data_path_map: Dict[int, List[Path]] Dictionary mapping from an arbitrary dimension index to the list of data paths matching that dimension. """ dim_to_data_paths: Dict[int, List[Path]] = {} for data_path in data_paths: file_name = data_path.stem search_result = re.search(rf"{dim_prefix}(\d*)", file_name) if search_result is not None: dim_match = search_result.group(0)[len(dim_prefix) :] dim = int(dim_match) if dim not in dim_to_data_paths: dim_to_data_paths[dim] = [] dim_to_data_paths[dim].append(data_path) return dim_to_data_paths @staticmethod def _cast_list(item: Any) -> List[Any]: if isinstance(item, list): return item return [item] def __init__( self, fs: AbstractFileSystem, image_directory: Path, data_file_prefix: str, channel_file_prefix: str = "_Ch", timepoint_file_prefix: str = "_TP", ): # Adjust mapping of yaml files to Python dictionaries to account # for duplicate keys found in slidebook yaml files yaml.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, SldyImage._yaml_mapping, yaml.Loader, ) self._fs = fs self.image_directory = image_directory self.id = self.image_directory.stem self._channel_record = SldyImage._get_yaml_contents( fs, image_directory / "ChannelRecord.yaml" ) self._image_record = SldyImage._get_yaml_contents( fs, image_directory / "ImageRecord.yaml" ) # Ensure both are read in successfully if self._channel_record is None or self._image_record is None: raise ValueError( "Something unexpected went wrong reading in channel and image records" ) lens_def = SldyImage._cast_list(self._image_record["CLensDef70"])[0] optovar_def = SldyImage._cast_list(self._image_record["COptovarDef70"])[0] exposure_record = SldyImage._cast_list( self._channel_record["CExposureRecord70"] )[0] micron_per_pixel = float(lens_def["mMicronPerPixel"]) optovar_mag = float(optovar_def["mMagnification"]) x_factor = float(exposure_record["mXFactor"]) y_factor = float(exposure_record["mYFactor"]) interplane_spacing = self._channel_record.get("mInterplaneSpacing") self.physical_pixel_size_x = micron_per_pixel / optovar_mag * x_factor self.physical_pixel_size_y = micron_per_pixel / optovar_mag * y_factor self.physical_pixel_size_z = ( float(interplane_spacing) if interplane_spacing is not None else None ) data_path_matcher = fs.glob(self.image_directory / f"{data_file_prefix}*.npy") self._data_paths = set([Path(data_path) for data_path in data_path_matcher]) # Create mapping of timepoint / channel to their respective data paths self._timepoint_to_data_paths = SldyImage._get_dim_to_data_path_map( self._data_paths, timepoint_file_prefix ) self._channel_to_data_paths = SldyImage._get_dim_to_data_path_map( self._data_paths, channel_file_prefix ) # Create simple sorted list of each timepoint and channel self.timepoints = sorted(self._timepoint_to_data_paths.keys()) self.channels = sorted(self._channel_to_data_paths.keys()) @property def metadata(self) -> Dict[str, Optional[dict]]: """ Returns a dictionary representing the metadata of this acquisition. Returns ------- metadata: Dict[str, dict] Simple mapping of metadata file names to the metadata extracted from them. Possibly different than the actual yaml due to mapping the yaml to Python dictionaries, specifically with duplicate keys. """ if self._metadata is None: self._metadata = { "annotation_record": SldyImage._get_yaml_contents( self._fs, self.image_directory / "AnnotationRecord.yaml", False ), "aux_data": SldyImage._get_yaml_contents( self._fs, self.image_directory / "AuxData.yaml", False ), "channel_record": self._channel_record, "elapsed_times": SldyImage._get_yaml_contents( self._fs, self.image_directory / "ElapsedTimes.yaml", False ), "image_record": self._image_record, "mask_record": SldyImage._get_yaml_contents( self._fs, self.image_directory / "MaskRecord.yaml", False ), "sa_position_data": SldyImage._get_yaml_contents( self._fs, self.image_directory / "SAPositionData.yaml", False ), "stage_position_data": SldyImage._get_yaml_contents( self._fs, self.image_directory / "StagePositionData.yaml", False ), } return self._metadata
[docs] def get_data( self, timepoint: Optional[int], channel: Optional[int], delayed: bool ) -> np.ndarray: """ Returns the image data for the given timepoint and channel if specified. If delayed, the data will be lazily read in. Parameters ---------- timepoint: Optional[int] Optional timepoint to get data about. channel: Optional[int] Optional channel to get data about. delayed: bool If True, the data will be lazily read in. Returns ------- data: np.ndarray Numpy representation of the image data found. """ data_paths = self._data_paths if timepoint is not None: data_paths = data_paths.intersection( self._timepoint_to_data_paths[timepoint] ) if channel is not None: data_paths = data_paths.intersection(self._channel_to_data_paths[channel]) if len(data_paths) != 1: raise ValueError( f"Expected to find 1 data path for timepoint {timepoint} " f"and channel {channel}, but instead found {len(data_paths)}." ) data = np.load(list(data_paths)[0], mmap_mode="r" if delayed else None) # Add empty Z dimension if not present already if len(data.shape) == 2: return np.array([data]) return data