Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions src/Drivers/hiopbbpy/BODriverEX_mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""
Code description:
for a 2D example LpNormProblem (MPI version for scaling tests)
1) randomly sample training points
2) define a Kriging-based Gaussian-process (smt backend)
trained on said data
3) determine the minimizer via BOAlgorithm

This version uses MPIPoolExecutor for multi-node parallel execution.

Usage:
mpiexec -n 16 python -m mpi4py.futures BODriverEX_mpi.py

Authors: Tucker Hartland <hartland1@llnl.gov>
Nai-Yuan Chiang <chiang7@llnl.gov>
"""

import sys
import os
import numpy as np
import warnings
warnings.filterwarnings("ignore")
from hiopbbpy.surrogate_modeling import smtKRG
from hiopbbpy.opt import BOAlgorithm
from hiopbbpy.problems import BraninProblem, LpNormProblem
from hiopbbpy.utils import MPIEvaluator
from mpi4py.futures import MPIPoolExecutor

### parameters
n_samples = 64 # number of the initial samples to train GP
theta = 1.e-2 # hyperparameter for GP kernel
nx = 2 # dimension of the problem
xlimits = np.array([[-5, 5], [-5, 5]]) # bounds on optimization variable

prob_type_l = ["LpNorm"] # ["LpNorm", "Branin"]
acq_type_l = ["LCB"] # ["LCB", "EI"]

def con_eq(x):
return x[0] + x[1] - 4

def con_jac_eq(x):
return np.array([1.0, 1.0])

def con_ineq(x):
return x[0] - x[1]

def con_jac_ineq(x):
return np.array([1.0, -1.0])

# 'SLSQP' requires constraints defined in a list of dict.
# IPOPT can support this format, too
user_constraint_list = [{'type': 'eq', 'fun': con_eq, 'jac': con_jac_eq},
{'type': 'ineq', 'fun': con_ineq, 'jac': con_jac_ineq}]

def cons_vec(x):
x1, x2 = x
return np.array([
(x1 - 2)**2 + (x2 - 2.5)**2 - 2,
x1 + x2 - 5,
-x1
])

# Jacobian of constraints
def cons_jac_vec(x):
x1, x2 = x
return np.array([
[2 * (x1 - 2), 2 * (x2 - 2.5)],
[1, 1],
[-1, 0]
])

cl = -np.inf * np.ones(3)
cu = np.zeros(3)

# 'trust-constr' and IPOPT support vector-valued constraints
user_constraint_dict = {'cons': cons_vec, 'jac': cons_jac_vec, 'cl': cl, 'cu': cu}


if __name__ == "__main__":
do_profiling = True

for prob_type in prob_type_l:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If prob_type_l contains one element do we want to keep this loop over elements of prob_type_l?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm... let's keep it since we can easily switch prob_type_l to a list with more than one elements

print()
print(f"========================================")
print(f"Testing BO with {prob_type} problem")
print(f"========================================")

# ----- Create MPI evaluators
# MPIPoolExecutor() will use all available MPI processes when
# launched with: mpiexec -n N python -m mpi4py.futures script.py
obj_evaluator = MPIEvaluator(
executor=MPIPoolExecutor(),
profiling=do_profiling,
task_name="MPI_OBJ_EVAL"
)
opt_evaluator = MPIEvaluator(
function_mode=False,
executor=MPIPoolExecutor(),
profiling=do_profiling,
task_name="MPI_OPT_EVAL"
)

if prob_type == "LpNorm":
problem = LpNormProblem(nx, xlimits)
else:
problem = BraninProblem()
problem.set_constraints(user_constraint_list) # for solver 'trust-constr' and IPOPT, use user_constraint_dict; for solver 'SLSQP' and IPOPT, user_constraint_list

for acq_type in acq_type_l:
print(f"\nAcquisition type: {acq_type}")

### initial training set
x_train = problem.sample(n_samples)
y_train = obj_evaluator.run(problem.evaluate, x_train)

### Define the GP surrogate model
gp_model = smtKRG(theta, xlimits, nx)
gp_model.train(x_train, y_train)

opt_solver = 'SLSQP' #"SLSQP" "IPOPT" "trust-constr"
if opt_solver == "SLSQP" or opt_solver == "trust-constr":
solver_options = {"maxiter": 100} #for scipy solvers
elif opt_solver == "IPOPT":
solver_options = {"max_iter": 100, "print_level": 1}

options = {
'acquisition_type': acq_type,
'log_level': 'info',
'bo_maxiter': 20,
'opt_solver': opt_solver,
'batch_size': 2,
'n_start': 64,
'solver_options': solver_options,
'obj_evaluator': obj_evaluator,
'opt_evaluator': opt_evaluator
}

# Instantiate and run Bayesian Optimization
print(f"Starting Bayesian Optimization...")
bo = BOAlgorithm(problem, gp_model, x_train, y_train, options = options) #EI or LCB
bo.optimize()

print(f"\nOptimization complete!")
print(f"Optimal solution: {bo.x_opt}")
print(f"Optimal value: {bo.y_opt}")
107 changes: 86 additions & 21 deletions src/Drivers/hiopbbpy/EvaluationManagerCI.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
"""
This is a class to manage function evaluations using multiple parallel executors.
It supports both intra-node and inter-node parallelism.
Test script demonstrating EvaluationManager with multiple executor types.
Supports both intra-node and inter-node parallelism.

