Source code for iparallelization_solver

"""
The `iparallelization_solver` interface is designed for parallel execution of an input function.

This module defines the `IParallelSolver` class, which orchestrates the parallel execution of a
general processing function on a data set consisting of multiple independent data subsets, here reffered to as set IDs.
It also declares a logging method (intended to start a separate logging thread).

Author:
-------
Momcilo Tosic
Astroinformatics student
Faculty of Mathematics, University of Belgrade
"""

from multiprocessing import Process
from multiprocessing import Queue

# Default number of processes to spawn
DEFAULT_NUM_WORKERS = 4

[docs] class IParallelSolver(): """ A class to manage parallel execution of data processing functions. Attributes: num_workers (int): Number of worker processes to spawn. """ def __init__(self, num_workers = DEFAULT_NUM_WORKERS, ): """Initialize the ParallelSolver with the specified configuration.""" self.num_workers = num_workers
[docs] def process_wrapper(self): """ Wrapper for the process function to integrate logging and result handling. """ # Event used for stopping background log thread stopper_event = None # Go through unprocessed sets while not self.set_ids_.empty(): # Safely pop from queue try: set_id = self.set_ids_.get() except Exception as e: break # If a throw happens before setting result res_string = "" try: # Maybe start logging self.maybe_begin_logging(set_id) # Call main processing function result = self.get_process_function_result(set_id) # Get results into formatted string res_string = self.aggregate_process_function_result(result) # Put results in unified results queue if flag is set if self.save_all_results_: self.results_.put(res_string) except Exception as e: print('Error processing/saving data : ' + str(e) + '\n') finally: try: # Maybe stop logging self.maybe_stop_logging() # Maybe save local results self.maybe_save_local_results(set_id, res_string) except Exception as e: print('Error stopping logs : ' + str(e))
[docs] def process_ids(self, set_ids, results_file = None): """ Processes a list of set IDs using the configured process function in parallel. Parameters: set_ids (list of str): List of set IDs to process. results_file (str, optional): Path to save aggregated results. """ # Unified output queue and input queue self.results_ = Queue() self.set_ids_ = Queue() # Set flag to save all results from unified queue if results_file is not None: self.save_all_results_ = True else: self.save_all_results_ = False # Fill input queue for id in set_ids: self.set_ids_.put(id) # Generate and start processes processes = [Process(target = self.process_wrapper) for i in range(self.num_workers)] for p in processes: p.start() for p in processes: p.join() # Save results to unified results file self.maybe_save_results(results_file)
[docs] def aggregate_process_function_result(self, result): pass
[docs] def get_process_function_result(self, set_id): pass
[docs] def maybe_begin_logging(self, set_id): pass
[docs] def maybe_stop_logging(self): pass
[docs] def maybe_save_local_results(self, set_id, res_string): pass
[docs] def maybe_save_results(self, results_file): pass