Source code for moosez.moosez

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# ----------------------------------------------------------------------------------------------------------------------
# Author: Lalith Kumar Shiyam Sundar
# Institution: Medical University of Vienna
# Research Group: Quantitative Imaging and Medical Physics (QIMP) Team
# Date: 09.02.2023
# Version: 2.0.0
#
# Description:
# The main module of the mooseZ. It contains the main function that is executed when the mooseZ is run.
#
# Usage:
# The main function in this module is executed when the mooseZ is run.
#
# ----------------------------------------------------------------------------------------------------------------------
import os

os.environ["nnUNet_raw"] = ""
os.environ["nnUNet_preprocessed"] = ""
os.environ["nnUNet_results"] = ""

import argparse
import time
import SimpleITK
import colorama
import emoji
import numpy
import pandas as pd
import multiprocessing as mp
import concurrent.futures
from moosez import constants
from moosez import download
from moosez import file_utilities
from moosez import image_conversion
from moosez import image_processing
from moosez import input_validation
from moosez import predict
from moosez import system
from moosez import models
from moosez.nnUNet_custom_trainer.utility import add_custom_trainers_to_local_nnunetv2
from moosez.benchmarking.benchmark import PerformanceObserver


[docs] def main(): colorama.init() # Argument parser parser = argparse.ArgumentParser( description=constants.USAGE_MESSAGE, formatter_class=argparse.RawTextHelpFormatter, add_help=False ) # Main directory containing subject folders parser.add_argument( "-d", "--main_directory", type=str, metavar="<MAIN_DIRECTORY>", help="Specify the main directory containing subject folders." ) # Name of the models to use for segmentation parser.add_argument( "-m", "--model_names", nargs='+', type=str, choices=models.AVAILABLE_MODELS, metavar="<MODEL_NAMES>", help="Choose the models for segmentation from the following:\n" + "\n".join(models.AVAILABLE_MODELS) ) parser.add_argument( "-b", "--benchmark", action="store_true", default=False, help="Activate benchmarking." ) parser.add_argument( "-v-off", "--verbose_off", action="store_false", help="Deactivate verbose console." ) parser.add_argument( "-log-off", "--logging_off", action="store_false", help="Deactivate logging." ) parser.add_argument( '-herd', '--moose_herd', nargs='?', const=2, type=int, help='Specify the concurrent jobs (default: 2)' ) parser.add_argument( '-dtd', '--download_training_data', action="store_true", default=False, help='Download the enhance 1.6k ENHANCE dataset' ) parser.add_argument( '-dd', '--download_directory', default=None, type=str, help='Path to save the enhance 1.6k ENHANCE dataset' ) # Custom help option parser.add_argument( "-h", "--help", action="help", default=argparse.SUPPRESS, help="Show this help message and exit." ) args = parser.parse_args() # ---------------------------------- # OUTPUT SETTINGS # ---------------------------------- verbose_console = args.verbose_off verbose_log = args.logging_off if args.download_training_data: verbose_console = True verbose_log = False output_manager = system.OutputManager(verbose_console, verbose_log) output_manager.display_logo() output_manager.display_authors() output_manager.display_doi() # ---------------------------------- # DOWNLOADING THE ENHANCE DATA # ---------------------------------- if args.download_training_data: output_manager.console_update(f'') output_manager.console_update(f'{constants.ANSI_VIOLET} {emoji.emojize(":globe_with_meridians:")} ENHANCE 1.6k DATA DOWNLOAD:{constants.ANSI_RESET}') output_manager.console_update(f'') download.download_enhance_data(args.download_directory, output_manager) return # ---------------------------------- # START MOOSE # ---------------------------------- parent_folder = os.path.abspath(args.main_directory) model_names = args.model_names benchmark = args.benchmark moose_instances = args.moose_herd output_manager.configure_logging(parent_folder) output_manager.log_update('----------------------------------------------------------------------------------------------------') output_manager.log_update(' STARTING MOOSE-Z V.3.0.0 ') output_manager.log_update('----------------------------------------------------------------------------------------------------') # ---------------------------------- # DOWNLOADING THE MODEL # ---------------------------------- output_manager.console_update(f'') output_manager.console_update(f'{constants.ANSI_VIOLET} {emoji.emojize(":globe_with_meridians:")} MODEL DOWNLOAD:{constants.ANSI_RESET}') output_manager.console_update(f'') model_path = system.MODELS_DIRECTORY_PATH file_utilities.create_directory(model_path) model_routine = models.construct_model_routine(model_names, output_manager) # ---------------------------------- # INPUT VALIDATION AND PREPARATION # ---------------------------------- output_manager.log_update(f' ') output_manager.log_update(f'- Main directory: {parent_folder}') output_manager.log_update(f' ') output_manager.console_update(f' ') output_manager.console_update(f'{constants.ANSI_VIOLET} {emoji.emojize(":memo:")} NOTE:{constants.ANSI_RESET}') output_manager.console_update(f' ') custom_trainer_status = add_custom_trainers_to_local_nnunetv2() modalities = input_validation.determine_model_expectations(model_routine, output_manager) output_manager.log_update(f'- Custom trainer: {custom_trainer_status}') accelerator, device_count = system.check_device(output_manager) if moose_instances is not None: output_manager.console_update(f" Number of moose instances run in parallel: {moose_instances}") # ---------------------------------- # INPUT STANDARDIZATION # ---------------------------------- output_manager.console_update(f'') output_manager.console_update(f'{constants.ANSI_VIOLET} {emoji.emojize(":magnifying_glass_tilted_left:")} STANDARDIZING INPUT DATA TO NIFTI:{constants.ANSI_RESET}') output_manager.console_update(f'') output_manager.log_update(f' ') output_manager.log_update(f' STANDARDIZING INPUT DATA TO NIFTI:') output_manager.log_update(f' ') image_conversion.standardize_to_nifti(parent_folder, output_manager) output_manager.console_update(f"{constants.ANSI_GREEN} Standardization complete.{constants.ANSI_RESET}") output_manager.log_update(f" Standardization complete.") # -------------------------------------- # CHECKING FOR MOOSE COMPLIANT SUBJECTS # -------------------------------------- subjects = [os.path.join(parent_folder, d) for d in os.listdir(parent_folder) if os.path.isdir(os.path.join(parent_folder, d))] moose_compliant_subjects = input_validation.select_moose_compliant_subjects(subjects, modalities, output_manager) num_subjects = len(moose_compliant_subjects) if num_subjects < 1: output_manager.console_update(f'{constants.ANSI_RED} {emoji.emojize(":cross_mark:")} No moose compliant subject found to continue!{constants.ANSI_RESET} {emoji.emojize(":light_bulb:")} See: https://github.com/ENHANCE-PET/MOOSE#directory-structure-and-naming-conventions-for-moose-%EF%B8%8F') return # ------------------------------------------------- # RUN PREDICTION ONLY FOR MOOSE COMPLIANT SUBJECTS # ------------------------------------------------- output_manager.console_update(f'') output_manager.console_update(f'{constants.ANSI_VIOLET} {emoji.emojize(":crystal_ball:")} PREDICT:{constants.ANSI_RESET}') output_manager.console_update(f'') output_manager.log_update(' ') output_manager.log_update(' PERFORMING PREDICTION:') output_manager.spinner_start(' Initiating') start_total_time = time.time() subject_performance_parameters = [] if moose_instances is not None: output_manager.log_update(f"- Branching out with {moose_instances} concurrent jobs.") performance_observer = PerformanceObserver(f'All {num_subjects} subjects | {moose_instances} jobs', ', '.join(model_names)) if benchmark: performance_observer.on() mp_context = mp.get_context('spawn') processed_subjects = 0 output_manager.spinner_update(f'[{processed_subjects}/{num_subjects}] subjects processed.') if device_count is not None and device_count > 1: accelerator_assignments = [f"{accelerator}:{i % device_count}" for i in range(len(subjects))] else: accelerator_assignments = [accelerator] * len(subjects) with concurrent.futures.ProcessPoolExecutor(max_workers=moose_instances, mp_context=mp_context) as executor: futures = [] for i, (subject, accelerator) in enumerate(zip(moose_compliant_subjects, accelerator_assignments)): futures.append(executor.submit(moose_subject, subject, i, num_subjects, model_routine, accelerator, None, benchmark)) for future in concurrent.futures.as_completed(futures): if benchmark: subject_performance_parameters.append(future.result()) processed_subjects += 1 output_manager.spinner_update(f'[{processed_subjects}/{num_subjects}] subjects processed.') performance_observer.record_phase("Total Processing Done") if benchmark: performance_observer.off() subject_performance_parameters.append(performance_observer.get_peak_resources()) else: for i, subject in enumerate(moose_compliant_subjects): subject_performance = moose_subject(subject, i, num_subjects, model_routine, accelerator, output_manager, benchmark) if benchmark: subject_performance_parameters.append(subject_performance) end_total_time = time.time() total_elapsed_time = (end_total_time - start_total_time) / 60 time_per_dataset = total_elapsed_time / len(moose_compliant_subjects) time_per_model = time_per_dataset / len(model_names) output_manager.spinner_succeed(f'{constants.ANSI_GREEN} All predictions done! | Total elapsed time for ' f'{len(moose_compliant_subjects)} datasets, {len(model_names)} models: {round(total_elapsed_time, 1)} min' f' | Time per dataset: {round(time_per_dataset, 2)} min' f' | Time per model: {round(time_per_model, 2)} min {constants.ANSI_RESET}') output_manager.log_update(f' ') output_manager.log_update(f' ALL SUBJECTS PROCESSED') output_manager.log_update(f' - Number of Subjects: {len(moose_compliant_subjects)}') output_manager.log_update(f' - Number of Models: {len(model_names)}') output_manager.log_update(f' - Time (total): {round(total_elapsed_time, 1)}min') output_manager.log_update(f' - Time (per subject): {round(time_per_dataset, 2)}min') output_manager.log_update(f' - Time (per model): {round(time_per_model, 2)}min') if benchmark: df = pd.DataFrame(subject_performance_parameters, columns=['Image', 'Model', 'Image Size', 'Runtime [s]', 'Peak Memory [GB]']) csv_file_path = os.path.join(parent_folder, 'moosez-v3.0.0_peak_performance_parameters.csv') df.to_csv(csv_file_path, index=False) output_manager.log_update(f' - Resource utilization written to {csv_file_path}') output_manager.log_update('----------------------------------------------------------------------------------------------------') output_manager.log_update(' FINISHED MOOSE-Z V.3.0.0 ') output_manager.log_update('----------------------------------------------------------------------------------------------------')
[docs] def moose(input_data: str | tuple[numpy.ndarray, tuple[float, float, float]] | SimpleITK.Image, model_names: str | list[str], output_dir: str = None, accelerator: str = None) -> str | SimpleITK.Image | numpy.ndarray: """ Execute the MOOSE 3.0 image segmentation process. :param input_data: The input data to process, which can be one of the following: - str: A file path to a NIfTI file. - tuple[numpy.ndarray, tuple[float, float, float]]: A tuple containing a numpy array and spacing. - SimpleITK.Image: An image object to process. :param model_names: The name(s) of the model(s) to be used for segmentation. :type model_names: str or list[str] :param output_dir: Path to the directory where the output will be saved if the input is a file path. :type output_dir: Optional[str] :param accelerator: Specifies the accelerator type, e.g., "cpu" or "cuda". :type accelerator: Optional[str] :return: The output type aligns with the input type: - str (file path): If `input_data` is a file path. - SimpleITK.Image: If `input_data` is a SimpleITK.Image. - numpy.ndarray: If `input_data` is a numpy array. :rtype: str or SimpleITK.Image or numpy.ndarray :Example: >>> moose('/path/to/input/file', '[list, of, models]', '/path/to/output', 'cuda') >>> moose((numpy_array, (1.5, 1.5, 1.5)), 'model_name', '/path/to/output', 'cuda') >>> moose(simple_itk_image, 'model_name', '/path/to/output', 'cuda') """ # Load the image and set a default filename based on input type if isinstance(input_data, str): image = SimpleITK.ReadImage(input_data) file_name = file_utilities.get_nifti_file_stem(input_data) elif isinstance(input_data, SimpleITK.Image): image = input_data file_name = 'image_from_simpleitk' elif isinstance(input_data, tuple) and isinstance(input_data[0], numpy.ndarray) and isinstance(input_data[1], tuple): numpy_array, spacing = input_data image = SimpleITK.GetImageFromArray(numpy_array) image.SetSpacing(spacing) file_name = 'image_from_array' else: raise ValueError( "Invalid input format. `input_data` must be either a file path (str), " "a SimpleITK.Image, or a tuple (numpy array, spacing)." ) # Ensure model_names is a list for consistency if isinstance(model_names, str): model_names = [model_names] # Output manager and model routine setup output_manager = system.OutputManager(False, False) model_path = system.MODELS_DIRECTORY_PATH file_utilities.create_directory(model_path) model_routine = models.construct_model_routine(model_names, output_manager) # Perform segmentation for desired_spacing, model_workflows in model_routine.items(): resampled_array = image_processing.ImageResampler.resample_image_SimpleITK_DASK_array(image, 'bspline', desired_spacing) for model_workflow in model_workflows: segmentation_array = predict.predict_from_array_by_iterator(resampled_array, model_workflow[0], accelerator, os.devnull) if len(model_workflow) == 2: inference_fov_intensities = model_workflow[1].limit_fov["inference_fov_intensities"] if isinstance(inference_fov_intensities, int): inference_fov_intensities = [inference_fov_intensities] existing_intensities = numpy.unique(segmentation_array) if not all(intensity in existing_intensities for intensity in inference_fov_intensities): continue segmentation_array, desired_spacing = predict.cropped_fov_prediction_pipeline( image, segmentation_array, model_workflow, accelerator, os.devnull ) segmentation = SimpleITK.GetImageFromArray(segmentation_array) segmentation.SetSpacing(desired_spacing) segmentation.SetOrigin(image.GetOrigin()) segmentation.SetDirection(image.GetDirection()) resampled_segmentation = image_processing.ImageResampler.resample_segmentation(image, segmentation) # Return based on input type if isinstance(input_data, str): # Return file path if input was a file path if output_dir is None: output_dir = os.path.dirname(input_data) segmentation_image_path = os.path.join( output_dir, f"{model_workflow.target_model.multilabel_prefix}segmentation_{file_name}.nii.gz" ) SimpleITK.WriteImage(resampled_segmentation, segmentation_image_path) return segmentation_image_path elif isinstance(input_data, SimpleITK.Image): # Return SimpleITK.Image if input was SimpleITK.Image return resampled_segmentation elif isinstance(input_data, tuple): # Return numpy array if input was numpy array return SimpleITK.GetArrayFromImage(resampled_segmentation)
[docs] def moose_subject(subject: str, subject_index: int, number_of_subjects: int, model_routine: dict, accelerator: str, output_manager: system.OutputManager | None, benchmark: bool = False): # SETTING UP DIRECTORY STRUCTURE subject_name = os.path.basename(subject) if output_manager is None: output_manager = system.OutputManager(False, False) output_manager.log_update(' ') output_manager.log_update(f' SUBJECT: {subject_name}') model_names = [] for workflows in model_routine.values(): for workflow in workflows: model_names.append(workflow.target_model.model_identifier) performance_observer = PerformanceObserver(subject_name, ', '.join(model_names)) subject_peak_performance = None if benchmark: performance_observer.on() output_manager.spinner_update(f'[{subject_index + 1}/{number_of_subjects}] Setting up directory structure for {subject_name}...') output_manager.log_update(' ') output_manager.log_update(f' SETTING UP MOOSE-Z DIRECTORY:') output_manager.log_update(' ') moose_dir, segmentations_dir, stats_dir = file_utilities.moose_folder_structure(subject) output_manager.log_update(f" MOOSE directory for subject {subject_name} at: {moose_dir}") # RUN PREDICTION start_time = time.time() output_manager.log_update(' ') output_manager.log_update(' RUNNING PREDICTION:') output_manager.log_update(' ') performance_observer.record_phase("Loading Image") file_path = file_utilities.get_files(subject, 'CT_', ('.nii', '.nii.gz'))[0] image = image_processing.standardize_image(file_path, output_manager, moose_dir) file_name = file_utilities.get_nifti_file_stem(file_path) pet_file = file_utilities.find_pet_file(subject) performance_observer.metadata_image_size = image.GetSize() performance_observer.time_phase() for desired_spacing, model_workflows in model_routine.items(): performance_observer.record_phase(f"Resampling Image: {'x'.join(map(str,desired_spacing))}") resampling_time_start = time.time() resampled_array = image_processing.ImageResampler.resample_image_SimpleITK_DASK_array(image, 'bspline', desired_spacing) output_manager.log_update(f' - Resampling at {"x".join(map(str,desired_spacing))} took: {round((time.time() - resampling_time_start), 2)}s') performance_observer.time_phase() for model_workflow in model_workflows: performance_observer.record_phase(f"Predicting: {model_workflow.target_model}") # ---------------------------------- # RUN MODEL WORKFLOW # ---------------------------------- model_time_start = time.time() output_manager.spinner_update(f'[{subject_index + 1}/{number_of_subjects}] Running prediction for {subject_name} using {model_workflow[0]}...') output_manager.log_update(f' - Model {model_workflow.target_model}') segmentation_array = predict.predict_from_array_by_iterator(resampled_array, model_workflow[0], accelerator, output_manager.nnunet_log_filename) if len(model_workflow) == 2: inference_fov_intensities = model_workflow[1].limit_fov["inference_fov_intensities"] if isinstance(inference_fov_intensities, int): inference_fov_intensities = [inference_fov_intensities] existing_intensities = numpy.unique(segmentation_array) if not all([intensity in existing_intensities for intensity in inference_fov_intensities]): output_manager.spinner_update(f'[{subject_index + 1}/{number_of_subjects}] Organ to crop from not in initial FOV...') output_manager.log_update(" - Organ to crop from not in initial FOV.") performance_observer.time_phase() continue segmentation_array, desired_spacing = predict.cropped_fov_prediction_pipeline(image, segmentation_array, model_workflow, accelerator, output_manager.nnunet_log_filename) segmentation = SimpleITK.GetImageFromArray(segmentation_array) segmentation.SetSpacing(desired_spacing) segmentation.SetOrigin(image.GetOrigin()) segmentation.SetDirection(image.GetDirection()) resampled_segmentation = image_processing.ImageResampler.resample_segmentation(image, segmentation) segmentation_image_path = os.path.join(segmentations_dir, f"{model_workflow.target_model.multilabel_prefix}segmentation_{file_name}.nii.gz") output_manager.log_update(f' - Writing segmentation for {model_workflow.target_model}') SimpleITK.WriteImage(resampled_segmentation, segmentation_image_path) output_manager.log_update(f" - Prediction complete for {model_workflow.target_model} within {round((time.time() - model_time_start)/ 60, 1)} min.") # ---------------------------------- # EXTRACT VOLUME STATISTICS # ---------------------------------- output_manager.spinner_update(f'[{subject_index + 1}/{number_of_subjects}] Extracting CT volume statistics for {subject_name} ({model_workflow.target_model})...') output_manager.log_update(f' - Extracting volume statistics for {model_workflow.target_model}') out_csv = os.path.join(stats_dir, model_workflow.target_model.multilabel_prefix + subject_name + '_ct_volume.csv') image_processing.get_shape_statistics(resampled_segmentation, model_workflow.target_model, out_csv) output_manager.spinner_update(f'{constants.ANSI_GREEN} [{subject_index + 1}/{number_of_subjects}] CT volume extracted for {subject_name}! {constants.ANSI_RESET}') time.sleep(1) # ---------------------------------- # EXTRACT PET ACTIVITY # ---------------------------------- if pet_file is not None: pet_image = SimpleITK.ReadImage(pet_file) output_manager.spinner_update(f'[{subject_index + 1}/{number_of_subjects}] Extracting PET activity for {subject_name} ({model_workflow.target_model})...') output_manager.log_update(f' - Extracting PET statistics for {model_workflow.target_model}') resampled_multilabel_image = image_processing.ImageResampler.reslice_identity(pet_image, resampled_segmentation, is_label_image=True) out_csv = os.path.join(stats_dir, model_workflow.target_model.multilabel_prefix + subject_name + '_pet_activity.csv') image_processing.get_intensity_statistics(pet_image, resampled_multilabel_image, model_workflow.target_model, out_csv) output_manager.spinner_update(f'{constants.ANSI_GREEN} [{subject_index + 1}/{number_of_subjects}] PET activity extracted for {subject_name}! {constants.ANSI_RESET}') time.sleep(1) performance_observer.time_phase() end_time = time.time() elapsed_time = end_time - start_time output_manager.spinner_update(f' {constants.ANSI_GREEN}[{subject_index + 1}/{number_of_subjects}] Prediction done for {subject_name} using {len(model_names)} models: ' f' | Elapsed time: {round(elapsed_time / 60, 1)} min{constants.ANSI_RESET}') time.sleep(1) output_manager.log_update(f' Prediction done for {subject_name} using {len(model_names)} models: {", ".join(model_names)}!' f' | Elapsed time: {round(elapsed_time / 60, 1)} min') performance_observer.record_phase("Total Processing Done") if benchmark: performance_observer.off() performance_observer.plot_performance(stats_dir) subject_peak_performance = performance_observer.get_peak_resources() return subject_peak_performance
if __name__ == '__main__': main()