Source code for wsireg.writers.ome_tiff_writer

from pathlib import Path
from typing import List, Optional, Tuple, Union

import cv2
import numpy as np
import SimpleITK as sitk
from tifffile import TiffWriter

from wsireg.reg_images.reg_image import RegImage
from wsireg.reg_transforms.reg_transform_seq import RegTransformSeq
from wsireg.utils.im_utils import (
    format_channel_names,
    get_pyramid_info,
    prepare_ome_xml_str,
)


[docs] class OmeTiffWriter: x_size: Optional[int] = None y_size: Optional[int] = None y_spacing: Optional[Union[int, float]] = None x_spacing: Optional[Union[int, float]] = None tile_size: int = 512 pyr_levels: Optional[List[Tuple[int, int]]] = None n_pyr_levels: Optional[int] = None PhysicalSizeY: Optional[Union[int, float]] = None PhysicalSizeX: Optional[Union[int, float]] = None subifds: Optional[int] = None compression: str = "deflate" def __init__( self, reg_image: RegImage, reg_transform_seq: Optional[RegTransformSeq] = None, ): """ Class for managing writing images to OME-TIFF. Parameters ---------- reg_image: RegImage RegImage to be transformed reg_transform_seq: RegTransformSeq or None Registration transformation sequence from wsireg to transform image Attibutes --------- x_size: int Size of the output image after transformation in x y_size: int Size of the output image after transformation in y y_spacing: float Pixel spacing in microns after transformation in y x_spacing: float Pixel spacing in microns after transformation in x tile_size: int Size of tiles to be written pyr_levels: list of tuples of int: Size of downsampled images in pyramid n_pyr_levels: int Number of downsamples in pyramid PhysicalSizeY: float physical size of image in micron for OME-TIFF in Y PhysicalSizeX: float physical size of image in micron for OME-TIFF in X subifds: int Number of sub-resolutions for pyramidal OME-TIFF compression: str tifffile string to pass to compression argument, defaults to "deflate" for minisblack and "jpeg" for RGB type images """ self.reg_image = reg_image self.reg_transform_seq = reg_transform_seq def _prepare_image_info( self, image_name: str, reg_transform_seq: Optional[RegTransformSeq] = None, write_pyramid: bool = True, tile_size: int = 512, compression: Optional[str] = "default", ) -> None: """Get image info and OME-XML""" if reg_transform_seq: self.x_size, self.y_size = reg_transform_seq.output_size self.x_spacing, self.y_spacing = reg_transform_seq.output_spacing else: self.y_size, self.x_size = ( (self.reg_image.shape[0], self.reg_image.shape[1]) if self.reg_image.is_rgb else (self.reg_image.shape[1], self.reg_image.shape[2]) ) self.y_spacing, self.x_spacing = None, None self.tile_size = tile_size # protect against too large tile size while ( self.y_size / self.tile_size <= 1 or self.x_size / self.tile_size <= 1 ): self.tile_size = self.tile_size // 2 self.pyr_levels, _ = get_pyramid_info( self.y_size, self.x_size, self.reg_image.n_ch, self.tile_size ) self.n_pyr_levels = len(self.pyr_levels) if reg_transform_seq: self.PhysicalSizeY = self.y_spacing self.PhysicalSizeX = self.x_spacing else: self.PhysicalSizeY = self.reg_image.image_res self.PhysicalSizeX = self.reg_image.image_res channel_names = format_channel_names( self.reg_image.channel_names, self.reg_image.n_ch ) self.omexml = prepare_ome_xml_str( self.y_size, self.x_size, self.reg_image.n_ch, self.reg_image.im_dtype, self.reg_image.is_rgb, PhysicalSizeX=self.PhysicalSizeX, PhysicalSizeY=self.PhysicalSizeY, PhysicalSizeXUnit="µm", PhysicalSizeYUnit="µm", Name=image_name, Channel=None if self.reg_image.is_rgb else {"Name": channel_names}, ) self.subifds = self.n_pyr_levels - 1 if write_pyramid is True else None if compression == "default": print("using default compression") self.compression = "jpeg" if self.reg_image.is_rgb else "deflate" else: self.compression = compression
[docs] def write_image_by_plane( self, image_name: str, output_dir: Union[Path, str] = "", write_pyramid: bool = True, tile_size: int = 512, compression: Optional[str] = "default", interpolation: int = cv2.INTER_LINEAR, blur_size: int = 0, subsample: bool = False, ) -> str: """ Write OME-TIFF image plane-by-plane to disk. WsiReg compatible RegImages all have methods to read an image channel-by-channel, thus each channel is read, transformed, and written to reduce memory during write. RGB images may run large memory footprints as they are interleaved before write, for RGB images, using the `OmeTiledTiffWriter` is recommended. Parameters ---------- image_name: str Name to be written WITHOUT extension for example if image_name = "cool_image" the file would be "cool_image.ome.tiff" output_dir: Path or str Directory where the image will be saved write_pyramid: bool Whether to write the OME-TIFF with sub-resolutions or not tile_size: int What size to write OME-TIFF tiles to disk compression: str tifffile string to pass to compression argument, defaults to "deflate" for minisblack and "jpeg" for RGB type images Returns ------- output_file_name: str File path to the written OME-TIFF """ output_file_name = str(Path(output_dir) / f"{image_name}.ome.tiff") self._prepare_image_info( image_name, reg_transform_seq=self.reg_transform_seq, write_pyramid=write_pyramid, tile_size=tile_size, compression=compression, ) rgb_im_data = [] print(f"saving to {output_file_name}") with TiffWriter(output_file_name, bigtiff=True) as tif: if self.reg_image.reader == "sitk": self.reg_image._read_full_image() for channel_idx in range(self.reg_image.n_ch): print(f"transforming : {channel_idx}") image = self.reg_image.read_single_channel(channel_idx) image = np.squeeze(image) image = sitk.GetImageFromArray(image) image.SetSpacing( (self.reg_image.image_res, self.reg_image.image_res) ) if self.reg_transform_seq: image = self.reg_transform_seq.resampler.Execute(image) # image = transform_plane( # image, final_transform, composite_transform # ) print(f"transformed : {channel_idx}") if self.reg_image.is_rgb: rgb_im_data.append(image) else: print("saving") if isinstance(image, sitk.Image): image = sitk.GetArrayFromImage(image) options = dict( tile=(self.tile_size, self.tile_size), compression=self.compression, photometric="rgb" if self.reg_image.is_rgb else "minisblack", metadata=None, ) # write OME-XML to the ImageDescription tag of the first page description = self.omexml if channel_idx == 0 else None # write channel data print( f" writing channel {channel_idx} - shape: {image.shape}" ) tif.write( image, subifds=self.subifds, description=description, **options, ) if write_pyramid: for pyr_idx in range(1, self.n_pyr_levels): resize_shape = ( self.pyr_levels[pyr_idx][0], self.pyr_levels[pyr_idx][1], ) if subsample: image = image[::2, ::2] else: if blur_size > 0: image = cv2.blur( image, (blur_size, blur_size) ) # image = cv2.blur(image, (3, 3)) image = cv2.resize( image, resize_shape, interpolation, ) print( f"pyramid index {pyr_idx} : channel {channel_idx} shape: {image.shape}" ) tif.write(image, **options, subfiletype=1) if self.reg_image.is_rgb: rgb_im_data = sitk.Compose(rgb_im_data) rgb_im_data = sitk.GetArrayFromImage(rgb_im_data) options = dict( tile=(self.tile_size, self.tile_size), compression=self.compression, photometric="rgb", metadata=None, ) # write OME-XML to the ImageDescription tag of the first page description = self.omexml # write channel data tif.write( rgb_im_data, subifds=self.subifds, description=description, **options, ) print(f"RGB shape: {rgb_im_data.shape}") if write_pyramid: for pyr_idx in range(1, self.n_pyr_levels): resize_shape = ( self.pyr_levels[pyr_idx][0], self.pyr_levels[pyr_idx][1], ) rgb_im_data = cv2.resize( rgb_im_data, resize_shape, cv2.INTER_LINEAR, ) tif.write(rgb_im_data, **options, subfiletype=1) return output_file_name