Source code for precon

from __future__ import annotations

from pathlib import Path
from typing import BinaryIO

from precon.parameter.models.enums import Enums, InPlaneTransform
from precon.parameter.models.label import Label, LabelStructC
from precon.parameter.models.parameter import Parameter
from precon.parameter.models.parameter2read import Parameter2Read
from precon.parameter.models.parfile import Parfile
from precon.parameter.models.recfile import Recfile
from precon.parameter.models.sensitivity import Sensitivity
from precon.parameter.sens import reconstruct_refscan as _reconstruct_refscan
from precon.parameter.sens import reformat_refscan as _reformat_refscan
from precon.recon.epi import epi_corr as _epi_corr
from precon.recon.epi import get_epi_corr_data as _get_epi_corr_data
from precon.recon.format import in_plane_transform as _in_plane_transform
from precon.recon.geo_corr import geo_corr as _geo_corr
from precon.recon.read import read as _read
from precon.recon.sense import sense_unfold as _sense_unfold
from precon.recon.sort import sort as _sort
from precon.recon.array_compression import compress_data as _compress_data
from precon.recon.array_compression import compress_sensitivity as _compress_sensitivity
from precon.recon.array_compression import (
    get_array_compression_matrix as _get_array_compression_matrix,
)
from precon.recon.cardiac import retro_binning as _retro_binning
from precon.recon.cardiac import retro_fill_holes as _retro_fill_holes
from precon.recon.crop import crop as _crop
from precon.recon.fft import i2k as _i2k
from precon.recon.fft import k2i as _k2i
from precon.recon.filter import hamming_filter as _hamming_filter
from precon.recon.flow import (
    concomitant_field_correction as _concomitant_field_correction,
)
from precon.recon.flow import divide_segments as _divide_segments
from precon.recon.flow import fit_flow_phase as _fit_flow_phase
from precon.recon.flow import format_flow as _format_flow
from precon.recon.homodyne import homodyne as _homodyne
from precon.recon.homodyne import is_partial_fourier as _is_partial_fourier
from precon.recon.kshift import kshift as _kshift
from precon.recon.sos import sos as _sos
from precon.recon.spectro import spectro_combine_coils as _spectro_combine_coils
from precon.recon.spectro import spectro_downsample as _spectro_downsample
from precon.recon.zeropad import zeropad as _zeropad
from precon.settings import Settings
from precon.utils import *
from precon.utils.helper import (
    as_fortran,
    convert_to_single,
    get_data_size,
    to_12d_single,
)
from precon.utils.math import round_up
from precon.recon import noncart


