import multiprocessing
import warnings
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import List, Optional, Tuple, Union
import cv2
import dask.array as da
import numpy as np
import SimpleITK as sitk
import zarr
from czifile import CziFile
from tifffile import (
OmeXml,
TiffFile,
TiffWriter,
create_output,
imread,
xml2dict,
)
from wsireg.parameter_maps.preprocessing import BoundingBox, ImagePreproParams
from wsireg.utils.tform_utils import sitk_transform_image
TIFFFILE_EXTS = [".scn", ".tif", ".tiff", ".ndpi", ".svs"]
ARRAYLIKE_CLASSES = (np.ndarray, da.core.Array, zarr.Array)
SITK_TO_NP_DTYPE = {
0: np.int8,
1: np.uint8,
2: np.int16,
3: np.uint16,
4: np.int32,
5: np.uint32,
6: np.int64,
7: np.uint64,
8: np.float32,
9: np.float64,
10: np.complex64,
11: np.complex64,
12: np.int8,
13: np.uint8,
14: np.int16,
15: np.int16,
16: np.int32,
17: np.int32,
18: np.int64,
19: np.int64,
20: np.float32,
21: np.float64,
22: np.uint8,
23: np.uint16,
24: np.uint32,
25: np.uint64,
}
COLNAME_TO_HEX = {
"red": "FF0000",
"green": "00FF00",
"blue": "0000FF",
"magenta": "FF00FF",
"yellow": "FFFF00",
"cyan": "00FFFFF",
"white": "FFFFFF",
}
[docs]
def zarr_get_base_pyr_layer(zarr_store):
"""
Find the base pyramid layer of a zarr store
Parameters
----------
zarr_store
zarr store
Returns
-------
zarr_im: zarr.core.Array
zarr array of base layer
"""
if isinstance(zarr_store, zarr.hierarchy.Group):
zarr_im = zarr_store[str(0)]
elif isinstance(zarr_store, zarr.core.Array):
zarr_im = zarr_store
return zarr_im
[docs]
def ensure_dask_array(image):
if isinstance(image, da.core.Array):
return image
if isinstance(image, zarr.Array):
return da.from_zarr(image)
# handles np.ndarray _and_ other array like objects.
return da.from_array(image)
[docs]
def preprocess_dask_array(
array: da.Array, preprocessing: Optional[ImagePreproParams] = None
):
is_rgb = guess_rgb(array.shape)
if is_rgb:
if preprocessing:
image_out = np.asarray(grayscale(array, is_interleaved=is_rgb))
image_out = sitk.GetImageFromArray(image_out)
else:
image_out = np.asarray(array)
image_out = sitk.GetImageFromArray(image_out, isVector=True)
elif len(array.shape) == 2:
image_out = sitk.GetImageFromArray(np.asarray(array))
else:
if preprocessing:
if preprocessing.ch_indices and len(array.shape) > 2:
chs = list(preprocessing.ch_indices)
array = array[chs, :, :]
image_out = sitk.GetImageFromArray(np.squeeze(np.asarray(array)))
return image_out
[docs]
def read_preprocess_array(array, preprocessing, force_rgb=None):
"""Read np.array, zarr.Array, or dask.array image into memory
with preprocessing for registration."""
is_interleaved = guess_rgb(array.shape)
is_rgb = is_interleaved if not force_rgb else force_rgb
if is_rgb:
if preprocessing:
image_out = np.asarray(
grayscale(array, is_interleaved=is_interleaved)
)
image_out = sitk.GetImageFromArray(image_out)
else:
image_out = np.asarray(array)
if not is_interleaved:
image_out = np.rollaxis(image_out, 0, 3)
image_out = sitk.GetImageFromArray(image_out, isVector=True)
elif len(array.shape) == 2:
image_out = sitk.GetImageFromArray(np.asarray(array))
else:
if preprocessing:
if preprocessing.ch_indices and len(array.shape) > 2:
chs = list(preprocessing.ch_indices)
array = array[chs, :, :]
image_out = sitk.GetImageFromArray(np.squeeze(np.asarray(array)))
return image_out
[docs]
def tifffile_zarr_backend(
image_filepath, largest_series, preprocessing, force_rgb=None
):
"""
Read image with tifffile and use zarr to read data into memory
Parameters
----------
image_filepath: str
path to the image file
largest_series: int
index of the largest series in the image
preprocessing:
whether to do some read-time pre-processing
- greyscale conversion (at the tile level)
- read individual or range of channels (at the tile level)
Returns
-------
image: sitk.Image
image ready for other registration pre-processing
"""
print("using zarr backend")
zarr_series = imread(image_filepath, aszarr=True, series=largest_series)
zarr_store = zarr.open(zarr_series)
zarr_im = zarr_get_base_pyr_layer(zarr_store)
return read_preprocess_array(
zarr_im, preprocessing=preprocessing, force_rgb=force_rgb
)
[docs]
def tifffile_dask_backend(
image_filepath, largest_series, preprocessing, force_rgb=None
):
"""
Read image with tifffile and use dask to read data into memory
Parameters
----------
image_filepath: str
path to the image file
largest_series: int
index of the largest series in the image
preprocessing:
whether to do some read-time pre-processing
- greyscale conversion (at the tile level)
- read individual or range of channels (at the tile level)
Returns
-------
image: sitk.Image
image ready for other registration pre-processing
"""
print("using dask backend")
zarr_series = imread(image_filepath, aszarr=True, series=largest_series)
zarr_store = zarr.open(zarr_series)
dask_im = da.squeeze(da.from_zarr(zarr_get_base_pyr_layer(zarr_store)))
return read_preprocess_array(
dask_im, preprocessing=preprocessing, force_rgb=force_rgb
)
[docs]
def sitk_backend(image_filepath, preprocessing):
"""
Read image with SimpleITK..this will always read the full image into memory
Parameters
----------
image_filepath: str
path to the image file
preprocessing:
whether to do some read-time pre-processing
- greyscale conversion (at the tile level)
- read individual or range of channels (at the tile level)
Returns
-------
image: sitk.Image
image ready for other registration pre-processing
"""
print("using sitk backend")
image = sitk.ReadImage(image_filepath)
if image.GetNumberOfComponentsPerPixel() >= 3:
if preprocessing is not None:
image = sitk_vect_to_gs(image)
elif image.GetDepth() == 0:
return image
else:
if preprocessing is not None:
if (
preprocessing.get("ch_indices") is not None
and image.GetDepth() > 0
):
print("here")
chs = np.asarray(preprocessing.get('ch_indices'))
image = image[:, :, chs]
return image
[docs]
def guess_rgb(shape):
"""
Guess if the passed shape comes from rgb data.
If last dim is 3 or 4 assume the data is rgb, including rgba.
Parameters
----------
shape : list of int
Shape of the data that should be checked.
Returns
-------
bool
If data is rgb or not.
"""
ndim = len(shape)
last_dim = shape[-1]
if ndim > 2 and last_dim < 5:
rgb = True
else:
rgb = False
return rgb
[docs]
def grayscale(rgb_image, is_interleaved=False):
"""
convert RGB image data to greyscale
Parameters
----------
rgb_image: np.ndarray
image data
Returns
-------
image:np.ndarray
returns 8-bit greyscale image for 24-bit RGB image
"""
if is_interleaved is True:
result = (
(rgb_image[..., 0] * 0.2125).astype(np.uint8)
+ (rgb_image[..., 1] * 0.7154).astype(np.uint8)
+ (rgb_image[..., 2] * 0.0721).astype(np.uint8)
)
else:
result = (
(rgb_image[0, ...] * 0.2125).astype(np.uint8)
+ (rgb_image[1, ...] * 0.7154).astype(np.uint8)
+ (rgb_image[2, ...] * 0.0721).astype(np.uint8)
)
return result
[docs]
def czi_tile_grayscale(rgb_image):
"""
convert RGB image data to greyscale
Parameters
----------
rgb_image: np.ndarray
image data
Returns
-------
image:np.ndarray
returns 8-bit greyscale image for 24-bit RGB image
"""
result = (
(rgb_image[..., 0] * 0.2125).astype(np.uint8)
+ (rgb_image[..., 1] * 0.7154).astype(np.uint8)
+ (rgb_image[..., 2] * 0.0721).astype(np.uint8)
)
return np.expand_dims(result, axis=-1)
[docs]
class CziRegImageReader(CziFile):
"""
Sub-class of CziFile with added functionality to only read certain channels
"""
[docs]
def sub_asarray(
self,
resize=True,
order=0,
out=None,
max_workers=None,
channel_idx=None,
as_uint8=False,
):
"""Return image data from file(s) as numpy array.
Parameters
----------
resize : bool
If True (default), resize sub/supersampled subblock data.
order : int
The order of spline interpolation used to resize sub/supersampled
subblock data. Default is 0 (nearest neighbor).
out : numpy.ndarray, str, or file-like object; optional
Buffer where image data will be saved.
If numpy.ndarray, a writable array of compatible dtype and shape.
If str or open file, the file name or file object used to
create a memory-map to an array stored in a binary file on disk.
max_workers : int
Maximum number of threads to read and decode subblock data.
By default up to half the CPU cores are used.
channel_idx : int or list of int
The indices of the channels to extract
as_uint8 : bool
byte-scale image data to np.uint8 data type
Parameters
----------
out:np.ndarray
image read with selected parameters as np.ndarray
"""
out_shape = list(self.shape)
start = list(self.start)
ch_dim_idx = self.axes.index('C')
if channel_idx is not None:
if isinstance(channel_idx, int):
channel_idx = [channel_idx]
if out_shape[ch_dim_idx] == 1:
channel_idx = None
else:
out_shape[ch_dim_idx] = len(channel_idx)
min_ch_seq = {}
for idx, i in enumerate(channel_idx):
min_ch_seq.update({i: idx})
if as_uint8 is True:
out_dtype = np.uint8
else:
out_dtype = self.dtype
if out is None:
out = create_output(None, tuple(out_shape), out_dtype)
if max_workers is None:
max_workers = multiprocessing.cpu_count() - 1
def func(
directory_entry, resize=resize, order=order, start=start, out=out
):
"""Read, decode, and copy subblock data."""
subblock = directory_entry.data_segment()
dvstart = list(directory_entry.start)
czi_c_idx = [
de.dimension for de in subblock.dimension_entries
].index('C')
subblock_ch_idx = subblock.dimension_entries[czi_c_idx].start
if channel_idx is not None:
if subblock_ch_idx in channel_idx:
subblock.dimension_entries[czi_c_idx].start
tile = subblock.data(resize=resize, order=order)
dvstart[ch_dim_idx] = min_ch_seq.get(subblock_ch_idx)
else:
return
else:
tile = subblock.data(resize=resize, order=order)
if as_uint8 is True:
tile = (tile / 256).astype("uint8")
index = tuple(
slice(i - j, i - j + k)
for i, j, k in zip(tuple(dvstart), tuple(start), tile.shape)
)
try:
out[index] = tile
except ValueError as e:
warnings.warn(str(e))
if max_workers > 1:
self._fh.lock = True
with ThreadPoolExecutor(max_workers) as executor:
executor.map(func, self.filtered_subblock_directory)
self._fh.lock = None
else:
for directory_entry in self.filtered_subblock_directory:
func(directory_entry)
if hasattr(out, "flush"):
out.flush()
return out
[docs]
def sub_asarray_rgb(
self,
resize=True,
order=0,
out=None,
max_workers=None,
channel_idx=None,
as_uint8=False,
greyscale=False,
):
"""Return image data from file(s) as numpy array.
Parameters
----------
resize : bool
If True (default), resize sub/supersampled subblock data.
order : int
The order of spline interpolation used to resize sub/supersampled
subblock data. Default is 0 (nearest neighbor).
out : numpy.ndarray, str, or file-like object; optional
Buffer where image data will be saved.
If numpy.ndarray, a writable array of compatible dtype and shape.
If str or open file, the file name or file object used to
create a memory-map to an array stored in a binary file on disk.
max_workers : int
Maximum number of threads to read and decode subblock data.
By default up to half the CPU cores are used.
channel_idx : int or list of int
The indices of the channels to extract
as_uint8 : bool
byte-scale image data to np.uint8 data type
Parameters
----------
out:np.ndarray
image read with selected parameters as np.ndarray
"""
out_shape = list(self.shape)
start = list(self.start)
ch_dim_idx = self.axes.index('0')
if channel_idx is not None:
if isinstance(channel_idx, int):
channel_idx = [channel_idx]
out_shape[ch_dim_idx] = len(channel_idx)
if greyscale is True:
out_shape[ch_dim_idx] = 1
if as_uint8 is True:
out_dtype = np.uint8
else:
out_dtype = self.dtype
if out is None:
out = create_output(None, tuple(out_shape), out_dtype)
if max_workers is None:
max_workers = multiprocessing.cpu_count() - 1
def func(
directory_entry, resize=resize, order=order, start=start, out=out
):
"""Read, decode, and copy subblock data."""
subblock = directory_entry.data_segment()
dvstart = list(directory_entry.start)
tile = subblock.data(resize=resize, order=order)
if greyscale is True:
tile = czi_tile_grayscale(tile)
if channel_idx is not None:
tile = tile[:, :, :, :, :, channel_idx]
index = tuple(
slice(i - j, i - j + k)
for i, j, k in zip(tuple(dvstart), tuple(start), tile.shape)
)
try:
out[index] = tile
except ValueError as e:
warnings.warn(str(e))
if max_workers > 1:
self._fh.lock = True
with ThreadPoolExecutor(max_workers) as executor:
executor.map(func, self.filtered_subblock_directory)
self._fh.lock = None
else:
for directory_entry in self.filtered_subblock_directory:
func(directory_entry)
if hasattr(out, "flush"):
out.flush()
return out
[docs]
def tf_get_largest_series(image_filepath: Union[str, Path]) -> int:
"""
Determine largest series for .scn files by examining metadata
For other multi-series files, find the one with the most pixels
Parameters
----------
image_filepath: str
path to the image file
Returns
-------
largest_series:int
index of the largest series in the image data
"""
fp_ext = Path(image_filepath).suffix.lower()
tf_im = TiffFile(image_filepath)
if fp_ext == ".scn":
scn_meta = xml2dict(tf_im.scn_metadata)
image_meta = scn_meta.get("scn").get("collection").get("image")
largest_series = np.argmax(
[
im.get("scanSettings")
.get("objectiveSettings")
.get("objective")
for im in image_meta
]
)
else:
largest_series = np.argmax(
[
np.prod(np.asarray(series.shape), dtype=np.int64)
for series in tf_im.series
]
)
return int(largest_series)
[docs]
def get_sitk_image_info(image_filepath):
"""
Get image info for files only ready by SimpleITK
Parameters
----------
image_filepath:str
filepath to image
Returns
-------
im_dims: np.ndarray
image dimensions in np.ndarray
im_dtype: np.dtype
data type of the image
"""
reader = sitk.ImageFileReader()
reader.SetFileName(image_filepath)
reader.LoadPrivateTagsOn()
reader.ReadImageInformation()
im_dims = np.asarray(reader.GetSize())
# swap to YX
im_dims[[0, 1]] = im_dims[[1, 0]]
im_dtype = np.dtype(SITK_TO_NP_DTYPE.get(reader.GetPixelID()))
is_vector = sitk.GetPixelIDValueAsString(reader.GetPixelID())
if "vector" in is_vector:
im_dims = np.append(im_dims, 3)
elif len(im_dims) == 3:
im_dims = im_dims[[2, 0, 1]]
else:
im_dims = np.concatenate([[1], im_dims], axis=0)
return im_dims, im_dtype
[docs]
def tifffile_to_dask(
im_fp: Union[str, Path], largest_series: int, level=int
) -> Union[da.Array, List[da.Array]]:
imdata = zarr.open(
imread(im_fp, aszarr=True, series=largest_series, level=level)
)
if isinstance(imdata, zarr.hierarchy.Group):
imdata = [da.from_zarr(imdata[z]) for z in imdata.array_keys()]
else:
imdata = da.from_zarr(imdata)
return imdata
[docs]
def get_tifffile_info(
image_filepath: Union[str, Path]
) -> Tuple[Tuple[int, int, int], np.dtype, int]:
largest_series = tf_get_largest_series(image_filepath)
zarr_im = zarr.open(
imread(image_filepath, aszarr=True, series=largest_series)
)
zarr_im = zarr_get_base_pyr_layer(zarr_im)
im_dims = np.squeeze(zarr_im.shape)
if len(im_dims) == 2:
im_dims = np.concatenate([[1], im_dims])
im_dtype = zarr_im.dtype
return im_dims, im_dtype, largest_series
[docs]
def tf_zarr_read_single_ch(
image_filepath, channel_idx, is_rgb, is_rgb_interleaved=True
):
"""
Reads a single channel using zarr or dask in combination with tifffile
Parameters
----------
image_filepath:str
file path to image
channel_idx:int
index of the channel to be read
is_rgb:bool
whether image is rgb interleaved
Returns
-------
im:np.ndarray
image as a np.ndarray
"""
largest_series = tf_get_largest_series(image_filepath)
zarr_im = zarr.open(
imread(image_filepath, aszarr=True, series=largest_series)
)
zarr_im = zarr_get_base_pyr_layer(zarr_im)
try:
im = da.squeeze(da.from_zarr(zarr_im))
if is_rgb and is_rgb_interleaved is True:
im = im[:, :, channel_idx].compute()
elif len(im.shape) > 2:
im = im[channel_idx, :, :].compute()
else:
im = im.compute()
except ValueError:
im = zarr_im
if is_rgb is True and is_rgb_interleaved is True:
im = im[:, :, channel_idx]
elif len(im.shape) > 2:
im = im[channel_idx, :, :].compute()
else:
im = im.compute()
return im
[docs]
def czi_read_single_ch(image_filepath, channel_idx):
czi = CziRegImageReader(image_filepath)
im = czi.sub_asarray(
channel_idx=channel_idx,
)
return im
[docs]
def calc_pyramid_levels(xy_final_shape, tile_size):
"""
Calculate number of pyramids for a given image dimension and tile size
Stops when further downsampling would be smaller than tile_size.
Parameters
----------
xy_final_shape:np.ndarray
final shape in xy order
tile_size: int
size of the tiles in the pyramidal layers
Returns
-------
res_shapes:list
list of tuples of the shapes of the downsampled images
"""
res_shape = xy_final_shape[::-1]
res_shapes = [tuple(res_shape)]
while all(res_shape > tile_size):
res_shape = res_shape // 2
res_shapes.append(tuple(res_shape))
return res_shapes[:-1]
[docs]
def add_ome_axes_single_plane(image_np):
"""
Reshapes np.ndarray image to match OME-zarr standard
Parameters
----------
image_np:np.ndarray
image to which additional axes are added to meet OME-zarr standard
Returns
-------
image_np:np.ndarray
reshaped image array
"""
return image_np.reshape((1,) * (3) + image_np.shape)
[docs]
def generate_channels(channel_names, channel_colors, im_dtype):
"""
Generate OME-zarr channels metadata
Parameters
----------
channel_names:list
channel_colors:list
im_dtype:np.dtype
Returns
-------
channel_info:list
list of dicts containing OME-zarr channel info
"""
channel_info = []
for channel_name, channel_color in zip(channel_names, channel_colors):
channel_info.append(
{
"label": channel_name,
"color": channel_color,
"active": True,
"window": {"start": 0, "end": int(np.iinfo(im_dtype).max)},
}
)
return channel_info
[docs]
def get_pyramid_info(y_size, x_size, n_ch, tile_size):
"""
Get pyramidal info for OME-zarr output
Parameters
----------
y_size: int
y dimension of base layer
x_size:int
x dimension of base layer
n_ch:int
number of channels in the image
tile_size:int
tile size of the image
Returns
-------
pyr_levels
pyramidal levels
pyr_shapes:
OME-zarr pyramid shapes for all levels
"""
yx_size = np.asarray([y_size, x_size], dtype=np.int32)
pyr_levels = calc_pyramid_levels(yx_size, tile_size)
pyr_shapes = [(1, n_ch, 1, int(pl[0]), int(pl[1])) for pl in pyr_levels]
return pyr_levels, pyr_shapes
[docs]
def prepare_ome_zarr_group(
zarr_store_dir,
y_size,
x_size,
n_ch,
im_dtype,
tile_size=512,
channel_names=None,
channel_colors=None,
):
"""
Prepare OME-zarr store with all meta data and channel info and initialize store
Parameters
----------
zarr_store_dir:str
filepath to zarr store
y_size: int
y dimension of base layer
x_size:int
x dimension of base layer
n_ch:int
number of channels in the image
im_dtype:np.dtype
data type of the image
tile_size:int
tile size of the image
channel_names:list
list of str channel names
channel_colors:
list of hex or str channel colors
Returns
-------
grp: zarr.store
initialized store
n_pyr_levels: int
number of sub-resolutions
pyr_levels: list
shapes of sub-resolutions
"""
store = zarr.DirectoryStore(zarr_store_dir)
grp = zarr.group(store, overwrite=True)
zarr_dtype = "{}{}".format(im_dtype.kind, im_dtype.itemsize)
pyr_levels, pyr_shapes = get_pyramid_info(x_size, y_size, n_ch, tile_size)
paths = []
for path, pyr_shape in enumerate(pyr_levels):
grp.create_dataset(
str(path),
shape=pyr_shapes[path],
dtype=zarr_dtype,
chunks=(1, 1, 1, tile_size, tile_size),
)
paths.append({"path": str(path)})
multiscales = [
{
"version": "0.1",
"datasets": paths,
}
]
grp.attrs["multiscales"] = multiscales
n_pyr_levels = len(paths)
channel_names = format_channel_names(channel_names, n_ch)
n_colors = n_ch // len(COLNAME_TO_HEX) + 1
color_palette = [*COLNAME_TO_HEX] * n_colors
if channel_colors is None:
channel_colors = [color_palette[idx] for idx in range(n_ch)]
elif n_ch != len(channel_colors) and n_ch != 1:
channel_colors = [color_palette[idx] for idx in range(n_ch)]
elif n_ch != len(channel_colors) and n_ch == 1:
channel_colors = ["FFFFFF"]
else:
channel_colors = [COLNAME_TO_HEX[ch] for ch in channel_colors]
channel_info = generate_channels(channel_names, channel_colors, im_dtype)
image_data = {
'id': 1,
'channels': channel_info,
'rdefs': {
'model': 'color',
},
}
grp.attrs["omero"] = image_data
return grp, n_pyr_levels, pyr_levels
[docs]
def image_to_zarr_store(zgrp, image, channel_idx, n_pyr_levels, pyr_levels):
"""
Write image into zarr store with sub resolutions
Parameters
----------
grp: zarr.store
initialized store
image:sitk.Image
image
channel_idx:int
which channel the image represents
n_pyr_levels: int
pyr_levels: list
"""
for pyr_idx in range(n_pyr_levels):
if pyr_idx == 0:
image = sitk.GetArrayFromImage(image)
else:
resize_shape = (
pyr_levels[pyr_idx][1],
pyr_levels[pyr_idx][0],
)
image = cv2.resize(image, resize_shape, cv2.INTER_LINEAR)
zgrp[str(pyr_idx)][
:, channel_idx : channel_idx + 1, :, :, :
] = add_ome_axes_single_plane(image)
[docs]
def prepare_ome_xml_str(
y_size, x_size, n_ch, im_dtype, is_rgb, **ome_metadata
):
omexml = OmeXml()
if is_rgb:
stored_shape = (1, 1, 1, y_size, x_size, n_ch)
im_shape = (y_size, x_size, n_ch)
else:
stored_shape = (n_ch, 1, 1, y_size, x_size, 1)
im_shape = (n_ch, y_size, x_size)
omexml.addimage(
dtype=im_dtype,
shape=im_shape,
# specify how the image is stored in the TIFF file
storedshape=stored_shape,
**ome_metadata,
)
return omexml.tostring().encode("utf8")
[docs]
def compute_mask_to_bbox(mask, mask_padding=100):
mask.SetSpacing((1, 1))
mask_size = mask.GetSize()
mask = sitk.Threshold(mask, 1, 255)
mask = sitk.ConnectedComponent(mask)
labstats = sitk.LabelShapeStatisticsImageFilter()
labstats.SetBackgroundValue(0)
labstats.ComputePerimeterOff()
labstats.ComputeFeretDiameterOff()
labstats.ComputeOrientedBoundingBoxOff()
labstats.Execute(mask)
bb_points = []
for label in labstats.GetLabels():
x1, y1, xw, yh = labstats.GetBoundingBox(label)
x2, y2 = x1 + xw, y1 + yh
lab_points = np.asarray([[x1, y1], [x2, y2]])
bb_points.append(lab_points)
bb_points = np.concatenate(bb_points)
x_min = np.min(bb_points[:, 0])
y_min = np.min(bb_points[:, 1])
x_max = np.max(bb_points[:, 0])
y_max = np.max(bb_points[:, 1])
if (x_min - mask_padding) < 0:
x_min = 0
else:
x_min -= mask_padding
if (y_min - mask_padding) < 0:
y_min = 0
else:
y_min -= mask_padding
if (x_max + mask_padding) > mask_size[0]:
x_max = mask_size[0]
else:
x_max += mask_padding
if (y_max + mask_padding) > mask_size[1]:
y_max = mask_size[1]
else:
y_max += mask_padding
x_width = x_max - x_min
y_height = y_max - y_min
return BoundingBox(x_min, y_min, x_width, y_height)
[docs]
def sitk_vect_to_gs(image):
"""
converts simpleITK RGB image to greyscale using cv2
Parameters
----------
image
SimpleITK image
Returns
-------
Greyscale SimpleITK image
"""
image = sitk.GetArrayFromImage(image)
if image.shape[2] == 3:
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
elif image.shape[2] == 4:
image = cv2.cvtColor(image, cv2.COLOR_BGRA2GRAY)
return sitk.GetImageFromArray(image, isVector=False)
[docs]
def sitk_max_int_proj(image: sitk.Image) -> sitk.Image:
"""
Finds maximum intensity projection of multi-channel SimpleITK image
Parameters
----------
image
multichannel impleITK image
Returns
-------
SimpleITK image
"""
# check if there are 3 dimensions (XYC)
if len(image.GetSize()) == 3:
return sitk.MaximumProjection(image, 2)[:, :, 0]
else:
print(
'cannot perform maximum intensity project on single channel image'
)
return image
[docs]
def sitk_inv_int(image):
"""
inverts intensity of images for registration, useful for alignment of brightfield
and fluorescence images
Parameters
----------
image
SimpleITK image s
Returns
-------
Intensity inverted SimpleITK image
"""
return sitk.InvertIntensity(image)
[docs]
def contrast_enhance(image):
spacing = image.GetSpacing()
image = sitk.GetArrayFromImage(image)
image = cv2.convertScaleAbs(image, alpha=7, beta=1)
image = sitk.GetImageFromArray(image)
image.SetSpacing(spacing)
return image
[docs]
def std_prepro():
"""
Catch all dictionary of preprocessing functions that will result in a single 2D image for
registration
Returns
-------
dictionary of processing parameters
"""
STD_PREPRO = {
'image_type': 'FL',
'ch_indices': None,
'as_uint8': True,
'downsample': 1,
'max_int_proj': sitk_max_int_proj,
'inv_int': sitk_inv_int,
}
return STD_PREPRO
[docs]
def tifffile_to_arraylike(image_filepath):
largest_series_idx = tf_get_largest_series(image_filepath)
image = zarr.open(
imread(image_filepath, aszarr=True, series=largest_series_idx)
)
if isinstance(image, zarr.Group):
image = image[0]
return image, image_filepath
[docs]
def ome_tifffile_to_arraylike(image_filepath):
ome_metadata = xml2dict(TiffFile(image_filepath).ome_metadata)
im_dims, im_dtype = get_tifffile_info(image_filepath)
largest_series_idx = tf_get_largest_series(image_filepath)
series_metadata = ome_metadata.get("OME").get("Image")
if isinstance(series_metadata, list):
series_metadata = series_metadata[largest_series_idx]
if isinstance(series_metadata.get("Pixels").get("Channel"), list):
samples_per_pixel = (
series_metadata.get("Pixels")
.get("Channel")[0]
.get("SamplesPerPixel")
)
else:
samples_per_pixel = (
series_metadata.get("Pixels").get("Channel").get("SamplesPerPixel")
)
is_rgb = guess_rgb(im_dims)
image = zarr.open(
imread(image_filepath, aszarr=True, series=largest_series_idx)
)
if isinstance(image, zarr.Group):
image = image[0]
image = da.from_zarr(image)
if samples_per_pixel:
if is_rgb is False and samples_per_pixel >= 3:
image = image.transpose(1, 2, 0)
return image, image_filepath