diff --git a/src/Drivers/hiopbbpy/BODriverXfoil.py b/src/Drivers/hiopbbpy/BODriverXfoil.py new file mode 100644 index 000000000..8e20dcd95 --- /dev/null +++ b/src/Drivers/hiopbbpy/BODriverXfoil.py @@ -0,0 +1,163 @@ +import logging +import os +import tempfile +from pathlib import Path +import numpy as np +import sys + +from scipy.optimize import NonlinearConstraint + +from ds4mems.airfoil import XFoilAirfoilPerformance +from xfoilProblem import xfoilProblem + +from hiopbbpy.surrogate_modeling import smtKRG +from hiopbbpy.opt import BOAlgorithm +from hiopbbpy.utils import MPIEvaluator +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ThreadPoolExecutor + +### Define problem and optimization parameters +nx = 8 # Number of design variables +use_ref = False +do_parallel = True +do_profiling = True +max_worker = 2 + + +def main(): + xlimits_small = np.array( + [ + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [0.0, 0.5], + [-0.25, 0.25], + ] + ) + + xlimits = np.array( + [ + [-10.79733932, 39.28774442], + [-22.81287711, 5.94493027], + [-23.24153557, 12.15769847], + [-5.47472985, 7.92991688], + [-8.20327119, 12.44090178], + [-1.50046976, 4.41520152], + [-0.37926066, 4.19234019], + [-1.61741561, 0.2987408], + ] + ) + # FIXME_NY --- use big box + xlimits = xlimits_small + + x_nan = np.array([[-1.2269792 , -1.40856 , 0.56076634, -1.96206043, 1.44158607, + 0.22498941, -0.32848405, 0.04818669], + [-1.42737423, -1.40397264, 0.36791669, -2.11149784, 1.54822502, + 0.07623255, -0.39513648, -0.05350951]], dtype=float) + + x_penalty = np.array([[-1.49019102, -1.37671764, 0.65196615, -2.38465265, 1.39195187, + 0.16935736, -0.40938738, -0.05497496], + [-1.10599338, -1.10345512, 0.49594816, -1.93446173, 1.38383001, + 0.55640812, -0.41227115, 0.00824948]], dtype=float) + + x_opt = np.array([ + [-1.24028704, -1.31949567, 0.53528491, -2.16370978, 1.37588402, 0.32447433, -0.16926065, -0.20102103] + ], dtype=float) + + x_opt_2 = np.array([-1.46972556, -1.07491497, 0.32973962, -2.20769501, 1.26854236, + 0.3562351 , -0.397001 , -0.32209042]) + + x_ref = x_opt[0] + + if use_ref: + # --- dimension checks --- + if x_ref is None: + raise ValueError("x_ref must not be None when use_ref=True") + + if x_ref.shape != (nx,): + raise ValueError(f"x_ref must have shape ({nx},), got {x_ref.shape}") + + # --- build new bounds --- + delta = 0.1 * np.abs(x_ref) + var_lb = x_ref - delta + var_ub = x_ref + delta + + # replace xlimits + xlimits = np.column_stack((var_lb, var_ub)) + xlimits_small = None + print("built a box from the given reference point") + + + ### Mac + problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/Users/chiang7/project/2025/scidac/other_lab/ornl/xfoil/bin/xfoil") + + ### LC + #problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/p/lustre1/chiang7/hiopbbpy/xfoil/bin/xfoil") + + print("Problem name: ", problem.name) + + + ### BO parameters + n_samples = 4 # Number of initial design points + theta = 1.e-2 # Hyperparameter for the Kriging (GP) model + acq_type = "EI" # Acquisition function: "EI" or "LCB" + + print("Acquisition type: ", acq_type) + + opt_solver = 'IPOPT' #"SLSQP" "IPOPT" "trust-constr" + if opt_solver == "SLSQP" or opt_solver == "trust-constr": + solver_options = {"maxiter": 100} #for scipy solvers + elif opt_solver == "IPOPT": + solver_options = {"max_iter": 200, "print_level": 1, "tol": 1e-4} + + options = { + 'acquisition_type': acq_type, + 'log_level': 'info', + 'bo_maxiter': 2, + 'opt_solver': opt_solver, + 'batch_size': 1, + 'n_start': 3, + 'solver_options': solver_options, + } + + ### initial training set + x_train = problem.sample(n_samples) + + + if do_parallel: + # ----- evaluator + obj_evaluator = MPIEvaluator( + executor=ProcessPoolExecutor(max_workers=max_worker), + profiling=do_profiling, + task_name="PARALLEL_OBJ_EVAL", + use_run_dir=True, + ) + opt_evaluator = MPIEvaluator( + function_mode=False, + executor=ProcessPoolExecutor(max_workers=max_worker), + profiling=do_profiling, + task_name="PARALLEL_IPOPT_START", + use_run_dir=False, + ) + y_train = obj_evaluator.run(problem.evaluate, x_train) + options['obj_evaluator'] = obj_evaluator + options['opt_evaluator'] = opt_evaluator + else: + y_train = problem.evaluate(x_train) + + ### Define the GP surrogate model + gp_model = smtKRG(theta, xlimits, nx) + gp_model.train(x_train, y_train) + + # Instantiate and run Bayesian Optimization + bo = BOAlgorithm(problem, gp_model, x_train, y_train, options = options) #EI or LCB + bo.optimize() + + problem.obj_func(bo.x_opt, run_dir="final_opt") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/Drivers/hiopbbpy/EvaluationManagerCI.py b/src/Drivers/hiopbbpy/EvaluationManagerCI.py index 2659a1728..e23673a72 100644 --- a/src/Drivers/hiopbbpy/EvaluationManagerCI.py +++ b/src/Drivers/hiopbbpy/EvaluationManagerCI.py @@ -12,7 +12,7 @@ import os import socket import threading -from hiopbbpy.utils import EvaluationManager, is_running_with_mpi +from hiopbbpy.utils import EvaluationManager from concurrent.futures import ThreadPoolExecutor def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): @@ -56,22 +56,13 @@ def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): ) args = parser.parse_args() - # Choose executor type - if is_running_with_mpi(): - from mpi4py import MPI - driver_rank = MPI.COMM_WORLD.Get_rank() - executor_type = "mpi" - else: - driver_rank = 0 - executor_type = "cpu" - # Set up logging logging.basicConfig(level=logging.INFO) # Create manager cpu_executor = ThreadPoolExecutor() manager = EvaluationManager( - cpu_executor=cpu_executor, + executor=cpu_executor, profiling=args.profile, task_name="CI_TASK" ) @@ -81,10 +72,8 @@ def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): manager.submit_tasks( _fn_for_test, [i for i in range(args.n)], - execute_at=executor_type, sleep_time=args.sleep_time, slow_first=args.slow_first, - driver_rank=driver_rank, ) # Do some other work while tasks are running diff --git a/src/Drivers/hiopbbpy/xfoilProblem.py b/src/Drivers/hiopbbpy/xfoilProblem.py new file mode 100644 index 000000000..96697c900 --- /dev/null +++ b/src/Drivers/hiopbbpy/xfoilProblem.py @@ -0,0 +1,283 @@ +import numpy as np +import sys +from pathlib import Path + +import random + +# Import the base optimization problem class. +from hiopbbpy.problems import Problem + +# Import the ipopt problem class. +from cyipopt import Problem as cyProblem + +from scipy.optimize import NonlinearConstraint + +from ds4mems.airfoil import XFoilAirfoilPerformance + +class XFoilSampler: + def __init__(self, n, var_bounds, tighter_bounds=None, ref_x=None, rng=None, use_ref=False): + self.n = n + + var_bounds = np.asarray(var_bounds, dtype=float) + + if var_bounds.shape != (self.n, 2): + raise ValueError(f"var_bounds must have shape ({self.n}, 2), got {var_bounds.shape}") + + self.var_lb = var_bounds[:, 0] + self.var_ub = var_bounds[:, 1] + self.tighter_lb = None + self.tighter_ub = None + + self.ref_x = None if ref_x is None else np.asarray(ref_x, dtype=float).ravel() + if self.ref_x is not None and self.ref_x.shape != (self.n,): + raise ValueError(f"ref_x must have shape ({self.n},), got {self.ref_x.shape}") + + if tighter_bounds is not None: + #print(f"use small box") + tighter_bounds = np.asarray(tighter_bounds, dtype=float) + if tighter_bounds.shape != (self.n, 2): + raise ValueError(f"tighter_bounds must have shape ({self.n}, 2), got {tighter_bounds.shape}") + self.tighter_lb = tighter_bounds[:, 0] + self.tighter_ub = tighter_bounds[:, 1] + + self.rng = np.random.default_rng() if rng is None else rng + + def rejection_sampling(self, n_samples, is_valid=None, max_tries=10000): + n_samples = int(n_samples) + print(f"Do rejection sampling! Find {n_samples} samples.") + + samples = [] + tries = 0 + + while len(samples) < n_samples: + if tries > max_tries: + raise RuntimeError("Could not generate enough valid samples.") + tries += 1 + + # --- Generate one candidate --- + if self.tighter_lb is None: + x = self.rng.uniform(self.var_lb, self.var_ub) + else: + if self.rng.random() < 0.5: + print("sample from small box: ") + x = self.rng.uniform(self.tighter_lb, self.tighter_ub) + else: + print("sample from big box: ") + x = self.rng.uniform(self.var_lb, self.var_ub) + + # --- Check validity --- + if is_valid is None: + samples.append(x) + else: + try: + if is_valid(x,run_dir=f"temp_sample_{len(samples)}"): + samples.append(x) + print("") + else: + print("---reject!") + except Exception: + pass # reject if evaluation crashes + + return np.array(samples) + + + def random(self, n_samples): + n_samples = int(n_samples) + + # No small box --> sample everything from big box + if self.tighter_lb is None: + return self.rng.uniform( + self.var_lb, self.var_ub, size=(n_samples, self.n) + ) + + # Split samples + n_small = n_samples // 2 + n_full = n_samples - n_small + + samples = np.empty((n_samples, self.n)) + + # Sample from smaller box + print(f"generate {n_small} samples from small box") + samples[:n_small] = self.rng.uniform( + self.tighter_lb, self.tighter_ub, size=(n_small, self.n) + ) + + # Sample from full box + print(f"generate {n_full} samples from big box") + samples[n_small:] = self.rng.uniform( + self.var_lb, self.var_ub, size=(n_full, self.n) + ) + + # Optional: shuffle so small-box samples aren’t grouped? + #self.rng.shuffle(samples, axis=0) + + return samples + +# ------------------------------------------------------------------- +# Air Foil Optimization Problem Definition +# ------------------------------------------------------------------- + +class xfoilProblem(Problem): + def __init__(self, ndim, xlimits, constraints=[], + tighter_bounds=None, ref_x=None, use_ref=False, + xfoil_path=None, + n_points=201, mach=0.2, reynolds_list=(1e6, 5e6), + penalty_weight=1e5, penalty_power=2, constr_eps=None): + """ + Initializes the wind farm layout optimization problem using FLORIS. + + Parameters: + ndim (int): Total number of decision variables + xlimits: Limits for the decision variables. + constraints (list): List of constraint definitions (optional). + """ + name = 'xFoil' + super().__init__(ndim, xlimits, name=name, constraints=constraints) + + # Derive the number of variables. + self.n = int(ndim) + self._eval_cache = {} # key -> (f, c) or (f, None) + self._cache_ndigits = 12 + # include config tag so cache is invalidated if settings change + self._cache_tag = ("mach", mach, "Re", tuple(reynolds_list), "npts", n_points) + self.initial_samples = True + if xfoil_path is None: + raise RuntimeError("Set a valid path to xfoil binary.") + + self.airfoil_perf = XFoilAirfoilPerformance(xfoil_path) + + # Define the objective function + self.base_obj_func = self.airfoil_perf_obj + self.constr_func = self.airfoil_perf.constr + + self.penalty_weight = float(penalty_weight) + self.penalty_power = int(penalty_power) + self.constr_eps = np.finfo(np.float64).eps if constr_eps is None else float(constr_eps) + + self.obj_func = self._penalized_obj # for unconstrained prob, set this to base_obj_func + + self.constraints = {} + self.tighter_bounds = tighter_bounds + self.sampler = XFoilSampler(n=self.n, var_bounds=xlimits, tighter_bounds=self.tighter_bounds, ref_x=ref_x, use_ref=use_ref) + + def airfoil_perf_obj(self, X, run_dir=None): + if run_dir is None: + raise ValueError("run_dir must be provided for XFoil evaluations") + aoa_for_xfoil = (0.0, 20.0, 1.0) # (start, stop, step) + re_values = np.linspace(1e6, 5e6, num=5) + mach = 0.2 + + #print(".", end="", flush=True) # Progress indicator + + F = [] + for re in re_values: + F.append(self.airfoil_perf(X, aoa=aoa_for_xfoil, re=re, mach=mach,run_dir=run_dir)) + + return -np.mean(np.concat(F)) # Return negative mean performance for minimization + + + def _cache_key(self, x: np.ndarray): + x = np.asarray(x, dtype=float).ravel() + return (self._cache_tag, tuple(np.round(x, self._cache_ndigits))) + + def eval_cached(self, x: np.ndarray, run_dir=None): + if run_dir is None: + raise ValueError("run_dir must be provided for XFoil evaluations") + k = self._cache_key(x) + hit = self._eval_cache.get(k, None) + if hit is not None: + return hit # (f, c) + + c = np.asarray(self.constr_func(x), dtype=float).ravel() + if c > 0.0: + f = float(self.base_obj_func(x, run_dir=run_dir)) + else: + f = np.inf + print(f"evaluation: f={f}; c={c}") + self._eval_cache[k] = (f, c) + return f, c + + def _penalized_obj(self, x: np.ndarray, run_dir=None) -> float: + if run_dir is None: + raise ValueError("run_dir must be provided for XFoil evaluations") + #print("use penalty func!") + + f, con_arr = self.eval_cached(x,run_dir=run_dir) + + if not np.isfinite(f): + print(f"Warning: base_obj_func returned {f} at x = {x}, replacing with 1e3") + f = 1e3 + + con_arr = np.atleast_1d(np.asarray(con_arr, dtype=float)) + if con_arr.shape != (1,): + raise ValueError( + f"xfoilProblem expects exactly 1 constraint, got shape {con_arr.shape}" + ) + + con_f = float(con_arr[0]) + # print(f"con_f = {con_f}") + con_violation = max(0.0, self.constr_eps - con_f) # scalar float + + # # adapting penalty multiplier (scalar) + # v = con_violation + # if v < 1e-4: + # penalty_mult = 0.0 + # elif v < 1e-2: + # penalty_mult = 10.0 + # elif v < 1e-1: + # penalty_mult = 100.0 + # elif v < 1.0: + # penalty_mult = 500.0 + # else: + # penalty_mult = 1000.0 + + # penalty = penalty_mult * (con_violation ** self.penalty_power) # scalar + # print(f". obj = base obj + penalty: {f} + {penalty}", flush=True) + # retval = f + penalty + + ### fixme: + #when con is large, skip evaluating f, just retrun big penalty term + v = con_violation + retval = 0.0 + if v < 1e-8: + retval = f + elif v < 1e-4: + retval = -25 + elif v < 1: + retval = -50 + + print(f". penalty obj = {retval}; con_violation = {v}", flush=True) + + + return retval + + + def _evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: + """ + Evaluates the objective function + + Parameters: + x (ndarray): An array + + Returns: + ndarray: The objective values + """ + y = [float(self.obj_func(xi,**kwargs)) for xi in x] # shape (k,) + return np.asarray(y, dtype=float).reshape(-1, 1) + + def sample(self, nsample: int) -> np.ndarray: + def is_valid(x, run_dir=None): + f, con_arr = self.eval_cached(x,run_dir=run_dir) + + con_arr = np.atleast_1d(np.asarray(con_arr, dtype=float)) + if con_arr.shape != (1,): + raise ValueError( + f"xfoilProblem expects exactly 1 constraint, got shape {con_arr.shape}" + ) + + con_f = float(con_arr[0]) + + #isvalid = np.isfinite(f) # an alt way + isvalid = (con_f > 0.0) and np.isfinite(f) + return isvalid + return self.sampler.rejection_sampling(nsample, is_valid=is_valid) \ No newline at end of file diff --git a/src/hiopbbpy/problems/problem.py b/src/hiopbbpy/problems/problem.py index d1a76ed0b..0558614bf 100644 --- a/src/hiopbbpy/problems/problem.py +++ b/src/hiopbbpy/problems/problem.py @@ -28,7 +28,7 @@ def __init__(self, ndim, xlimits, name=" ", constraints=[]): # each dict = one scalar constraint self.n_con = len(constraints) - def _evaluate(self, x: np.ndarray) -> np.ndarray: + def _evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: """ problem evaluation y = f(x) of a scalar valued function f @@ -44,7 +44,7 @@ def _evaluate(self, x: np.ndarray) -> np.ndarray: """ raise NotImplementedError("Child class of hiopProblem should implement method _evaluate") - def evaluate(self, x: np.ndarray) -> np.ndarray: + def evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: """ problem callback y = f(x) of the scalar valued function f @@ -59,7 +59,7 @@ def evaluate(self, x: np.ndarray) -> np.ndarray: Function values (cast to reals) """ y = np.ndarray((x.shape[0], 1)) - y[:,:] = self._evaluate(x) + y[:,:] = self._evaluate(x, **kwargs) return y def con_evaluate(self, x: np.ndarray) -> np.ndarray: diff --git a/src/hiopbbpy/utils/__init__.py b/src/hiopbbpy/utils/__init__.py index 5b514f0b9..29d8dfc47 100644 --- a/src/hiopbbpy/utils/__init__.py +++ b/src/hiopbbpy/utils/__init__.py @@ -1,4 +1,4 @@ -from .evaluation_manager import (EvaluationManager, is_running_with_mpi) +from .evaluation_manager import (EvaluationManager) from .util import Evaluator, MPIEvaluator __all__ = [ diff --git a/src/hiopbbpy/utils/evaluation_manager.py b/src/hiopbbpy/utils/evaluation_manager.py index edcd73981..5080c08ba 100644 --- a/src/hiopbbpy/utils/evaluation_manager.py +++ b/src/hiopbbpy/utils/evaluation_manager.py @@ -8,38 +8,17 @@ import threading import logging -import copy +from concurrent.futures import CancelledError, wait +from collections import deque import os import time import math -from concurrent.futures import ProcessPoolExecutor, CancelledError -from collections import deque - - -def is_running_with_mpi(): - """Returns True if the code is running in an MPI environment.""" - _MPI_RANK_ENV_VARS = [ - "OMPI_COMM_WORLD_RANK", # Open MPI - "PMI_RANK", # MPICH, Intel MPI, Cray MPI - "MPI_RANK", # Intel MPI (sometimes) - "MV2_COMM_WORLD_RANK", # MVAPICH - ] - return any(var in os.environ for var in _MPI_RANK_ENV_VARS) - - -# Loads MPIPoolExecutor if MPI is available -if is_running_with_mpi(): - from mpi4py.futures import MPIPoolExecutor, wait - _EVALUATION_MANAGER_USES_MPI4PY = True -else: - _EVALUATION_MANAGER_USES_MPI4PY = False - from concurrent.futures import wait - +import copy -def _timed_call(fn, x, kwargs): - """Run fn(x, **kwargs) and record worker-side timing.""" +def _timed_call(fn, args, kwargs): + """Run fn(*args, **kwargs) and record worker-side timing.""" start_time = time.perf_counter() - fx = fn(x, **kwargs) + fx = fn(*args, **kwargs) done_time = time.perf_counter() return { "result": fx, @@ -69,50 +48,73 @@ def _summary_stats(values): "max": max(values), } - class EvaluationManager: - """Class that manages the evaluation of functions using multiple executors.""" - - def __init__( - self, - cpu_executor=None, - mpi_executor=None, - profiling=False, - task_name="TASK") -> None: + """Manage asynchronous function evaluations over one or more executors. + + The manager is executor-agnostic: each configured executor only needs a + ``submit(fn, *args, **kwargs)`` method that returns a Future-like object + exposing ``done()`` and ``result()``. + + Tasks are submitted asynchronously via :meth:`submit_tasks`. Completed + results are collected lazily by :meth:`retrieve_results` and eagerly by + :meth:`sync` (which blocks until the running queue is empty). + + Parameters + ---------- + executor: + Either a single executor instance or a ``dict[str, executor]`` mapping. + When a single executor is provided, it is stored under key ``"0"``. + profiling: + If True, wrap calls with worker-side timing. + task_name: + Label used in logging and profiling output. + """ + + def __init__(self, executor, profiling=False, task_name="TASK") -> None: self._queue = deque([]) + self._completed_X = deque([]) + self._completed_F = deque([]) self._queue_lock = threading.Lock() + + if isinstance(executor, dict): + self.executors = executor + else: + self.executors = {"0": executor} + self.logger = logging.getLogger(self.__class__.__name__) + self.task_name = task_name self.profiling = profiling self._first_submit_time = None - self.task_name = task_name - - self.executors = { - "cpu": ProcessPoolExecutor() if cpu_executor is None else cpu_executor - } - if _EVALUATION_MANAGER_USES_MPI4PY: - self.executors["mpi"] = ( - MPIPoolExecutor() if mpi_executor is None else mpi_executor - ) - elif mpi_executor is not None: - self.executors["mpi"] = mpi_executor - self.logger.info("EvaluationManager initialized with executors:") + self.logger.info(f"{self.task_name} EvaluationManager initialized with executors:") for key, executor in self.executors.items(): self.logger.info(f" - {key}: {executor}") def __del__(self) -> None: + """Shutdown managed executors during object destruction.""" for executor in self.executors.values(): - executor.shutdown(wait=False) - self.logger.info(f"{self.task_name} EvaluationManager destroyed and executors shut down.") + try: + executor.shutdown(wait=False) + self.logger.info(f"{self.task_name} EvaluationManager destroyed and executors shut down.") + except Exception as e: + self.logger.warning(f"{self.task_name} Error shutting down executor: {e}") def _get_num_workers(self): """Return number of workers.""" - if "mpi" in self.executors and is_running_with_mpi(): - return int(os.environ.get("MPI4PY_FUTURES_MAX_WORKERS", 1)) - try: - return self.executors["cpu"]._max_workers - except AttributeError: - return 1 + if "mpi" in self.executors: + try: + return int(os.environ.get("MPI4PY_FUTURES_MAX_WORKERS", 1)) + except Exception: + pass + + # For standard executors like ThreadPoolExecutor / ProcessPoolExecutor + for ex in self.executors.values(): + try: + return ex._max_workers + except AttributeError: + continue + + return 1 def set_task_name(self, task_name): self.task_name = task_name @@ -132,36 +134,72 @@ def _print_timing_stats(self, label, values): ) def sync(self) -> None: - """Wait for all submitted tasks to complete.""" - future_objs = [queue_obj["future"] for queue_obj in self._queue] - wait(future_objs) + """Block until all queued tasks finish. + + This method repeatedly waits on the currently queued futures and then + harvests completed items into the internal completion buffers. Harvested + results can be consumed using :meth:`retrieve_results`. + """ + while True: + with self._queue_lock: + futures = [queue_obj[1] for queue_obj in self._queue] + if len(futures) == 0: + break + + wait(futures) + + with self._queue_lock: + self._harvest_completed_locked() + + def submit_tasks(self, fn, X, execute_at=None, **kwargs) -> None: + """Submit tasks to the specified executor. + + Parameters + ---------- + fn: + The function to be executed. + X: + Sequence of input data for the function. If an element is a tuple, + it is expanded as positional arguments (``fn(*x, **kwargs)``); + otherwise it is passed as a single argument (``fn(x, **kwargs)``). + execute_at: + Executor key to use for task submission. If ``None``, the first key + in ``executors`` is used. The key lookup is case-insensitive. + kwargs: + Additional keyword arguments passed to the function. + """ + + if execute_at is None: + execute_at = next(iter(self.executors)) - def submit_tasks(self, fn, X, execute_at="cpu", **kwargs) -> None: - """Submits tasks to the specified executor.""" key = execute_at.lower() + if key not in self.executors: + raise KeyError(f"Executor '{execute_at}' not found. Available: {list(self.executors.keys())}") + with self._queue_lock: for x in X: submit_time = time.perf_counter() if self._first_submit_time is None: self._first_submit_time = submit_time + args = x if isinstance(x, tuple) else (x,) + if self.profiling: - future_obj = self.executors[key].submit(_timed_call, fn, x, kwargs) + future_obj = self.executors[key].submit(_timed_call, fn, args, kwargs) else: - future_obj = self.executors[key].submit(fn, x, **kwargs) + future_obj = self.executors[key].submit(fn, *args, **kwargs) - self._queue.append({ - "x": copy.deepcopy(x), - "future": future_obj, - "submit_time": submit_time, - }) + self._queue.append([x, future_obj, key, submit_time]) self.logger.info(f"{self.task_name} Submitted f({x})") def retrieve_results(self) -> tuple[list, list]: - """Retrieves the results of completed tasks.""" - X = deque([]) - F = deque([]) - + """Retrieves the results of completed tasks. + Returns + ------- + tuple[list, list] + Inputs and corresponding results for completed tasks. If a task + failed or was cancelled, its result entry is ``None``. + """ execution_times = [] wait_times = [] turnaround_times = [] @@ -170,46 +208,16 @@ def retrieve_results(self) -> tuple[list, list]: batch_done_time = time.perf_counter() with self._queue_lock: - new_queue = deque([]) - - for item in self._queue: - x = item["x"] - future = item["future"] - submit_time = item["submit_time"] - - if future.done(): - try: - fx = future.result() - except CancelledError: - self.logger.warning(f"{self.task_name} The execution of x={x} was cancelled.") - continue - - if self.profiling: - # These are fine for local inspection, but note: - # worker_start_time / worker_done_time are on worker clocks. - worker_start_time = fx["start_time"] - worker_done_time = fx["done_time"] - - execution_time = fx["execution_time"] - - # These are not robust across different node clocks, but kept here - # because you already had them. - wait_time = worker_start_time - submit_time - turnaround_time = worker_done_time - submit_time - - execution_times.append(execution_time) - wait_times.append(wait_time) - turnaround_times.append(turnaround_time) - - fx = fx["result"] - - X.append(x) - F.append(fx) - self.logger.info(f"{self.task_name} Completed: f({x}) = {fx}") - else: - new_queue.append(item) + self._harvest_completed_locked( + execution_times=execution_times, + wait_times=wait_times, + turnaround_times=turnaround_times, + ) - self._queue = new_queue + X = list(self._completed_X) + F = list(self._completed_F) + self._completed_X.clear() + self._completed_F.clear() if self.profiling and execution_times: self._print_timing_stats(f"{self.task_name} Execution times", execution_times) @@ -236,5 +244,73 @@ def retrieve_results(self) -> tuple[list, list]: print(f"{self.task_name} Ideal walltime in seconds (perfect balance): {ideal_walltime:.6e}") print(f"{self.task_name} Actual walltime in seconds (observed): {actual_walltime:.6e}") - return list(X), list(F) + self._first_submit_time = None + return X, F + + def _harvest_completed_locked( + self, + execution_times=None, + wait_times=None, + turnaround_times=None, + ) -> None: + """Move completed task results from running queue into completion buffers. + + This method assumes the caller holds ``_queue_lock``. + """ + new_queue = deque([]) + + for item in self._queue: + x = item[0] + future = item[1] + submit_time = item[3] + if future.done(): + self._completed_X.append(x) + self._completed_F.append(None) + + try: + fx = future.result() + + if self.profiling: + worker_start_time = fx["start_time"] + worker_done_time = fx["done_time"] + execution_time = fx["execution_time"] + + # These are OK for local runs, but can be unreliable across nodes + wait_time = worker_start_time - submit_time + turnaround_time = worker_done_time - submit_time + + if execution_times is not None: + execution_times.append(execution_time) + if wait_times is not None: + wait_times.append(wait_time) + if turnaround_times is not None: + turnaround_times.append(turnaround_time) + + fx = fx["result"] + + self._completed_F[-1] = fx + self.logger.info(f"{self.task_name} Completed: f({x}) = {fx}") + + except CancelledError: + self.logger.warning(f"{self.task_name} The execution of x={x} was cancelled.") + except Exception as e: + self.logger.warning(f"{self.task_name} Task f({x}) raised an exception: {e}") + + else: + new_queue.append(item) + + self._queue = new_queue + + def print_status(self) -> None: + """Print the current status of the task queue and completion buffers.""" + with self._queue_lock: + futures = [queue_obj[1] for queue_obj in self._queue] + n_running_futures = sum(1 for f in futures if not f.done()) + n_done_futures = len(futures) - n_running_futures + self.logger.info( + f"Status: {len(self._completed_X)} harvested results, " + f"{n_running_futures} running tasks, {n_done_futures} completed tasks still in queue." + ) + + \ No newline at end of file diff --git a/src/hiopbbpy/utils/util.py b/src/hiopbbpy/utils/util.py index ee8d199d0..39b731e17 100644 --- a/src/hiopbbpy/utils/util.py +++ b/src/hiopbbpy/utils/util.py @@ -25,9 +25,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ import numpy as np -from .evaluation_manager import EvaluationManager, is_running_with_mpi +from .evaluation_manager import EvaluationManager import logging +import os +import time +import uuid +from pathlib import Path def check_required_keys(user_dict, required_keys): for key in required_keys: @@ -74,29 +78,75 @@ class MPIEvaluator(Evaluator): We reformat to [eval0, eval1, eval2,...] """ - def __init__(self, function_mode=True,cpu_executor=None, mpi_executor=None, profiling=False, task_name="MPITASK"): - self.manager = EvaluationManager(cpu_executor,mpi_executor,profiling=profiling, task_name=task_name) + def __init__(self, function_mode=True, executor=None, profiling=False, + task_name="MPITASK", run_root="./hiop_temp", use_run_dir=False): + self.manager = EvaluationManager(executor, profiling=profiling, task_name=task_name) self.function_mode = function_mode - if is_running_with_mpi(): - self.executor_type = "mpi" - else: - self.executor_type = "cpu" + self.run_root = Path(run_root) + self.run_root.mkdir(parents=True, exist_ok=True) + self.use_run_dir = use_run_dir + print(f"Create Evaluator for task: {task_name}") + def __del__(self): del self.manager + def set_task_name(self, task_name): self.manager.set_task_name(task_name) - def run(self, fun, Xin): + + def run(self, fun, Xin): nevals = Xin.shape[0] - self.manager.submit_tasks(fun, [np.atleast_2d(Xin[i]) for i in range(nevals)], execute_at=self.executor_type) + print("in Evaluator::run") + + # unique batch directory so repeated calls do not reuse temp_dir_0, temp_dir_1, ... + batch_id = f"{self.manager.task_name}_{os.getpid()}_{time.time_ns()}_{uuid.uuid4().hex[:8]}" + batch_dir = self.run_root / batch_id + batch_dir.mkdir(parents=True, exist_ok=False) + + for i in range(nevals): + xi = np.atleast_2d(Xin[i]) + + kwargs = {} + if self.use_run_dir: + run_dir = batch_dir / f"eval_{i:04d}" + run_dir.mkdir(parents=True, exist_ok=False) + kwargs["run_dir"] = str(run_dir) + + # submit (index, x) so we can restore original order later + self.manager.submit_tasks( + _run_indexed_fun, + [(fun, i, xi)], + **kwargs, + ) + print(f"Submitted task {i + 1}", flush=True) + self.manager.sync() Xout, Fout = self.manager.retrieve_results() + + # restore original order using returned indices + ordered = [None] * nevals + for out in Fout: + if out is None: + continue + idx, val = out + ordered[idx] = val + + missing = [i for i, v in enumerate(ordered) if v is None] + if missing: + raise RuntimeError(f"Missing evaluation results for indices {missing}") + if self.function_mode: - Y = np.ndarray((nevals, 1)) - Y[:,0] = np.array(Fout)[:,0,0] + Y = np.empty((nevals, 1), dtype=float) + for i, val in enumerate(ordered): + arr = np.asarray(val, dtype=float) + Y[i, 0] = float(arr.reshape(-1)[0]) else: - Y = [Fi[0] for Fi in Fout] + Y = [val[0] for val in ordered] + return Y +def _run_indexed_fun(fun, idx, x, **kwargs): + return idx, fun(x, **kwargs) + class Logger: """ A simple wrapper for Python's logging module that sets up a reusable logger.