diff --git a/.github/workflows/spack_build.yml b/.github/workflows/spack_build.yml index 5948c08a1..c2554ffe8 100644 --- a/.github/workflows/spack_build.yml +++ b/.github/workflows/spack_build.yml @@ -180,7 +180,7 @@ jobs: - name: Install HiOp run: | ls -al - spack -d -e . install --keep-stage --verbose --show-log-on-error --only package --no-cache + spack -d -e . install --keep-stage --verbose --show-log-on-error --only package - name: Test Build run: | diff --git a/src/Drivers/hiopbbpy/BODriverEX_mpi.py b/src/Drivers/hiopbbpy/BODriverEX_mpi.py new file mode 100644 index 000000000..4c3b943c6 --- /dev/null +++ b/src/Drivers/hiopbbpy/BODriverEX_mpi.py @@ -0,0 +1,145 @@ +""" + Code description: + for a 2D example LpNormProblem (MPI version for scaling tests) + 1) randomly sample training points + 2) define a Kriging-based Gaussian-process (smt backend) + trained on said data + 3) determine the minimizer via BOAlgorithm + + This version uses MPIPoolExecutor for multi-node parallel execution. + + Usage: + mpiexec -n 16 python -m mpi4py.futures BODriverEX_mpi.py + +Authors: Tucker Hartland + Nai-Yuan Chiang +""" + +import sys +import os +import numpy as np +import warnings +warnings.filterwarnings("ignore") +from hiopbbpy.surrogate_modeling import smtKRG +from hiopbbpy.opt import BOAlgorithm +from hiopbbpy.problems import BraninProblem, LpNormProblem +from hiopbbpy.utils import MPIEvaluator +from mpi4py.futures import MPIPoolExecutor + +### parameters +n_samples = 64 # number of the initial samples to train GP +theta = 1.e-2 # hyperparameter for GP kernel +nx = 2 # dimension of the problem +xlimits = np.array([[-5, 5], [-5, 5]]) # bounds on optimization variable + +prob_type_l = ["LpNorm"] # ["LpNorm", "Branin"] +acq_type_l = ["LCB"] # ["LCB", "EI"] + +def con_eq(x): + return x[0] + x[1] - 4 + +def con_jac_eq(x): + return np.array([1.0, 1.0]) + +def con_ineq(x): + return x[0] - x[1] + +def con_jac_ineq(x): + return np.array([1.0, -1.0]) + +# 'SLSQP' requires constraints defined in a list of dict. +# IPOPT can support this format, too +user_constraint_list = [{'type': 'eq', 'fun': con_eq, 'jac': con_jac_eq}, + {'type': 'ineq', 'fun': con_ineq, 'jac': con_jac_ineq}] + +def cons_vec(x): + x1, x2 = x + return np.array([ + (x1 - 2)**2 + (x2 - 2.5)**2 - 2, + x1 + x2 - 5, + -x1 + ]) + +# Jacobian of constraints +def cons_jac_vec(x): + x1, x2 = x + return np.array([ + [2 * (x1 - 2), 2 * (x2 - 2.5)], + [1, 1], + [-1, 0] + ]) + +cl = -np.inf * np.ones(3) +cu = np.zeros(3) + +# 'trust-constr' and IPOPT support vector-valued constraints +user_constraint_dict = {'cons': cons_vec, 'jac': cons_jac_vec, 'cl': cl, 'cu': cu} + + +if __name__ == "__main__": + do_profiling = True + + for prob_type in prob_type_l: + print() + print(f"========================================") + print(f"Testing BO with {prob_type} problem") + print(f"========================================") + + # ----- Create MPI evaluators + # MPIPoolExecutor() will use all available MPI processes when + # launched with: mpiexec -n N python -m mpi4py.futures script.py + obj_evaluator = MPIEvaluator( + executor=MPIPoolExecutor(), + profiling=do_profiling, + task_name="MPI_OBJ_EVAL" + ) + opt_evaluator = MPIEvaluator( + function_mode=False, + executor=MPIPoolExecutor(), + profiling=do_profiling, + task_name="MPI_OPT_EVAL" + ) + + if prob_type == "LpNorm": + problem = LpNormProblem(nx, xlimits) + else: + problem = BraninProblem() + problem.set_constraints(user_constraint_list) # for solver 'trust-constr' and IPOPT, use user_constraint_dict; for solver 'SLSQP' and IPOPT, user_constraint_list + + for acq_type in acq_type_l: + print(f"\nAcquisition type: {acq_type}") + + ### initial training set + x_train = problem.sample(n_samples) + y_train = obj_evaluator.run(problem.evaluate, x_train) + + ### Define the GP surrogate model + gp_model = smtKRG(theta, xlimits, nx) + gp_model.train(x_train, y_train) + + opt_solver = 'SLSQP' #"SLSQP" "IPOPT" "trust-constr" + if opt_solver == "SLSQP" or opt_solver == "trust-constr": + solver_options = {"maxiter": 100} #for scipy solvers + elif opt_solver == "IPOPT": + solver_options = {"max_iter": 100, "print_level": 1} + + options = { + 'acquisition_type': acq_type, + 'log_level': 'info', + 'bo_maxiter': 20, + 'opt_solver': opt_solver, + 'batch_size': 2, + 'n_start': 64, + 'solver_options': solver_options, + 'obj_evaluator': obj_evaluator, + 'opt_evaluator': opt_evaluator + } + + # Instantiate and run Bayesian Optimization + print(f"Starting Bayesian Optimization...") + bo = BOAlgorithm(problem, gp_model, x_train, y_train, options = options) #EI or LCB + bo.optimize() + + print(f"\nOptimization complete!") + print(f"Optimal solution: {bo.x_opt}") + print(f"Optimal value: {bo.y_opt}") diff --git a/src/Drivers/hiopbbpy/EvaluationManagerCI.py b/src/Drivers/hiopbbpy/EvaluationManagerCI.py index 2659a1728..11c8850b3 100644 --- a/src/Drivers/hiopbbpy/EvaluationManagerCI.py +++ b/src/Drivers/hiopbbpy/EvaluationManagerCI.py @@ -1,6 +1,14 @@ """ -This is a class to manage function evaluations using multiple parallel executors. -It supports both intra-node and inter-node parallelism. +Test script demonstrating EvaluationManager with multiple executor types. +Supports both intra-node and inter-node parallelism. + +Usage examples: + Single-node with threads: python EvaluationManagerCI.py -e thread -w 4 + Single-node with processes: python EvaluationManagerCI.py -e process -w 4 + Multi-node with MPI: mpiexec -n 8 python EvaluationManagerCI.py -e mpi + +For MPI, use N = number of total processes across all nodes. +The script will use rank 0 as master and ranks 1-(N-1) as workers. Authors: Tucker Hartland Weslley S Pereira @@ -12,20 +20,28 @@ import os import socket import threading -from hiopbbpy.utils import EvaluationManager, is_running_with_mpi -from concurrent.futures import ThreadPoolExecutor +from hiopbbpy.utils import EvaluationManager +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): hostname = socket.gethostname() pid = os.getpid() + # Try to get the actual MPI rank of the worker executing this function + try: + from mpi4py import MPI + comm = MPI.COMM_WORLD + actual_rank = comm.Get_rank() + except: + actual_rank = driver_rank # Fallback to passed rank if MPI not available + if slow_first and x == 0: actual_sleep = 3 * sleep_time else: actual_sleep = sleep_time print( - f"rank={driver_rank} pid={pid} host={hostname}: processing x={x}", + f"rank={actual_rank} pid={pid} host={hostname}: processing x={x}", flush=True, ) @@ -36,7 +52,16 @@ def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): # Arguments for command line parser = argparse.ArgumentParser( description="Execute n function calls with t duration.", - epilog="To properly run the example with mpi4py, use: env MPI4PY_FUTURES_MAX_WORKERS= mpiexec -n 1 python evaluation_manager.py", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + Single-node with threads: python EvaluationManagerCI.py -e thread -w 4 + Single-node with processes: python EvaluationManagerCI.py -e process -w 4 + Multi-node with MPI: mpiexec -n python EvaluationManagerCI.py -e mpi + +For MPI, use N = number of nodes * processes_per_node. +The script uses rank 0 as master and ranks 1-(N-1) as workers. + """, ) parser.add_argument("-n", type=int, default=20, help="Number of tasks to execute") parser.add_argument( @@ -54,26 +79,68 @@ def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): action="store_true", help="Make the first task slower (3x sleep time)", ) + parser.add_argument( + "-e", + "--executor", + type=str, + default="thread", + choices=["thread", "process", "mpi"], + help="Executor type: thread (ThreadPoolExecutor), process (ProcessPoolExecutor), or mpi (MPIPoolExecutor)", + ) + parser.add_argument( + "-w", + "--max_workers", + type=int, + default=None, + help="Maximum number of workers (for thread/process executors)", + ) args = parser.parse_args() - # Choose executor type - if is_running_with_mpi(): - from mpi4py import MPI - driver_rank = MPI.COMM_WORLD.Get_rank() - executor_type = "mpi" - else: - driver_rank = 0 - executor_type = "cpu" - # Set up logging logging.basicConfig(level=logging.INFO) - # Create manager - cpu_executor = ThreadPoolExecutor() + # Create executor based on user choice + if args.executor == "thread": + cpu_executor = ThreadPoolExecutor(max_workers=args.max_workers) + executor_name = "ThreadPool" + elif args.executor == "process": + cpu_executor = ProcessPoolExecutor(max_workers=args.max_workers) + executor_name = "ProcessPool" + elif args.executor == "mpi": + try: + from mpi4py import MPI + from mpi4py.futures import MPIPoolExecutor + + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + size = comm.Get_size() + + if size < 2: + print("ERROR: MPI executor requires at least 2 processes (1 master + 1 worker)") + print("Run with: mpiexec -n python EvaluationManagerCI.py -e mpi") + sys.exit(1) + + # Use context manager - this prevents spawning and uses existing processes + # Only rank 0 will run the main logic, others will be workers + if rank != 0: + # Worker ranks just need to participate in the MPIPoolExecutor + with MPIPoolExecutor() as executor: + pass # Workers block here until master is done + sys.exit(0) + + cpu_executor = MPIPoolExecutor() + executor_name = f"MPIPool (rank={rank}/{size}, {size-1} workers)" + print(f"MPI executor initialized: {size} total processes, {size-1} workers across nodes") + + except ImportError: + print("ERROR: mpi4py not installed. Install with: pip install mpi4py") + sys.exit(1) + + # Create manager (only rank 0 reaches here for MPI) manager = EvaluationManager( - cpu_executor=cpu_executor, + executor=cpu_executor, profiling=args.profile, - task_name="CI_TASK" + task_name=f"CI_TASK_{executor_name}" ) # Submit tasks @@ -81,10 +148,8 @@ def _fn_for_test(x, sleep_time=0.1, slow_first=False, driver_rank=0): manager.submit_tasks( _fn_for_test, [i for i in range(args.n)], - execute_at=executor_type, sleep_time=args.sleep_time, slow_first=args.slow_first, - driver_rank=driver_rank, ) # Do some other work while tasks are running diff --git a/src/Drivers/hiopbbpy/README.md b/src/Drivers/hiopbbpy/README.md new file mode 100644 index 000000000..91f43ff8a --- /dev/null +++ b/src/Drivers/hiopbbpy/README.md @@ -0,0 +1,174 @@ +# HiOp Bayesian Optimization with EvaluationManager + +This directory contains Bayesian Optimization (BO) drivers that use the EvaluationManager for parallel function evaluations. + +## Quick Start + +### Scaling Tests (Simple 2D Problem) +Run automated scaling tests with a simple 2D LpNorm problem: +```bash +bash submit_bo_scaling.sh +``` +This tests **3, 5, 9, 17 processes** on 1 and 2 nodes. + +## Files Overview + +### BO Drivers + +- `BODriverEX.py` - Simple 2D LpNorm problem (single executor) +- `BODriverEX_mpi.py` - Simple 2D LpNorm problem (MPI version for scaling tests) +- `BODriverCI.py` - Branin/LpNorm test problems + +### Submission Scripts + +- `submit_bo_scaling.sh` - Automated scaling test launcher (uses BODriverEX_mpi.py) +- `submit_bo_template.sbatch` - Generic template for any BO configuration + +**EvaluationManager Test Scripts:** +- `submit_scaling_tests.sh` - Scaling tests for thread/process/mpi executors +- `test_thread.sh` - ThreadPoolExecutor test +- `test_process.sh` - ProcessPoolExecutor test +- `test_multinode_mpi.sh` - MPIPoolExecutor multi-node test +- `EvaluationManagerCI.py` - Simple test function for EvaluationManager + +### Documentation + +- `README.md` - This file (general overview) +- `TESTING_GUIDE.md` - Guide for running scaling tests +- `STRUCTURE.md` - Directory structure and organization + +## EvaluationManager Integration + +Your BO code **already uses EvaluationManager**! The `MPIEvaluator` class (in `hiopbbpy.utils`) is a wrapper around `EvaluationManager`, so no code changes are needed. + +### Key Features: +- Automatic profiling output when `profiling=True` +- Support for thread/process/MPI executors +- Per-evaluation run directories +- Timing statistics and performance metrics + +### Profiling Output: +When profiling is enabled, you'll see: +``` +=== Parallel Performance === +MPI_OBJ_EVAL Workers: 15 +MPI_OBJ_EVAL Total work in seconds: 1.234e+02 +MPI_OBJ_EVAL Ideal walltime in seconds (perfect balance): 8.227e+00 +MPI_OBJ_EVAL Actual walltime in seconds (observed): 9.345e+00 +``` + +## Usage Examples + +### Run Scaling Tests +```bash +# Test with 3, 5, 9, 17 processes on 1 and 2 nodes +bash submit_bo_scaling.sh + +# Monitor progress +squeue -u $USER +tail -f logs/bo_*.out + +# View results +grep -H "Parallel Performance\|Elapsed time" logs/bo_*.out | sort +``` + + +### Run EvaluationManager Tests +```bash +# Test all executor types: thread, process, mpi +bash submit_scaling_tests.sh + +# Test specific executor +sbatch -n 16 test_process.sh +sbatch -N 2 -n 16 test_multinode_mpi.sh +``` + +## Architecture + +``` +BODriver (Python) + ↓ +MPIEvaluator (hiopbbpy.utils.util) + ↓ +EvaluationManager (hiopbbpy.utils.evaluation_manager) + ↓ +Executor (ThreadPool/ProcessPool/MPIPool) + ↓ +Worker Processes (evaluate objective function) +``` + +## Bug Fixes + +### MPI Hanging Issue (Fixed) +**Problem:** MPI tests were hanging with mvapich2 due to PMI2 spawn errors. + +**Solution:** +1. Switch to OpenMPI (better mpi4py.futures support) +2. Use `mpiexec -n N python -m mpi4py.futures script.py` +3. This avoids dynamic process spawning issues + +### Missing Profiling Output (Fixed) +**Problem:** Profiling was enabled but timing statistics weren't showing. + +**Solution:** Store timing data as instance variables so they persist across `sync()` and `retrieve_results()` calls. + +## Directory Structure + +``` +hiopbbpy/ +├── BO Drivers +│ ├── BODriverEX.py # Simple test problem +│ ├── BODriverEX_mpi.py # Simple test (MPI) +│ └── BODriverCI.py # CI test problems +│ +├── Submission Scripts +│ ├── submit_bo_scaling.sh +│ ├── submit_bo_template.sbatch +│ └── submit_scaling_tests.sh +│ +├── Test Scripts +│ ├── test_thread.sh +│ ├── test_process.sh +│ ├── test_multinode_mpi.sh +│ └── EvaluationManagerCI.py +│ +├── Documentation +│ ├── README.md +│ ├── TESTING_GUIDE.md +│ └── README_EvaluationManager.md +│ +├── Logs & Temp +│ ├── logs/ # Job output files +│ └── hiop_temp/ # Temporary run directories +│ +``` + +## Requirements + +- Python 3.10+ +- mpi4py (for MPI executors) +- OpenMPI (recommended) or mvapich2 +- hiopbbpy package + +## Troubleshooting + +**No profiling output?** +- Verify `do_profiling = True` in the driver +- Check for "Parallel Performance" in output logs +- Recent fix should make this work automatically + +**MPI hangs?** +- Load OpenMPI: `module load openmpi/4.1.2` +- Always use: `python -m mpi4py.futures script.py` +- Check processes are distributed: `squeue -u $USER` + +**Jobs fail immediately?** +- Check `.err` files in logs/ directory +- Verify imports: `python -c "from hiopbbpy.problems import LpNormProblem"` +- Check module environment: `module list` + +## Authors + +- Tucker Hartland +- Nai-Yuan Chiang +- Weslley S Pereira diff --git a/src/Drivers/hiopbbpy/TESTING_GUIDE.md b/src/Drivers/hiopbbpy/TESTING_GUIDE.md new file mode 100644 index 000000000..19f0884df --- /dev/null +++ b/src/Drivers/hiopbbpy/TESTING_GUIDE.md @@ -0,0 +1,106 @@ +# Scaling Test Guide + +## Quick Start + +### BO Scaling Tests +```bash +bash submit_bo_scaling.sh +``` +Tests **3, 5, 9, 17 processes** on 1 and 2 nodes using simple 2D LpNorm problem. + +### EvaluationManager Tests +```bash +bash submit_scaling_tests.sh +``` +Tests **1, 2, 4, 8, 16, 32, 64 workers** with thread/process/MPI executors. + +## Test Problems + +**BODriverEX_mpi.py**: Simple 2D LpNorm optimization +- 64 initial samples, 20 BO iterations +- Fast evaluations +- Shows profiling output + +**EvaluationManagerCI.py**: Trivial test function +- Fixed 128 tasks, configurable sleep +- Tests strong scaling + +## Monitoring + +### Check Queue +```bash +squeue -u $USER +``` + +### Watch Output +```bash +tail -f logs/bo_*.out # All BO jobs +tail -f logs/bo_1n_p16_*.out # Specific job +``` + +### Check Results +```bash +grep -H "Parallel Performance" logs/bo_*.out +grep -H "Elapsed time" logs/bo_*.out | sort +``` + +## Expected Output + +### Profiling Summary +``` +=== Parallel Performance === +MPI_OBJ_EVAL Workers: 15 +MPI_OBJ_EVAL Total work in seconds: 7.890e-01 +MPI_OBJ_EVAL Ideal walltime in seconds (perfect balance): 5.260e-02 +MPI_OBJ_EVAL Actual walltime in seconds (observed): 6.123e-02 +``` + +### Job Summary +``` +========================================== +Results Summary +========================================== +Nodes: 2 +Total processes: 16 +Workers: 15 +Status: ✓ COMPLETED +Elapsed time: 00:01:23 (83 seconds) +========================================== +``` + + +## Configuration + +### Change Process Counts +Edit `submit_bo_scaling.sh`: +```bash +PROC_COUNTS=(2 4 8 16 32) +``` + +### Change Problem Size +Edit `BODriverEX_mpi.py`: +```python +n_samples = 128 +options['bo_maxiter'] = 50 +``` + +## Troubleshooting + +**No profiling output?** +- Verify `do_profiling = True` in driver +- Recent fix (May 2026) should resolve this + +**MPI hangs?** +```bash +module list | grep openmpi # Should show openmpi, not mvapich2 +``` + +**Jobs fail?** +```bash +python -c "from hiopbbpy.problems import LpNormProblem" # Test imports +ls logs/*.err # Check error logs +``` + +## See Also + +- `README.md` - Main documentation diff --git a/src/Drivers/hiopbbpy/submit_bo_scaling.sh b/src/Drivers/hiopbbpy/submit_bo_scaling.sh new file mode 100755 index 000000000..d5bbe5c4c --- /dev/null +++ b/src/Drivers/hiopbbpy/submit_bo_scaling.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# submit_bo_scaling.sh +# Submit BO scaling tests with different processor counts on 1 and 2 nodes + +echo "==========================================" +echo "Submitting BO Scaling Tests" +echo "==========================================" +echo "" + +# Array of processor counts to test +PROC_COUNTS=(3 5 9 17) + +# Test on 1 node +echo "=== Single-Node Tests ===" +for nprocs in "${PROC_COUNTS[@]}"; do + echo "Submitting: 1 node, $nprocs processes" + + sbatch -N 1 -n $nprocs \ + --job-name="bo_1n_p${nprocs}" \ + --output="logs/bo_1n_p${nprocs}_%j.out" \ + --error="logs/bo_1n_p${nprocs}_%j.err" \ + submit_bo_template.sbatch + + if [ $? -eq 0 ]; then + echo " ✓ Job submitted successfully" + else + echo " ✗ Job submission failed" + fi + + sleep 1 +done + +echo "" +echo "=== Two-Node Tests ===" +for nprocs in "${PROC_COUNTS[@]}"; do + # For 2 nodes, we want nprocs per node + tasks_per_node=$nprocs + total_procs=$((2 * tasks_per_node)) + + echo "Submitting: 2 nodes, $tasks_per_node tasks/node ($total_procs total)" + + sbatch -N 2 --ntasks-per-node=$tasks_per_node \ + --job-name="bo_2n_p${total_procs}" \ + --output="logs/bo_2n_p${total_procs}_%j.out" \ + --error="logs/bo_2n_p${total_procs}_%j.err" \ + submit_bo_template.sbatch + + if [ $? -eq 0 ]; then + echo " ✓ Job submitted successfully" + else + echo " ✗ Job submission failed" + fi + + sleep 1 +done + +echo "" +echo "==========================================" +echo "All BO scaling jobs submitted!" +echo "==========================================" +echo "" +echo "Summary:" +echo " Single-node: 2, 4, 8, 16 processes" +echo " Two-node: 4, 8, 16, 32 total processes (2, 4, 8, 16 per node)" +echo "" +echo "Check job status:" +echo " squeue -u $USER" +echo "" +echo "Monitor outputs:" +echo " tail -f logs/bo_*.out" +echo "" +echo "View results when complete:" +echo " grep -H 'Elapsed time' logs/bo_*.out | sort" diff --git a/src/Drivers/hiopbbpy/submit_bo_template.sbatch b/src/Drivers/hiopbbpy/submit_bo_template.sbatch new file mode 100755 index 000000000..a59a3daa8 --- /dev/null +++ b/src/Drivers/hiopbbpy/submit_bo_template.sbatch @@ -0,0 +1,86 @@ +#!/bin/bash -l +#SBATCH -p pbatch +#SBATCH -A hiop +#SBATCH -t 02:00:00 +#SBATCH --mem=240G + +# Template script for BO scaling tests +# Node and process counts are specified via sbatch command line + +set -euo pipefail + +mkdir -p logs +JOBID="${SLURM_JOB_ID:-nojobid}" + +# Calculate total processes and workers +TOTAL_PROCS=${SLURM_NTASKS} +N_WORKERS=$((TOTAL_PROCS - 1)) +TASKS_PER_NODE=${SLURM_NTASKS_PER_NODE:-N/A} + +echo "==========================================" +echo "BO Scaling Test" +echo "==========================================" +echo "Job ID: $JOBID" +echo "Job name: ${SLURM_JOB_NAME}" +echo "Nodes: ${SLURM_NNODES}" +echo "Tasks per node: ${TASKS_PER_NODE}" +echo "Total processes: $TOTAL_PROCS" +echo "Workers: $N_WORKERS" +echo "Node list: ${SLURM_NODELIST}" +echo "Start time: $(date)" +echo "==========================================" +echo "" + +start_time=$(date +%s) + +####################################### +# Environment setup +####################################### +# Load OpenMPI +module unload mvapich2 2>/dev/null || true +module unload mpich 2>/dev/null || true +module load openmpi/4.1.2 + +echo "Loaded modules:" +module list +echo "" + +echo "Python version:" +python --version +echo "" + +echo "MPI information:" +which mpiexec +echo "" + +####################################### +# Launch BO with MPI +####################################### +echo "Command: mpiexec -n $TOTAL_PROCS python -m mpi4py.futures BODriverEX_mpi.py" +echo "" + +mpiexec -n $TOTAL_PROCS python -m mpi4py.futures BODriverEX_mpi.py + +EXIT_CODE=$? + +end_time=$(date +%s) +elapsed=$((end_time - start_time)) + +echo "" +echo "==========================================" +echo "Results Summary" +echo "==========================================" +echo "Nodes: ${SLURM_NNODES}" +echo "Total processes: $TOTAL_PROCS" +echo "Workers: $N_WORKERS" +if [ $EXIT_CODE -eq 0 ]; then + echo "Status: ✓ COMPLETED" +else + echo "Status: ✗ FAILED (exit code $EXIT_CODE)" +fi +printf "Elapsed time: %02d:%02d:%02d (%d seconds)\n" \ + $((elapsed/3600)) $((elapsed%3600/60)) $((elapsed%60)) "$elapsed" +echo "Completion time: $(date)" +echo "==========================================" + +exit $EXIT_CODE diff --git a/src/Drivers/hiopbbpy/submit_scaling_tests.sh b/src/Drivers/hiopbbpy/submit_scaling_tests.sh new file mode 100644 index 000000000..39cab1b7a --- /dev/null +++ b/src/Drivers/hiopbbpy/submit_scaling_tests.sh @@ -0,0 +1,63 @@ +#!/bin/bash +# submit_scaling_tests.sh +# Submit EvaluationManager scaling tests with fixed number of tasks +# and varying number of processes + +echo "Submitting scaling tests for EvaluationManager" +echo "==============================================================" +echo "" + +# Array of process counts to test +# Note: For MPI, need at least 2 processes (1 master + 1 worker) +PROCESS_COUNTS=(2 4 8 16 32 64) + +# Choose executor type: "thread" for ThreadPoolExecutor, "process" for ProcessPoolExecutor, or "mpi" for MPIPoolExecutor +EXECUTOR_TYPE="mpi" + +echo "Configuration:" +echo " Executor type: $EXECUTOR_TYPE" +echo " Process counts: ${PROCESS_COUNTS[@]}" +echo "" + +for nprocs in "${PROCESS_COUNTS[@]}"; do + echo "Submitting: $nprocs workers" + + if [ "$EXECUTOR_TYPE" == "thread" ]; then + # Use ThreadPoolExecutor (single-node) + echo " Command: sbatch test_thread.sh $nprocs" + sbatch --job-name="eval_mgr_thread${nprocs}" test_thread.sh $nprocs + elif [ "$EXECUTOR_TYPE" == "process" ]; then + # Use ProcessPoolExecutor (single-node) + echo " Command: sbatch test_process.sh $nprocs" + sbatch --job-name="eval_mgr_process${nprocs}" test_process.sh $nprocs + elif [ "$EXECUTOR_TYPE" == "mpi" ]; then + # Use MPIPoolExecutor (can be multi-node) + # Calculate nodes needed (assuming 4 procs per node, round up) + nodes=$(( (nprocs + 3) / 4 )) + echo " Using $nodes node(s) for $nprocs MPI processes" + echo " Command: sbatch -N $nodes -n $nprocs test_multinode_mpi.sh" + sbatch -N $nodes -n $nprocs --job-name="eval_mgr_mpi${nprocs}" test_multinode_mpi.sh + fi + + if [ $? -eq 0 ]; then + echo " ✓ Job submitted successfully" + else + echo " ✗ Job submission failed" + fi + + # Brief pause between submissions + sleep 1 +done + +echo "" +echo "==============================================================" +echo "All jobs submitted!" +echo "" +echo "Check job status with:" +echo " squeue -u $USER" +echo "" +echo "Monitor output files:" +echo " tail -f logs/eval_mgr_*.out" +echo "" +echo "View results when complete:" +echo " grep -H 'Total time\|Workers:' logs/eval_mgr_*.out | sort" diff --git a/src/Drivers/hiopbbpy/test_multinode_mpi.sh b/src/Drivers/hiopbbpy/test_multinode_mpi.sh new file mode 100644 index 000000000..77862adef --- /dev/null +++ b/src/Drivers/hiopbbpy/test_multinode_mpi.sh @@ -0,0 +1,88 @@ +#!/bin/bash -l +#SBATCH --job-name=eval_mgr_mpi +#SBATCH --output=logs/eval_mgr_mpi_%j.out +#SBATCH --error=logs/eval_mgr_mpi_%j.err +#SBATCH -N 1 +#SBATCH -n 4 +#SBATCH -p pbatch +#SBATCH -A hiop +#SBATCH -t 00:05:00 +#SBATCH --mem=240G + +# Multi-node MPI test script for EvaluationManager +# This script will be submitted multiple times with different node counts +# +# Usage: +# sbatch -N 1 -n 4 test_multinode_mpi.sh +# sbatch -N 2 -n 8 test_multinode_mpi.sh +# sbatch -N 4 -n 16 test_multinode_mpi.sh +# sbatch -N 8 -n 32 test_multinode_mpi.sh + +# Get number of nodes and processes from SLURM +NNODES=${SLURM_NNODES:-1} +NPROCS_PER_NODE=${SLURM_NTASKS_PER_NODE:-4} +NTASKS=${SLURM_NTASKS:-4} +# Fixed number of tasks for strong scaling tests +NTASKS_TO_RUN=128 + +echo "==========================================" +echo "Testing EvaluationManager Multi-Node MPI" +echo "==========================================" +echo "Job ID: $SLURM_JOB_ID" +echo "Job name: $SLURM_JOB_NAME" +echo "Nodes: $NNODES" +echo "Processes per node: $NPROCS_PER_NODE" +echo "Total processes: $NTASKS" +echo "Tasks to compute: $NTASKS_TO_RUN" +echo "Node list: $SLURM_NODELIST" +echo "Start time: $(date)" +echo "==========================================" +echo "" + +# Load required modules +module unload mvapich2 +module unload mpich +module load openmpi/4.1.2 + +# Show loaded modules +echo "Loaded modules:" +module list +echo "" + +# Show Python and MPI information +echo "Python version:" +python --version +echo "" + +echo "MPI information:" +which mpiexec +echo "" + +# Run the test +# Use python -m mpi4py.futures to properly handle MPIPoolExecutor without spawning +echo "Starting EvaluationManager test..." +echo "Command: mpiexec -n $NTASKS python -m mpi4py.futures EvaluationManagerCI.py -e mpi -n $NTASKS_TO_RUN -p -t 0.5" +echo "" + +mpiexec -n $NTASKS python -m mpi4py.futures EvaluationManagerCI.py \ + -e mpi \ + -n $NTASKS_TO_RUN \ + -p \ + -t 0.5 + +EXIT_CODE=$? + +echo "" +echo "==========================================" +echo "Test completed at $(date)" +echo "Exit code: $EXIT_CODE" +echo "==========================================" + +# Print summary of output files +if [ $EXIT_CODE -eq 0 ]; then + echo "✓ Test PASSED" +else + echo "✗ Test FAILED with exit code $EXIT_CODE" +fi + +exit $EXIT_CODE diff --git a/src/Drivers/hiopbbpy/test_process.sh b/src/Drivers/hiopbbpy/test_process.sh new file mode 100755 index 000000000..83a7a1ab6 --- /dev/null +++ b/src/Drivers/hiopbbpy/test_process.sh @@ -0,0 +1,82 @@ +#!/bin/bash -l +#SBATCH --job-name=eval_mgr_process +#SBATCH --output=logs/eval_mgr_process_%j.out +#SBATCH --error=logs/eval_mgr_process_%j.err +#SBATCH -N 1 +#SBATCH -n 4 +#SBATCH -p pbatch +#SBATCH -A hiop +#SBATCH -t 00:05:00 +#SBATCH --mem=240G + +# Single-node multi-process test script for EvaluationManager +# This script uses ProcessPoolExecutor (not MPI) for parallel execution +# +# Usage: +# sbatch -n 4 test_process.sh +# sbatch -n 8 test_process.sh +# sbatch -n 16 test_process.sh + +# Get number of workers from command line arg or SLURM (use -n parameter) +# Default to 4 if neither is provided +if [ -n "$1" ]; then + NWORKERS=$1 +else + NWORKERS=${SLURM_NTASKS:-4} +fi +NTASKS_TO_RUN=128 # Fixed number of tasks for strong scaling tests + +echo "==========================================" +echo "Testing EvaluationManager Single-Node Process" +echo "==========================================" +echo "Job ID: $SLURM_JOB_ID" +echo "Job name: $SLURM_JOB_NAME" +echo "Node: $SLURM_NODELIST" +echo "Workers: $NWORKERS" +echo "Tasks to compute: $NTASKS_TO_RUN" +echo "Start time: $(date)" +echo "==========================================" +echo "" + +# Load required modules (adjust as needed for Dane) +# Uncomment and modify these lines based on your environment +# module load python/3.9 + +# Show loaded modules +echo "Loaded modules:" +module list +echo "" + +# Show Python information +echo "Python version:" +python --version +echo "" + +# Run the test with ProcessPoolExecutor +echo "Starting EvaluationManager test with ProcessPoolExecutor..." +echo "Command: python EvaluationManagerCI.py -e process -w $NWORKERS -n $NTASKS_TO_RUN -p -t 0.5" +echo "" + +python EvaluationManagerCI.py \ + -e process \ + -w $NWORKERS \ + -n $NTASKS_TO_RUN \ + -p \ + -t 0.5 + +EXIT_CODE=$? + +echo "" +echo "==========================================" +echo "Test completed at $(date)" +echo "Exit code: $EXIT_CODE" +echo "==========================================" + +# Print summary of output files +if [ $EXIT_CODE -eq 0 ]; then + echo "✓ Test PASSED" +else + echo "✗ Test FAILED with exit code $EXIT_CODE" +fi + +exit $EXIT_CODE diff --git a/src/Drivers/hiopbbpy/test_thread.sh b/src/Drivers/hiopbbpy/test_thread.sh new file mode 100755 index 000000000..fc92d64c9 --- /dev/null +++ b/src/Drivers/hiopbbpy/test_thread.sh @@ -0,0 +1,82 @@ +#!/bin/bash -l +#SBATCH --job-name=eval_mgr_thread +#SBATCH --output=logs/eval_mgr_thread_%j.out +#SBATCH --error=logs/eval_mgr_thread_%j.err +#SBATCH -N 1 +#SBATCH -n 4 +#SBATCH -p pbatch +#SBATCH -A hiop +#SBATCH -t 00:05:00 +#SBATCH --mem=240G + +# Single-node multi-thread test script for EvaluationManager +# This script uses ThreadPoolExecutor for parallel execution +# +# Usage: +# sbatch -n 4 test_thread.sh +# sbatch -n 8 test_thread.sh +# sbatch -n 16 test_thread.sh + +# Get number of workers from command line arg or SLURM (use -n parameter) +# Default to 4 if neither is provided +if [ -n "$1" ]; then + NWORKERS=$1 +else + NWORKERS=${SLURM_NTASKS:-4} +fi +NTASKS_TO_RUN=128 # Fixed number of tasks for strong scaling tests + +echo "==========================================" +echo "Testing EvaluationManager Single-Node Thread" +echo "==========================================" +echo "Job ID: $SLURM_JOB_ID" +echo "Job name: $SLURM_JOB_NAME" +echo "Node: $SLURM_NODELIST" +echo "Workers: $NWORKERS" +echo "Tasks to compute: $NTASKS_TO_RUN" +echo "Start time: $(date)" +echo "==========================================" +echo "" + +# Load required modules (adjust as needed for Dane) +# Uncomment and modify these lines based on your environment +# module load python/3.9 + +# Show loaded modules +echo "Loaded modules:" +module list +echo "" + +# Show Python information +echo "Python version:" +python --version +echo "" + +# Run the test with ThreadPoolExecutor +echo "Starting EvaluationManager test with ThreadPoolExecutor..." +echo "Command: python EvaluationManagerCI.py -e thread -w $NWORKERS -n $NTASKS_TO_RUN -p -t 0.5" +echo "" + +python EvaluationManagerCI.py \ + -e thread \ + -w $NWORKERS \ + -n $NTASKS_TO_RUN \ + -p \ + -t 0.5 + +EXIT_CODE=$? + +echo "" +echo "==========================================" +echo "Test completed at $(date)" +echo "Exit code: $EXIT_CODE" +echo "==========================================" + +# Print summary of output files +if [ $EXIT_CODE -eq 0 ]; then + echo "✓ Test PASSED" +else + echo "✗ Test FAILED with exit code $EXIT_CODE" +fi + +exit $EXIT_CODE diff --git a/src/hiopbbpy/opt/boalgorithm.py b/src/hiopbbpy/opt/boalgorithm.py index 35541d609..36bd0fb55 100644 --- a/src/hiopbbpy/opt/boalgorithm.py +++ b/src/hiopbbpy/opt/boalgorithm.py @@ -14,6 +14,7 @@ from ..problems.problem import Problem from .optproblem import IpoptProb from ..utils.util import Evaluator, Logger +import os # A base class defining a general framework for Bayesian Optimization class BOAlgorithmBase: @@ -338,6 +339,7 @@ def __init__(self, fun, method, bounds, constraints, solver_options): # Find the minimum of the input objective `fun`, using the minimize function from SciPy. def minimizer_callback(self, x0s): output = [] + print(f"Worker pid={os.getpid()}: doing minimizer_callback ...", flush=True) msg = "" for x0 in x0s: if self.method == "SLSQP": diff --git a/src/hiopbbpy/problems/problem.py b/src/hiopbbpy/problems/problem.py index d1a76ed0b..0558614bf 100644 --- a/src/hiopbbpy/problems/problem.py +++ b/src/hiopbbpy/problems/problem.py @@ -28,7 +28,7 @@ def __init__(self, ndim, xlimits, name=" ", constraints=[]): # each dict = one scalar constraint self.n_con = len(constraints) - def _evaluate(self, x: np.ndarray) -> np.ndarray: + def _evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: """ problem evaluation y = f(x) of a scalar valued function f @@ -44,7 +44,7 @@ def _evaluate(self, x: np.ndarray) -> np.ndarray: """ raise NotImplementedError("Child class of hiopProblem should implement method _evaluate") - def evaluate(self, x: np.ndarray) -> np.ndarray: + def evaluate(self, x: np.ndarray, **kwargs) -> np.ndarray: """ problem callback y = f(x) of the scalar valued function f @@ -59,7 +59,7 @@ def evaluate(self, x: np.ndarray) -> np.ndarray: Function values (cast to reals) """ y = np.ndarray((x.shape[0], 1)) - y[:,:] = self._evaluate(x) + y[:,:] = self._evaluate(x, **kwargs) return y def con_evaluate(self, x: np.ndarray) -> np.ndarray: diff --git a/src/hiopbbpy/utils/__init__.py b/src/hiopbbpy/utils/__init__.py index 5b514f0b9..29d8dfc47 100644 --- a/src/hiopbbpy/utils/__init__.py +++ b/src/hiopbbpy/utils/__init__.py @@ -1,4 +1,4 @@ -from .evaluation_manager import (EvaluationManager, is_running_with_mpi) +from .evaluation_manager import (EvaluationManager) from .util import Evaluator, MPIEvaluator __all__ = [ diff --git a/src/hiopbbpy/utils/evaluation_manager.py b/src/hiopbbpy/utils/evaluation_manager.py index edcd73981..e5cb784e0 100644 --- a/src/hiopbbpy/utils/evaluation_manager.py +++ b/src/hiopbbpy/utils/evaluation_manager.py @@ -2,44 +2,54 @@ This is a class to manage function evaluations using multiple parallel executors. It supports both intra-node and inter-node parallelism. +Supported executors: + - concurrent.futures.ThreadPoolExecutor (single-node, multi-threaded) + - concurrent.futures.ProcessPoolExecutor (single-node, multi-process) + - mpi4py.futures.MPIPoolExecutor (multi-node, MPI-based) + +For multi-node execution with MPI: + 1. Install mpi4py: pip install mpi4py + 2. Run with: mpiexec -n python your_script.py + where N >= 2 (1 master + N-1 workers distributed across nodes) + 3. Only rank 0 should create the EvaluationManager + 4. Worker ranks should call MPIPoolExecutor() to enter worker loop + +Example multi-node usage: + from mpi4py import MPI + from mpi4py.futures import MPIPoolExecutor + + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + + if rank == 0: + # Master: create manager and submit tasks + manager = EvaluationManager({"mpi": MPIPoolExecutor()}, profiling=True) + manager.submit_tasks(my_func, data_list, execute_at="mpi") + manager.sync() + X, F = manager.retrieve_results() + else: + # Workers: enter worker loop + MPIPoolExecutor() + +See EvaluationManagerCI.py for a complete working example with multi-node MPI. + Authors: Tucker Hartland Weslley S Pereira """ import threading import logging -import copy +from concurrent.futures import CancelledError, wait +from collections import deque import os import time import math -from concurrent.futures import ProcessPoolExecutor, CancelledError -from collections import deque - - -def is_running_with_mpi(): - """Returns True if the code is running in an MPI environment.""" - _MPI_RANK_ENV_VARS = [ - "OMPI_COMM_WORLD_RANK", # Open MPI - "PMI_RANK", # MPICH, Intel MPI, Cray MPI - "MPI_RANK", # Intel MPI (sometimes) - "MV2_COMM_WORLD_RANK", # MVAPICH - ] - return any(var in os.environ for var in _MPI_RANK_ENV_VARS) - - -# Loads MPIPoolExecutor if MPI is available -if is_running_with_mpi(): - from mpi4py.futures import MPIPoolExecutor, wait - _EVALUATION_MANAGER_USES_MPI4PY = True -else: - _EVALUATION_MANAGER_USES_MPI4PY = False - from concurrent.futures import wait - +import copy -def _timed_call(fn, x, kwargs): - """Run fn(x, **kwargs) and record worker-side timing.""" +def _timed_call(fn, args, kwargs): + """Run fn(*args, **kwargs) and record worker-side timing.""" start_time = time.perf_counter() - fx = fn(x, **kwargs) + fx = fn(*args, **kwargs) done_time = time.perf_counter() return { "result": fx, @@ -69,50 +79,86 @@ def _summary_stats(values): "max": max(values), } - class EvaluationManager: - """Class that manages the evaluation of functions using multiple executors.""" - - def __init__( - self, - cpu_executor=None, - mpi_executor=None, - profiling=False, - task_name="TASK") -> None: + """Manage asynchronous function evaluations over one or more executors. + + The manager is executor-agnostic: each configured executor only needs a + ``submit(fn, *args, **kwargs)`` method that returns a Future-like object + exposing ``done()`` and ``result()``. + + Tasks are submitted asynchronously via :meth:`submit_tasks`. Completed + results are collected lazily by :meth:`retrieve_results` and eagerly by + :meth:`sync` (which blocks until the running queue is empty). + + Parameters + ---------- + executor: + Either a single executor instance or a ``dict[str, executor]`` mapping. + When a single executor is provided, it is stored under key ``"0"``. + profiling: + If True, wrap calls with worker-side timing. + task_name: + Label used in logging and profiling output. + """ + + def __init__(self, executor, profiling=False, task_name="TASK") -> None: self._queue = deque([]) + self._completed_X = deque([]) + self._completed_F = deque([]) self._queue_lock = threading.Lock() + + if isinstance(executor, dict): + self.executors = executor + else: + self.executors = {"0": executor} + self.logger = logging.getLogger(self.__class__.__name__) + self.task_name = task_name self.profiling = profiling self._first_submit_time = None - self.task_name = task_name - self.executors = { - "cpu": ProcessPoolExecutor() if cpu_executor is None else cpu_executor - } - if _EVALUATION_MANAGER_USES_MPI4PY: - self.executors["mpi"] = ( - MPIPoolExecutor() if mpi_executor is None else mpi_executor - ) - elif mpi_executor is not None: - self.executors["mpi"] = mpi_executor + # Store timing data if profiling is enabled + self._execution_times = [] if profiling else None + self._wait_times = [] if profiling else None + self._turnaround_times = [] if profiling else None - self.logger.info("EvaluationManager initialized with executors:") + self.logger.info(f"{self.task_name} EvaluationManager initialized with executors:") for key, executor in self.executors.items(): self.logger.info(f" - {key}: {executor}") def __del__(self) -> None: + """Shutdown managed executors during object destruction.""" for executor in self.executors.values(): - executor.shutdown(wait=False) - self.logger.info(f"{self.task_name} EvaluationManager destroyed and executors shut down.") + try: + executor.shutdown(wait=False) + self.logger.info(f"{self.task_name} EvaluationManager destroyed and executors shut down.") + except Exception as e: + self.logger.warning(f"{self.task_name} Error shutting down executor: {e}") def _get_num_workers(self): """Return number of workers.""" - if "mpi" in self.executors and is_running_with_mpi(): - return int(os.environ.get("MPI4PY_FUTURES_MAX_WORKERS", 1)) - try: - return self.executors["cpu"]._max_workers - except AttributeError: - return 1 + # For standard executors like ThreadPoolExecutor / ProcessPoolExecutor + for ex in self.executors.values(): + try: + return ex._max_workers + except AttributeError: + pass + + # For MPI executors, try to get MPI communicator size + if "mpi" in self.executors: + try: + from mpi4py import MPI + comm = MPI.COMM_WORLD + # Return size - 1 because rank 0 is the master + return comm.Get_size() - 1 + except Exception: + # Fallback to environment variable (legacy single-node mode) + try: + return int(os.environ.get("MPI4PY_FUTURES_MAX_WORKERS", 1)) + except Exception: + pass + + return 1 def set_task_name(self, task_name): self.task_name = task_name @@ -132,87 +178,106 @@ def _print_timing_stats(self, label, values): ) def sync(self) -> None: - """Wait for all submitted tasks to complete.""" - future_objs = [queue_obj["future"] for queue_obj in self._queue] - wait(future_objs) + """Block until all queued tasks finish. + + This method repeatedly waits on the currently queued futures and then + harvests completed items into the internal completion buffers. Harvested + results can be consumed using :meth:`retrieve_results`. + """ + while True: + with self._queue_lock: + futures = [queue_obj[1] for queue_obj in self._queue] + if len(futures) == 0: + break + + wait(futures) + + with self._queue_lock: + self._harvest_completed_locked( + execution_times=self._execution_times, + wait_times=self._wait_times, + turnaround_times=self._turnaround_times + ) + + def submit_tasks(self, fn, X, execute_at=None, **kwargs) -> None: + """Submit tasks to the specified executor. + + Parameters + ---------- + fn: + The function to be executed. + X: + Sequence of input data for the function. If an element is a tuple, + it is expanded as positional arguments (``fn(*x, **kwargs)``); + otherwise it is passed as a single argument (``fn(x, **kwargs)``). + execute_at: + Executor key to use for task submission. If ``None``, the first key + in ``executors`` is used. The key lookup is case-insensitive. + kwargs: + Additional keyword arguments passed to the function. + """ + + if execute_at is None: + execute_at = next(iter(self.executors)) - def submit_tasks(self, fn, X, execute_at="cpu", **kwargs) -> None: - """Submits tasks to the specified executor.""" key = execute_at.lower() + if key not in self.executors: + raise KeyError(f"Executor '{execute_at}' not found. Available: {list(self.executors.keys())}") + with self._queue_lock: for x in X: submit_time = time.perf_counter() if self._first_submit_time is None: self._first_submit_time = submit_time + args = x if isinstance(x, tuple) else (x,) + if self.profiling: - future_obj = self.executors[key].submit(_timed_call, fn, x, kwargs) + future_obj = self.executors[key].submit(_timed_call, fn, args, kwargs) else: - future_obj = self.executors[key].submit(fn, x, **kwargs) + future_obj = self.executors[key].submit(fn, *args, **kwargs) - self._queue.append({ - "x": copy.deepcopy(x), - "future": future_obj, - "submit_time": submit_time, - }) + self._queue.append([x, future_obj, key, submit_time]) self.logger.info(f"{self.task_name} Submitted f({x})") def retrieve_results(self) -> tuple[list, list]: - """Retrieves the results of completed tasks.""" - X = deque([]) - F = deque([]) - - execution_times = [] - wait_times = [] - turnaround_times = [] - + """Retrieves the results of completed tasks. + Returns + ------- + tuple[list, list] + Inputs and corresponding results for completed tasks. If a task + failed or was cancelled, its result entry is ``None``. + """ # Master-side wall clock for the whole batch batch_done_time = time.perf_counter() with self._queue_lock: - new_queue = deque([]) - - for item in self._queue: - x = item["x"] - future = item["future"] - submit_time = item["submit_time"] - - if future.done(): - try: - fx = future.result() - except CancelledError: - self.logger.warning(f"{self.task_name} The execution of x={x} was cancelled.") - continue - - if self.profiling: - # These are fine for local inspection, but note: - # worker_start_time / worker_done_time are on worker clocks. - worker_start_time = fx["start_time"] - worker_done_time = fx["done_time"] - - execution_time = fx["execution_time"] - - # These are not robust across different node clocks, but kept here - # because you already had them. - wait_time = worker_start_time - submit_time - turnaround_time = worker_done_time - submit_time - - execution_times.append(execution_time) - wait_times.append(wait_time) - turnaround_times.append(turnaround_time) + # Harvest any remaining completed tasks + self._harvest_completed_locked( + execution_times=self._execution_times, + wait_times=self._wait_times, + turnaround_times=self._turnaround_times, + ) - fx = fx["result"] + X = list(self._completed_X) + F = list(self._completed_F) + self._completed_X.clear() + self._completed_F.clear() - X.append(x) - F.append(fx) - self.logger.info(f"{self.task_name} Completed: f({x}) = {fx}") - else: - new_queue.append(item) + # Use the stored timing data collected during all harvests + execution_times = self._execution_times or [] + wait_times = self._wait_times or [] + turnaround_times = self._turnaround_times or [] - self._queue = new_queue + if self.profiling: + print(f"\nDEBUG: Profiling enabled, collected {len(execution_times)} execution times", flush=True) + if execution_times: + self._print_timing_stats(f"{self.task_name} Execution times", execution_times) + else: + print(f"WARNING: Profiling enabled but no execution times collected!", flush=True) if self.profiling and execution_times: - self._print_timing_stats(f"{self.task_name} Execution times", execution_times) + pass # Timing stats already printed above # Optional: only print these if you are comfortable with cross-clock values # self._print_timing_stats("Wait times", wait_times) @@ -230,11 +295,91 @@ def retrieve_results(self) -> tuple[list, list]: if self._first_submit_time is not None else 0.0 ) - print("\n=== Parallel Performance ===") - print(f"{self.task_name} Workers: {num_workers}") - print(f"{self.task_name} Total work in seconds: {total_work:.6e}") - print(f"{self.task_name} Ideal walltime in seconds (perfect balance): {ideal_walltime:.6e}") - print(f"{self.task_name} Actual walltime in seconds (observed): {actual_walltime:.6e}") + print("\n=== Parallel Performance ===", flush=True) + print(f"{self.task_name} Workers: {num_workers}", flush=True) + print(f"{self.task_name} Total work in seconds: {total_work:.6e}", flush=True) + print(f"{self.task_name} Ideal walltime in seconds (perfect balance): {ideal_walltime:.6e}", flush=True) + print(f"{self.task_name} Actual walltime in seconds (observed): {actual_walltime:.6e}", flush=True) + + # Clear timing data for next batch + if self.profiling: + self._execution_times.clear() + self._wait_times.clear() + self._turnaround_times.clear() + + self._first_submit_time = None + return X, F + + def _harvest_completed_locked( + self, + execution_times=None, + wait_times=None, + turnaround_times=None, + ) -> None: + """Move completed task results from running queue into completion buffers. + + This method assumes the caller holds ``_queue_lock``. + """ + new_queue = deque([]) + + for item in self._queue: + x = item[0] + future = item[1] + submit_time = item[3] + if future.done(): + self._completed_X.append(x) + self._completed_F.append(None) + + try: + fx = future.result() + + if self.profiling: + # Check if fx is a timing dict (from _timed_call) + if isinstance(fx, dict) and "start_time" in fx: + worker_start_time = fx["start_time"] + worker_done_time = fx["done_time"] + execution_time = fx["execution_time"] + + # These are OK for local runs, but can be unreliable across nodes + wait_time = worker_start_time - submit_time + turnaround_time = worker_done_time - submit_time + + if execution_times is not None: + execution_times.append(execution_time) + if wait_times is not None: + wait_times.append(wait_time) + if turnaround_times is not None: + turnaround_times.append(turnaround_time) + + fx = fx["result"] + else: + # Profiling enabled but result is not a timing dict + # This happens when function is wrapped (e.g., by MPIEvaluator) + print(f"DEBUG: Profiling enabled but result type is {type(fx)}, not a timing dict", flush=True) + + self._completed_F[-1] = fx + self.logger.info(f"{self.task_name} Completed: f({x}) = {fx}") + + except CancelledError: + self.logger.warning(f"{self.task_name} The execution of x={x} was cancelled.") + except Exception as e: + self.logger.warning(f"{self.task_name} Task f({x}) raised an exception: {e}") + + else: + new_queue.append(item) + + self._queue = new_queue + + def print_status(self) -> None: + """Print the current status of the task queue and completion buffers.""" + with self._queue_lock: + futures = [queue_obj[1] for queue_obj in self._queue] + n_running_futures = sum(1 for f in futures if not f.done()) + n_done_futures = len(futures) - n_running_futures + self.logger.info( + f"Status: {len(self._completed_X)} harvested results, " + f"{n_running_futures} running tasks, {n_done_futures} completed tasks still in queue." + ) + - return list(X), list(F) \ No newline at end of file diff --git a/src/hiopbbpy/utils/util.py b/src/hiopbbpy/utils/util.py index ee8d199d0..64938529b 100644 --- a/src/hiopbbpy/utils/util.py +++ b/src/hiopbbpy/utils/util.py @@ -25,9 +25,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ import numpy as np -from .evaluation_manager import EvaluationManager, is_running_with_mpi +from .evaluation_manager import EvaluationManager import logging +import os +import time +import uuid +from pathlib import Path def check_required_keys(user_dict, required_keys): for key in required_keys: @@ -63,40 +67,125 @@ def run(self, fun, x): class MPIEvaluator(Evaluator): """ - A wrapper of the evaluation_manager code. - Note that application codes application.py that use this Evaluator should be run as - env MPI4PY_FUTURES_MAX_WORKERS=8 mpiexec -n 1 python application.py - Also, the application code should have a "main" section wrapped in - if __name__ == "__main__": - Expecting the function evaluations to return an array. - Fout has then the structure of - [[eval0], [[eval1]], [eval2],...]] - We reformat to - [eval0, eval1, eval2,...] + A wrapper of the evaluation_manager code that supports multiple execution modes. + + Execution modes: + + 1. Single-node with ProcessPoolExecutor or ThreadPoolExecutor: + python application.py + + 2. Single-node with MPI (legacy mode): + env MPI4PY_FUTURES_MAX_WORKERS=8 mpiexec -n 1 python application.py + + 3. Multi-node with MPI (recommended for HPC clusters): + mpiexec -n python application.py + where N >= 2 (1 master rank 0 + N-1 workers distributed across nodes) + + For multi-node, your application.py must have: + if __name__ == "__main__": + from mpi4py import MPI + from mpi4py.futures import MPIPoolExecutor + import sys + + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + + if rank != 0: + MPIPoolExecutor() # Workers enter executor loop + sys.exit(0) + + # Master (rank 0) continues here + executor = MPIPoolExecutor() + evaluator = MPIEvaluator(executor=executor, ...) + # ... rest of your code + + Output format: + Function evaluations return arrays with structure [[eval0], [eval1], [eval2], ...] + which are reformatted to [eval0, eval1, eval2, ...] """ - def __init__(self, function_mode=True,cpu_executor=None, mpi_executor=None, profiling=False, task_name="MPITASK"): - self.manager = EvaluationManager(cpu_executor,mpi_executor,profiling=profiling, task_name=task_name) + def __init__(self, function_mode=True, executor=None, profiling=False, + task_name="MPITASK", run_root="./hiop_temp", use_run_dir=False): + # If no executor provided, create a default ThreadPoolExecutor + if executor is None: + from concurrent.futures import ThreadPoolExecutor + import multiprocessing + max_workers = multiprocessing.cpu_count() + executor = ThreadPoolExecutor(max_workers=max_workers) + print(f"No executor provided for {task_name}, using ThreadPoolExecutor with {max_workers} workers") + + self.manager = EvaluationManager(executor, profiling=profiling, task_name=task_name) self.function_mode = function_mode - if is_running_with_mpi(): - self.executor_type = "mpi" - else: - self.executor_type = "cpu" + self.run_root = Path(run_root) + self.run_root.mkdir(parents=True, exist_ok=True) + self.use_run_dir = use_run_dir + print(f"Create Evaluator for task: {task_name}") + def __del__(self): del self.manager + def set_task_name(self, task_name): self.manager.set_task_name(task_name) - def run(self, fun, Xin): + + def run(self, fun, Xin): nevals = Xin.shape[0] - self.manager.submit_tasks(fun, [np.atleast_2d(Xin[i]) for i in range(nevals)], execute_at=self.executor_type) + print("in Evaluator::run") + + # unique batch directory so repeated calls do not reuse temp_dir_0, temp_dir_1, ... + batch_id = f"{self.manager.task_name}_{os.getpid()}_{time.time_ns()}_{uuid.uuid4().hex[:8]}" + batch_dir = self.run_root / batch_id + batch_dir.mkdir(parents=True, exist_ok=False) + + for i in range(nevals): + xi = np.atleast_2d(Xin[i]) + + kwargs = {} + if self.use_run_dir: + run_dir = batch_dir / f"eval_{i:04d}" + run_dir.mkdir(parents=True, exist_ok=False) + kwargs["run_dir"] = str(run_dir) + + # submit (index, x) so we can restore original order later + self.manager.submit_tasks( + _run_indexed_fun, + [(fun, i, xi)], + **kwargs, + ) + print(f"Submitted task {i + 1}", flush=True) + self.manager.sync() + print(f"\n{'='*50}") + print(f"Retrieving results for {self.manager.task_name}...") + print(f"Profiling enabled: {self.manager.profiling}") + print(f"{'='*50}\n", flush=True) Xout, Fout = self.manager.retrieve_results() + + # restore original order using returned indices + ordered = [None] * nevals + for out in Fout: + if out is None: + continue + # Handle profiling case where _run_indexed_fun returns (idx, result) + # but result might be a timing dict if profiling is enabled + idx, val = out + ordered[idx] = val + + missing = [i for i, v in enumerate(ordered) if v is None] + if missing: + raise RuntimeError(f"Missing evaluation results for indices {missing}") + if self.function_mode: - Y = np.ndarray((nevals, 1)) - Y[:,0] = np.array(Fout)[:,0,0] + Y = np.empty((nevals, 1), dtype=float) + for i, val in enumerate(ordered): + arr = np.asarray(val, dtype=float) + Y[i, 0] = float(arr.reshape(-1)[0]) else: - Y = [Fi[0] for Fi in Fout] + Y = [val[0] for val in ordered] + return Y +def _run_indexed_fun(fun, idx, x, **kwargs): + return idx, fun(x, **kwargs) + class Logger: """ A simple wrapper for Python's logging module that sets up a reusable logger.