[docs]def calculate_sensitivities( refscan: Path, target_scan: Path, stack: int = 0, mix: int = 0, match_target_size: bool = False, qbc_stack: int = None, fov: List[float] = None, output_size: List[int] = None, ) -> Sensitivity: """Calculates the sensitivity maps from the SENSE reference scan in the geometry of the target scan. Args: refscan (Path): The path to the rawfile of the SENSE reference scan target_scan (Path): The path to the rawfile of the target scan stack (int, optional): The stack index of the target scan which is used to determine the geometry (only relevant for multistack data). Defaults to 0. mix (int, optional): The mix index of the target scan which is used to determine the geometry (only relevant for mixed data). Defaults to 0. match_target_size (bool, optional): If True the size of the sensitivities is equal to the matrix size of the target scan. Defaults to False. fov (List[float], optional): The field of view of the reformatted data. Defaults to None. Only change if a different FOV than the target scan is needed (e.g. in simulations) output_size (List[int], optional): The matrix size of the reformatted data. Defaults to None. If not given then it is determined automatically. If match_target_size is True then this argumant is ignored Returns: Sensitivity: The sensitivity object containing the reformatted SENSE reference scan. Raises: RuntimeError: If the "fov" does not contain exactly 3 elements. RuntimeError: If the "output_size" does not contain exactly 3 elements. Examples: .. highlight:: python .. code-block:: python sens = pr.calculate_sensitivities(Path(refscan), Path(target_scan), match_target_size=True) .. highlight:: python .. code-block:: python # calculate the sensitivities with a user defined FOV and matrix size sens = pr.calculate_sensitivities(Path(refscan), Path(target_scan), fov=[320, 230, 50], output_size=[512, 320, 20]) """ if Settings.PERFORMANCE_LOGGING: print("\ncalculating sensitivities") print("-------------------------") target_pars = Parameter(target_scan) ref_pars = Parameter(refscan) is_coca = ref_pars.get_value(ref_pars.IS_COCA_SCAN, [0])[0] if not is_coca: raise RuntimeError("this is not a sense reference scan") s = Sensitivity() s.refscan = str(refscan) s.target_scan = str(target_scan) qbc, coil = reconstruct_refscan(ref_pars, qbc_stack) s = reformat_refscan( qbc, coil, ref_pars, target_pars, stack=stack, mix=mix, match_target_size=match_target_size, fov=fov, output_size=output_size, ) if Settings.PERFORMANCE_LOGGING: print("sensitivity calculation finished") print("--------------------------------\n") return s
[docs]def compress_data(data: npt.ArrayLike, A: npt.ArrayLike) -> npt.ArrayLike: """Compresses the channels of the given data with the provided compression matrix A. The matrix can be calculated with the "get_array_compression_matrix" function. A more detailed description of the compression algorithm can be found in "Buehrer M, Pruessmann KP, Boesiger P, Kozerke S. Array compression for MRI with large coil arrays. Magn Reson Med 2007;57:1131–1139" Args: data (npt.ArrayLike): The data to be compressed. The compression will be applied along the channel dimension (Enums.CHANNEL_DIM) A (npt.ArrayLike): The matrix used for compression. It must have the size: NR_VIRTUAL_CHANNELS x NR_CHANNELS Returns: npt.ArrayLike: The compressed data. It has NR_VIRTUAL_CHANNELS in the channel dimension """ return _compress_data(data, A)
[docs]def compress_sensitivity(sensitivity: Sensitivity, A: npt.ArrayLike) -> Sensitivity: """Compresses the given sensitivity object with the provided compression matrix A. It will compress the member arrays: "sensitivity.sensitivity" and "sensitivity.surfacecoil". This function should be called when using array compression together with SENSE A more detailed description of the compression algorithm can be found in "Buehrer M, Pruessmann KP, Boesiger P, Kozerke S. Array compression for MRI with large coil arrays. Magn Reson Med 2007;57:1131–1139" Args: sensitivity (Sensitivity): The sensitivity object to compress. A (npt.ArrayLike): The matrix used for compression. It must have the size: NR_VIRTUAL_CHANNELS x NR_CHANNELS Returns: Sensitivity: The compressed sensitivity object. Examples: .. highlight:: python .. code-block:: python # calculate the sensitivities sens = pr.calculate_sensitivities(Path(refscan), Path(target_scan)) # get the array compression matrix A = pr.get_array_compression_matrix(sens.surfacecoil) # crop the compression matrix to the number of virtual coils virtual_coils = 8 A = A[0:args.virtual_coils, :] # compress the sensitivities (for the SENSE recon) sens = pr.compress_sensitivity(sens, A) """ return _compress_sensitivity(sensitivity, A)
[docs]def concomitant_field_correction( data: npt.ArrayLike, mps_to_xyz_transformation: Union[list, npt.ArrayLike], concom_factors: List[float], voxel_sizes: tuple, segments: List[float] = None, ) -> npt.ArrayLike: """Applies the concomitant field correction to flow data. Args: data (npt.ArrayLike): The input data to be corrected. mps_to_xyz_transformation (list): The transformation matrix from the MPS to the XYZ system. It can be obtained with the "get_transformation_matrix" function which is a member of the Parameter class. concom_factors (List[float]): The concomitant field correction factors for each flow segment. It can be gotten with the "get_concom_factors" function which is a member of the Parameter class. voxel_sizes (tuple): The voxel sizes for each dimension. segments (List[float], optional): The flow segments of the current data. Defaults to None. Returns: npt.ArrayLike: The corrected data. Raises: RuntimeError: If the number of flow segments in the data does not match the length of the segments list. Examples: .. highlight:: python .. code-block:: python # flow segments are stored in the extr1 label segments = parameter2read.extr1 # get the transformation matrices (MPS to XYZ) for every location. locations = pr.utils.get_unique(labels, 'loca') MPS_to_XYZ = pars.get_transformation_matrix(loca=locations, mix=mix, target=pr.Enums.XYZ) voxel_sizes = pars.get_voxel_sizes(mix=mix) # concommitant field correction (process every location separately) concom_factors = pars.get_concom_factors() data = pr.concomitant_field_correction(data, MPS_to_XYZ, concom_factors, voxel_sizes, segments) """ if not isinstance(mps_to_xyz_transformation, list): mps_to_xyz_transformation = [mps_to_xyz_transformation] if get_data_size(data, Enums.LOCA_DIM) != len(mps_to_xyz_transformation): raise RuntimeError( "the number of transformation matrices must match the number of locations in the data" ) return _concomitant_field_correction( data, mps_to_xyz_transformation, concom_factors, voxel_sizes, segments )
[docs]def crop( data: npt.ArrayLike, axis: Union[int, tuple] = 0, factor: Union[int, tuple] = 1, n: Union[int, tuple] = None, where: Union[str, int] = 0, ) -> npt.ArrayLike: """Crop the input data along the specified axes. Args: data (npt.ArrayLike): The input data to be cropped. axis (Union[int, tuple], optional): The axis or axes along which to crop the data. Defaults to 0. factor (Union[int, tuple], optional): The factor or factors by which to crop the data. Defaults to 1. If both "factor" and "n" are passed as arguments then "n" will be used. n (Union[int, tuple], optional): The number or numbers of elements to keep after cropping. Defaults to None. If both "factor" and "n" are passed as arguments then "n" will be used. where (Union[str, int], optional): Specifies where to crop. Options are: 'symmetric': the data will be cropped symmetrically on both sides; 'left': the data will be cropped on the left; 'right': the data will be cropped on the right Returns: npt.ArrayLike: The cropped data. Raises: RuntimeError: If the 'where' option is unknown. RuntimeError: If 'axis' and 'factor' do not have the same number of elements. RuntimeError: If 'axis' and 'n' do not have the same number of elements. Examples: .. highlight:: python .. code-block:: python data = pr.crop(data, axis=(1, 2), factor=(1.5, 2), where='symmetric') data = pr.crop(data, axis=0, n=256, where='left') """ return _crop(data, axis=axis, factor=factor, n=n, where=where)
[docs]def divide_flow_segments(data: npt.ArrayLike, hadamard=False) -> npt.ArrayLike: """Divides the flow segments of the input data (in dimension Enums.FLOW_SEGMENT_DIM) Args: data (npt.ArrayLike): The input data. hadamard (bool, optional): Whether to use the Hadamard encoding scheme. Defaults to False. Returns: npt.ArrayLike: The resulting flow data. Examples: .. highlight:: python .. code-block:: python data = pr.divide_flow_segments(data, pars.is_hadamard_encoding()) """ return _divide_segments(data, hadamard=hadamard)
[docs]def epi_corr( data: npt.ArrayLike, labels: npt.ArrayLike, slopes: List[float], offsets: List[float], ) -> npt.ArrayLike: """Performs a linear EPI correction on the input data using the provided slopes and offsets for each channel. The slopes and offsets can be calculated with the "get_epi_corr_data" function. The input data must be fourier transformed in readout direction but not in phase-encoding direction Args: data (npt.ArrayLike): The input data. labels (npt.ArrayLike): The current labels associated with the input data. slopes (List[float]): The slopes for each channel. offsets (List[float]): The offsets for each channel. Returns: npt.ArrayLike: The corrected data. Raises: RuntimeError: If the lengths of offsets and slopes are not the same. RuntimeError: If the length of offsets and slopes is not the same as the number of channels. Examples: .. highlight:: python .. code-block:: python # read epi correction data with open(args.rawfile, 'rb') as raw: parameter2read.typ = Label.TYPE_ECHO_PHASE epi_corr_data, epi_corr_labels = pr.read(raw, parameter2read, pars.labels, pars.coil_info, oversampling_removal=False) # sort the epi correction data (since ky is always 0 set the grad label as ky) cur_recon_resolution = pars.get_recon_resolution(mix=mix, xovs=True, yovs=True, zovs=True) epi_corr_data, epi_corr_labels = pr.sort(epi_corr_data, epi_corr_labels, output_size=[cur_recon_resolution[0]], zeropad=(True, False, False), immediate_averaging=False, ky='grad') # epi correction slopes, offsets = pr.get_epi_corr_data(epi_corr_data, epi_corr_labels) data = pr.epi_corr(data, labels, slopes, offsets) """ return _epi_corr(data, labels, slopes, offsets)
[docs]def fit_flow_phase( data: npt.ArrayLike, order: int = 3, weights: npt.ArrayLike = None ) -> npt.ArrayLike: """Removes a static phase offset in the flow data (due to eddy-currents) by fitting the background phase with a polynom of a given order (default = 3) Args: data (npt.ArrayLike): The input data (complex). order (int, optional): The order of the polynomial fit. Defaults to 3. weights (npt.ArrayLike, optional): The weights for the fit. Defaults to None. Must be a 2-dimensional array with the same matrix size as the data Returns: npt.ArrayLike: The data with the offset phase removed. Examples: .. highlight:: python .. code-block:: python data = pr.fit_flow_phase(data, order=3) """ return _fit_flow_phase(data, order, weights)
[docs]def format(data: npt.ArrayLike, in_plane_transformation: list) -> npt.ArrayLike: """Transforms the images into the radiological convention. Applies a sequence of named InPlaneTransform operations in order. Use pars.get_in_plane_transformation() to get the standard list for a dataset. For radial data, prepend InPlaneTransform.SWAP_AXES to the list. Args: data (npt.ArrayLike): The images to be transformed. in_plane_transformation (list): List of InPlaneTransform operations to apply. Returns: npt.ArrayLike: The images in radiological convention. Examples: .. highlight:: python .. code-block:: python transforms = pars.get_in_plane_transformation(mix=0, stack=0) data = pr.format(data, transforms) # For radial reconstruction, prepend SWAP_AXES: transforms.insert(0, pr.InPlaneTransform.SWAP_AXES) data = pr.format(data, transforms) """ return _in_plane_transform(data, transforms=in_plane_transformation)
[docs]def format_flow( data: npt.ArrayLike, coordinate_system: List[int], venc: List[float] = None, hadamard: bool = False, ) -> npt.ArrayLike: """Makes sure the flow encoding is along the RF-AP-FH axis. Args: data (npt.ArrayLike): The flow data to be formatted. coordinate_system (List[int]): The MPS coordinate system of the current scan expressed in the RL-AP-FH system. Can be obtained with the "get_coordinate_system" function which is a member of the Parameter class. venc (List[float], optional): The velocity encoding values for each flow segment. Defaults to None. hadamard (bool, optional): Flag indicating whether Hadamard encoding is used. Defaults to False. Returns: npt.ArrayLike: The formatted flow data. Raises: RuntimeError: If the number of flow segments in the data is greater than 3. RuntimeError: If the number of flow segments in the data is less than 3 and venc is not provided. RuntimeError: If the length of venc is not equal to 3. Examples: .. highlight:: python .. code-block:: python if get_data_size(data, pr.Enums.FLOW_SEGMENT_DIM) <= 3: data = pr.format_flow(data, pars.get_coordinate_system(), pars.get_venc(), pars.is_hadamard_encoding()) """ return _format_flow(data, coordinate_system, venc=venc, hadamard=hadamard)
[docs]def get_array_compression_matrix( ref_data: npt.ArrayLike, max_nr_voxels=None ) -> npt.ArrayLike: """Calculate the array compression matrix for a fully sampled reference data. Args: ref_data (npt.ArrayLike): The fully sampled reference data (e.g. the sense reference scan or sensitivities). max_nr_voxels (int, optional): The maximum number of voxels to include in the calculation. Defaults to None. Reduce it if you see a performance impact in the calculations Returns: np.ndarray: The array compression matrix. Raises: RuntimeError: If the reference data has less than 2 coils. Examples: .. highlight:: python .. code-block:: python # reconstruct refscan ref_pars = pr.Parameter(Path(args.refscan)) qbc, coil = pr.reconstruct_refscan(ref_pars) # calculate the sensitivities sens = pr.reformat_refscan(qbc, coil, ref_pars, pars, stack=stack, mix=mix, match_target_size=True) # calculate the array compression matrix A = pr.get_array_compression_matrix(sens.surfacecoil) """ return _get_array_compression_matrix(ref_data, max_nr_voxels=max_nr_voxels)
[docs]def get_epi_corr_data( data: npt.ArrayLike, labels: npt.ArrayLike ) -> (List[float], List[float]): """Calculate the slopes and offsets for the EPI correction. The resulting offsets and slopes can be passed to "epi_corr" Args: data (npt.ArrayLike): The EPI data. labels (npt.ArrayLike): The current labels for the data. Returns: tuple: A tuple containing the slopes and offsets which can be passed to the "epi_corr" function. Example: .. highlight:: python .. code-block:: python # read epi correction data with open(args.rawfile, 'rb') as raw: parameter2read.typ = Label.TYPE_ECHO_PHASE epi_corr_data, epi_corr_labels = pr.read(raw, parameter2read, pars.labels, pars.coil_info, oversampling_removal=False) # sort the epi correction data (since ky is always 0 set the grad label as ky) cur_recon_resolution = pars.get_recon_resolution(mix=mix, xovs=True, yovs=True, zovs=True) epi_corr_data, epi_corr_labels = pr.sort(epi_corr_data, epi_corr_labels, output_size=[cur_recon_resolution[0]], zeropad=(True, False, False), immediate_averaging=False, ky='grad') # epi correction slopes, offsets = pr.get_epi_corr_data(epi_corr_data, epi_corr_labels) """ return _get_epi_corr_data(data, labels)
[docs]def geo_corr( data: npt.ArrayLike, mps_to_xyz_transformation: Union[list, npt.ArrayLike], r: float, gys: npt.ArrayLike, gxc: npt.ArrayLike, gz: npt.ArrayLike, voxel_sizes: tuple, ) -> npt.ArrayLike: """Corrects image distortion due gradient non-linearities for large FOV's Args: data (npt.ArrayLike): The input dataset. mps_to_xyz_transformation (Union[list, npt.ArrayLike]): The transformation matrix from the MPS to the XYZ system. Can be obtained with the "get_transformation_matrix" function which is a member of the Parameter class. r (float): The threshold radius the correction sphere. Can be obtained with the "get_concom_factors" function which is a member of the Parameter class. gys (npt.ArrayLike): The x-correction parameters. Can be obtained with the "get_concom_factors" function which is a member of the Parameter class. gxc (npt.ArrayLike): The y-correction parameters. Can be obtained with the "get_concom_factors" function which is a member of the Parameter class. gz (npt.ArrayLike): The z-correction parameters. Can be obtained with the "get_concom_factors" function which is a member of the Parameter class. voxel_sizes (tuple): The voxel sizes in the XYZ coordinates. Returns: npt.ArrayLike: The geometry corrected images. Raises: RuntimeError: If the number of transformation matrices does not match the number of locations in the data. Example: .. highlight:: python .. code-block:: python # get geometry correction coefficients r, gys, gxc, gz = pars.get_geo_corr_pars() # get the transformation from MPS to XYZ locations = pr.utils.get_unique(labels, 'loca') MPS_to_XYZ = pars.get_transformation_matrix(loca=locations, mix=mix, target=pr.Enums.XYZ) # get the voxel sizes voxel_sizes = pars.get_voxel_sizes(mix=mix) # perform geometry correction data = pr.geo_corr(data, MPS_to_XYZ, r, gys, gxc, gz, voxel_sizes=voxel_sizes) """ if not isinstance(mps_to_xyz_transformation, list): mps_to_xyz_transformation = [mps_to_xyz_transformation] if get_data_size(data, Enums.LOCA_DIM) != len(mps_to_xyz_transformation): raise RuntimeError( "the number of transformation matrices must match the number of locations in the data" ) return _geo_corr(data, mps_to_xyz_transformation, r, gys, gxc, gz, voxel_sizes)
[docs]def hamming_filter( data: npt.ArrayLike, strength: Union[int, tuple], axis: Union[int, tuple] = 0, sampled_size: Union[int, tuple] = None, ) -> npt.ArrayLike: """Applies a Hamming filter to the input data along the specified axis. Args: data (npt.ArrayLike): The input data to be filtered. strength (Union[int, tuple]): The strength of the filter between (0,1). Can be a single value or a tuple of values, one for each axis. axis (Union[int, tuple], optional): The axis or axes along which to apply the filter. Defaults to 0. sampled_size (Union[int, tuple], optional): The size of the sampled data. Can be a single value or a tuple of values, one for each axis. Defaults to None. Raises: RuntimeError: If the number of elements in "axis" and "strength" are not the same. RuntimeError: If the number of elements in "axis" and "sampled_size" are not the same. Returns: The filtered data. Examples: .. highlight:: python .. code-block:: python sampled_size = (pars.get_sampled_size(enc=0, stack=0, ovs=False), pars.get_sampled_size(enc=1, stack=0), pars.get_sampled_size(enc=2, stack=0)) data = pr.hamming_filter(data, (0.25, 0.25, 0.25), axis=(0, 1, 2), sampled_size=sampled_size) """ return _hamming_filter(data, strength, axis=axis, sampled_size=sampled_size)
[docs]def homodyne( data: npt.ArrayLike, kx_range, ky_range, kz_range, is_kspace=False, use_torch=True, ) -> npt.ArrayLike: """Performs a homodyne reconstruction on data with partially sampled k-space data. In partial fourier imaging only a fraction of k-space is acquired for improved scan- or echo-time. If the k-space is partly sampled in readout direction we speak of partial echo and halfscan for a partly sampled space in phase encoding direction. In the first case the echo time is reduced while in the second one the scantime is shortened. Performing a standard imaging reconstruction on the partly sampled k-space however leads to image bluring and reduced SNR. Therefore a partial fourier reconstruction is performed which utilizes the k-space symmetry to reconstruct images of better quality. In MRecon a so called homodyne reconstruction is implemented: Noll, D.C.; Nishimura, D.G.; Macovski, A.; “Homodyne detection in magnetic resonance imaging” Medical Imaging, IEEE Transactions on, vol.10, no.2, pp.154-163, Jun 1991. Args: data (npt.ArrayLike): The input data. kx_range: The range of the k-space in readout direction. ky_range: The range of the k-space in phase encoding direction. kz_range: The range of the k-space in slice direction. is_kspace (bool, optional): Whether the data is in k-space. Defaults to False. use_torch (bool, optional): Whether to use the pytorch implementation. Defaults to True. py_impl (bool, optional): Whether to use the Python implementation. Defaults to True. Returns: npt.ArrayLike: The corrected data. Example: .. highlight:: python .. code-block:: python # get the k-space sampling ranges kx_range = pars.get_range(enc=0, mix=mix, stack=stack, ovs=False) ky_range = pars.get_range(enc=1, mix=mix, stack=stack) kz_range = pars.get_range(enc=2, mix=mix, stack=stack) # perform homodyne reconstruction if k-space is partially sampled if pr.is_partial_fourier(kx_range) or pr.is_partial_fourier(ky_range) or pr.is_partial_fourier(kz_range): data = pr.homodyne(data, kx_range, ky_range, kz_range) """ return _homodyne( data, kx_range, ky_range, kz_range, is_kspace=is_kspace, use_torch=use_torch, )
[docs]def is_partial_fourier(krange) -> bool: """Check if the given k-space range is partially sampled. Args: krange (list): A k-space range. Returns: bool: True if the krange is a partially sampled, False otherwise. Example: .. highlight:: python .. code-block:: python # get the k-space sampling range in readout direction kx_range = pars.get_range(enc=0, mix=mix, stack=stack, ovs=False) # check if partially sampled is_partial_echo = pr.is_partial_fourier(kx_range) """ return _is_partial_fourier(krange)
[docs]def i2k( data: npt.ArrayLike, axis: Union[int, tuple] = 0, use_torch=True ) -> npt.ArrayLike: """Calculates the fourier transform from image-space to k-space of the given data. Args: data (npt.ArrayLike): The image-space data. axis (Union[int, tuple], optional): The axis or axes along which to calculate the fft. Defaults to 0. use_torch (bool, optional): Whether to use pytorch implementation of the fft. Defaults to True. If False then numpy is used Returns: The k-space data Raises: TypeError: If the data is not array-like. Examples: .. highlight:: python .. code-block:: python k_data = pr.i2k(im_data, axis=(0, 1, 2)) """ return _i2k(data, axis=axis, use_torch=use_torch)
[docs]def k2i( data: npt.ArrayLike, axis: Union[int, tuple] = 0, use_torch=True ) -> npt.ArrayLike: """Calculates the fourier transform from k-space to image-space of the given data. Args: data (npt.ArrayLike): The k-space data. axis (Union[int, tuple], optional): The axis or axes along which to calculate the fft. Defaults to 0. use_torch (bool, optional): Whether to use pytorch implementation of the fft. Defaults to True. If False then numpy is used Returns: The image-space data Raises: TypeError: If the data is not array-like. Examples: .. highlight:: python .. code-block:: python im_data = pr.k2i(k_data, axis=(0, 1, 2)) """ return _k2i(data, axis=axis, use_torch=use_torch)
[docs]def kshift( data: npt.ArrayLike, shift: Union[int, tuple], axis: Union[int, tuple] = None ) -> npt.ArrayLike: """Applies a shift to the images by adding a linear phase in k-space (fourier-shift-theorem). Args: data (npt.ArrayLike): The input array (k-space data). shift (Union[int, tuple]): The shift in pixels. If a tuple is provided, each element corresponds to the shift along the corresponding axis. axis (Union[int, tuple], optional): The axis or axes along which to shift the elements. If None, the default axis is 0. If a tuple is provided, each element corresponds to the axis along which to shift the elements. Raises: RuntimeError: If the number of elements in "axis" and "shift" is not the same. Returns: The k-space with an added linear phase. Raises: - RuntimeError: If the number of elements in "axis" and "shift" is not the same. Examples: .. highlight:: python .. code-block:: python datak = pr.kshift(datak, (10, 20), (0, 1)) """ return _kshift(data, shift=shift, axis=axis)
[docs]def read( raw: BinaryIO, parameter2read: Parameter2Read, labels: List[LabelStructC], coil_info: dict, normalization=True, nonlin_correction=True, pda_correction=True, random_phase_correction=True, meas_phase_correction=True, dc_offset_correction=True, sign_reversal=True, oversampling_removal=True, array_compression: npt.ArrayLike = None, max_chunk_size_mb: int = 500, ) -> npt.ArrayLike: """Reads data from a Philips .raw file. The read function returns the data together with the corresponding scanner labels. The data contains the acquired profiles in temporal order and is always a 3-dimensional array in the form: samples x coils x profiles By default, the function already performs all basic-corrections and removes the oversampling along the readout direction. However the default behavior can be modified with optional input arguments (see below) Args: raw (BinaryIO): The binary file to read from as File-like object. parameter2read (Parameter2Read): A Parameter2Read class specifying what to read. labels (List[LabelStructC]): The scanner labels. coil_info (dict): The information about the connected receive coils. normalization (bool, optional): Whether to apply normalization. Defaults to True. The scanner provides a normalization factor for each acquired profile which is usually applied after reading. nonlin_correction (bool, optional): Whether to apply non-linear correction. Defaults to True. pda_correction (bool, optional): Whether to apply PDA correction. Defaults to True. In 3D-scans each profile might have different receiver gain values to account for the largely different signal levels between the center and outer k-space lines. This is corrected in the PDA-correction. random_phase_correction (bool, optional): Whether to apply random phase correction. Defaults to True. The scanner adds a random phase to each acquired profile to correct for systematic errors. This phase is subtracted in the random phase correction. meas_phase_correction (bool, optional): Whether to apply measurement phase correction. Defaults to True. Different echoes might have a phase offset which is subtracted in this correction. dc_offset_correction (bool, optional): Whether to apply DC offset correction. Defaults to True. The acquired data might have constant offset, which is calculated and subtracted in this correction. sign_reversal (bool, optional): Whether to apply sign reversal. Defaults to True. In EPI sequences k-space lines are acquired in different directions. This correction reverses all the odd lines. oversampling_removal (bool, optional): Whether to remove oversampling. Defaults to True. Usually the oversampling is removed to save memory (normally a factor of 2. array_compression (npt.ArrayLike, optional): The array compression matrix. Defaults to None. If given then the data is compressed to the given number of virtual channels. The number of virtual channels after compression is given by the number of rows of the compression matrix. See: Buehrer, M., Pruessmann, K. P., Boesiger, P. and Kozerke, S. (2007), Array compression for MRI with large coil arrays. Magnetic Resonance in Medicine, 57: 1131–1139. doi: 10.1002/mrm.21237 max_chunk_size_mb (int, optional): The maximum chunk size in MB to read at once. Defaults to 500. If the file is larger than this value then it is read in chunks of this size. This is useful for large files to avoid memory issues. Returns: npt.ArrayLike: The read data in the form samples x coils x profiles. Note: The read function only reads data of the same size and with the same geometry. Therefore, you should always loop over the number of mixes and stacks. If PARAMETER2READ specifies data of different stacks, mixes or sizes then an error will be raised. Examples: .. highlight:: python .. code-block:: python # read parameter pars = pr.Parameter(Path(rawfile)) # define what to read parameter2read = pr.Parameter2Read(pars.labels) parameter2read.stack = 0 parameter2read.mix = 0 # read data with open(rawfile, 'rb') as raw: data, labels = pr.read(raw, parameter2read, pars.labels, pars.coil_info) """ return _read( raw, parameter2read, labels, coil_info, normalization=normalization, nonlin_correction=nonlin_correction, pda_correction=pda_correction, random_phase_correction=random_phase_correction, meas_phase_correction=meas_phase_correction, dc_offset_correction=dc_offset_correction, sign_reversal=sign_reversal, oversampling_removal=oversampling_removal, array_compression=array_compression, max_chunk_size_mb=max_chunk_size_mb, )
[docs]def reconstruct_refscan( pars: Parameter, qbc_stack: int = None ) -> (npt.ArrayLike, npt.ArrayLike): """Reconstructs the SENSE reference scan. Args: pars (Parameter): The parameter object of the refscan. qbc_stack (int, optional): The stack of the bodycoil. If not given it is determined automatically. Only change if the automatic determination fails. Returns: tuple: A tuple containing the reconstructed bodycoil and surfacecoil data. Example: .. highlight:: python .. code-block:: python ref_pars = pr.Parameter(Path(refscan)) qbc, coil = reconstruct_refscan(ref_pars) """ if Settings.PERFORMANCE_LOGGING: print("\nreconstructing refscan") print("------------------------") qbc, coil = _reconstruct_refscan(pars, qbc_stack=qbc_stack) if Settings.PERFORMANCE_LOGGING: print("refscan reconstruction finished") print("-------------------------------\n") return qbc, coil
[docs]def reformat_refscan( qbc: npt.ArrayLike, coil: npt.ArrayLike, ref_pars: Parameter, target_pars: Parameter, stack: int = 0, mix: int = 0, match_target_size: bool = False, fov: List[float] = None, output_size: List[int] = None, ) -> Sensitivity: """Reformats the SENSE reference scan to the geometry of the target scan. Args: qbc (npt.ArrayLike): The bodycoil data of the reference scan. coil (npt.ArrayLike): The surfacecoil data of the reference scan. ref_pars (Parameter): The parameters of the sense reference scan. target_pars (Parameter): The parameters of the target scan. stack (int, optional): The stack index of the target scan which is used to determine the geometry (only relevant for multistack data). Defaults to 0. mix (int, optional): The mix index of the target scan which is used to determine the geometry (only relevant for mixed data). Defaults to 0. match_target_size (bool, optional): If True the size of the sensitivities is equal to the matrix size of the target scan. Defaults to False. fov (List[float], optional): The field of view of the reformatted data. Defaults to None. Only change if a different FOV than the target scan is needed (e.g. in simulations) output_size (List[int], optional): The matrix size of the reformatted data. Defaults to None. If not given then it is determined automatically. If match_target_size is True then this argumant is ignored Returns: Sensitivity: The sensitivity object containing the reformatted SENSE reference scan. Raises: RuntimeError: If the "fov" does not contain exactly 3 elements. RuntimeError: If the "output_size" does not contain exactly 3 elements. Examples: .. highlight:: python .. code-block:: python # reconstruct refscan ref_pars = pr.Parameter(Path(refscan)) qbc, coil = pr.reconstruct_refscan(ref_pars) # calculate the sensitivities sens = pr.reformat_refscan(qbc, coil, ref_pars, pars, match_target_size=True) # calculate the sensitivities with a user defined FOV and matrix size sens = pr.reformat_refscan(qbc, coil, ref_pars, pars, fov=[320, 230, 50], output_size=[512, 320, 20]) """ if fov is not None and len(fov) != 3: raise RuntimeError('the "fov" must must contain exactly 3 elements') if output_size is not None and len(output_size) != 3: raise RuntimeError('the "output_size" must must contain exactly 3 elements') return _reformat_refscan( qbc, coil, ref_pars, target_pars, stack=stack, mix=mix, match_target_size=match_target_size, fov=fov, output_size=output_size, )
[docs]def retro_binning(labels: List[LabelStructC], nr_phases: int): """Calculates the cardiac phase label from the rtop and rr label in a retrospectively triggered cardiac scan. The cardiac phase is calculated as: label.card = int(label.rtop / label.rr * nr_phases) Parameters: - labels (List[LabelStructC]): The scanner labels - nr_phases (int): The number of cardiac phases after binning. If not provided, the default value is 20. Returns: - List[LabelStructC]: The new labels with the cardiac phase set. Raises: - RuntimeError: If the labels parameter is not a list. - RuntimeError: If all rtop labels are 0. - RuntimeError: If all rr labels are 0. Note: Make sure to call this function before sorting the data. Examples: .. highlight:: python .. code-block:: python with open(rawfile, 'rb') as raw: data, labels = pr.read(raw, parameter2read, pars.labels, pars.coil_info) # retrospective cardiac binning labels = pr.retro_binning(labels, 30) """ return _retro_binning(labels, nr_phases)
[docs]def retro_fill_holes(data: npt.ArrayLike, max_dist: int = 3) -> npt.ArrayLike: """Fill empty k-space profiles in retrospectively triggered cardiac data. After retrospective binning, some cardiac phases may contain no acquired profiles. This function fills those gaps by copying the nearest occupied profile along the cardiac-phase dimension (up to ``max_dist`` phases away). Args: data (npt.ArrayLike): Sorted k-space data with cardiac-phase dimension at ``Enums.HP_DIM``. max_dist (int, optional): Maximum number of cardiac phases to search for a neighbor when filling a hole. Defaults to 3. Returns: npt.ArrayLike: Data with empty cardiac-phase profiles filled in. Examples: .. highlight:: python .. code-block:: python labels = pr.retro_binning(labels, nr_phases=30) data, labels = pr.sort(data, labels, output_size=cur_recon_resolution) data = pr.retro_fill_holes(data) """ return _retro_fill_holes(data)
[docs]def sense_unfold( data: npt.ArrayLike, sensitivity: Sensitivity, output_size: List[int], regularization_factor=2, slice_shifts: List[int] = None, psi_ignore_off_diagonals: bool = True, ) -> npt.ArrayLike: """Performs a SENSE unfolding on the given data. Args: data (npt.ArrayLike): The sensitivity-encoded k-space data. sensitivity (Sensitivity): The sensitivity object containing the sensitivity map, noise covariance matrix and calibration data. output_size (List[int]): The desired output size of the unfolded image. The ratio between output_size and the size of the input data determines the SENSE factor regularization_factor (int, optional): The regularization factor. Defaults to 2. slice_shifts (List[int], optional): The slice shifts. Defaults to None. Slice shifts should only be given as input for a multiband reconstruction. psi_ignore_off_diagonals (bool, optional): Whether to ignore off-diagonal elements in the PSI matrix. Defaults to True. use_torch (bool, optional): Whether to use the SENSE implementation in pytorch. Defaults to True. Returns: npt.ArrayLike: The unfolded image. Example: .. highlight:: python .. code-block:: python # calculate the sensitivities sens = pr.calculate_sensitivities(Path(refscan), Path(target_scan)) # get the regularization factor from the scanner parameters regularization_factor = pars.get_value(pars.SENSE_REGULARIZATION_FACTOR, at=0, default=2) # get the unfolded size output_size = pars.get_recon_resolution(mix=mix, xovs=False, yovs=True, zovs=True, folded=False) # perform sense reconstruction data = pr.sense_unfold(data, sensitivity, output_size, regularization_factor=regularization_factor) """ return _sense_unfold( data, sensitivity, output_size, regularization_factor=regularization_factor, slice_shifts=slice_shifts, psi_ignore_off_diagonals=psi_ignore_off_diagonals, )
[docs]def sort( data: npt.ArrayLike, labels: List[LabelStructC], output_size: List[int] = None, zeropad=True, immediate_averaging=True, **kwargs, ) -> npt.ArrayLike: """Sorts the input data according to its scanner labels and creates a proper k-space. After reading the data, the acquired profiles are stored in temporal order. The sort function sorts the profiles by their image attributes (labels) and creates properly organized k-space data which can be Fourier transformed to obtain the images. Args: data (npt.ArrayLike): The input data to be sorted. labels (List[LabelStructC]): The labels corresponding to the input data (output of the "read" function). output_size (List[int]): The desired size of the output data. If the output_size is larger than the sampled range, the resulting k-space is zero-filled. If it is smaller, the resulting k-space will be cropped, effectively lowering the resolution. If the output_size is not given, the resulting k-space will have the same size as the sampled range. zeropad (tuple): A tuple specifying whether to zero-pad the output data along the x, y, or z dimension. If zeropad is False, then the output_size along this direction is ignored. immediate_averaging (bool, optional): Whether to perform immediate averaging. Defaults to True. Returns: tuple: A tuple containing the sorted data and its corresponding labels. The output data has a fixed size of 12 dimensions, where each dimension corresponds to a predefined image attribute: (kx, ky, kz, channels, dynamics, cardiac phases, echoes, locations, mixes, extra1, extra2, averages) The corresponding labels have the same size as the sorted data, except for the following dimensions: - The first dimension (kx or readout) is always 1 in the labels because the label information is the same for all samples along the readout direction. - The coil dimension (channels) is also always 1 in the labels, as the label information is the same for all coils. Note that the labels for the zero-filled entries in the k-space will be None. Examples: .. highlight:: python .. code-block:: python # read data with open(args.rawfile, 'rb') as raw: data, labels = pr.read(raw, parameter2read, pars.labels, pars.coil_info) # sort and zero-fill data (create k-space) cur_recon_resolution = pars.get_recon_resolution(mix=mix, xovs=False, yovs=True, zovs=True) data, labels = pr.sort(data, labels, output_size=cur_recon_resolution) # Access the label for the central k-space line ky, kz = 64, 0 # Example indices for ky and kz (assuming a 128x128 matrix) dynamic, cardiac_phase, echo, location, mix, extra1, extra2, average = 0, 0, 0, 0, 0, 0, 0, 0 label = labels[0, ky, kz, 0, dynamic, cardiac_phase, echo, location, mix, extra1, extra2, average] """ return _sort( data, labels, output_size=output_size, zeropad=zeropad, immediate_averaging=immediate_averaging, **kwargs, )
[docs]def sos(data: npt.ArrayLike, axis: int = 3) -> npt.ArrayLike: """Calculate the sum of squares along a specified axis. Args: data (npt.ArrayLike): The input data array. axis (int, optional): The axis along which to calculate the sum of squares. Defaults to 3. Returns: npt.ArrayLike: The square root of the sum of squares along the specified axis. Examples: .. highlight:: python .. code-block:: python data = pr.sos(data, axis=3) """ return _sos(data, axis=axis)
[docs]def spectro_combine_coils(data: npt.ArrayLike, svd: bool = True) -> npt.ArrayLike: """Combine multi-coil spectroscopy data into a single coil. Determines optimal coil weights using either SVD (default) or a spectral peak amplitude method, then applies the weights and sums over the coil dimension. The weights are averaged over all dynamics and averages. Args: data (npt.ArrayLike): The multi-coil spectroscopy data. svd (bool, optional): If True, use SVD to determine coil weights. If False, use spectral peak amplitudes. Defaults to True. Returns: npt.ArrayLike: The combined single-coil spectroscopy data. Examples: .. highlight:: python .. code-block:: python data = pr.spectro_combine_coils(data, svd=True) """ return _spectro_combine_coils(data, svd)
[docs]def spectro_downsample(data: npt.ArrayLike, oversampling_factor: float = 1.0) -> npt.ArrayLike: """Downsample spectroscopy FID data by the given oversampling factor. Uses a linear-prediction-based algorithm (Ref.: Proceedings ISMRM 2012, Abstract 5680) to remove the oversampling without introducing ringing. A Nuttall-windowed FIR low-pass filter is applied after prepending and appending linearly predicted data points. Args: data (npt.ArrayLike): The spectroscopy FID data. The temporal (readout) dimension must be ``Enums.X_DIM``. oversampling_factor (float, optional): The oversampling factor to remove. If <= 1.0, the data is returned unchanged. Defaults to 1.0. Returns: npt.ArrayLike: The downsampled FID data. Examples: .. highlight:: python .. code-block:: python ovs = pars.get_oversampling(enc=0, mix=0) data = pr.spectro_downsample(data, oversampling_factor=ovs) """ return _spectro_downsample(data, oversampling_factor)
[docs]def zeropad( data: npt.ArrayLike, size: Union[int, tuple], axis: Union[int, tuple] = 0 ) -> npt.ArrayLike: """Zero-pad an array along the specified axis or axes. The zeros will be padded symmetrically to both sides of the axis Args: data (npt.ArrayLike): The input array to be zero-padded. size (Union[int, tuple]): The desired size of the output array along the specified axis or axes. If a single integer is provided, the same size will be used for all axes. If a tuple is provided, each element corresponds to the size along the corresponding axis. axis (Union[int, tuple], optional): The axis or axes along which the array should be zero-padded. If a single integer is provided, the same axis will be used for all sizes. If a tuple is provided, each element corresponds to the axis along which the corresponding size should be applied. Defaults to 0. Raises: RuntimeError: If the number of elements in "axis" and "size" is not the same. Returns: np.ndarray: The zero-padded array. Note: - If the size of the input array along an axis is smaller than the desired size, zeros will be added to both ends of the axis. - If the size of the input array along an axis is larger than the desired size, the array will be cropped along that axis. Example: .. highlight:: python .. code-block:: python data_padded = pr.zeropad(data, (512, 320), axis=(0, 1)) """ return _zeropad(data, size, axis=axis)