From f7e98f717b2476ae5990e7fbef125fcd9d6a87c9 Mon Sep 17 00:00:00 2001 From: nychiang Date: Fri, 10 Apr 2026 13:26:39 -0700 Subject: [PATCH 1/5] create branch --- src/Drivers/hiopbbpy/EvaluationManagerCI.py | 13 +- src/hiopbbpy/utils/evaluation_manager.py | 295 ++++++++++++-------- src/hiopbbpy/utils/util.py | 12 +- 3 files changed, 191 insertions(+), 129 deletions(-) diff --git a/src/Drivers/hiopbbpy/EvaluationManagerCI.py b/src/Drivers/hiopbbpy/EvaluationManagerCI.py index 2659a1728..23ff0f8b8 100644 --- a/src/Drivers/hiopbbpy/EvaluationManagerCI.py +++ b/src/Drivers/hiopbbpy/EvaluationManagerCI.py @@ -56,22 +56,13 @@ def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): ) args = parser.parse_args() - # Choose executor type - if is_running_with_mpi(): - from mpi4py import MPI - driver_rank = MPI.COMM_WORLD.Get_rank() - executor_type = "mpi" - else: - driver_rank = 0 - executor_type = "cpu" - # Set up logging logging.basicConfig(level=logging.INFO) # Create manager cpu_executor = ThreadPoolExecutor() manager = EvaluationManager( - cpu_executor=cpu_executor, + executor=cpu_executor, profiling=args.profile, task_name="CI_TASK" ) @@ -81,10 +72,8 @@ def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): manager.submit_tasks( _fn_for_test, [i for i in range(args.n)], - execute_at=executor_type, sleep_time=args.sleep_time, slow_first=args.slow_first, - driver_rank=driver_rank, ) # Do some other work while tasks are running diff --git a/src/hiopbbpy/utils/evaluation_manager.py b/src/hiopbbpy/utils/evaluation_manager.py index edcd73981..7ca4e4ee9 100644 --- a/src/hiopbbpy/utils/evaluation_manager.py +++ b/src/hiopbbpy/utils/evaluation_manager.py @@ -8,33 +8,12 @@ import threading import logging -import copy +from concurrent.futures import CancelledError, wait +from collections import deque import os import time import math -from concurrent.futures import ProcessPoolExecutor, CancelledError -from collections import deque - - -def is_running_with_mpi(): - """Returns True if the code is running in an MPI environment.""" - _MPI_RANK_ENV_VARS = [ - "OMPI_COMM_WORLD_RANK", # Open MPI - "PMI_RANK", # MPICH, Intel MPI, Cray MPI - "MPI_RANK", # Intel MPI (sometimes) - "MV2_COMM_WORLD_RANK", # MVAPICH - ] - return any(var in os.environ for var in _MPI_RANK_ENV_VARS) - - -# Loads MPIPoolExecutor if MPI is available -if is_running_with_mpi(): - from mpi4py.futures import MPIPoolExecutor, wait - _EVALUATION_MANAGER_USES_MPI4PY = True -else: - _EVALUATION_MANAGER_USES_MPI4PY = False - from concurrent.futures import wait - +import copy def _timed_call(fn, x, kwargs): """Run fn(x, **kwargs) and record worker-side timing.""" @@ -48,7 +27,6 @@ def _timed_call(fn, x, kwargs): "execution_time": done_time - start_time, } - def _summary_stats(values): """Return mean, std_dev, min, max for a sequence.""" if not values: @@ -69,50 +47,73 @@ def _summary_stats(values): "max": max(values), } - class EvaluationManager: - """Class that manages the evaluation of functions using multiple executors.""" - - def __init__( - self, - cpu_executor=None, - mpi_executor=None, - profiling=False, - task_name="TASK") -> None: + """Manage asynchronous function evaluations over one or more executors. + + The manager is executor-agnostic: each configured executor only needs a + ``submit(fn, *args, **kwargs)`` method that returns a Future-like object + exposing ``done()`` and ``result()``. + + Tasks are submitted asynchronously via :meth:`submit_tasks`. Completed + results are collected lazily by :meth:`retrieve_results` and eagerly by + :meth:`sync` (which blocks until the running queue is empty). + + Parameters + ---------- + executor: + Either a single executor instance or a ``dict[str, executor]`` mapping. + When a single executor is provided, it is stored under key ``"0"``. + profiling: + If True, wrap calls with worker-side timing. + task_name: + Label used in logging and profiling output. + """ + + def __init__(self, executor, profiling=False, task_name="TASK") -> None: self._queue = deque([]) + self._completed_X = deque([]) + self._completed_F = deque([]) self._queue_lock = threading.Lock() + + if isinstance(executor, dict): + self.executors = executor + else: + self.executors = {"0": executor} + self.logger = logging.getLogger(self.__class__.__name__) + self.task_name = task_name self.profiling = profiling self._first_submit_time = None - self.task_name = task_name - self.executors = { - "cpu": ProcessPoolExecutor() if cpu_executor is None else cpu_executor - } - if _EVALUATION_MANAGER_USES_MPI4PY: - self.executors["mpi"] = ( - MPIPoolExecutor() if mpi_executor is None else mpi_executor - ) - elif mpi_executor is not None: - self.executors["mpi"] = mpi_executor - - self.logger.info("EvaluationManager initialized with executors:") + self.logger.info(f"{self.task_name} EvaluationManager initialized with executors:") for key, executor in self.executors.items(): self.logger.info(f" - {key}: {executor}") def __del__(self) -> None: + """Shutdown managed executors during object destruction.""" for executor in self.executors.values(): - executor.shutdown(wait=False) - self.logger.info(f"{self.task_name} EvaluationManager destroyed and executors shut down.") + try: + executor.shutdown(wait=False) + self.logger.info(f"{self.task_name} EvaluationManager destroyed and executors shut down.") + except Exception as e: + self.logger.warning(f"{self.task_name} Error shutting down executor: {e}") def _get_num_workers(self): """Return number of workers.""" - if "mpi" in self.executors and is_running_with_mpi(): - return int(os.environ.get("MPI4PY_FUTURES_MAX_WORKERS", 1)) - try: - return self.executors["cpu"]._max_workers - except AttributeError: - return 1 + if "mpi" in self.executors: + try: + return int(os.environ.get("MPI4PY_FUTURES_MAX_WORKERS", 1)) + except Exception: + pass + + # For standard executors like ThreadPoolExecutor / ProcessPoolExecutor + for ex in self.executors.values(): + try: + return ex._max_workers + except AttributeError: + continue + + return 1 def set_task_name(self, task_name): self.task_name = task_name @@ -132,13 +133,50 @@ def _print_timing_stats(self, label, values): ) def sync(self) -> None: - """Wait for all submitted tasks to complete.""" - future_objs = [queue_obj["future"] for queue_obj in self._queue] - wait(future_objs) + """Block until all queued tasks finish. + + This method repeatedly waits on the currently queued futures and then + harvests completed items into the internal completion buffers. Harvested + results can be consumed using :meth:`retrieve_results`. + """ + while True: + with self._queue_lock: + futures = [queue_obj[1] for queue_obj in self._queue] + if len(futures) == 0: + break + + wait(futures) + + with self._queue_lock: + self._harvest_completed_locked() + + def submit_tasks(self, fn, X, execute_at=None, **kwargs) -> None: + """Submit tasks to the specified executor. + + Parameters + ---------- + fn: + The function to be executed. + X: + Sequence of input data for the function. If an element is a tuple, + it is expanded as positional arguments (``fn(*x, **kwargs)``); + otherwise it is passed as a single argument (``fn(x, **kwargs)``). + execute_at: + Executor key to use for task submission. If ``None``, the first key + in ``executors`` is used. The key lookup is case-insensitive. + kwargs: + Additional keyword arguments passed to the function. + """ + + if execute_at is None: + execute_at = next(iter(self.executors)) - def submit_tasks(self, fn, X, execute_at="cpu", **kwargs) -> None: - """Submits tasks to the specified executor.""" key = execute_at.lower() + if key not in self.executors: + raise KeyError( + f"Executor '{execute_at}' not found. Available: {list(self.executors.keys())}" + ) + with self._queue_lock: for x in X: submit_time = time.perf_counter() @@ -148,20 +186,22 @@ def submit_tasks(self, fn, X, execute_at="cpu", **kwargs) -> None: if self.profiling: future_obj = self.executors[key].submit(_timed_call, fn, x, kwargs) else: - future_obj = self.executors[key].submit(fn, x, **kwargs) + if isinstance(x, tuple): + future_obj = self.executors[key].submit(fn, *x, **kwargs) + else: + future_obj = self.executors[key].submit(fn, x, **kwargs) - self._queue.append({ - "x": copy.deepcopy(x), - "future": future_obj, - "submit_time": submit_time, - }) + self._queue.append([x, future_obj, key, submit_time]) self.logger.info(f"{self.task_name} Submitted f({x})") def retrieve_results(self) -> tuple[list, list]: - """Retrieves the results of completed tasks.""" - X = deque([]) - F = deque([]) - + """Retrieves the results of completed tasks. + Returns + ------- + tuple[list, list] + Inputs and corresponding results for completed tasks. If a task + failed or was cancelled, its result entry is ``None``. + """ execution_times = [] wait_times = [] turnaround_times = [] @@ -170,46 +210,16 @@ def retrieve_results(self) -> tuple[list, list]: batch_done_time = time.perf_counter() with self._queue_lock: - new_queue = deque([]) - - for item in self._queue: - x = item["x"] - future = item["future"] - submit_time = item["submit_time"] - - if future.done(): - try: - fx = future.result() - except CancelledError: - self.logger.warning(f"{self.task_name} The execution of x={x} was cancelled.") - continue - - if self.profiling: - # These are fine for local inspection, but note: - # worker_start_time / worker_done_time are on worker clocks. - worker_start_time = fx["start_time"] - worker_done_time = fx["done_time"] - - execution_time = fx["execution_time"] - - # These are not robust across different node clocks, but kept here - # because you already had them. - wait_time = worker_start_time - submit_time - turnaround_time = worker_done_time - submit_time - - execution_times.append(execution_time) - wait_times.append(wait_time) - turnaround_times.append(turnaround_time) - - fx = fx["result"] - - X.append(x) - F.append(fx) - self.logger.info(f"{self.task_name} Completed: f({x}) = {fx}") - else: - new_queue.append(item) + self._harvest_completed_locked( + execution_times=execution_times, + wait_times=wait_times, + turnaround_times=turnaround_times, + ) - self._queue = new_queue + X = list(self._completed_X) + F = list(self._completed_F) + self._completed_X.clear() + self._completed_F.clear() if self.profiling and execution_times: self._print_timing_stats(f"{self.task_name} Execution times", execution_times) @@ -236,5 +246,72 @@ def retrieve_results(self) -> tuple[list, list]: print(f"{self.task_name} Ideal walltime in seconds (perfect balance): {ideal_walltime:.6e}") print(f"{self.task_name} Actual walltime in seconds (observed): {actual_walltime:.6e}") - return list(X), list(F) + return X, F + + def _harvest_completed_locked( + self, + execution_times=None, + wait_times=None, + turnaround_times=None, + ) -> None: + """Move completed task results from running queue into completion buffers. + + This method assumes the caller holds ``_queue_lock``. + """ + new_queue = deque([]) + + for item in self._queue: + x = item[0] + future = item[1] + submit_time = item[3] + if future.done(): + self._completed_X.append(x) + self._completed_F.append(None) + + try: + fx = future.result() + + if self.profiling: + worker_start_time = fx["start_time"] + worker_done_time = fx["done_time"] + execution_time = fx["execution_time"] + + # These are OK for local runs, but can be unreliable across nodes + wait_time = worker_start_time - submit_time + turnaround_time = worker_done_time - submit_time + + if execution_times is not None: + execution_times.append(execution_time) + if wait_times is not None: + wait_times.append(wait_time) + if turnaround_times is not None: + turnaround_times.append(turnaround_time) + + fx = fx["result"] + + self._completed_F[-1] = fx + self.logger.info(f"{self.task_name} Completed: f({x}) = {fx}") + + except CancelledError: + self.logger.warning(f"{self.task_name} The execution of x={x} was cancelled.") + except Exception as e: + self.logger.warning(f"{self.task_name} Task f({x}) raised an exception: {e}") + + else: + new_queue.append(item) + + self._queue = new_queue + + def print_status(self) -> None: + """Print the current status of the task queue and completion buffers.""" + with self._queue_lock: + futures = [queue_obj[1] for queue_obj in self._queue] + n_running_futures = sum(1 for f in futures if not f.done()) + n_done_futures = len(futures) - n_running_futures + self.logger.info( + f"Status: {len(self._completed_X)} harvested results, " + f"{n_running_futures} running tasks, {n_done_futures} completed tasks still in queue." + ) + + \ No newline at end of file diff --git a/src/hiopbbpy/utils/util.py b/src/hiopbbpy/utils/util.py index ee8d199d0..ff4bd670f 100644 --- a/src/hiopbbpy/utils/util.py +++ b/src/hiopbbpy/utils/util.py @@ -25,7 +25,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ import numpy as np -from .evaluation_manager import EvaluationManager, is_running_with_mpi +from .evaluation_manager import EvaluationManager import logging @@ -74,20 +74,16 @@ class MPIEvaluator(Evaluator): We reformat to [eval0, eval1, eval2,...] """ - def __init__(self, function_mode=True,cpu_executor=None, mpi_executor=None, profiling=False, task_name="MPITASK"): - self.manager = EvaluationManager(cpu_executor,mpi_executor,profiling=profiling, task_name=task_name) + def __init__(self, function_mode=True,executor=None, profiling=False, task_name="MPITASK"): + self.manager = EvaluationManager(executor,profiling=profiling, task_name=task_name) self.function_mode = function_mode - if is_running_with_mpi(): - self.executor_type = "mpi" - else: - self.executor_type = "cpu" def __del__(self): del self.manager def set_task_name(self, task_name): self.manager.set_task_name(task_name) def run(self, fun, Xin): nevals = Xin.shape[0] - self.manager.submit_tasks(fun, [np.atleast_2d(Xin[i]) for i in range(nevals)], execute_at=self.executor_type) + self.manager.submit_tasks(fun, [np.atleast_2d(Xin[i]) for i in range(nevals)], execute_at="mpi") self.manager.sync() Xout, Fout = self.manager.retrieve_results() if self.function_mode: From e0eec2037553dd0891368323c614deb52fb9fc6b Mon Sep 17 00:00:00 2001 From: nychiang Date: Fri, 10 Apr 2026 13:50:46 -0700 Subject: [PATCH 2/5] update --- src/Drivers/hiopbbpy/EvaluationManagerCI.py | 2 +- src/hiopbbpy/utils/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Drivers/hiopbbpy/EvaluationManagerCI.py b/src/Drivers/hiopbbpy/EvaluationManagerCI.py index 23ff0f8b8..e23673a72 100644 --- a/src/Drivers/hiopbbpy/EvaluationManagerCI.py +++ b/src/Drivers/hiopbbpy/EvaluationManagerCI.py @@ -12,7 +12,7 @@ import os import socket import threading -from hiopbbpy.utils import EvaluationManager, is_running_with_mpi +from hiopbbpy.utils import EvaluationManager from concurrent.futures import ThreadPoolExecutor def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): diff --git a/src/hiopbbpy/utils/__init__.py b/src/hiopbbpy/utils/__init__.py index 5b514f0b9..29d8dfc47 100644 --- a/src/hiopbbpy/utils/__init__.py +++ b/src/hiopbbpy/utils/__init__.py @@ -1,4 +1,4 @@ -from .evaluation_manager import (EvaluationManager, is_running_with_mpi) +from .evaluation_manager import (EvaluationManager) from .util import Evaluator, MPIEvaluator __all__ = [ From ccd693076bfd9877b94d764de3f65090689e00ba Mon Sep 17 00:00:00 2001 From: nychiang Date: Tue, 14 Apr 2026 23:52:54 -0700 Subject: [PATCH 3/5] add example --- src/Drivers/hiopbbpy/BODriverXfoil.py | 153 ++++++++++++++ src/Drivers/hiopbbpy/xfoilProblem.py | 281 ++++++++++++++++++++++++++ src/hiopbbpy/problems/problem.py | 6 +- src/hiopbbpy/utils/util.py | 6 +- 4 files changed, 442 insertions(+), 4 deletions(-) create mode 100644 src/Drivers/hiopbbpy/BODriverXfoil.py create mode 100644 src/Drivers/hiopbbpy/xfoilProblem.py diff --git a/src/Drivers/hiopbbpy/BODriverXfoil.py b/src/Drivers/hiopbbpy/BODriverXfoil.py new file mode 100644 index 000000000..adfb767d8 --- /dev/null +++ b/src/Drivers/hiopbbpy/BODriverXfoil.py @@ -0,0 +1,153 @@ +""" + Allocate 12 wind turbines via FLORIS using Bayesian Optimization (BO). +""" + +import logging +import os +import tempfile +from pathlib import Path +import numpy as np +import sys + +from scipy.optimize import NonlinearConstraint + +from ds4mems.airfoil import XFoilAirfoilPerformance +from xfoilProblem import xfoilProblem + +from hiopbbpy.surrogate_modeling import smtKRG +from hiopbbpy.opt import BOAlgorithm +from hiopbbpy.utils import MPIEvaluator +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ThreadPoolExecutor + +### Define problem and optimization parameters +nx = 8 # Number of design variables + +use_ref = False +do_parallel = True + +xlimits_small = np.array( + [ + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [0.0, 0.5], + [-0.25, 0.25], + ] + ) + +xlimits = np.array( + [ + [-10.79733932, 39.28774442], + [-22.81287711, 5.94493027], + [-23.24153557, 12.15769847], + [-5.47472985, 7.92991688], + [-8.20327119, 12.44090178], + [-1.50046976, 4.41520152], + [-0.37926066, 4.19234019], + [-1.61741561, 0.2987408], + ] + ) + + +x_nan = np.array([[-1.2269792 , -1.40856 , 0.56076634, -1.96206043, 1.44158607, + 0.22498941, -0.32848405, 0.04818669], + [-1.42737423, -1.40397264, 0.36791669, -2.11149784, 1.54822502, + 0.07623255, -0.39513648, -0.05350951]], dtype=float) + +x_penalty = np.array([[-1.49019102, -1.37671764, 0.65196615, -2.38465265, 1.39195187, + 0.16935736, -0.40938738, -0.05497496], + [-1.10599338, -1.10345512, 0.49594816, -1.93446173, 1.38383001, + 0.55640812, -0.41227115, 0.00824948]], dtype=float) + +x_opt = np.array([ + [-1.24028704, -1.31949567, 0.53528491, -2.16370978, 1.37588402, 0.32447433, -0.16926065, -0.20102103] + ], dtype=float) + +x_opt_2 = np.array([-1.46972556, -1.07491497, 0.32973962, -2.20769501, 1.26854236, + 0.3562351 , -0.397001 , -0.32209042]) + +x_ref = x_opt[0] + +if use_ref: + # --- dimension checks --- + if x_ref is None: + raise ValueError("x_ref must not be None when use_ref=True") + + if x_ref.shape != (nx,): + raise ValueError(f"x_ref must have shape ({nx},), got {x_ref.shape}") + + # --- build new bounds --- + delta = 0.1 * np.abs(x_ref) + var_lb = x_ref - delta + var_ub = x_ref + delta + + # replace xlimits + xlimits = np.column_stack((var_lb, var_ub)) + xlimits_small = None + print("built a box from the given reference point") + +### Mac +problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/Users/chiang7/project/2025/scidac/other_lab/ornl/xfoil/bin/xfoil") + +### LC +#problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/p/lustre1/chiang7/hiopbbpy/xfoil/bin/xfoil") + +print("Problem name: ", problem.name) + + +### BO parameters +n_samples = 4 # Number of initial design points +theta = 1.e-2 # Hyperparameter for the Kriging (GP) model +acq_type = "EI" # Acquisition function: "EI" or "LCB" + +print("Acquisition type: ", acq_type) + +opt_solver = 'IPOPT' #"SLSQP" "IPOPT" "trust-constr" +if opt_solver == "SLSQP" or opt_solver == "trust-constr": + solver_options = {"maxiter": 100} #for scipy solvers +elif opt_solver == "IPOPT": + solver_options = {"max_iter": 200, "print_level": 1, "tol": 1e-4} + +options = { + 'acquisition_type': acq_type, + 'log_level': 'info', + 'bo_maxiter': 2, + 'opt_solver': opt_solver, + 'batch_size': 1, + 'n_start': 3, + 'solver_options': solver_options, + } + +### initial training set +x_train = problem.sample(n_samples) + +do_profiling = True +max_worker = 2 +if do_parallel: + # ----- evaluator + obj_evaluator = MPIEvaluator(executor=ProcessPoolExecutor(max_workers=max_worker),profiling=do_profiling, task_name="PARALLEL_OBJ_EVAL") + opt_evaluator = MPIEvaluator(function_mode=False, executor=ProcessPoolExecutor(max_workers=max_worker), profiling=do_profiling, task_name="PARALLEL_IPOPT_START") + print("\n000000\n") + + y_train = obj_evaluator.run(problem.evaluate, x_train) + options['obj_evaluator'] = obj_evaluator + options['opt_evaluator'] = opt_evaluator +else: + y_train = problem.evaluate(x_train) + + +### Define the GP surrogate model +gp_model = smtKRG(theta, xlimits, nx) +gp_model.train(x_train, y_train) + + +# Instantiate and run Bayesian Optimization +bo = BOAlgorithm(problem, gp_model, x_train, y_train, options = options) #EI or LCB +bo.optimize() + +problem.obj_func(bo.x_opt) + diff --git a/src/Drivers/hiopbbpy/xfoilProblem.py b/src/Drivers/hiopbbpy/xfoilProblem.py new file mode 100644 index 000000000..6c71db1fc --- /dev/null +++ b/src/Drivers/hiopbbpy/xfoilProblem.py @@ -0,0 +1,281 @@ +import numpy as np +import sys +from pathlib import Path + +import random + +# Import the base optimization problem class. +from hiopbbpy.problems import Problem + +# Import the ipopt problem class. +from cyipopt import Problem as cyProblem + +from scipy.optimize import NonlinearConstraint + +from ds4mems.airfoil import XFoilAirfoilPerformance + +class XFoilSampler: + def __init__(self, n, var_bounds, tighter_bounds=None, ref_x=None, rng=None, use_ref=False): + self.n = n + + var_bounds = np.asarray(var_bounds, dtype=float) + + if var_bounds.shape != (self.n, 2): + raise ValueError(f"var_bounds must have shape ({self.n}, 2), got {var_bounds.shape}") + + self.var_lb = var_bounds[:, 0] + self.var_ub = var_bounds[:, 1] + self.tighter_lb = None + self.tighter_ub = None + + self.ref_x = None if ref_x is None else np.asarray(ref_x, dtype=float).ravel() + if self.ref_x is not None and self.ref_x.shape != (self.n,): + raise ValueError(f"ref_x must have shape ({self.n},), got {self.ref_x.shape}") + + if tighter_bounds is not None: + #print(f"use small box") + tighter_bounds = np.asarray(tighter_bounds, dtype=float) + if tighter_bounds.shape != (self.n, 2): + raise ValueError(f"tighter_bounds must have shape ({self.n}, 2), got {tighter_bounds.shape}") + self.tighter_lb = tighter_bounds[:, 0] + self.tighter_ub = tighter_bounds[:, 1] + + self.rng = np.random.default_rng() if rng is None else rng + + def rejection_sampling(self, n_samples, is_valid=None, max_tries=10000): + n_samples = int(n_samples) + print(f"Do rejection sampling! Find {n_samples} samples.") + + samples = [] + tries = 0 + + while len(samples) < n_samples: + if tries > max_tries: + raise RuntimeError("Could not generate enough valid samples.") + tries += 1 + + # --- Generate one candidate --- + if self.tighter_lb is None: + x = self.rng.uniform(self.var_lb, self.var_ub) + else: + if self.rng.random() < 0.5: + print("sample from small box: ") + x = self.rng.uniform(self.tighter_lb, self.tighter_ub) + else: + print("sample from big box: ") + x = self.rng.uniform(self.var_lb, self.var_ub) + + # --- Check validity --- + if is_valid is None: + samples.append(x) + else: + try: + if is_valid(x,run_dir=f"temp_sample_{len(samples)}"): + samples.append(x) + print("") + else: + print("---reject!") + except Exception: + pass # reject if evaluation crashes + + return np.array(samples) + + + def random(self, n_samples): + n_samples = int(n_samples) + + # No small box --> sample everything from big box + if self.tighter_lb is None: + return self.rng.uniform( + self.var_lb, self.var_ub, size=(n_samples, self.n) + ) + + # Split samples + n_small = n_samples // 2 + n_full = n_samples - n_small + + samples = np.empty((n_samples, self.n)) + + # Sample from smaller box + print(f"generate {n_small} samples from small box") + samples[:n_small] = self.rng.uniform( + self.tighter_lb, self.tighter_ub, size=(n_small, self.n) + ) + + # Sample from full box + print(f"generate {n_full} samples from big box") + samples[n_small:] = self.rng.uniform( + self.var_lb, self.var_ub, size=(n_full, self.n) + ) + + # Optional: shuffle so small-box samples aren’t grouped? + #self.rng.shuffle(samples, axis=0) + + return samples + +# ------------------------------------------------------------------- +# Air Foil Optimization Problem Definition +# ------------------------------------------------------------------- + +class xfoilProblem(Problem): + def __init__(self, ndim, xlimits, constraints=[], + tighter_bounds=None, ref_x=None, use_ref=False, + xfoil_path=None, + n_points=201, mach=0.2, reynolds_list=(1e6, 5e6), + penalty_weight=1e5, penalty_power=2, constr_eps=None): + """ + Initializes the wind farm layout optimization problem using FLORIS. + + Parameters: + ndim (int): Total number of decision variables + xlimits: Limits for the decision variables. + constraints (list): List of constraint definitions (optional). + """ + name = 'xFoil' + super().__init__(ndim, xlimits, name=name, constraints=constraints) + + # Derive the number of variables. + self.n = int(ndim) + self._eval_cache = {} # key -> (f, c) or (f, None) + self._cache_ndigits = 12 + # include config tag so cache is invalidated if settings change + self._cache_tag = ("mach", mach, "Re", tuple(reynolds_list), "npts", n_points) + self.initial_samples = True + if xfoil_path is None: + raise RuntimeError("Set a valid path to xfoil binary.") + + self.airfoil_perf = XFoilAirfoilPerformance(xfoil_path) + + # Define the objective function + self.base_obj_func = self.airfoil_perf_obj + self.constr_func = self.airfoil_perf.constr + + self.penalty_weight = float(penalty_weight) + self.penalty_power = int(penalty_power) + self.constr_eps = np.finfo(np.float64).eps if constr_eps is None else float(constr_eps) + + self.obj_func = self._penalized_obj # for unconstrained prob, set this to base_obj_func + + self.constraints = {} + self.tighter_bounds = tighter_bounds + self.sampler = XFoilSampler(n=self.n, var_bounds=xlimits, tighter_bounds=self.tighter_bounds, ref_x=ref_x, use_ref=use_ref) + + def airfoil_perf_obj(self, X, run_dir="./temp_output0"): + aoa_for_xfoil = (0.0, 20.0, 1.0) # (start, stop, step) + re_values = np.linspace(1e6, 5e6, num=5) + mach = 0.2 + + #print(".", end="", flush=True) # Progress indicator + + F = [] + for re in re_values: + F.append(self.airfoil_perf(X, aoa=aoa_for_xfoil, re=re, mach=mach,run_dir=run_dir)) + + return -np.mean(np.concat(F)) # Return negative mean performance for minimization + + + def _cache_key(self, x: np.ndarray): + x = np.asarray(x, dtype=float).ravel() + return (self._cache_tag, tuple(np.round(x, self._cache_ndigits))) + + def eval_cached(self, x: np.ndarray, run_dir="./temp_output2"): + print("in eval_cached") + k = self._cache_key(x) + hit = self._eval_cache.get(k, None) + if hit is not None: + return hit # (f, c) + + c = np.asarray(self.constr_func(x), dtype=float).ravel() + if c > 0.0: + f = float(self.base_obj_func(x, run_dir=run_dir)) + else: + f = np.inf + print(f"evaluation: f={f}; c={c}") + self._eval_cache[k] = (f, c) + return f, c + + def _penalized_obj(self, x: np.ndarray, run_dir="./temp_output3") -> float: + #print("use penalty func!") + print("in penalized_obj") + + f, con_arr = self.eval_cached(x,run_dir=run_dir) + + if not np.isfinite(f): + print(f"Warning: base_obj_func returned {f} at x = {x}, replacing with 1e3") + f = 1e3 + + con_arr = np.atleast_1d(np.asarray(con_arr, dtype=float)) + if con_arr.shape != (1,): + raise ValueError( + f"xfoilProblem expects exactly 1 constraint, got shape {con_arr.shape}" + ) + + con_f = float(con_arr[0]) + # print(f"con_f = {con_f}") + con_violation = max(0.0, self.constr_eps - con_f) # scalar float + + # # adapting penalty multiplier (scalar) + # v = con_violation + # if v < 1e-4: + # penalty_mult = 0.0 + # elif v < 1e-2: + # penalty_mult = 10.0 + # elif v < 1e-1: + # penalty_mult = 100.0 + # elif v < 1.0: + # penalty_mult = 500.0 + # else: + # penalty_mult = 1000.0 + + # penalty = penalty_mult * (con_violation ** self.penalty_power) # scalar + # print(f". obj = base obj + penalty: {f} + {penalty}", flush=True) + # retval = f + penalty + + ### fixme: + #when con is large, skip evaluating f, just retrun big penalty term + v = con_violation + retval = 0.0 + if v < 1e-8: + retval = f + elif v < 1e-4: + retval = -25 + elif v < 1: + retval = -50 + + print(f". penalty obj = {retval}; con_violation = {v}", flush=True) + + + return retval + + + def _evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: + """ + Evaluates the objective function + + Parameters: + x (ndarray): An array + + Returns: + ndarray: The objective values + """ + print("in _evaluate") + y = [float(self.obj_func(xi,**kwargs)) for xi in x] # shape (k,) + return np.asarray(y, dtype=float).reshape(-1, 1) + + def sample(self, nsample: int) -> np.ndarray: + def is_valid(x, run_dir=None): + print("in is_valid") + f, con_arr = self.eval_cached(x,run_dir=run_dir) + + con_arr = np.atleast_1d(np.asarray(con_arr, dtype=float)) + if con_arr.shape != (1,): + raise ValueError( + f"xfoilProblem expects exactly 1 constraint, got shape {con_arr.shape}" + ) + + con_f = float(con_arr[0]) + + #isvalid = np.isfinite(f) # an alt way + isvalid = (con_f > 0.0) and np.isfinite(f) + return isvalid + return self.sampler.rejection_sampling(nsample, is_valid=is_valid) \ No newline at end of file diff --git a/src/hiopbbpy/problems/problem.py b/src/hiopbbpy/problems/problem.py index d1a76ed0b..0558614bf 100644 --- a/src/hiopbbpy/problems/problem.py +++ b/src/hiopbbpy/problems/problem.py @@ -28,7 +28,7 @@ def __init__(self, ndim, xlimits, name=" ", constraints=[]): # each dict = one scalar constraint self.n_con = len(constraints) - def _evaluate(self, x: np.ndarray) -> np.ndarray: + def _evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: """ problem evaluation y = f(x) of a scalar valued function f @@ -44,7 +44,7 @@ def _evaluate(self, x: np.ndarray) -> np.ndarray: """ raise NotImplementedError("Child class of hiopProblem should implement method _evaluate") - def evaluate(self, x: np.ndarray) -> np.ndarray: + def evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: """ problem callback y = f(x) of the scalar valued function f @@ -59,7 +59,7 @@ def evaluate(self, x: np.ndarray) -> np.ndarray: Function values (cast to reals) """ y = np.ndarray((x.shape[0], 1)) - y[:,:] = self._evaluate(x) + y[:,:] = self._evaluate(x, **kwargs) return y def con_evaluate(self, x: np.ndarray) -> np.ndarray: diff --git a/src/hiopbbpy/utils/util.py b/src/hiopbbpy/utils/util.py index ff4bd670f..d39bdad12 100644 --- a/src/hiopbbpy/utils/util.py +++ b/src/hiopbbpy/utils/util.py @@ -77,13 +77,17 @@ class MPIEvaluator(Evaluator): def __init__(self, function_mode=True,executor=None, profiling=False, task_name="MPITASK"): self.manager = EvaluationManager(executor,profiling=profiling, task_name=task_name) self.function_mode = function_mode + print(f"Create Evaluator for task: {task_name}") def __del__(self): del self.manager def set_task_name(self, task_name): self.manager.set_task_name(task_name) def run(self, fun, Xin): nevals = Xin.shape[0] - self.manager.submit_tasks(fun, [np.atleast_2d(Xin[i]) for i in range(nevals)], execute_at="mpi") + print("in Evaluator::run") + for i in range(nevals): + self.manager.submit_tasks(fun, [np.atleast_2d(Xin[i])], run_dir=f"./hiop_temp/temp_dir_{i}") + print(f"Submitted task {i + 1}", flush=True) self.manager.sync() Xout, Fout = self.manager.retrieve_results() if self.function_mode: From 0d2f30e5009753c1c0c93e3cd798cf70e27ce77c Mon Sep 17 00:00:00 2001 From: nychiang Date: Wed, 15 Apr 2026 01:28:01 -0700 Subject: [PATCH 4/5] fix --- src/Drivers/hiopbbpy/BODriverXfoil.py | 260 ++++++++++++----------- src/Drivers/hiopbbpy/xfoilProblem.py | 16 +- src/hiopbbpy/utils/evaluation_manager.py | 23 +- src/hiopbbpy/utils/util.py | 64 +++++- 4 files changed, 214 insertions(+), 149 deletions(-) diff --git a/src/Drivers/hiopbbpy/BODriverXfoil.py b/src/Drivers/hiopbbpy/BODriverXfoil.py index adfb767d8..430eed560 100644 --- a/src/Drivers/hiopbbpy/BODriverXfoil.py +++ b/src/Drivers/hiopbbpy/BODriverXfoil.py @@ -22,132 +22,146 @@ ### Define problem and optimization parameters nx = 8 # Number of design variables - use_ref = False do_parallel = True - -xlimits_small = np.array( - [ - [-0.25, 0.25], - [-0.25, 0.25], - [-0.25, 0.25], - [-0.25, 0.25], - [-0.25, 0.25], - [-0.25, 0.25], - [0.0, 0.5], - [-0.25, 0.25], - ] - ) - -xlimits = np.array( - [ - [-10.79733932, 39.28774442], - [-22.81287711, 5.94493027], - [-23.24153557, 12.15769847], - [-5.47472985, 7.92991688], - [-8.20327119, 12.44090178], - [-1.50046976, 4.41520152], - [-0.37926066, 4.19234019], - [-1.61741561, 0.2987408], - ] - ) - - -x_nan = np.array([[-1.2269792 , -1.40856 , 0.56076634, -1.96206043, 1.44158607, - 0.22498941, -0.32848405, 0.04818669], - [-1.42737423, -1.40397264, 0.36791669, -2.11149784, 1.54822502, - 0.07623255, -0.39513648, -0.05350951]], dtype=float) - -x_penalty = np.array([[-1.49019102, -1.37671764, 0.65196615, -2.38465265, 1.39195187, - 0.16935736, -0.40938738, -0.05497496], - [-1.10599338, -1.10345512, 0.49594816, -1.93446173, 1.38383001, - 0.55640812, -0.41227115, 0.00824948]], dtype=float) - -x_opt = np.array([ - [-1.24028704, -1.31949567, 0.53528491, -2.16370978, 1.37588402, 0.32447433, -0.16926065, -0.20102103] - ], dtype=float) - -x_opt_2 = np.array([-1.46972556, -1.07491497, 0.32973962, -2.20769501, 1.26854236, - 0.3562351 , -0.397001 , -0.32209042]) - -x_ref = x_opt[0] - -if use_ref: - # --- dimension checks --- - if x_ref is None: - raise ValueError("x_ref must not be None when use_ref=True") - - if x_ref.shape != (nx,): - raise ValueError(f"x_ref must have shape ({nx},), got {x_ref.shape}") - - # --- build new bounds --- - delta = 0.1 * np.abs(x_ref) - var_lb = x_ref - delta - var_ub = x_ref + delta - - # replace xlimits - xlimits = np.column_stack((var_lb, var_ub)) - xlimits_small = None - print("built a box from the given reference point") - -### Mac -problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/Users/chiang7/project/2025/scidac/other_lab/ornl/xfoil/bin/xfoil") - -### LC -#problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/p/lustre1/chiang7/hiopbbpy/xfoil/bin/xfoil") - -print("Problem name: ", problem.name) - - -### BO parameters -n_samples = 4 # Number of initial design points -theta = 1.e-2 # Hyperparameter for the Kriging (GP) model -acq_type = "EI" # Acquisition function: "EI" or "LCB" - -print("Acquisition type: ", acq_type) - -opt_solver = 'IPOPT' #"SLSQP" "IPOPT" "trust-constr" -if opt_solver == "SLSQP" or opt_solver == "trust-constr": - solver_options = {"maxiter": 100} #for scipy solvers -elif opt_solver == "IPOPT": - solver_options = {"max_iter": 200, "print_level": 1, "tol": 1e-4} - -options = { - 'acquisition_type': acq_type, - 'log_level': 'info', - 'bo_maxiter': 2, - 'opt_solver': opt_solver, - 'batch_size': 1, - 'n_start': 3, - 'solver_options': solver_options, - } - -### initial training set -x_train = problem.sample(n_samples) - do_profiling = True max_worker = 2 -if do_parallel: - # ----- evaluator - obj_evaluator = MPIEvaluator(executor=ProcessPoolExecutor(max_workers=max_worker),profiling=do_profiling, task_name="PARALLEL_OBJ_EVAL") - opt_evaluator = MPIEvaluator(function_mode=False, executor=ProcessPoolExecutor(max_workers=max_worker), profiling=do_profiling, task_name="PARALLEL_IPOPT_START") - print("\n000000\n") - - y_train = obj_evaluator.run(problem.evaluate, x_train) - options['obj_evaluator'] = obj_evaluator - options['opt_evaluator'] = opt_evaluator -else: - y_train = problem.evaluate(x_train) -### Define the GP surrogate model -gp_model = smtKRG(theta, xlimits, nx) -gp_model.train(x_train, y_train) - - -# Instantiate and run Bayesian Optimization -bo = BOAlgorithm(problem, gp_model, x_train, y_train, options = options) #EI or LCB -bo.optimize() - -problem.obj_func(bo.x_opt) - +def main(): + xlimits_small = np.array( + [ + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [-0.25, 0.25], + [0.0, 0.5], + [-0.25, 0.25], + ] + ) + + xlimits = np.array( + [ + [-10.79733932, 39.28774442], + [-22.81287711, 5.94493027], + [-23.24153557, 12.15769847], + [-5.47472985, 7.92991688], + [-8.20327119, 12.44090178], + [-1.50046976, 4.41520152], + [-0.37926066, 4.19234019], + [-1.61741561, 0.2987408], + ] + ) + # FIXME_NY --- use big box + xlimits = xlimits_small + + x_nan = np.array([[-1.2269792 , -1.40856 , 0.56076634, -1.96206043, 1.44158607, + 0.22498941, -0.32848405, 0.04818669], + [-1.42737423, -1.40397264, 0.36791669, -2.11149784, 1.54822502, + 0.07623255, -0.39513648, -0.05350951]], dtype=float) + + x_penalty = np.array([[-1.49019102, -1.37671764, 0.65196615, -2.38465265, 1.39195187, + 0.16935736, -0.40938738, -0.05497496], + [-1.10599338, -1.10345512, 0.49594816, -1.93446173, 1.38383001, + 0.55640812, -0.41227115, 0.00824948]], dtype=float) + + x_opt = np.array([ + [-1.24028704, -1.31949567, 0.53528491, -2.16370978, 1.37588402, 0.32447433, -0.16926065, -0.20102103] + ], dtype=float) + + x_opt_2 = np.array([-1.46972556, -1.07491497, 0.32973962, -2.20769501, 1.26854236, + 0.3562351 , -0.397001 , -0.32209042]) + + x_ref = x_opt[0] + + if use_ref: + # --- dimension checks --- + if x_ref is None: + raise ValueError("x_ref must not be None when use_ref=True") + + if x_ref.shape != (nx,): + raise ValueError(f"x_ref must have shape ({nx},), got {x_ref.shape}") + + # --- build new bounds --- + delta = 0.1 * np.abs(x_ref) + var_lb = x_ref - delta + var_ub = x_ref + delta + + # replace xlimits + xlimits = np.column_stack((var_lb, var_ub)) + xlimits_small = None + print("built a box from the given reference point") + + + ### Mac + problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/Users/chiang7/project/2025/scidac/other_lab/ornl/xfoil/bin/xfoil") + + ### LC + #problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/p/lustre1/chiang7/hiopbbpy/xfoil/bin/xfoil") + + print("Problem name: ", problem.name) + + + ### BO parameters + n_samples = 4 # Number of initial design points + theta = 1.e-2 # Hyperparameter for the Kriging (GP) model + acq_type = "EI" # Acquisition function: "EI" or "LCB" + + print("Acquisition type: ", acq_type) + + opt_solver = 'IPOPT' #"SLSQP" "IPOPT" "trust-constr" + if opt_solver == "SLSQP" or opt_solver == "trust-constr": + solver_options = {"maxiter": 100} #for scipy solvers + elif opt_solver == "IPOPT": + solver_options = {"max_iter": 200, "print_level": 1, "tol": 1e-4} + + options = { + 'acquisition_type': acq_type, + 'log_level': 'info', + 'bo_maxiter': 2, + 'opt_solver': opt_solver, + 'batch_size': 1, + 'n_start': 3, + 'solver_options': solver_options, + } + + ### initial training set + x_train = problem.sample(n_samples) + + + if do_parallel: + # ----- evaluator + obj_evaluator = MPIEvaluator( + executor=ProcessPoolExecutor(max_workers=max_worker), + profiling=do_profiling, + task_name="PARALLEL_OBJ_EVAL", + use_run_dir=True, + ) + opt_evaluator = MPIEvaluator( + function_mode=False, + executor=ProcessPoolExecutor(max_workers=max_worker), + profiling=do_profiling, + task_name="PARALLEL_IPOPT_START", + use_run_dir=False, + ) + y_train = obj_evaluator.run(problem.evaluate, x_train) + options['obj_evaluator'] = obj_evaluator + options['opt_evaluator'] = opt_evaluator + else: + y_train = problem.evaluate(x_train) + + ### Define the GP surrogate model + gp_model = smtKRG(theta, xlimits, nx) + gp_model.train(x_train, y_train) + + # Instantiate and run Bayesian Optimization + bo = BOAlgorithm(problem, gp_model, x_train, y_train, options = options) #EI or LCB + bo.optimize() + + problem.obj_func(bo.x_opt, run_dir="final_opt") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/Drivers/hiopbbpy/xfoilProblem.py b/src/Drivers/hiopbbpy/xfoilProblem.py index 6c71db1fc..96697c900 100644 --- a/src/Drivers/hiopbbpy/xfoilProblem.py +++ b/src/Drivers/hiopbbpy/xfoilProblem.py @@ -160,7 +160,9 @@ def __init__(self, ndim, xlimits, constraints=[], self.tighter_bounds = tighter_bounds self.sampler = XFoilSampler(n=self.n, var_bounds=xlimits, tighter_bounds=self.tighter_bounds, ref_x=ref_x, use_ref=use_ref) - def airfoil_perf_obj(self, X, run_dir="./temp_output0"): + def airfoil_perf_obj(self, X, run_dir=None): + if run_dir is None: + raise ValueError("run_dir must be provided for XFoil evaluations") aoa_for_xfoil = (0.0, 20.0, 1.0) # (start, stop, step) re_values = np.linspace(1e6, 5e6, num=5) mach = 0.2 @@ -178,8 +180,9 @@ def _cache_key(self, x: np.ndarray): x = np.asarray(x, dtype=float).ravel() return (self._cache_tag, tuple(np.round(x, self._cache_ndigits))) - def eval_cached(self, x: np.ndarray, run_dir="./temp_output2"): - print("in eval_cached") + def eval_cached(self, x: np.ndarray, run_dir=None): + if run_dir is None: + raise ValueError("run_dir must be provided for XFoil evaluations") k = self._cache_key(x) hit = self._eval_cache.get(k, None) if hit is not None: @@ -194,9 +197,10 @@ def eval_cached(self, x: np.ndarray, run_dir="./temp_output2"): self._eval_cache[k] = (f, c) return f, c - def _penalized_obj(self, x: np.ndarray, run_dir="./temp_output3") -> float: + def _penalized_obj(self, x: np.ndarray, run_dir=None) -> float: + if run_dir is None: + raise ValueError("run_dir must be provided for XFoil evaluations") #print("use penalty func!") - print("in penalized_obj") f, con_arr = self.eval_cached(x,run_dir=run_dir) @@ -258,13 +262,11 @@ def _evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: Returns: ndarray: The objective values """ - print("in _evaluate") y = [float(self.obj_func(xi,**kwargs)) for xi in x] # shape (k,) return np.asarray(y, dtype=float).reshape(-1, 1) def sample(self, nsample: int) -> np.ndarray: def is_valid(x, run_dir=None): - print("in is_valid") f, con_arr = self.eval_cached(x,run_dir=run_dir) con_arr = np.atleast_1d(np.asarray(con_arr, dtype=float)) diff --git a/src/hiopbbpy/utils/evaluation_manager.py b/src/hiopbbpy/utils/evaluation_manager.py index 7ca4e4ee9..5080c08ba 100644 --- a/src/hiopbbpy/utils/evaluation_manager.py +++ b/src/hiopbbpy/utils/evaluation_manager.py @@ -15,10 +15,10 @@ import math import copy -def _timed_call(fn, x, kwargs): - """Run fn(x, **kwargs) and record worker-side timing.""" +def _timed_call(fn, args, kwargs): + """Run fn(*args, **kwargs) and record worker-side timing.""" start_time = time.perf_counter() - fx = fn(x, **kwargs) + fx = fn(*args, **kwargs) done_time = time.perf_counter() return { "result": fx, @@ -27,6 +27,7 @@ def _timed_call(fn, x, kwargs): "execution_time": done_time - start_time, } + def _summary_stats(values): """Return mean, std_dev, min, max for a sequence.""" if not values: @@ -173,23 +174,20 @@ def submit_tasks(self, fn, X, execute_at=None, **kwargs) -> None: key = execute_at.lower() if key not in self.executors: - raise KeyError( - f"Executor '{execute_at}' not found. Available: {list(self.executors.keys())}" - ) - + raise KeyError(f"Executor '{execute_at}' not found. Available: {list(self.executors.keys())}") + with self._queue_lock: for x in X: submit_time = time.perf_counter() if self._first_submit_time is None: self._first_submit_time = submit_time + args = x if isinstance(x, tuple) else (x,) + if self.profiling: - future_obj = self.executors[key].submit(_timed_call, fn, x, kwargs) + future_obj = self.executors[key].submit(_timed_call, fn, args, kwargs) else: - if isinstance(x, tuple): - future_obj = self.executors[key].submit(fn, *x, **kwargs) - else: - future_obj = self.executors[key].submit(fn, x, **kwargs) + future_obj = self.executors[key].submit(fn, *args, **kwargs) self._queue.append([x, future_obj, key, submit_time]) self.logger.info(f"{self.task_name} Submitted f({x})") @@ -246,6 +244,7 @@ def retrieve_results(self) -> tuple[list, list]: print(f"{self.task_name} Ideal walltime in seconds (perfect balance): {ideal_walltime:.6e}") print(f"{self.task_name} Actual walltime in seconds (observed): {actual_walltime:.6e}") + self._first_submit_time = None return X, F def _harvest_completed_locked( diff --git a/src/hiopbbpy/utils/util.py b/src/hiopbbpy/utils/util.py index d39bdad12..39b731e17 100644 --- a/src/hiopbbpy/utils/util.py +++ b/src/hiopbbpy/utils/util.py @@ -28,6 +28,10 @@ from .evaluation_manager import EvaluationManager import logging +import os +import time +import uuid +from pathlib import Path def check_required_keys(user_dict, required_keys): for key in required_keys: @@ -74,29 +78,75 @@ class MPIEvaluator(Evaluator): We reformat to [eval0, eval1, eval2,...] """ - def __init__(self, function_mode=True,executor=None, profiling=False, task_name="MPITASK"): - self.manager = EvaluationManager(executor,profiling=profiling, task_name=task_name) + def __init__(self, function_mode=True, executor=None, profiling=False, + task_name="MPITASK", run_root="./hiop_temp", use_run_dir=False): + self.manager = EvaluationManager(executor, profiling=profiling, task_name=task_name) self.function_mode = function_mode + self.run_root = Path(run_root) + self.run_root.mkdir(parents=True, exist_ok=True) + self.use_run_dir = use_run_dir print(f"Create Evaluator for task: {task_name}") + def __del__(self): del self.manager + def set_task_name(self, task_name): self.manager.set_task_name(task_name) - def run(self, fun, Xin): + + def run(self, fun, Xin): nevals = Xin.shape[0] print("in Evaluator::run") + + # unique batch directory so repeated calls do not reuse temp_dir_0, temp_dir_1, ... + batch_id = f"{self.manager.task_name}_{os.getpid()}_{time.time_ns()}_{uuid.uuid4().hex[:8]}" + batch_dir = self.run_root / batch_id + batch_dir.mkdir(parents=True, exist_ok=False) + for i in range(nevals): - self.manager.submit_tasks(fun, [np.atleast_2d(Xin[i])], run_dir=f"./hiop_temp/temp_dir_{i}") + xi = np.atleast_2d(Xin[i]) + + kwargs = {} + if self.use_run_dir: + run_dir = batch_dir / f"eval_{i:04d}" + run_dir.mkdir(parents=True, exist_ok=False) + kwargs["run_dir"] = str(run_dir) + + # submit (index, x) so we can restore original order later + self.manager.submit_tasks( + _run_indexed_fun, + [(fun, i, xi)], + **kwargs, + ) print(f"Submitted task {i + 1}", flush=True) + self.manager.sync() Xout, Fout = self.manager.retrieve_results() + + # restore original order using returned indices + ordered = [None] * nevals + for out in Fout: + if out is None: + continue + idx, val = out + ordered[idx] = val + + missing = [i for i, v in enumerate(ordered) if v is None] + if missing: + raise RuntimeError(f"Missing evaluation results for indices {missing}") + if self.function_mode: - Y = np.ndarray((nevals, 1)) - Y[:,0] = np.array(Fout)[:,0,0] + Y = np.empty((nevals, 1), dtype=float) + for i, val in enumerate(ordered): + arr = np.asarray(val, dtype=float) + Y[i, 0] = float(arr.reshape(-1)[0]) else: - Y = [Fi[0] for Fi in Fout] + Y = [val[0] for val in ordered] + return Y +def _run_indexed_fun(fun, idx, x, **kwargs): + return idx, fun(x, **kwargs) + class Logger: """ A simple wrapper for Python's logging module that sets up a reusable logger. From 12d2f3b7ce10c6bbc2a4bf54ffa1a00ccb8788e0 Mon Sep 17 00:00:00 2001 From: nychiang Date: Wed, 22 Apr 2026 11:20:57 -0700 Subject: [PATCH 5/5] bug fixed --- src/Drivers/hiopbbpy/BODriverXfoil.py | 46 +++++++++++++------ src/Drivers/hiopbbpy/xfoilProblem.py | 56 ++++++++++++++++++------ src/hiopbbpy/opt/boalgorithm.py | 2 + src/hiopbbpy/utils/evaluation_manager.py | 28 +++++++++++- src/hiopbbpy/utils/util.py | 5 ++- 5 files changed, 105 insertions(+), 32 deletions(-) diff --git a/src/Drivers/hiopbbpy/BODriverXfoil.py b/src/Drivers/hiopbbpy/BODriverXfoil.py index 430eed560..83ff1b968 100644 --- a/src/Drivers/hiopbbpy/BODriverXfoil.py +++ b/src/Drivers/hiopbbpy/BODriverXfoil.py @@ -1,7 +1,7 @@ """ - Allocate 12 wind turbines via FLORIS using Bayesian Optimization (BO). + 8D Xfoil problem using Bayesian Optimization (BO). """ - +import argparse import logging import os import tempfile @@ -9,9 +9,6 @@ import numpy as np import sys -from scipy.optimize import NonlinearConstraint - -from ds4mems.airfoil import XFoilAirfoilPerformance from xfoilProblem import xfoilProblem from hiopbbpy.surrogate_modeling import smtKRG @@ -22,13 +19,32 @@ ### Define problem and optimization parameters nx = 8 # Number of design variables -use_ref = False +use_ref = True do_parallel = True do_profiling = True -max_worker = 2 - +DEFAULT_MAX_WORKER = 2 + +def parse_args(): + parser = argparse.ArgumentParser( + description="Run Bayesian optimization for the XFoil problem." + ) + parser.add_argument( + "-w", "--max-workers", + type=int, + default=DEFAULT_MAX_WORKER, + help=f"Maximum number of parallel workers (default: {DEFAULT_MAX_WORKER})", + ) + return parser.parse_args() def main(): + args = parse_args() + max_worker = args.max_workers + job_root = os.environ.get("HIOP_XFOIL_JOB_ROOT") + if job_root is None: + slurm_job_id = os.environ.get("SLURM_JOB_ID", "nojobid") + job_root = f"./hiop_temp/job_{slurm_job_id}" + print("Job temp root:", job_root) + xlimits_small = np.array( [ [-0.25, 0.25], @@ -55,7 +71,7 @@ def main(): ] ) # FIXME_NY --- use big box - xlimits = xlimits_small +# xlimits = xlimits_small x_nan = np.array([[-1.2269792 , -1.40856 , 0.56076634, -1.96206043, 1.44158607, 0.22498941, -0.32848405, 0.04818669], @@ -96,7 +112,7 @@ def main(): ### Mac - problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/Users/chiang7/project/2025/scidac/other_lab/ornl/xfoil/bin/xfoil") + problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/p/lustre1/chiang7/xfoil/xfoil/bin/xfoil") ### LC #problem = xfoilProblem(nx, xlimits, tighter_bounds=xlimits_small, ref_x=x_ref, use_ref=use_ref, xfoil_path="/p/lustre1/chiang7/hiopbbpy/xfoil/bin/xfoil") @@ -105,7 +121,7 @@ def main(): ### BO parameters - n_samples = 4 # Number of initial design points + n_samples = 32 # Number of initial design points theta = 1.e-2 # Hyperparameter for the Kriging (GP) model acq_type = "EI" # Acquisition function: "EI" or "LCB" @@ -120,10 +136,10 @@ def main(): options = { 'acquisition_type': acq_type, 'log_level': 'info', - 'bo_maxiter': 2, + 'bo_maxiter': 10, 'opt_solver': opt_solver, 'batch_size': 1, - 'n_start': 3, + 'n_start': 32, 'solver_options': solver_options, } @@ -138,6 +154,7 @@ def main(): profiling=do_profiling, task_name="PARALLEL_OBJ_EVAL", use_run_dir=True, + run_root=job_root, ) opt_evaluator = MPIEvaluator( function_mode=False, @@ -145,6 +162,7 @@ def main(): profiling=do_profiling, task_name="PARALLEL_IPOPT_START", use_run_dir=False, + run_root=job_root, ) y_train = obj_evaluator.run(problem.evaluate, x_train) options['obj_evaluator'] = obj_evaluator @@ -160,7 +178,7 @@ def main(): bo = BOAlgorithm(problem, gp_model, x_train, y_train, options = options) #EI or LCB bo.optimize() - problem.obj_func(bo.x_opt, run_dir="final_opt") + problem.obj_func(bo.x_opt, run_dir=os.path.join(job_root, "final_opt")) if __name__ == "__main__": diff --git a/src/Drivers/hiopbbpy/xfoilProblem.py b/src/Drivers/hiopbbpy/xfoilProblem.py index 96697c900..cbcc8efad 100644 --- a/src/Drivers/hiopbbpy/xfoilProblem.py +++ b/src/Drivers/hiopbbpy/xfoilProblem.py @@ -1,8 +1,10 @@ import numpy as np import sys +import os from pathlib import Path import random +import traceback # Import the base optimization problem class. from hiopbbpy.problems import Problem @@ -44,7 +46,7 @@ def __init__(self, n, var_bounds, tighter_bounds=None, ref_x=None, rng=None, use def rejection_sampling(self, n_samples, is_valid=None, max_tries=10000): n_samples = int(n_samples) - print(f"Do rejection sampling! Find {n_samples} samples.") + print(f"Worker pid={os.getpid()}: Do rejection sampling! Find {n_samples} samples.") samples = [] tries = 0 @@ -59,10 +61,10 @@ def rejection_sampling(self, n_samples, is_valid=None, max_tries=10000): x = self.rng.uniform(self.var_lb, self.var_ub) else: if self.rng.random() < 0.5: - print("sample from small box: ") + print("Main pid={os.getpid()}: sample from small box: ", flush=True) x = self.rng.uniform(self.tighter_lb, self.tighter_ub) else: - print("sample from big box: ") + print("Main pid={os.getpid()}: sample from big box: ", flush=True) x = self.rng.uniform(self.var_lb, self.var_ub) # --- Check validity --- @@ -70,11 +72,13 @@ def rejection_sampling(self, n_samples, is_valid=None, max_tries=10000): samples.append(x) else: try: - if is_valid(x,run_dir=f"temp_sample_{len(samples)}"): + job_root = os.environ.get("HIOP_XFOIL_JOB_ROOT", f"./hiop_temp/job_{os.environ.get('SLURM_JOB_ID', 'nojobid')}") + run_dir=os.path.join(job_root, "serial", f"temp_sample_{len(samples)}") + if is_valid(x,run_dir=run_dir): samples.append(x) - print("") + print(f"Worker pid={os.getpid()}: --- Sample Accept!") else: - print("---reject!") + print(f"Worker pid={os.getpid()}: --- Sample Reject!") except Exception: pass # reject if evaluation crashes @@ -97,13 +101,13 @@ def random(self, n_samples): samples = np.empty((n_samples, self.n)) # Sample from smaller box - print(f"generate {n_small} samples from small box") + print(f"Worker pid={os.getpid()}: generate {n_small} samples from small box") samples[:n_small] = self.rng.uniform( self.tighter_lb, self.tighter_ub, size=(n_small, self.n) ) # Sample from full box - print(f"generate {n_full} samples from big box") + print(f"Worker pid={os.getpid()}: generate {n_full} samples from big box") samples[n_small:] = self.rng.uniform( self.var_lb, self.var_ub, size=(n_full, self.n) ) @@ -173,8 +177,24 @@ def airfoil_perf_obj(self, X, run_dir=None): for re in re_values: F.append(self.airfoil_perf(X, aoa=aoa_for_xfoil, re=re, mach=mach,run_dir=run_dir)) - return -np.mean(np.concat(F)) # Return negative mean performance for minimization +# return -np.mean(np.concat(F)) # Return negative mean performance for minimization + vals = [] + for fi in F: + if fi is None: + continue + try: + arr = np.atleast_1d(np.asarray(fi, dtype=float)) + except Exception: + continue + if arr.size > 0 and np.all(np.isfinite(arr)): + vals.extend(arr.ravel()) + + if len(vals) == 0: + print(f"Worker pid={os.getpid()}: invalid XFoil output in {run_dir}; returning inf", flush=True) + return np.inf + + return -float(np.mean(vals)) def _cache_key(self, x: np.ndarray): x = np.asarray(x, dtype=float).ravel() @@ -183,6 +203,7 @@ def _cache_key(self, x: np.ndarray): def eval_cached(self, x: np.ndarray, run_dir=None): if run_dir is None: raise ValueError("run_dir must be provided for XFoil evaluations") + print(f"Worker pid={os.getpid()}: using run_dir={run_dir}", flush=True) k = self._cache_key(x) hit = self._eval_cache.get(k, None) if hit is not None: @@ -193,7 +214,7 @@ def eval_cached(self, x: np.ndarray, run_dir=None): f = float(self.base_obj_func(x, run_dir=run_dir)) else: f = np.inf - print(f"evaluation: f={f}; c={c}") + print(f"Worker pid={os.getpid()}: evaluation: f={f}; c={c}") self._eval_cache[k] = (f, c) return f, c @@ -205,7 +226,7 @@ def _penalized_obj(self, x: np.ndarray, run_dir=None) -> float: f, con_arr = self.eval_cached(x,run_dir=run_dir) if not np.isfinite(f): - print(f"Warning: base_obj_func returned {f} at x = {x}, replacing with 1e3") + print(f"Worker pid={os.getpid()}: Warning: base_obj_func returned {f} at x = {x}, replacing with 1e3") f = 1e3 con_arr = np.atleast_1d(np.asarray(con_arr, dtype=float)) @@ -246,7 +267,7 @@ def _penalized_obj(self, x: np.ndarray, run_dir=None) -> float: elif v < 1: retval = -50 - print(f". penalty obj = {retval}; con_violation = {v}", flush=True) + print(f"Worker pid={os.getpid()}: penalty obj = {retval}; con_violation = {v}", flush=True) return retval @@ -262,7 +283,12 @@ def _evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: Returns: ndarray: The objective values """ - y = [float(self.obj_func(xi,**kwargs)) for xi in x] # shape (k,) + try: + y = [float(self.obj_func(xi,**kwargs)) for xi in x] # shape (k,) + except Exception as e: + print(f"Exception in evaluate: {e}", flush=True) + print(traceback.format_exc(), flush=True) + raise return np.asarray(y, dtype=float).reshape(-1, 1) def sample(self, nsample: int) -> np.ndarray: @@ -280,4 +306,6 @@ def is_valid(x, run_dir=None): #isvalid = np.isfinite(f) # an alt way isvalid = (con_f > 0.0) and np.isfinite(f) return isvalid - return self.sampler.rejection_sampling(nsample, is_valid=is_valid) \ No newline at end of file + +# return self.sampler.rejection_sampling(nsample, is_valid=is_valid) + return self.sampler.random(nsample) diff --git a/src/hiopbbpy/opt/boalgorithm.py b/src/hiopbbpy/opt/boalgorithm.py index 35541d609..36bd0fb55 100644 --- a/src/hiopbbpy/opt/boalgorithm.py +++ b/src/hiopbbpy/opt/boalgorithm.py @@ -14,6 +14,7 @@ from ..problems.problem import Problem from .optproblem import IpoptProb from ..utils.util import Evaluator, Logger +import os # A base class defining a general framework for Bayesian Optimization class BOAlgorithmBase: @@ -338,6 +339,7 @@ def __init__(self, fun, method, bounds, constraints, solver_options): # Find the minimum of the input objective `fun`, using the minimize function from SciPy. def minimizer_callback(self, x0s): output = [] + print(f"Worker pid={os.getpid()}: doing minimizer_callback ...", flush=True) msg = "" for x0 in x0s: if self.method == "SLSQP": diff --git a/src/hiopbbpy/utils/evaluation_manager.py b/src/hiopbbpy/utils/evaluation_manager.py index 5080c08ba..663a019d6 100644 --- a/src/hiopbbpy/utils/evaluation_manager.py +++ b/src/hiopbbpy/utils/evaluation_manager.py @@ -75,6 +75,9 @@ def __init__(self, executor, profiling=False, task_name="TASK") -> None: self._completed_X = deque([]) self._completed_F = deque([]) self._queue_lock = threading.Lock() + self._last_execution_times = [] + self._last_wait_times = [] + self._last_turnaround_times = [] if isinstance(executor, dict): self.executors = executor @@ -140,6 +143,10 @@ def sync(self) -> None: harvests completed items into the internal completion buffers. Harvested results can be consumed using :meth:`retrieve_results`. """ + execution_times = [] + wait_times = [] + turnaround_times = [] + while True: with self._queue_lock: futures = [queue_obj[1] for queue_obj in self._queue] @@ -149,7 +156,15 @@ def sync(self) -> None: wait(futures) with self._queue_lock: - self._harvest_completed_locked() + self._harvest_completed_locked( + execution_times=execution_times, + wait_times=wait_times, + turnaround_times=turnaround_times, + ) + + self._last_execution_times = execution_times + self._last_wait_times = wait_times + self._last_turnaround_times = turnaround_times def submit_tasks(self, fn, X, execute_at=None, **kwargs) -> None: """Submit tasks to the specified executor. @@ -219,6 +234,11 @@ def retrieve_results(self) -> tuple[list, list]: self._completed_X.clear() self._completed_F.clear() + if not execution_times and self._last_execution_times: + execution_times = self._last_execution_times + wait_times = self._last_wait_times + turnaround_times = self._last_turnaround_times + if self.profiling and execution_times: self._print_timing_stats(f"{self.task_name} Execution times", execution_times) @@ -244,6 +264,9 @@ def retrieve_results(self) -> tuple[list, list]: print(f"{self.task_name} Ideal walltime in seconds (perfect balance): {ideal_walltime:.6e}") print(f"{self.task_name} Actual walltime in seconds (observed): {actual_walltime:.6e}") + self._last_execution_times = [] + self._last_wait_times = [] + self._last_turnaround_times = [] self._first_submit_time = None return X, F @@ -294,7 +317,8 @@ def _harvest_completed_locked( except CancelledError: self.logger.warning(f"{self.task_name} The execution of x={x} was cancelled.") except Exception as e: - self.logger.warning(f"{self.task_name} Task f({x}) raised an exception: {e}") + self.logger.exception(f"{self.task_name} Task f({x}) raised an exception") +# self.logger.warning(f"{self.task_name} Task f({x}) raised an exception: {e}") else: new_queue.append(item) diff --git a/src/hiopbbpy/utils/util.py b/src/hiopbbpy/utils/util.py index 39b731e17..6a7566171 100644 --- a/src/hiopbbpy/utils/util.py +++ b/src/hiopbbpy/utils/util.py @@ -29,6 +29,7 @@ import logging import os +import sys import time import uuid from pathlib import Path @@ -98,7 +99,7 @@ def run(self, fun, Xin): print("in Evaluator::run") # unique batch directory so repeated calls do not reuse temp_dir_0, temp_dir_1, ... - batch_id = f"{self.manager.task_name}_{os.getpid()}_{time.time_ns()}_{uuid.uuid4().hex[:8]}" + batch_id = f"{self.manager.task_name}_{os.getpid()}_{uuid.uuid4().hex[:8]}" batch_dir = self.run_root / batch_id batch_dir.mkdir(parents=True, exist_ok=False) @@ -176,7 +177,7 @@ def __init__(self, name='hiopbbpy'): # Create a console output handler if not self._logger.handlers: - ch = logging.StreamHandler() + ch = logging.StreamHandler(sys.stdout) # Define the output format: logger name, and message formatter = logging.Formatter('%(name)s %(message)s')