I've attached my own code to the attachment.
from ax.plot.pareto_utils import compute_posterior_pareto_frontier
from ax.utils.notebook.plotting import render, init_notebook_plotting
import pandas as pd
from ax import *
import numpy as np
from ax.metrics.noisy_function import NoisyFunctionMetric
from ax.service.utils.report_utils import exp_to_df
from ax.runners.synthetic import SyntheticRunner
from ax.modelbridge.factory import get_MOO_EHVI, get_MOO_PAREGO
from ax.modelbridge.modelbridge_utils import observed_hypervolume
import os
import pybdsim
from ax.storage.json_store.save import save_experiment
def run_bdsim(parameters, magnets_name_list, gantry_components_file_name, particles_number):
Q1G2 = parameters[0]
Q2G2 = parameters[1]
Q3G2 = parameters[2]
Q4G2 = parameters[3]
Q5G2 = parameters[4]
Q6G2 = parameters[5]
k_list = [Q1G2, Q2G2, Q3G2, Q4G2, Q5G2, Q6G2]
with open(gantry_components_file_name, 'r', encoding='utf-8', errors='ignore') as fd:
newText_gmad = fd.read()
# replace degrader Thickness
for i in range(6):
start_name = newText_gmad.find(magnets_name_list[i])
start = newText_gmad.find('k1', start_name)
end = newText_gmad.find(',', start)
comment_old = newText_gmad[start: end + 1]
comment_new = "k1=" + str(k_list[i]) + ","
newText_gmad = newText_gmad.replace(comment_old, comment_new)
with open(gantry_components_file_name, 'w', encoding='utf-8', errors='ignore') as fd:
fd.write(newText_gmad)
pybdsim.Run.Bdsim(gmadpath="bdsim_model_1/gantry.gmad", outfile="output_1", ngenerate=particles_number, batch=True, silent=True)
os.system("rebdsimOptics output_1.root output-optics_1.root --emittanceOnTheFly")
def get_beam_statistical(root_file_name, inintial_particles_number):
load_data = pybdsim.Data.Load(root_file_name)
transmission_efficiency_list = load_data.optics.GetColumn("Npart") / inintial_particles_number
x_envelop_list = abs(load_data.optics.GetColumn("Sigma_x") * 1000)
xp_envelop_list = abs(load_data.optics.GetColumn("Sigma_xp") * 1000)
y_envelop_list = abs(load_data.optics.GetColumn("Sigma_y") * 1000)
yp_envelop_list = abs(load_data.optics.GetColumn("Sigma_yp") * 1000)
dispersion_x_list = abs(load_data.optics.GetColumn("Disp_x"))
dispersion_xp_list = abs(load_data.optics.GetColumn("Disp_xp"))
x_envelope_end = x_envelop_list[-1]
y_envelope_end = y_envelop_list[-1]
xp_envelope_end = xp_envelop_list[-1]
yp_envelope_end = yp_envelop_list[-1]
dispersion_x_end = dispersion_x_list[-2]
dispersion_xp_end = dispersion_xp_list[-2]
max_x_envelope = max(x_envelop_list)
max_y_envelope = max(y_envelop_list)
transmission_efficiency = transmission_efficiency_list[-1]
return (transmission_efficiency, x_envelope_end, y_envelope_end, xp_envelope_end, yp_envelope_end, max_x_envelope,
max_y_envelope, dispersion_x_end, dispersion_xp_end)
def inintial_magnet_parameters():
element_list = ["GantryDR1", "GantryDR2", "GantryDR3", "Q1G2", "GantryDR4", "B1G21", "B1G22", "GantryDR5", "Q2G2",
"GantryDR6", "GantryDR7", "Q3G2", "GantryDR8", "GantryDR9", "Q4G2", "GantryDR10", "GantryDR11",
"Q5G2", "GantryDR12", "GantryDR13", "Q6G2", "GantryDR14", "B2G21", "B2G22",
"GantryDR15", "GantryDR16", "B3G21", "B3G22", "B3G23", "GantryDR17"]
inintial_K_dict = {"Q1G2":-3.0182, "Q2G2":3.5537, "Q3G2":-5.4026, "Q4G2":6.5711, "Q5G2":-5.3012, "Q6G2":4.8104}
magnets_name_list = ["Q1G2", "Q2G2", "Q3G2", "Q4G2", "Q5G2", "Q6G2"]
return (element_list, inintial_K_dict, magnets_name_list)
def inintial_BO_parameters(inintial_K_dict, magnets_name_list):
class Metric_transmission_efficiency(NoisyFunctionMetric):
def f(self, x: np.ndarray) -> float:
run_bdsim(x, magnets_name_list, "bdsim_model_1/gantry_components.gmad", 200)
(transmission_efficiency, x_envelope_end, y_envelope_end, xp_envelope_end, yp_envelope_end, max_x_envelope, max_y_envelope, dispersion_x_end, dispersion_xp_end) \
= get_beam_statistical("output-optics_1.root", 2000)
return float(transmission_efficiency)
class Metric_x_envelope_end(NoisyFunctionMetric):
def f(self, x: np.ndarray) -> float:
(transmission_efficiency, x_envelope_end, y_envelope_end, xp_envelope_end, yp_envelope_end, max_x_envelope,
max_y_envelope, dispersion_x_end, dispersion_xp_end) \
= get_beam_statistical("output-optics_1.root", 2000)
return float(x_envelope_end)
class Metric_y_envelope_end(NoisyFunctionMetric):
def f(self, x: np.ndarray) -> float:
(transmission_efficiency, x_envelope_end, y_envelope_end, xp_envelope_end, yp_envelope_end, max_x_envelope,
max_y_envelope, dispersion_x_end, dispersion_xp_end) \
= get_beam_statistical("output-optics_1.root", 2000)
return float(y_envelope_end)
class Metric_xp_envelope_end(NoisyFunctionMetric):
def f(self, x: np.ndarray) -> float:
(transmission_efficiency, x_envelope_end, y_envelope_end, xp_envelope_end, yp_envelope_end, max_x_envelope,
max_y_envelope, dispersion_x_end, dispersion_xp_end) \
= get_beam_statistical("output-optics_1.root", 2000)
return float(xp_envelope_end)
class Metric_yp_envelope_end(NoisyFunctionMetric):
def f(self, x: np.ndarray) -> float:
(transmission_efficiency, x_envelope_end, y_envelope_end, xp_envelope_end, yp_envelope_end, max_x_envelope,
max_y_envelope, dispersion_x_end, dispersion_xp_end) \
= get_beam_statistical("output-optics_1.root", 2000)
return float(yp_envelope_end)
class Metric_dispersion_x_end(NoisyFunctionMetric):
def f(self, x: np.ndarray) -> float:
(transmission_efficiency, x_envelope_end, y_envelope_end, xp_envelope_end, yp_envelope_end, max_x_envelope,
max_y_envelope, dispersion_x_end, dispersion_xp_end) \
= get_beam_statistical("output-optics_1.root", 2000)
return float(dispersion_x_end)
class Metric_dispersion_xp_end(NoisyFunctionMetric):
def f(self, x: np.ndarray) -> float:
(transmission_efficiency, x_envelope_end, y_envelope_end, xp_envelope_end, yp_envelope_end, max_x_envelope,
max_y_envelope, dispersion_x_end, dispersion_xp_end) \
= get_beam_statistical("output-optics_1.root", 2000)
return float(dispersion_xp_end)
# define search space
parameter_list = []
constant_rate = 0.1
for i in range(len(magnets_name_list)):
parameter_list.append(RangeParameter(name=magnets_name_list[i], lower=inintial_K_dict[magnets_name_list[i]]-abs(inintial_K_dict[magnets_name_list[i]])*constant_rate, upper= inintial_K_dict[magnets_name_list[i]]+abs(inintial_K_dict[magnets_name_list[i]])*constant_rate, parameter_type=ParameterType.FLOAT))
search_space = SearchSpace(parameters=parameter_list)
# instance metrics
metric_transmission_efficiency = Metric_transmission_efficiency("transmission_efficiency", magnets_name_list, noise_sd=0.0, lower_is_better=False)
metric_x_envelope_end = Metric_x_envelope_end("x_envelope_end", magnets_name_list, noise_sd=0.0, lower_is_better=True)
metric_y_envelope_end = Metric_y_envelope_end("y_envelope_end", magnets_name_list, noise_sd=0.0, lower_is_better=True)
metric_xp_envelope_end = Metric_xp_envelope_end("xp_envelope_end", magnets_name_list, noise_sd=0.0, lower_is_better=True)
metric_yp_envelope_end = Metric_yp_envelope_end("yp_envelope_end", magnets_name_list, noise_sd=0.0, lower_is_better=True)
metric_dispersion_x_end = Metric_dispersion_x_end("dispersion_x_end", magnets_name_list, noise_sd=0.0, lower_is_better=True)
metric_dispersion_xp_end = Metric_dispersion_xp_end("dispersion_xp_end", magnets_name_list, noise_sd=0.0, lower_is_better=True)
mo = MultiObjective(objectives=[Objective(metric=metric_transmission_efficiency, minimize=False),
Objective(metric=metric_x_envelope_end, minimize=True),
Objective(metric=metric_y_envelope_end, minimize=True),
Objective(metric=metric_xp_envelope_end, minimize=True),
Objective(metric=metric_yp_envelope_end, minimize=True),
Objective(metric=metric_dispersion_x_end, minimize=True),
Objective(metric=metric_dispersion_xp_end, minimize=True)])
# bound is should to use prior experiences
objective_thresholds = [ObjectiveThreshold(metric=metric_transmission_efficiency, bound=0.95, relative=False),
ObjectiveThreshold(metric=metric_x_envelope_end, bound=3.0, relative=False),
ObjectiveThreshold(metric=metric_y_envelope_end, bound=3.0, relative=False),
ObjectiveThreshold(metric=metric_xp_envelope_end, bound=2.5, relative=False),
ObjectiveThreshold(metric=metric_yp_envelope_end, bound=2.5, relative=False),
ObjectiveThreshold(metric=metric_dispersion_x_end, bound=0.1, relative=False),
ObjectiveThreshold(metric=metric_dispersion_xp_end, bound=0.1, relative=False)]
optimization_config = MultiObjectiveOptimizationConfig(objective=mo, objective_thresholds=objective_thresholds)
experiment = Experiment(name="gantry_experiment", search_space=search_space, optimization_config=optimization_config,
runner=SyntheticRunner(),
)
return experiment
def run_BO_Optimization(experiment, N_INIT, N_BATCH):
def initialize_experiment(experiment, N_INIT):
sobol = Models.SOBOL(search_space=experiment.search_space, seed=1234)
for i in range(N_INIT):
print(f"-------------SOBOL:{i}/{N_INIT}------------------")
experiment.new_trial(sobol.gen(1)).run()
return experiment.fetch_data()
ehvi_experiment = experiment
ehvi_data = initialize_experiment(ehvi_experiment, N_INIT)
ehvi_hv_list = []
for i in range(N_BATCH):
print(f"--------------EHVI:{i}/{N_BATCH}------------------")
ehvi_model = get_MOO_EHVI(experiment=ehvi_experiment, data=ehvi_data,)
generator_run = ehvi_model.gen(1)
trial = ehvi_experiment.new_trial(generator_run=generator_run)
trial.run()
ehvi_data = Data.from_multiple_data([ehvi_data, trial.fetch_data()])
exp_df = exp_to_df(ehvi_experiment)
try:
hv = observed_hypervolume(modelbridge=ehvi_model)
except:
hv = 0
print("Failed to compute hv")
ehvi_hv_list.append(hv)
print(f"Iteration: {i}, HV: {hv}")
exp_df.to_excel("./data_save_1.xlsx")
if __name__ == '__main__':
(element_list, inintial_K_dict, magnets_name_list) = inintial_magnet_parameters()
experiment= inintial_BO_parameters(inintial_K_dict, magnets_name_list)
run_BO_Optimization(experiment, 2, 2)
So I run the function run_bdsim in a single metric class, which seems to work. However, the Sobol algorithm in the develop API seems to sample random points (multiple points) and then calculate the associated values at the same time. Is it possible to sample a point and calculate it once (only run once run_bdsim and get all the metrics), as the function (run_bdsim) I wrote can't calculate it in parallel?
For instance:
gs = GenerationStrategy(steps=[GenerationStep(model=Models.SOBOL, num_trials=NUM_SOBOL_STEPS),
GenerationStep(model=Models.MOO, num_trials=-1)])
ax_client = AxClient(generation_strategy=gs, random_seed=1234, verbose_logging=True)
The above code is same qNEHVI as the website:https://ax.dev/tutorials/multiobjective_optimization.html
I've attached my own code to the attachment.