Extremum seeking
Extremum Seeking Optimization¶
In this example we demonstrate extremum seeking optimization. The optimum of the test evaluate function would drift around a center point and we would be trying to follow the trend by applying extremum seeking technique.
In [1]:
Copied!
import numpy as np
from xopt.generators.es.extremumseeking import ExtremumSeekingGenerator
from xopt.vocs import VOCS
from xopt.evaluator import Evaluator
from xopt import Xopt
from tqdm.auto import tqdm
import os
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")
import numpy as np
from xopt.generators.es.extremumseeking import ExtremumSeekingGenerator
from xopt.vocs import VOCS
from xopt.evaluator import Evaluator
from xopt import Xopt
from tqdm.auto import tqdm
import os
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")
In [2]:
Copied!
# set values if testing
SMOKE_TEST = os.environ.get("SMOKE_TEST")
NUM_STEPS = 10 if SMOKE_TEST else 1000
# set values if testing
SMOKE_TEST = os.environ.get("SMOKE_TEST")
NUM_STEPS = 10 if SMOKE_TEST else 1000
Extremum seeking test problem¶
This test problem is a 10-D quadratic function, with its optimum drifting around the initial position. We also add some noise to make the problem more realistic.
In [3]:
Copied!
np.random.seed(42) # set deterministic run
nES = 10
# This global dict is used as a counter to emulate drifting
states = {"count": 0}
noise = 0.1 * np.random.randn(NUM_STEPS)
# This is the unknown optimal point
p_opt = 1.5 * (2 * np.random.rand(nES) - 1)
# Various frequencies for unknown points
w_opt = 0.25 + 2 * np.random.rand(nES)
def f_ES_minimize(input_dict):
p = []
for i in range(10):
p.append(input_dict[f"p{i}"])
p = np.array(p)
# Vary the optimal point with time
p_opt_i = np.zeros(nES)
i = states["count"]
outcome_dict = {}
for n in np.arange(nES):
p_opt_i[n] = p_opt[n] * (1 + np.sin(2 * np.pi * w_opt[n] * i / 2000))
# This simple cost will be distance from the optimal point
f_val = np.sum((p - p_opt_i) ** 2) + noise[i]
states["count"] += 1
outcome_dict = {"f": f_val, "p_opt": pd.Series(p_opt_i)}
return outcome_dict
np.random.seed(42) # set deterministic run
nES = 10
# This global dict is used as a counter to emulate drifting
states = {"count": 0}
noise = 0.1 * np.random.randn(NUM_STEPS)
# This is the unknown optimal point
p_opt = 1.5 * (2 * np.random.rand(nES) - 1)
# Various frequencies for unknown points
w_opt = 0.25 + 2 * np.random.rand(nES)
def f_ES_minimize(input_dict):
p = []
for i in range(10):
p.append(input_dict[f"p{i}"])
p = np.array(p)
# Vary the optimal point with time
p_opt_i = np.zeros(nES)
i = states["count"]
outcome_dict = {}
for n in np.arange(nES):
p_opt_i[n] = p_opt[n] * (1 + np.sin(2 * np.pi * w_opt[n] * i / 2000))
# This simple cost will be distance from the optimal point
f_val = np.sum((p - p_opt_i) ** 2) + noise[i]
states["count"] += 1
outcome_dict = {"f": f_val, "p_opt": pd.Series(p_opt_i)}
return outcome_dict
Run ES on the test problem (YAML method)¶
In [4]:
Copied!
YAML = """
max_evaluations: 5000
generator:
name: extremum_seeking
k: 2.0
oscillation_size: 0.1
decay_rate: 1.0
evaluator:
function: __main__.f_ES_minimize
vocs:
variables:
p0: [-2, 2]
p1: [-2, 2]
p2: [-2, 2]
p3: [-2, 2]
p4: [-2, 2]
p5: [-2, 2]
p6: [-2, 2]
p7: [-2, 2]
p8: [-2, 2]
p9: [-2, 2]
objectives:
f: MINIMIZE
"""
X = Xopt.from_yaml(YAML)
X.max_evaluations = NUM_STEPS
X
YAML = """
max_evaluations: 5000
generator:
name: extremum_seeking
k: 2.0
oscillation_size: 0.1
decay_rate: 1.0
evaluator:
function: __main__.f_ES_minimize
vocs:
variables:
p0: [-2, 2]
p1: [-2, 2]
p2: [-2, 2]
p3: [-2, 2]
p4: [-2, 2]
p5: [-2, 2]
p6: [-2, 2]
p7: [-2, 2]
p8: [-2, 2]
p9: [-2, 2]
objectives:
f: MINIMIZE
"""
X = Xopt.from_yaml(YAML)
X.max_evaluations = NUM_STEPS
X
Out[4]:
Xopt ________________________________ Version: 2.4.6.dev5+ga295b108.d20250107 Data size: 0 Config as YAML: dump_file: null evaluator: function: __main__.f_ES_minimize function_kwargs: {} max_workers: 1 vectorized: false generator: decay_rate: 1.0 k: 2.0 name: extremum_seeking oscillation_size: 0.1 max_evaluations: 1000 serialize_inline: false serialize_torch: false strict: true vocs: constants: {} constraints: {} objectives: f: MINIMIZE observables: [] variables: p0: - -2.0 - 2.0 p1: - -2.0 - 2.0 p2: - -2.0 - 2.0 p3: - -2.0 - 2.0 p4: - -2.0 - 2.0 p5: - -2.0 - 2.0 p6: - -2.0 - 2.0 p7: - -2.0 - 2.0 p8: - -2.0 - 2.0 p9: - -2.0 - 2.0
In [5]:
Copied!
# Reset global counter to guarantee deterministic optimization
states["count"] = 0
X.random_evaluate(1)
X.step()
# Reset global counter to guarantee deterministic optimization
states["count"] = 0
X.random_evaluate(1)
X.step()
Now you can go directly to the Visualization section and check out the results.
Run ES on the test problem (API method)¶
VOCS¶
We'll set the bounds for all the variables pi to [-2, 2].
In [6]:
Copied!
variables = {}
for i in range(nES):
variables[f"p{i}"] = [-2, 2]
vocs = VOCS(
variables=variables,
objectives={"f": "MINIMIZE"},
)
variables = {}
for i in range(nES):
variables[f"p{i}"] = [-2, 2]
vocs = VOCS(
variables=variables,
objectives={"f": "MINIMIZE"},
)
In [7]:
Copied!
vocs
vocs
Out[7]:
VOCS(variables={'p0': [-2.0, 2.0], 'p1': [-2.0, 2.0], 'p2': [-2.0, 2.0], 'p3': [-2.0, 2.0], 'p4': [-2.0, 2.0], 'p5': [-2.0, 2.0], 'p6': [-2.0, 2.0], 'p7': [-2.0, 2.0], 'p8': [-2.0, 2.0], 'p9': [-2.0, 2.0]}, constraints={}, objectives={'f': 'MINIMIZE'}, constants={}, observables=[])
Evaluator¶
In [8]:
Copied!
evaluator = Evaluator(function=f_ES_minimize)
evaluator = Evaluator(function=f_ES_minimize)
Generator¶
In [9]:
Copied!
generator = ExtremumSeekingGenerator(vocs=vocs)
generator = ExtremumSeekingGenerator(vocs=vocs)
In [10]:
Copied!
generator.dict()
generator.dict()
Out[10]:
{'k': 2.0, 'oscillation_size': 0.1, 'decay_rate': 1.0}
Note that ES has 3 hyper-parameters: k
, oscillation_size
, and decay_rate
.
k
: ES feedback gain (setk < 0
for maximization instead of minimization)oscillation_size
: ES dithering sizedecay_rate
: This value is optional, it causes the oscillation sizes to naturally decay. If you want the parameters to persistently oscillate without decay, setdecay_rate = 1.0
Run the optimization¶
In [11]:
Copied!
X = Xopt(vocs=vocs, evaluator=evaluator, generator=generator)
X = Xopt(vocs=vocs, evaluator=evaluator, generator=generator)
In [12]:
Copied!
X.max_evaluations = NUM_STEPS
X.max_evaluations = NUM_STEPS
In [13]:
Copied!
# Reset global counter to guarantee deterministic optimization
states["count"] = 0
for i in tqdm(range(NUM_STEPS)):
X.step()
# Reset global counter to guarantee deterministic optimization
states["count"] = 0
for i in tqdm(range(NUM_STEPS)):
X.step()
Visualization¶
In [14]:
Copied!
# Plot all results
plt.figure(1, figsize=(8, 10))
plt.subplot(2, 1, 1)
plt.plot(X.data["f"])
plt.ylabel("ES cost")
plt.xticks([])
plt.subplot(2, 1, 2)
plt.plot(X.data[[f"p{i}" for i in range(10)]], alpha=0.25)
_p_opt = np.vstack(X.data["p_opt"].values).astype(
float
) # do not use p_opt as var name!
plt.plot(_p_opt, "k--")
plt.plot(2 + np.zeros(NUM_STEPS), "r")
plt.plot(-2 + np.zeros(NUM_STEPS), "r")
plt.legend(frameon=False)
plt.ylabel("ES parameter")
plt.xlabel("ES step")
plt.tight_layout()
# Plot all results
plt.figure(1, figsize=(8, 10))
plt.subplot(2, 1, 1)
plt.plot(X.data["f"])
plt.ylabel("ES cost")
plt.xticks([])
plt.subplot(2, 1, 2)
plt.plot(X.data[[f"p{i}" for i in range(10)]], alpha=0.25)
_p_opt = np.vstack(X.data["p_opt"].values).astype(
float
) # do not use p_opt as var name!
plt.plot(_p_opt, "k--")
plt.plot(2 + np.zeros(NUM_STEPS), "r")
plt.plot(-2 + np.zeros(NUM_STEPS), "r")
plt.legend(frameon=False)
plt.ylabel("ES parameter")
plt.xlabel("ES step")
plt.tight_layout()
In [15]:
Copied!
# Plot Individual Parameter Trajectories
plt.figure(2, figsize=(15, 8))
for n in np.arange(nES):
plt.subplot(2, 5, n + 1)
plt.plot(X.data[f"p{n}"], label=f"$p^{{ES}}_{n+1}$")
plt.plot(_p_opt[:, n], "k--", label=f"$p^*_{n+1}$")
plt.plot(2 + np.zeros(NUM_STEPS), "r--")
plt.plot(-2 + np.zeros(NUM_STEPS), "r--")
plt.ylim([-3, 5])
plt.legend(frameon=False, loc=1)
if n == 0:
plt.ylabel("parameters")
elif n == 5:
plt.ylabel("parameters")
else:
plt.yticks([])
if n > 4:
plt.xlabel("ES step")
else:
plt.xticks([])
plt.tight_layout()
# Plot Individual Parameter Trajectories
plt.figure(2, figsize=(15, 8))
for n in np.arange(nES):
plt.subplot(2, 5, n + 1)
plt.plot(X.data[f"p{n}"], label=f"$p^{{ES}}_{n+1}$")
plt.plot(_p_opt[:, n], "k--", label=f"$p^*_{n+1}$")
plt.plot(2 + np.zeros(NUM_STEPS), "r--")
plt.plot(-2 + np.zeros(NUM_STEPS), "r--")
plt.ylim([-3, 5])
plt.legend(frameon=False, loc=1)
if n == 0:
plt.ylabel("parameters")
elif n == 5:
plt.ylabel("parameters")
else:
plt.yticks([])
if n > 4:
plt.xlabel("ES step")
else:
plt.xticks([])
plt.tight_layout()