Xopt Parallel Examples¶
Xopt provides methods to parallelize optimizations using Processes, Threads, MPI, and Dask using the concurrent.futures interface as defined in https://www.python.org/dev/peps/pep-3148/ .
# Helpers for this notebook
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from dask.distributed import Client
import matplotlib.pyplot as plt
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import os
from xopt import AsynchronousXopt as Xopt
from xopt.vocs import get_feasibility_data
SMOKE_TEST = os.environ.get("SMOKE_TEST")
# Notebook printing output
# from xopt import output_notebook
# output_notebook()
N_CPUS = multiprocessing.cpu_count()
N_CPUS
# directory for data.
os.makedirs("temp", exist_ok=True)
The Xopt object can be instantiated from a JSON or YAML file, or a dict, with the proper structure.
Here we will make one
# Make a proper input file.
MAX_EVALUATIONS = 32 if SMOKE_TEST else 1000
YAML = """
stopping_condition:
name: MaxEvaluationsCondition
max_evaluations: 1000
generator:
name: cnsga
output_path: temp
population_size: 64
vocs:
variables:
x1: [0, 3.14159]
x2: [0, 3.14159]
objectives: {y1: MINIMIZE, y2: MINIMIZE}
constraints:
c1: [GREATER_THAN, 0]
c2: [LESS_THAN, 0.5]
constants: {a: dummy_constant}
evaluator:
function: xopt.resources.test_functions.tnk.evaluate_TNK
function_kwargs:
sleep: 0
random_sleep: 0.1
"""
X = Xopt(YAML)
X.stopping_condition.max_evaluations = MAX_EVALUATIONS
X
/home/runner/work/Xopt/Xopt/.venv/lib/python3.12/site-packages/pydantic/main.py:464: UserWarning: Pydantic serializer warnings: PydanticSerializationUnexpectedValue(Unexpected field `_loaded_population`: Expected `CNSGAGenerator`) PydanticSerializationUnexpectedValue(Unexpected field `_loaded_population`: Expected `CNSGAGenerator`) return self.__pydantic_serializer__.to_python(
Xopt
________________________________
Version: 0.1.dev1+gb834d2348
Data size: 0
Config as YAML:
dump_file: null
evaluator:
function: xopt.resources.test_functions.tnk.evaluate_TNK
function_kwargs:
raise_probability: 0
random_sleep: 0.1
sleep: 0
max_workers: 1
vectorized: false
generator:
crossover_probability: 0.9
mutation_probability: 1.0
name: cnsga
output_path: temp
population: null
population_file: null
population_size: 64
returns_id: false
supports_constraints: true
supports_multi_objective: true
supports_single_objective: true
vocs:
constants:
a:
dtype: null
type: Constant
value: dummy_constant
constraints:
c1:
dtype: null
type: GreaterThanConstraint
value: 0.0
c2:
dtype: null
type: LessThanConstraint
value: 0.5
objectives:
y1:
dtype: null
type: MinimizeObjective
y2:
dtype: null
type: MinimizeObjective
observables: {}
variables:
x1:
default_value: null
domain:
- 0.0
- 3.14159
dtype: null
type: ContinuousVariable
x2:
default_value: null
domain:
- 0.0
- 3.14159
dtype: null
type: ContinuousVariable
is_done: false
serialize_inline: false
serialize_torch: false
stopping_condition:
count_valid_only: false
max_evaluations: 1000
name: MaxEvaluationsCondition
use_dataframe_index: false
strict: true
%%timeit
# Check that the average time is close to random_sleep
X.evaluator.function({"x1": 0.5, "x2": 0.5}, random_sleep=0.1)
The slowest run took 4.79 times longer than the fastest. This could mean that an intermediate result is being cached. 110 ms ± 48.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%time
X.run()
CPU times: user 5.47 s, sys: 32.2 ms, total: 5.5 s Wall time: 1min 49s
Processes¶
%%time
X = Xopt(YAML)
with ProcessPoolExecutor(max_workers=N_CPUS) as executor:
X.evaluator.executor = executor
X.evaluator.max_workers = N_CPUS
X.run()
len(X.data)
CPU times: user 4.55 s, sys: 179 ms, total: 4.72 s Wall time: 26.7 s
1000
Threads¶
Continue running, this time with threads.
%%time
X = Xopt(YAML)
with ThreadPoolExecutor(max_workers=N_CPUS) as executor:
X.evaluator.executor = executor
X.evaluator.max_workers = N_CPUS
X.run()
len(X.data)
CPU times: user 4.47 s, sys: 73.6 ms, total: 4.54 s Wall time: 26.3 s
1000
MPI¶
The test.yaml file completely defines the problem. We will also direct the logging to an xopt.log file. The following invocation recruits 4 MPI workers to solve this problem.
We can also continue by calling .save with a JSON filename. This will write all of previous results into the file.
X = Xopt(YAML)
X.dump("test.yaml") # Write this input to file
!cat test.yaml
data: null
dump_file: null
evaluator:
function: xopt.resources.test_functions.tnk.evaluate_TNK
function_kwargs:
raise_probability: 0
random_sleep: 0.1
sleep: 0
max_workers: 1
vectorized: false
generator:
crossover_probability: 0.9
mutation_probability: 1.0
name: cnsga
output_path: temp
population: null
population_file: null
population_size: 64
returns_id: false
supports_constraints: true
supports_multi_objective: true
supports_single_objective: true
vocs:
constants:
a:
dtype: null
type: Constant
value: dummy_constant
constraints:
c1:
dtype: null
type: GreaterThanConstraint
value: 0.0
c2:
dtype: null
type: LessThanConstraint
value: 0.5
objectives:
y1:
dtype: null
type: MinimizeObjective
y2:
dtype: null
type: MinimizeObjective
observables: {}
variables:
x1:
default_value: null
domain:
- 0.0
- 3.14159
dtype: null
type: ContinuousVariable
x2:
default_value: null
domain:
- 0.0
- 3.14159
dtype: null
type: ContinuousVariable
is_done: false
serialize_inline: false
serialize_torch: false
stopping_condition:
count_valid_only: false
max_evaluations: 1000
name: MaxEvaluationsCondition
use_dataframe_index: false
strict: true
/home/runner/work/Xopt/Xopt/.venv/lib/python3.12/site-packages/pydantic/main.py:464: UserWarning: Pydantic serializer warnings: PydanticSerializationUnexpectedValue(Unexpected field `_loaded_population`: Expected `CNSGAGenerator`) PydanticSerializationUnexpectedValue(Unexpected field `_loaded_population`: Expected `CNSGAGenerator`) return self.__pydantic_serializer__.to_python(
%%time
!mpirun -n 8 python -m mpi4py.futures -m xopt.mpi.run -vv --logfile xopt.log test.yaml
--------------------------------------------------------------------------
There are not enough slots available in the system to satisfy the 8
slots that were requested by the application:
python
Either request fewer slots for your application, or make more slots
available for use.
A "slot" is the Open MPI term for an allocatable unit where we can
launch a process. The number of slots available are defined by the
environment in which Open MPI processes are run:
1. Hostfile, via "slots=N" clauses (N defaults to number of
processor cores if not provided)
2. The --host command line parameter, via a ":N" suffix on the
hostname (N defaults to 1 if not provided)
3. Resource manager (e.g., SLURM, PBS/Torque, LSF, etc.)
4. If none of a hostfile, the --host command line parameter, or an
RM is present, Open MPI defaults to the number of processor cores
In all the above cases, if you want Open MPI to default to the number
of hardware threads instead of the number of processor cores, use the
--use-hwthread-cpus option.
Alternatively, you can use the --oversubscribe option to ignore the
number of available slots when deciding the number of processes to
launch.
--------------------------------------------------------------------------
CPU times: user 1.55 ms, sys: 15 ms, total: 16.6 ms Wall time: 322 ms
!tail xopt.log
tail: cannot open 'xopt.log' for reading: No such file or directory
Dask¶
client = Client()
executor = client.get_executor()
client
Client
Client-3a93543d-1909-11f1-8d55-df341780e284
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
6c14f833
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 4 | Total memory: 15.62 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-41a2450b-4ea1-40d8-9139-9cfb0fe25667
| Comm: tcp://127.0.0.1:45965 | Workers: 0 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:41169 | Total threads: 1 |
| Dashboard: http://127.0.0.1:40513/status | Memory: 3.90 GiB |
| Nanny: tcp://127.0.0.1:35947 | |
| Local directory: /tmp/dask-scratch-space/worker-o1tbexnt | |
Worker: 1
| Comm: tcp://127.0.0.1:35839 | Total threads: 1 |
| Dashboard: http://127.0.0.1:43037/status | Memory: 3.90 GiB |
| Nanny: tcp://127.0.0.1:34047 | |
| Local directory: /tmp/dask-scratch-space/worker-vcp1gx6m | |
Worker: 2
| Comm: tcp://127.0.0.1:37363 | Total threads: 1 |
| Dashboard: http://127.0.0.1:35323/status | Memory: 3.90 GiB |
| Nanny: tcp://127.0.0.1:45917 | |
| Local directory: /tmp/dask-scratch-space/worker-sygfzk6k | |
Worker: 3
| Comm: tcp://127.0.0.1:45881 | Total threads: 1 |
| Dashboard: http://127.0.0.1:44943/status | Memory: 3.90 GiB |
| Nanny: tcp://127.0.0.1:38437 | |
| Local directory: /tmp/dask-scratch-space/worker-9l9kgxkb | |
%%time
X = Xopt(YAML)
X.evaluator.executor = executor
X.evaluator.max_workers = N_CPUS
X.run()
len(X.data)
CPU times: user 9.62 s, sys: 892 ms, total: 10.5 s Wall time: 31 s
1001
Load output into Pandas¶
This algorithm writes two types of files: gen_{i}.json with all of the new individuals evaluated in a generation, and pop_{i}.json with the latest best population. Xopt provides some functions to load these easily into a Pandas dataframe for further analysis.
X.data
| x1 | x2 | a | y1 | y2 | c1 | c2 | xopt_runtime | xopt_error | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 0.509595 | 2.896185 | dummy_constant | 0.509595 | 2.896185 | 7.741343 | 5.741794 | 0.117971 | False |
| 1 | 0.976737 | 0.314728 | dummy_constant | 0.976737 | 0.314728 | 0.025903 | 0.261604 | 0.083734 | False |
| 2 | 1.577174 | 3.094166 | dummy_constant | 1.577174 | 3.094166 | 11.030679 | 7.890000 | 0.069954 | False |
| 3 | 0.486401 | 2.098335 | dummy_constant | 0.486401 | 2.098335 | 3.727217 | 2.554861 | 0.036151 | False |
| 4 | 2.604678 | 0.880518 | dummy_constant | 2.604678 | 0.880518 | 6.511412 | 4.574464 | 0.164030 | False |
| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 996 | 0.932160 | 0.526322 | dummy_constant | 0.932160 | 0.526322 | 0.182106 | 0.187455 | 0.093067 | False |
| 997 | 1.182599 | 0.450129 | dummy_constant | 1.182599 | 0.450129 | 0.511733 | 0.468429 | 0.105321 | False |
| 998 | 0.977775 | 0.103277 | dummy_constant | 0.977775 | 0.103277 | -0.022019 | 0.385659 | 0.170930 | False |
| 999 | 1.004372 | 0.129049 | dummy_constant | 1.004372 | 0.129049 | 0.071043 | 0.391996 | 0.098690 | False |
| 1000 | 1.004372 | 0.129049 | dummy_constant | 1.004372 | 0.129049 | 0.071043 | 0.391996 | 0.098690 | False |
1001 rows Ć 9 columns
df = pd.concat([X.data, get_feasibility_data(X.vocs, X.data)], axis=1)
df[df["feasible"]]
| x1 | x2 | a | y1 | y2 | c1 | c2 | xopt_runtime | xopt_error | feasible_c1 | feasible_c2 | feasible | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 1 | 0.976737 | 0.314728 | dummy_constant | 0.976737 | 0.314728 | 0.025903 | 0.261604 | 0.083734 | False | True | True | True |
| 5 | 1.118871 | 0.443591 | dummy_constant | 1.118871 | 0.443591 | 0.351606 | 0.386183 | 0.187899 | False | True | True | True |
| 17 | 0.633864 | 1.013815 | dummy_constant | 0.633864 | 1.013815 | 0.518090 | 0.281925 | 0.049239 | False | True | True | True |
| 38 | 0.667170 | 1.124701 | dummy_constant | 0.667170 | 1.124701 | 0.775438 | 0.418197 | 0.029615 | False | True | True | True |
| 56 | 0.747635 | 0.782790 | dummy_constant | 0.747635 | 0.782790 | 0.078394 | 0.141293 | 0.063187 | False | True | True | True |
| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 995 | 0.779250 | 0.581340 | dummy_constant | 0.779250 | 0.581340 | 0.012641 | 0.084597 | 0.032774 | False | True | True | True |
| 996 | 0.932160 | 0.526322 | dummy_constant | 0.932160 | 0.526322 | 0.182106 | 0.187455 | 0.093067 | False | True | True | True |
| 997 | 1.182599 | 0.450129 | dummy_constant | 1.182599 | 0.450129 | 0.511733 | 0.468429 | 0.105321 | False | True | True | True |
| 999 | 1.004372 | 0.129049 | dummy_constant | 1.004372 | 0.129049 | 0.071043 | 0.391996 | 0.098690 | False | True | True | True |
| 1000 | 1.004372 | 0.129049 | dummy_constant | 1.004372 | 0.129049 | 0.071043 | 0.391996 | 0.098690 | False | True | True | True |
471 rows Ć 12 columns
# Plot the feasible ones
feasible_df = df[df["feasible"]]
feasible_df.plot("y1", "y2", kind="scatter").set_aspect("equal")
# Plot the infeasible ones
infeasible_df = df[~df["feasible"]]
infeasible_df.plot("y1", "y2", kind="scatter").set_aspect("equal")
# This is the final population
df1 = X.generator.population
df1.plot("y1", "y2", kind="scatter").set_aspect("equal")
matplotlib plotting¶
You can always use matplotlib for customizable plotting
# Extract objectives from output
k1, k2 = "y1", "y2"
fig, ax = plt.subplots(figsize=(6, 6))
ax.scatter(
infeasible_df[k1],
infeasible_df[k2],
color="blue",
marker=".",
alpha=0.5,
label="infeasible",
)
ax.scatter(
feasible_df[k1], feasible_df[k2], color="orange", marker=".", label="feasible"
)
ax.scatter(df1[k1], df1[k2], color="red", marker=".", label="final population")
ax.set_xlabel(k1)
ax.set_ylabel(k2)
ax.set_aspect("auto")
ax.set_title("Xopt's CNSGA algorithm")
plt.legend()
<matplotlib.legend.Legend at 0x7fbbc771ec60>
# Cleanup
#!rm -r dask-worker-space
!rm -r temp
!rm xopt.log*
!rm test.yaml
rm: cannot remove 'xopt.log*': No such file or directory