Usage examples:
Single-node with threads: python EvaluationManagerCI.py -e thread -w 4
Single-node with processes: python EvaluationManagerCI.py -e process -w 4
Multi-node with MPI: mpiexec -n 8 python EvaluationManagerCI.py -e mpi

For MPI, use N = number of total processes across all nodes.
The script will use rank 0 as master and ranks 1-(N-1) as workers.

Authors: Tucker Hartland <hartland1@llnl.gov>
Weslley S Pereira <wdasilv@nrel.gov>
Expand All @@ -12,20 +20,28 @@
import os
import socket
import threading
from hiopbbpy.utils import EvaluationManager, is_running_with_mpi
from concurrent.futures import ThreadPoolExecutor
from hiopbbpy.utils import EvaluationManager
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0):
hostname = socket.gethostname()
pid = os.getpid()

# Try to get the actual MPI rank of the worker executing this function
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
actual_rank = comm.Get_rank()
except:
actual_rank = driver_rank # Fallback to passed rank if MPI not available

if slow_first and x == 0:
actual_sleep = 3 * sleep_time
else:
actual_sleep = sleep_time

print(
f"rank={driver_rank} pid={pid} host={hostname}: processing x={x}",
f"rank={actual_rank} pid={pid} host={hostname}: processing x={x}",
flush=True,
)

Expand All @@ -36,7 +52,16 @@ def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0):
# Arguments for command line
parser = argparse.ArgumentParser(
description="Execute n function calls with t duration.",
epilog="To properly run the example with mpi4py, use: env MPI4PY_FUTURES_MAX_WORKERS=<N> mpiexec -n 1 python evaluation_manager.py",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
Single-node with threads: python EvaluationManagerCI.py -e thread -w 4
Single-node with processes: python EvaluationManagerCI.py -e process -w 4
Multi-node with MPI: mpiexec -n <N> python EvaluationManagerCI.py -e mpi

For MPI, use N = number of nodes * processes_per_node.
The script uses rank 0 as master and ranks 1-(N-1) as workers.
""",
)
parser.add_argument("-n", type=int, default=20, help="Number of tasks to execute")
parser.add_argument(
Expand All @@ -54,37 +79,77 @@ def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0):
action="store_true",
help="Make the first task slower (3x sleep time)",
)
parser.add_argument(
"-e",
"--executor",
type=str,
default="thread",
choices=["thread", "process", "mpi"],
help="Executor type: thread (ThreadPoolExecutor), process (ProcessPoolExecutor), or mpi (MPIPoolExecutor)",
)
parser.add_argument(
"-w",
"--max_workers",
type=int,
default=None,
help="Maximum number of workers (for thread/process executors)",
)
args = parser.parse_args()

# Choose executor type
if is_running_with_mpi():
from mpi4py import MPI
driver_rank = MPI.COMM_WORLD.Get_rank()
executor_type = "mpi"
else:
driver_rank = 0
executor_type = "cpu"

# Set up logging
logging.basicConfig(level=logging.INFO)

# Create manager
cpu_executor = ThreadPoolExecutor()
# Create executor based on user choice
if args.executor == "thread":
cpu_executor = ThreadPoolExecutor(max_workers=args.max_workers)
executor_name = "ThreadPool"
elif args.executor == "process":
cpu_executor = ProcessPoolExecutor(max_workers=args.max_workers)
executor_name = "ProcessPool"
elif args.executor == "mpi":
try:
from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if size < 2:
print("ERROR: MPI executor requires at least 2 processes (1 master + 1 worker)")
print("Run with: mpiexec -n <N> python EvaluationManagerCI.py -e mpi")
sys.exit(1)

# Use context manager - this prevents spawning and uses existing processes
# Only rank 0 will run the main logic, others will be workers
if rank != 0:
# Worker ranks just need to participate in the MPIPoolExecutor
with MPIPoolExecutor() as executor:
pass # Workers block here until master is done
sys.exit(0)

cpu_executor = MPIPoolExecutor()
executor_name = f"MPIPool (rank={rank}/{size}, {size-1} workers)"
print(f"MPI executor initialized: {size} total processes, {size-1} workers across nodes")

except ImportError:
print("ERROR: mpi4py not installed. Install with: pip install mpi4py")
sys.exit(1)

# Create manager (only rank 0 reaches here for MPI)
manager = EvaluationManager(
cpu_executor=cpu_executor,
executor=cpu_executor,
profiling=args.profile,
task_name="CI_TASK"
task_name=f"CI_TASK_{executor_name}"
)

# Submit tasks
t0 = time.perf_counter()
manager.submit_tasks(
_fn_for_test,
[i for i in range(args.n)],
execute_at=executor_type,
sleep_time=args.sleep_time,
slow_first=args.slow_first,
driver_rank=driver_rank,
)

# Do some other work while tasks are running
Expand Down
Loading
Loading