Module OPTIMA.core.evaluation
A module that provides functionality to evaluate the trained models.
Expand source code
# -*- coding: utf-8 -*-
"""A module that provides functionality to evaluate the trained models."""
from types import ModuleType
from typing import Callable, Union, Literal, Any, Optional
import copy
import os
import pickle
import numpy as np
import pandas as pd
from scipy.optimize import curve_fit
from sklearn.model_selection import KFold
import tabulate
from matplotlib import pyplot as plt
# This way of providing the model specific functionality allows supporting different machine learning libraries besides
# Keras
import importlib.util
if importlib.util.find_spec("lightning") is not None:
import torch
import ray
from ray import tune
import OPTIMA.core.training
import OPTIMA.core.model
import OPTIMA.builtin.evaluation
import OPTIMA.builtin.inputs
import OPTIMA.builtin.search_space
from OPTIMA.core.search_space import run_config_search_space_entry_type
def evaluate_experiment(
analysis: tune.ExperimentAnalysis,
training_func: Callable,
run_config: ModuleType,
optimize_name: str,
optimize_op: Union[Literal["max"], Literal["min"]],
search_space: dict[str, run_config_search_space_entry_type],
results_dir: str,
inputs_split: list[ray.ObjectRef],
targets_split: list[ray.ObjectRef],
weights_split: list[ray.ObjectRef],
normalized_weights_split: list[ray.ObjectRef],
input_handler: OPTIMA.builtin.inputs.InputHandler,
custom_metrics: Optional[list[tuple[str, Callable]]] = None,
composite_metrics: Optional[list[tuple[str, tuple[str, str], Callable]]] = None,
native_metrics: Optional[list] = None,
weighted_native_metrics: Optional[list] = None,
cpus_per_model: int = 1,
gpus_per_model: int = 0,
overtraining_conditions: Optional[list] = None,
write_results: bool = True,
return_results_str: bool = False,
return_unfilled: bool = False,
return_crossval_models: bool = False,
PBT: bool = False,
PBT_mutation_handler: Optional[OPTIMA.core.training.PBTMutationHandler] = None,
seed: Optional[int] = None,
) -> Union[
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, dict, dict],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, dict, dict],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list, dict, dict],
]:
"""Performs the evaluation of an experiment to find the best trial, run the crossvalidation and evaluate the models.
After removing any reports containing ``inf`` or ``NaN`` values, the dictionary of dataframes containing the reports
of all trials is saved to ``'dfs.pickle'`` in the provided ``results_dir``. If this file already exists, it is not
overwritten.
Two sets of plots are produced to give an overview of the entire experiment:
- A set of overview plots (one per metric) containing the values of the target metric, all `custom metrics` on the
training and validation datasets and all `composite metrics` for each report of every trial, i.e. the metric values
after every epoch throughout the entire experiment, sorted by the time of the report. Reports corresponding to
overfitted epochs (i.e. if any of the `overfitting conditions` defined in the run-config if not satisfied) are
shown semi-transparent. The plots are saved in the subdirectory ``'overview_plots'`` in the provided ``results_dir``.
- A set of progress plots (one per metric) containing the value of the target metric, all `custom metrics` on the
training and validation datasets and all `composite metrics` corresponding to the best epoch up to a certain point
during the optimization. E.g., at epoch 100, the values of the best epoch in the first 100 epochs is drawn. Thus,
the shown curves can be interpreted as a convergence of the optimization.
From the reported metric values, the best trial is selected using two independent methods (except if ``PBT`` is
``True``, in which case only the `best value`-method is used):
- `best value`: the best trial is given by the best reported value of the target metric while passing all
`overfitting conditions`.
- `best fit`: the evolution of all metrics of each trial is fitted (see ``evaluation.get_best_trials_from_fit``
for details). The best trial is given by the best target metric fit function value that passed all overfitting
conditions. The overfitting conditions are evaluated using the fit function values of all necessary metrics at
each epoch (if ``run_config.check_overtraining_with_fit`` is ``True``) or using the reported values (if
``run_config.check_overtraining_with_fit`` is ``False``).
For both methods, the hyperparameters corresponding to the best trial and the number of epochs to reach the best
target metric value are extracted. If ``PBT`` is ``False``, the hyperparameters correspond to the config provided by
Tune via ``analysis.get_all_configs()``. If ``PBT`` is ``True``, the ``get_policy()``-function of the provided
``PBT_mutation_handler`` is called to check if a hyperparameter schedule is available for the best trial. If this is
the case, this policy is used. If not, the config provided by Tune is used. Both the hyperparameters and the number
of epochs are given to the ``perform_crossvalidation``-function to perform the k-fold crossvalidation, resulting in
`k` trained models per method.
The ``evaluate``-function is called for the 2*k models trained during the crossvalidation. After the evaluation, the
mean and standard deviation of the `k` values for each metric provided by the evaluation is calculated for both sets
of hyperparameters. The results are printed to console.
If ``write_results`` is True, the results and a summary of the experiment (the used input variables as given by the
provided ``input_handler``-instance, the shape of the training, validation and (if used) testing dataset as well as
the search space) are saved to `results.txt` in the ``results_dir``.
The full results of the experiment evaluation is saved to `evaluation.pickle` in the ``results_dir`` which allows to
reload the evaluation results. This is useful when e.g. resuming a partially finished optimization run because the
evaluation of finished steps does not need to be repeated. Thus, the evaluation is automatically skipped when a
`evaluation.pickle`-file is present in ``results_dir`` and the results are instead reloaded from that file.
Parameters
----------
analysis : tune.ExperimentAnalysis
The ``tune.ExperimentAnalysis``-object extracted from the ``tune.ResultsGrid`` returned by the ``Tuner``.
training_func : Callable
Reference to the function performing the training. This is given to ``perform_crossvalidation``.
run_config : ModuleType
Reference to the imported run-config file.
optimize_name : str
Name of the target metric.
optimize_op : Union[Literal["max"], Literal["min"]]
Specifies if the target metric is to be maximized or minimized. Can be either ``'max'`` or ``'min'``.
search_space : dict[str, run_config_search_space_entry_type]
The search space as defined in the run-config.
results_dir : str
Path to the directory where the results are to be saved.
inputs_split : list[ray.ObjectRef]
List containing the object reference to the numpy array of input features for the training, validation and (if
used) testing sets.
targets_split : list[ray.ObjectRef]
List containing the object reference to the numpy array of target labels for the training, validation and (if
used) testing sets.
weights_split : list[ray.ObjectRef]
List containing the object reference to the numpy array of event weights for the training, validation and (if
used) testing sets.
normalized_weights_split : list[ray.ObjectRef]
List containing the object reference to the numpy array of normalized event weights for the training, validation
and (if used) testing sets.
input_handler : OPTIMA.builtin.inputs.InputHandler
Instance of the ``OPTIMA.builtin.inputs.InputHandler``-class
custom_metrics : Optional[list[tuple[str, Callable]]]
A list of `custom metrics` as defined in the run-config. (Default value = None)
composite_metrics : Optional[list[tuple[str, tuple[str, str], Callable]]]
A list of `composite metrics` as defined in the run-config. (Default value = None)
native_metrics : Optional[list]
A list of native metrics as defined in the run-config. (Default value = None)
weighted_native_metrics : Optional[list]
A list of weighted native metrics as defined in the run-config. (Default value = None)
cpus_per_model : int
The number of CPUs to use to train each model. This is given to ``perform_crossvalidation``. (Default value = 1)
gpus_per_model : int
The number of GPUs to use to train each model. This is given to ``perform_crossvalidation``. (Default value = 0)
overtraining_conditions : Optional[list]
A list of `overtraining conditions` as defined in the run-config. (Default value = None)
write_results : bool
If ``True``, the results are written to `results.txt` in ``results_dir``. (Default value = True)
return_results_str : bool
If ``True``, the results string that is printed to console is also returned. (Default value = False)
return_unfilled : bool
If ``True``, the evaluation part of the results string (containing the metric values returned by the
``evaluate``-function) will be provided "unfilled", i.e. with ``{}`` instead of the metric values.
Additionally, the list of raw metric values is provided. This can be useful for testing. (Default value = False)
return_crossval_models : bool
If ``True``, a dictionary containing file names of the saved crossvalidation models, the corresponding indices
denoting which of the `k` crossvalidation-splittings was used for the training, the model config and a dictionary
containing the training, validation and (if used) testing datasets for each splitting are returned. (Default value = False)
PBT : bool
Signifies if this is the evaluation of the Population Based Training step. This is given to ``perform_crossvalidation``
and used to skip the fit evaluation. (Default value = False)
PBT_mutation_handler : Optional[OPTIMA.core.training.PBTMutationHandler]
An instance of the ``OPTIMA.core.training.PBTMutationHandler``-class that helps to handle the PBT mutation
policies. Only used of ``PBT`` is ``True`` (Default value = None)
seed : Optional[int]
Seed given to ``perform_crossvalidation``. (Default value = None)
Returns
-------
Union[
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, dict, dict],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, dict, dict],
tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list, dict, dict],
]
Two pandas dataframes containing the values of all metrics, the trial-id, the training iteration and the path to
the checkpoint directory of the two best checkpoints and a dataframe containing the corresponding hyperparameters
(that were used to train the crossvalidation models) are returned. If ``return_results_str`` is ``True``, the
results string that was printed to console is also returned. Finally, if ``return_crossval_models`` is ``True``,
dictionaries containing the file names of the saved crossvalidation models, the corresponding indices denoting
which of the `k` crossvalidation-splittings was used for the training, the model config and the corresponding
training, validation and (if used) testing data are returned as well.
"""
if overtraining_conditions is None:
overtraining_conditions = []
if native_metrics is None:
native_metrics = []
if weighted_native_metrics is None:
weighted_native_metrics = []
if custom_metrics is None:
custom_metrics = []
if composite_metrics is None:
composite_metrics = []
# if we are doing the evaluation for the PBT step, we want to skip the fit evaluation
skip_fit_evaluation = PBT
# check if evaluation was already done previously
if not os.path.isfile(os.path.join(results_dir, "evaluation.pickle")):
# create the results directory if not present
if not os.path.exists(results_dir):
os.makedirs(results_dir, exist_ok=True)
# build a list containing the names of all metrics, grouped together like [[train_loss, val_loss], [train_accuracy, val_accuracy], ...]
metric_names = []
optimize_name_included = False
for metric, _ in native_metrics + weighted_native_metrics + custom_metrics:
group = ("train_" + metric, "val_" + metric)
metric_names.append(group)
if optimize_name in group:
optimize_name_included = True
for metric, _, _ in composite_metrics:
metric_names.append(metric)
if metric == optimize_name:
optimize_name_included = True
if not optimize_name_included:
metric_names = [optimize_name] + metric_names
# get the results dataframes and remove all NaN and inf values in columns corresponding to metrics
if not os.path.isfile(os.path.join(results_dir, "dfs.pickle")):
dfs_dirty = analysis.trial_dataframes
dfs = clean_analysis_results(dfs_dirty, metric_names)
# now also save the dataframes (if not already present)
with open(os.path.join(results_dir, "dfs.pickle"), "wb") as file:
pickle.dump(dfs, file)
else:
print(f"{os.path.join(results_dir, 'dfs.pickle')} already exists, reloading...")
with open(os.path.join(results_dir, "dfs.pickle"), "rb") as file:
dfs = pickle.load(file)
# go through dataframes and explicitly check if overtraining conditions are fulfilled, and add results (True/False)
# as new column "overtrained"
dfs_overtraining_checked = check_overtraining(dfs, overtraining_conditions)
# produce two sets of plots showing the overall progress of the experiment, one set containing all trials as datapoints,
# and one showing the evolution of the "best" trial; both as a function of epoch (total epochs across all trials)
draw_total_progress(
dfs_overtraining_checked,
optimize_name,
optimize_op,
metric_names,
figs_dir=results_dir,
reject_overtrained=True,
)
# find best trials by going through the dataframes and finding the extreme value of the metrics we are interested in
optimization_results_string = "" # will be printed and saved to file later
best_trials = get_best_trials(
dfs_overtraining_checked,
optimize_name,
optimize_op,
metric_names,
figs_dir=results_dir,
reject_overtrained=True,
)
optimization_results_string += "Best trials according to best achieved value for the metrics to monitor:\n"
optimization_results_string += tabulate.tabulate(
best_trials, headers=[best_trials.index.name] + list(best_trials.columns), tablefmt="fancy_grid"
)
if not skip_fit_evaluation:
# get best trials from fit: apply fit to target metrics for each trial, get the minimum of the fit, find the nearest
# checkpoint that is not overtrained, and write down the fit value at that point; then find the lowest fit value
# over all trials; when conf.check_overtraining_with_fit is set, give the overtraining conditions to the function
# which are then used to evaluate the overtraining using the fit function value for all relevant metrics,
# otherwise the "overtrained" column in the dfs is used
if run_config.check_overtraining_with_fit:
best_trials_fit = get_best_trials_from_fit(
run_config,
dfs_overtraining_checked,
optimize_name,
optimize_op,
metric_names,
figs_dir=results_dir,
overtraining_conditions=overtraining_conditions,
min_R_squared=run_config.fit_min_R_squared,
)
else:
best_trials_fit = get_best_trials_from_fit(
run_config,
dfs_overtraining_checked,
optimize_name,
optimize_op,
metric_names,
figs_dir=results_dir,
reject_overtrained=True,
min_R_squared=run_config.fit_min_R_squared,
)
if best_trials_fit is None:
print(
"WARNING: The fit evaluation failed. This indicates that no trial resulted in a sensible fit, which "
"might indicate a problem with the optimization. Skipping the fit evaluation..."
)
skip_fit_evaluation = True
else:
optimization_results_string += (
"\n\nBest trials according to best fit of the metrics to monitor: (all metric values from fit)\n"
)
optimization_results_string += tabulate.tabulate(
best_trials_fit,
headers=[best_trials_fit.index.name] + list(best_trials_fit.columns),
tablefmt="fancy_grid",
)
# go through results by iterating over the best trials for each target metric, print the corresponding configs,
# and save output paths for later evaluation
# first prepare the dataframe that will contain the best configs, meaning the hyperparameters of the best trials
# for each target metric, once determined using the best value and once using the fit. Since the first
# checkpointing epoch and the checkpointing frequency are not relevant hyperparameters, skip those entries.
optimize_name_list = optimize_name if isinstance(optimize_name, list) else [optimize_name]
hps_names = list(search_space.keys())
hps_names.remove("first_checkpoint_epoch")
hps_names.remove("checkpoint_frequency")
if PBT:
hps_names.remove("seed")
model_configs_df = pd.DataFrame(
index=hps_names + ["epochs", "seed"],
columns=list(*zip(optimize_name_list, [f"{metric} fit" for metric in optimize_name_list]))
if not skip_fit_evaluation
else optimize_name_list,
)
model_configs_df.index.name = "Hyperparameter"
optimization_results_string += "\n\nBest configs:\n"
# start by fetching dictionary containing the configs of all trials
if not os.path.isfile(os.path.join(results_dir, "configs.pickle")):
all_model_configs = analysis.get_all_configs()
assert all_model_configs != {}, "Dictionary of configs could not be loaded, was the optimization deleted?"
trial_ids = [trial.trial_id for trial in analysis.trials]
with open(os.path.join(results_dir, "configs.pickle"), "wb") as file:
pickle.dump((all_model_configs, trial_ids), file)
else:
print(f"{os.path.join(results_dir, 'configs.pickle')} already exists, reloading...")
with open(os.path.join(results_dir, "configs.pickle"), "rb") as file:
all_model_configs, trial_ids = pickle.load(file)
# start with results from the best metric values
dirs_to_evaluate = (
[]
) # will contain the paths to the directories containing the models to evaluate (best model from optimization + crossvalidation models)
model_configs_to_evaluate = [] # will contain the configs of the best models
for metric, best_trial, best_epoch in zip(best_trials.index, best_trials["trial"], best_trials["best epoch"]):
# get the config of this trial
model_config_to_evaluate = {}
config = all_model_configs[best_trial].copy()
# get the trial id
for trial_id in trial_ids:
if trial_id in best_trial:
model_config_to_evaluate[
"trial_id"
] = trial_id # best_trial is full path to the optimization folder while trails in trial_list as only the names
break
# when evaluating the PBT step, check if there is a policy available for this trial. If yes, use a different
# structure for the model_config_to_evaluate
if PBT and PBT_mutation_handler.get_policy(model_config_to_evaluate["trial_id"]) is not None:
# get the policy
policy = PBT_mutation_handler.get_policy(model_config_to_evaluate["trial_id"])
# since first_checkpoint_epoch, checkpoint_frequency, max_epochs and seed are constants, they will be
# the same for every config in the policy. Thus, move them outside of the hyperparameter schedule
constants = {
"first_checkpoint_epoch": policy[0][1]["first_checkpoint_epoch"],
"checkpoint_frequency": policy[0][1]["checkpoint_frequency"],
"max_epochs": policy[0][1]["max_epochs"],
"seed": policy[0][1]["seed"],
}
# since the constant entries as well as "first_checkpoint_epoch" and "checkpoint_frequency" are not
# needed in the schedule, remove the corresponding entries from each config in the policy
for _, model_config in policy:
del model_config["max_epochs"]
del model_config["seed"]
del model_config["first_checkpoint_epoch"]
del model_config["checkpoint_frequency"]
# add the policy and the constants to the model config to evaluate
model_config_to_evaluate["hp_schedule"] = policy
model_config_to_evaluate.update(constants)
else:
if PBT:
print(
f"Did not find a hyperparameter mutation policy for trial "
f"{model_config_to_evaluate['trial_id']}. Using fixed hyperparameters: {config}"
)
model_config_to_evaluate = config
# add the best epoch and save it to the model_configs_to_evaluate list; this is later given to the
# crossvalidation function to train multiple models for each config
model_config_to_evaluate["epochs"] = int(best_epoch)
model_configs_to_evaluate.append(model_config_to_evaluate)
# Add the hyperparameters to the dataframe containing the configs of the best trials. For PBT with an
# available policy, indicate the mutations. For hierarchical search spaces, not all hyperparameters must
# have a value, so insert "-" for those. Since the checkpointing frequency and the first checkpoint epoch
# are not relevant hyperparameters, skip those entries.
if "hp_schedule" in model_config_to_evaluate.keys():
# grab the hyperparameter schedule
policy = model_config_to_evaluate["hp_schedule"]
for hp in model_configs_df.index:
if hp in ["first_checkpoint_epoch", "checkpoint_frequency"]:
continue
elif hp == "epochs":
model_configs_df.loc[hp, metric] = model_config_to_evaluate["epochs"]
elif hp == "seed":
model_configs_df.loc[hp, metric] = model_config_to_evaluate["seed"]
else:
# build the string of mutations by iterating through this hp's schedule
hp_string = ""
previous_hp_value = None
for start_epoch, model_config in policy:
# get the hp value for this schedule step
hp_value = model_config[hp] if hp in model_config.keys() else "-"
# check if the value has changed, otherwise we can skip this step
if hp_value == previous_hp_value:
continue
# add this step's hp value to the string
if hp_string == "":
hp_string = str(hp_value)
else:
hp_string += f"\nepoch {start_epoch} --> {hp_value}"
# update the previous value
previous_hp_value = hp_value
# write the config entry
model_configs_df.loc[hp, metric] = hp_string
else:
for hp in model_configs_df.index:
if hp in ["first_checkpoint_epoch", "checkpoint_frequency"]:
continue
hp_value = model_config_to_evaluate.get(hp)
model_configs_df.loc[hp, metric] = hp_value if hp_value is not None else "-"
# create the target folder for the following crossvalidation
target_folder = os.path.join(results_dir, metric if len(best_trials.index) > 1 else "", "best_value")
dirs_to_evaluate.append(target_folder) # mark target_folder to be evaluated later
if not os.path.exists(target_folder):
os.makedirs(target_folder, exist_ok=True)
# then the results from the fit
if not skip_fit_evaluation:
for metric, best_trial, best_epoch in zip(
best_trials_fit.index,
best_trials_fit["trial"],
best_trials_fit["best epoch"],
):
model_config_to_evaluate = all_model_configs[best_trial].copy()
model_config_to_evaluate["epochs"] = int(best_epoch)
for trial_id in trial_ids:
if trial_id in best_trial:
model_config_to_evaluate[
"trial_id"
] = trial_id # best_trail is full path to the optimization folder while trails in trial_list as only the names
break
model_configs_to_evaluate.append(model_config_to_evaluate)
model_config = model_configs_to_evaluate[-1]
for hp in model_configs_df.index:
if hp in ["first_checkpoint_epoch", "checkpoint_frequency"]:
continue
hp_value = model_config.get(hp)
model_configs_df.loc[hp, f"{metric} fit"] = hp_value if hp_value is not None else "-"
target_folder = os.path.join(results_dir, metric if len(best_trials_fit.index) > 1 else "", "best_fit")
dirs_to_evaluate.append(target_folder) # mark target_folder to be evaluated later
if not os.path.exists(target_folder):
os.makedirs(target_folder, exist_ok=True)
optimization_results_string += tabulate.tabulate(
model_configs_df,
headers=[model_configs_df.index.name] + list(model_configs_df.columns),
tablefmt="fancy_grid",
)
print("\n" + optimization_results_string)
# give the list of best configs to perform_crossvalidation which will perform crossvalidation for each of the
# configs, applying the same EarlyStopping criteria as during the optimization.
print("Starting k-fold cross-validation for the best model-configs...")
crossval_model_info, crossval_input_data = OPTIMA.core.training.perform_crossvalidation(
model_configs_to_evaluate,
dirs_to_evaluate,
training_func,
run_config,
input_handler,
cpus_per_model,
gpus_per_model,
custom_metrics,
composite_metrics,
native_metrics,
weighted_native_metrics,
seed=seed,
)
print("Cross-validation finished!")
# reload best models from the optimization and the corresponding crossvalidation models and do the evaluation
evaluation_string = "Evaluation:"
print("\nReloading the cross-validation models for evaluation...")
# get the evaluation function
if hasattr(run_config, "evaluate"):
evaluate_func = run_config.evaluate
else:
evaluate_func = OPTIMA.builtin.evaluation.evaluate
# wrap the evaluation function in a ray task
evaluate_remote = ray.remote(num_cpus=cpus_per_model, num_gpus=gpus_per_model)(evaluate_func).remote
# go through all the dirs containing models to evaluate
raw_values_list = (
[]
) # will contain all raw values so that they can be returned as numbers; can be useful for testing
futures = [] # will contain the futures for the remote execution of the evaluation
for model_dir in dirs_to_evaluate:
# loop over all models from the crossvalidation and perform an evaluation for each. The evaluation is
# executed as a Ray task, so we save the returned futures for later.
print(f"Evaluating {model_dir}")
for model_info in crossval_model_info[model_dir]:
# get the inputs
inputs_split_k, targets_split_k, weights_split_k, normalized_weights_split_k = crossval_input_data[
model_info["split"]
]
# do the evaluation
futures.append(
evaluate_remote(
run_config,
os.path.join(model_dir, model_info["name"]),
inputs_split_k,
targets_split_k,
weights_split_k,
normalized_weights_split_k,
os.path.join(model_dir, "plots", "evaluation", "crossval_{}".format(model_info["split"])),
cpus=cpus_per_model,
print_results=False,
native_metrics=native_metrics,
weighted_native_metrics=weighted_native_metrics,
custom_FoMs=custom_metrics,
class_labels=run_config.evaluation_class_labels,
return_unfilled=True,
)
)
# save the values of the metrics returned by the evaluation to a list to then calculate mean and std, which
# are then inserted into the unfilled results string
for model_dir in dirs_to_evaluate:
results_str_unfilled = ""
metrics = []
for model_info in crossval_model_info[model_dir]:
# get and save the evaluation results
results_str_unfilled_k, metrics_k = ray.get(futures.pop(0))
if results_str_unfilled == "":
results_str_unfilled = results_str_unfilled_k
metrics.append(metrics_k)
# if defined, execute the finalize function for this model
if hasattr(run_config, "finalize_model"):
# get the inputs for potential use in the finalize function
inputs_split_k, targets_split_k, weights_split_k, normalized_weights_split_k = crossval_input_data[
model_info["split"]
]
# execute the finalize function as ray task in case it does predictions or something else with the
# models. However, we can't know if the finalize-function is written thread-safe, so execute them
# sequentially
f = ray.remote(num_cpus=cpus_per_model, num_gpus=gpus_per_model)(run_config.finalize_model).remote(
run_config,
inputs_split_k,
targets_split_k,
weights_split_k,
normalized_weights_split_k,
results_dir=results_dir,
model_dir=model_dir,
model_info=model_info,
input_handler=input_handler,
)
ray.get(f)
del f
# calculate the mean and std for the returned metrics and fill them into the results string
metrics_array = np.array(metrics)
metrics_mean = np.mean(metrics_array, axis=0)
metrics_std = np.std(metrics_array, axis=0)
metrics_with_errors_strs = []
for mean, std, raw_values in zip(metrics_mean, metrics_std, metrics_array.transpose()):
if std != 0:
err_significant_digit = max(-int(np.floor(np.log10(abs(std)))), 0)
metric_with_error_str = "{{:.{}f}} +- {{:.{}f}}".format(
err_significant_digit + 1, err_significant_digit + 1
)
else:
metric_with_error_str = "{} +- {}"
err_significant_digit = 4
metric_with_error_str += (
" (" + ", ".join(["{{:.{}f}}".format(err_significant_digit + 1) for _ in raw_values]) + ")"
)
raw_values_list += [mean, std] + list(raw_values)
metrics_with_errors_strs.append(metric_with_error_str)
evaluation_string += f"\n{model_dir}:\n"
evaluation_string += results_str_unfilled.format(*metrics_with_errors_strs)
# get summary of the optimization for the results file
if run_config.use_testing_dataset:
targets_train, targets_val, targets_test = ray.get(targets_split)
else:
targets_train, targets_val = ray.get(targets_split)
if targets_train.shape[1] == 1:
optimization_str = (
f"training events: {targets_train[targets_train[:, 0] == 1].shape[0]} signal, "
f"{targets_train[targets_train[:, 0] == 0].shape[0]} background\n"
)
optimization_str += (
f"validation events: {targets_val[targets_val[:, 0] == 1].shape[0]} signal, "
f"{targets_val[targets_val[:, 0] == 0].shape[0]} background\n"
)
if run_config.use_testing_dataset:
optimization_str += (
f"test events: {targets_test[targets_test[:, 0] == 1].shape[0]} signal, "
f"{targets_test[targets_test[:, 0] == 0].shape[0]} background\n"
)
else:
optimization_str = f"training events: {[targets_train[targets_train[:, j] == 1].shape[0] for j in range(targets_train.shape[1])]}\n"
optimization_str += f"validation events: {[targets_val[targets_val[:, j] == 1].shape[0] for j in range(targets_val.shape[1])]}\n"
if run_config.use_testing_dataset:
optimization_str += f"test events: {[targets_test[targets_test[:, j] == 1].shape[0] for j in range(targets_test.shape[1])]}\n"
optimization_str += "input variables: {}\n\n".format(", ".join(input_handler.get_vars()))
optimization_str += "search space:\n"
for hp in search_space.keys():
if hp in ["first_checkpoint_epoch", "checkpoint_frequency"]:
continue
optimization_str += f"\t{hp}: {search_space[hp]}\n"
# write results to file
if write_results:
with open(os.path.join(results_dir, "results.txt"), "w") as results_file:
results_file.write(
optimization_str
+ "\n"
+ optimization_results_string
+ "\n\n"
+ evaluation_string.format(*raw_values_list)
)
# saving results of the evaluation to file to reload the evaluation should the experiment be stopped and resumed in
# the future
with open(os.path.join(results_dir, "evaluation.pickle"), "wb") as evaluation_file:
pickle.dump(
(
best_trials,
best_trials_fit if not skip_fit_evaluation else None,
model_configs_df,
optimization_str,
optimization_results_string,
crossval_model_info,
{
k: [ray.get(e) for e in v] for k, v in crossval_input_data.items()
}, # we want to save the actual data here!
model_configs_to_evaluate,
dirs_to_evaluate,
evaluation_string,
raw_values_list,
),
evaluation_file,
)
else:
print(
f"Found previous evaluation at {os.path.join(results_dir, 'evaluation.pickle')}, reloading previous results "
f"and skipping the evaluation. If this is not intentional, delete the evaluation.pickle file or "
f"the {results_dir} directory."
)
with open(os.path.join(results_dir, "evaluation.pickle"), "rb") as evaluation_file:
(
best_trials,
best_trials_fit,
model_configs_df,
optimization_str,
optimization_results_string,
crossval_model_info,
crossval_input_data,
crossval_model_configs_to_evaluate,
crossval_dirs_to_evaluate,
evaluation_string,
raw_values_list,
) = pickle.load(evaluation_file)
# check if the crossvalidation should be done again; could be useful when crossvalidation should be repeated but
# the optimization folder was already deleted, but should be avoided when possible because the evaluation cannot
# be redone in that case (because the configs of each trial are saved inside the optimization folder and are
# thus not available anymore)
print(
"Checking if crossvalidation needs to be repeated. Note that the evaluation will not be repeated even when"
" the crossvalidation is redone! If you want to repeat the evaluation, it's necessary to delete the "
"'evaluation.pickle' file."
)
crossval_model_info, crossval_input_data = OPTIMA.core.training.perform_crossvalidation(
crossval_model_configs_to_evaluate,
crossval_dirs_to_evaluate,
training_func,
run_config,
input_handler,
cpus_per_model,
gpus_per_model,
custom_metrics,
composite_metrics,
native_metrics,
weighted_native_metrics,
seed=seed,
)
print(
"\nResults:\n"
+ optimization_str
+ "\n"
+ optimization_results_string
+ "\n\n"
+ evaluation_string.format(*raw_values_list)
)
if not return_results_str:
if return_crossval_models:
return (
best_trials,
best_trials_fit if not skip_fit_evaluation else None,
model_configs_df,
crossval_model_info,
crossval_input_data,
)
else:
return best_trials, best_trials_fit if not skip_fit_evaluation else None, model_configs_df
else:
if return_crossval_models:
if not return_unfilled:
return (
best_trials,
best_trials_fit if not skip_fit_evaluation else None,
model_configs_df,
optimization_str
+ "\n"
+ optimization_results_string
+ "\n\n"
+ evaluation_string.format(*raw_values_list),
crossval_model_info,
crossval_input_data,
)
else:
return (
best_trials,
best_trials_fit if not skip_fit_evaluation else None,
model_configs_df,
optimization_str + "\n" + optimization_results_string + "\n\n" + evaluation_string,
raw_values_list,
crossval_model_info,
crossval_input_data,
)
else:
if not return_unfilled:
return (
best_trials,
best_trials_fit if not skip_fit_evaluation else None,
model_configs_df,
optimization_str
+ "\n"
+ optimization_results_string
+ "\n\n"
+ evaluation_string.format(*raw_values_list),
)
else:
return (
best_trials,
best_trials_fit if not skip_fit_evaluation else None,
model_configs_df,
optimization_str + "\n" + optimization_results_string + "\n\n" + evaluation_string,
raw_values_list,
)
def scientific_rounding(value, err, notation="separate"):
"""Helper function to perform scientific rounding based on the provided uncertainty.
Notation can be 'bracket', meaning 0.0123 +- 0.0234 will become 0.012(23), or 'separate' where the rounded value and
error will be returned separately
Parameters
----------
value : _type_
_description_
err : _type_
_description_
notation : _type_
_description_ (Default value = 'separate')
"""
if abs(err) > 0:
significant_digit = max(-int(np.floor(np.log10(abs(err)))), 0)
error_digits = round(err * 10**significant_digit)
if error_digits < 4:
significant_digit += 1
error_digits = round(err * 10**significant_digit)
if notation == "bracket":
value_rounded = "{{:.{}f}}({{}})".format(significant_digit).format(value, error_digits)
return value_rounded
elif notation == "separate":
value_rounded = "{{:.{}f}}".format(significant_digit).format(value)
err_rounded = "{{:.{}f}}".format(significant_digit).format(err)
return value_rounded, err_rounded
else:
if notation == "bracket":
return f"{value}(0)"
elif notation == "separate":
return value, err
def clean_analysis_results(dfs, metric_names):
"""_summary_.
Parameters
----------
dfs : _type_
_description_
metric_names : _type_
_description_
Returns
-------
_type_
_description_
"""
# ungroup the metric names
metric_names_ungrouped = []
for group in metric_names:
if not isinstance(group, list) and not isinstance(group, tuple):
metric_names_ungrouped.append(group)
else:
for metric in group:
metric_names_ungrouped.append(metric)
dfs_cleaned = {}
for trial, df in dfs.items():
df_cleaned = df.replace([np.inf, -np.inf], np.nan, inplace=False) # first replace infs with NaNs
df_cleaned.dropna(
subset=metric_names_ungrouped, how="any", inplace=True
) # then drop all the rows containing NaNs
df_cleaned.reset_index(drop=True, inplace=True) # reindex to get rid of the gaps
dfs_cleaned[trial] = df_cleaned
return dfs_cleaned
def check_overtraining(dfs, overtraining_conditions):
"""_summary_.
Parameters
----------
dfs : _type_
_description_
overtraining_conditions : _type_
_description_
Returns
-------
_type_
_description_
"""
def _check_overtraining_for_df(df):
"""_summary_.
Parameters
----------
df : _type_
_description_
Returns
-------
_type_
_description_
"""
# skip if df is empty
if len(df) == 0:
return df
df_overtraining_checked = copy.deepcopy(df)
overtrained = np.zeros_like(df_overtraining_checked.index, dtype=bool)
for _, input_metric_names, condition in overtraining_conditions:
input_values = [df[input_name] for input_name in input_metric_names]
overtrained += condition(*input_values) # boolean addition is supported by numpy
df_overtraining_checked["overtrained"] = overtrained
return df_overtraining_checked
if isinstance(dfs, dict):
dfs_overtraining_checked = {}
for trial, df in dfs.items():
df_overtraining_checked = _check_overtraining_for_df(df)
dfs_overtraining_checked[trial] = df_overtraining_checked
elif isinstance(dfs, pd.DataFrame):
dfs_overtraining_checked = _check_overtraining_for_df(dfs)
else:
raise TypeError
return dfs_overtraining_checked
def draw_total_progress(dfs, optimize_name, optimize_op, metric_names, figs_dir=None, reject_overtrained=False):
"""_summary_.
Parameters
----------
dfs : _type_
_description_
optimize_name : _type_
_description_
optimize_op : _type_
_description_
metric_names : _type_
can be grouped by giving list of lists or list of tuples -> groups will be plotted together in one diagram
figs_dir : _type_
_description_ (Default value = None)
reject_overtrained : _type_
_description_ (Default value = False)
"""
if not isinstance(optimize_name, list):
optimize_name = [optimize_name]
if not isinstance(optimize_op, list):
optimize_op = [optimize_op]
# ungroup the metric names (to give e.g. both train_loss and val_loss their own columns)
metric_names_ungrouped = []
metric_names_grouped = []
for group in metric_names:
if not isinstance(group, list) and not isinstance(group, tuple):
metric_names_ungrouped.append(group)
metric_names_grouped.append([group])
else:
for metric in group:
metric_names_ungrouped.append(metric)
metric_names_grouped.append(group)
# create a single large dataframe and sort it by "timestamp"
large_df = pd.concat(dfs.values(), ignore_index=True, sort=False)
large_df = large_df.sort_values(by=["timestamp"], ignore_index=True)
# get alpha values from the overtraining flag
alpha_array = copy.deepcopy(
large_df["overtrained"].to_numpy(dtype=np.float64)
) # need the copy here, otherwise we modify df
alpha_array[alpha_array == True] = 0.2 # noqa: E712
alpha_array[alpha_array == False] = 1.0 # noqa: E712
for metric_group in metric_names_grouped:
for metric in metric_group:
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
# plt.scatter(large_df["timestamp"]-start_time, large_df[metric], s=3., label=metric, alpha=alpha_array)
plt.scatter(large_df.index, large_df[metric], s=3.0, label=metric, alpha=alpha_array)
plt.title("Optimization overview: {}".format(", ".join(metric_group)))
# plt.xlabel("runtime [s]")
plt.xlabel("iterations")
plt.legend()
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
plt.tight_layout()
if figs_dir is not None:
if not os.path.exists(os.path.join(figs_dir, "overview_plots")):
os.makedirs(os.path.join(figs_dir, "overview_plots"), exist_ok=True)
plt.savefig(os.path.join(figs_dir, "overview_plots", "{}.png".format("+".join(metric_group))), dpi=600)
else:
plt.show()
plt.clf()
# extract only those epochs that improved the optimization metrics
for target_metric, target_op in zip(optimize_name, optimize_op):
# calculate cumulative min/max for each target metric and remove duplicates (which only leaves entries which improved
# the target metric)
cumextr = large_df[target_metric].cummin() if target_op == "min" else large_df[target_metric].cummax()
cumextr_index = cumextr.drop_duplicates().index
# get the indices of large_df and replace all values that are not in cumext_index (i. e. those that did not result
# in an improvement) with NaN, then frontfill the NaNs to get repeating indices
repeating_indices = large_df.index.to_series().where(large_df.index.isin(cumextr_index))
repeating_indices.fillna(method="ffill", inplace=True)
# apply repeating indices to large_df to only have the entries that were improvements
pruned_large_df = large_df.iloc[repeating_indices][metric_names_ungrouped]
# we want to keep the original "timestamp" and indices, so reinsert them
pruned_large_df.index = large_df.index
pruned_large_df["timestamp"] = large_df["timestamp"]
# plot progress for all metrics
for metric_group in metric_names_grouped:
for metric in metric_group:
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
# plt.plot(pruned_large_df["timestamp"]-start_time, pruned_large_df[metric], label=metric)
plt.plot(pruned_large_df.index, pruned_large_df[metric], label=metric)
plt.title("Optimization progress: {}, target: {}".format(", ".join(metric_group), target_metric))
# plt.xlabel("runtime [s]")
plt.xlabel("iterations")
plt.legend()
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
plt.tight_layout()
if figs_dir is not None:
if not os.path.exists(os.path.join(figs_dir, "progress_plots")):
os.makedirs(os.path.join(figs_dir, "progress_plots"), exist_ok=True)
plt.savefig(
os.path.join(
figs_dir,
"progress_plots",
target_metric if len(optimize_name) > 1 else "",
"{}.png".format("+".join(metric_group)),
),
dpi=600,
)
else:
plt.show()
plt.clf()
def get_best_trials(dfs, optimize_name, optimize_op, metric_names, figs_dir=None, reject_overtrained=False):
"""_summary_.
Parameters
----------
dfs : _type_
_description_
optimize_name : _type_
_description_
optimize_op : _type_
_description_
metric_names : _type_
can be grouped by giving list of lists or list of tuples -> groups will be plotted together in one diagram
figs_dir : _type_
_description_ (Default value = None)
reject_overtrained : _type_
_description_ (Default value = False)
"""
if not isinstance(optimize_name, list):
optimize_name = [optimize_name]
if not isinstance(optimize_op, list):
optimize_op = [optimize_op]
# ungroup the metric names (to give e.g. both train_loss and val_loss their own columns)
metric_names_ungrouped = []
metric_names_grouped = []
for group in metric_names:
if not isinstance(group, list) and not isinstance(group, tuple):
metric_names_ungrouped.append(group)
metric_names_grouped.append([group])
else:
for metric in group:
metric_names_ungrouped.append(metric)
metric_names_grouped.append(group)
best_trials = pd.DataFrame(
index=optimize_name,
columns=metric_names_ungrouped + ["trial", "best epoch", "best index", "best checkpoint"],
dtype=np.float64,
)
best_trials.index.name = "target"
for metric, op in zip(optimize_name, optimize_op):
for trial, df in dfs.items():
# skip if df is empty
if len(df) == 0:
continue
# need to find the index of the best epoch because for PBT, epochs don't always increase monotonically
if reject_overtrained:
df_nonovertrained = df.where(df["overtrained"] == False) # reject overtrained epochs # noqa: E712
best_index = df_nonovertrained[metric].idxmax() if op == "max" else df_nonovertrained[metric].idxmin()
else:
best_index = df[metric].idxmax() if op == "max" else df[metric].idxmin()
# if every epoch is overtrained, best index will be NaN
if not np.isnan(best_index):
metric_best = df.iloc[best_index][metric]
best_epoch = df.iloc[best_index]["training_iteration"]
best_checkpoint = (
df.iloc[best_index]["iterations_since_restore"] - 1
) # when restoring during PBT, checkpoint numbering will be restarted from 0, overwriting previous checkpoints
if (
metric_best > best_trials.loc[metric, metric]
if op == "max"
else metric_best < best_trials.loc[metric, metric]
) or np.isnan(best_trials.loc[metric, metric]):
best_trials.loc[[metric], metric_names_ungrouped] = df.loc[
best_index, metric_names_ungrouped
].to_numpy()
best_trials.loc[[metric], ["trial", "best epoch", "best index", "best checkpoint"]] = [
trial,
best_epoch,
best_index,
best_checkpoint,
]
# plotting the evolution of the metrics for best trial
for target_metric_name, best_trial, best_epoch, best_index in zip(
optimize_name, best_trials["trial"], best_trials["best epoch"], best_trials["best index"]
):
df = dfs[best_trial]
alpha_array = copy.deepcopy(
df["overtrained"].to_numpy(dtype=np.float64)
) # need the copy here, otherwise we modify df
alpha_array[alpha_array == True] = 0.2 # noqa: E712
alpha_array[alpha_array == False] = 1.0 # noqa: E712
for metric_group in metric_names_grouped:
for metric in metric_group:
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
plt.scatter(df["training_iteration"], df[metric], s=11.0, label=metric, alpha=alpha_array)
plt.scatter(
best_epoch, df.iloc[int(best_index)][metric], s=40.0, marker="x", c="r"
) # need to use the index because for PBT, epochs don't always increase monotonically
plt.title(f"target: {target_metric_name}")
plt.xlabel("training iteration")
plt.legend()
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
plt.tight_layout()
if figs_dir is not None:
if not os.path.exists(
os.path.join(
figs_dir,
target_metric_name if len(optimize_name) > 1 else "",
"best_value",
"plots",
"optimization",
)
):
os.makedirs(
os.path.join(
figs_dir,
target_metric_name if len(optimize_name) > 1 else "",
"best_value",
"plots",
"optimization",
),
exist_ok=True,
)
plt.savefig(
os.path.join(
figs_dir,
target_metric_name if len(optimize_name) > 1 else "",
"best_value",
"plots",
"optimization",
"+".join(metric_group) + ".png",
),
dpi=600,
)
else:
plt.show()
plt.clf()
return best_trials
def get_best_trials_from_fit(
run_config: ModuleType,
dfs,
optimize_name,
optimize_op,
metric_names,
fit_function="custom_2",
fit_xtol=1e-5,
fit_maxfev=10000,
figs_dir=None,
overtraining_conditions=None,
reject_overtrained=False,
min_R_squared=0.9,
):
"""_summary_.
Parameters
----------
run_config : ModuleType
Reference to the imported run-config file.
dfs : _type_
dict of pandas dataframes
optimize_name : _type_
string or list of strings
optimize_op : _type_
string ('min'/'max') or list of strings of same length as metric_name
metric_names : _type_
_description_
fit_function : _type_
defining which function should be used for fitting. Can either be single value or list of same length (Default value = 'custom_2')
fit_xtol : _type_
_description_ (Default value = 1e-5)
fit_maxfev : _type_
_description_ (Default value = 10000)
figs_dir : _type_
_description_ (Default value = None)
overtraining_conditions : _type_
_description_ (Default value = None)
reject_overtrained : _type_
_description_ (Default value = False)
min_R_squared : _type_
_description_ (Default value = 0.9)
"""
def _custom_fit_function(x, A, a, c, B, b, C, D, const):
"""_summary_.
Parameters
----------
x : _type_
_description_
A : _type_
_description_
a : _type_
_description_
c : _type_
_description_
B : _type_
_description_
b : _type_
_description_
C : _type_
_description_
D : _type_
_description_
const : _type_
_description_
Returns
-------
_type_
_description_
"""
return A * np.exp(-a / 1e-4 * (x - c) ** 2) + B * np.exp(-b / 100 * x) + C * x + D * x * x + const
def _custom_fit_function_2(x, A, B, C, D, const):
"""_summary_.
Parameters
----------
x : _type_
_description_
A : _type_
_description_
B : _type_
_description_
C : _type_
_description_
D : _type_
_description_
const : _type_
_description_
Returns
-------
_type_
_description_
"""
return A / x**2 + B / x + C * x**2 + D * x + const
def _custom_fit_function_3(x, A, a, c, B, b, C, D, E, F, const):
"""_summary_.
Parameters
----------
x : _type_
_description_
A : _type_
_description_
a : _type_
_description_
c : _type_
_description_
B : _type_
_description_
b : _type_
_description_
C : _type_
_description_
D : _type_
_description_
E : _type_
_description_
F : _type_
_description_
const : _type_
_description_
Returns
-------
_type_
_description_
"""
return (
A * np.exp(-a / 1e-4 * (x - c) ** 2)
+ B * np.exp(-b / 100 * x)
+ C * x
+ D * x * x
+ E / x**2
+ F / x
+ const
)
def _crossval_fit(fit_function, x, y, bounds=(-np.inf, np.inf), p0=None, **kwargs):
"""Function to perform a crossvalidation-like fit to assess the goodness of fit.
It applies scipy.optimize's curve_fit multiple times to a subset of the data (using kfold splitting),
calculates the average deviation between the fit function and the fitting and testing data, gets the ratio between
the two errors for each splitting, and then looks at the median value. if that is > 3, the fit is rejected,
otherwise the fit is repeated on the full dataset and the fit parameters are returned
Parameters
----------
fit_function : _type_
_description_
x : _type_
_description_
y : _type_
_description_
bounds : _type_
_description_ (Default value = (-np.inf, np.inf))
p0 : _type_
_description_ (Default value = None)
**kwargs : _type_
_description_
"""
fit_errors = []
test_errors = []
error_ratios = []
fit_R_squared = []
kfold = KFold(n_splits=5, shuffle=True, random_state=1234)
for fit_indices, test_indices in kfold.split(x):
x_fit = x[fit_indices]
x_test = x[test_indices]
y_fit = y[fit_indices]
y_test = y[test_indices]
parameters, covariance = curve_fit(fit_function, x_fit, y_fit, bounds=bounds, p0=p0, **kwargs)
fitted_func = lambda x, p=parameters: fit_function(x, *p)
y_fit_pred = fitted_func(x_fit)
y_test_pred = fitted_func(x_test)
mean_abs_fit_error = np.mean(np.abs(y_fit_pred - y_fit))
mean_abs_test_error = np.mean(np.abs(y_test_pred - y_test))
error_fraction = mean_abs_test_error / mean_abs_fit_error
fit_errors.append(mean_abs_fit_error)
test_errors.append(mean_abs_test_error)
error_ratios.append(error_fraction)
R_squared_fit = np.sum((y_fit - y_fit_pred) ** 2) / np.sum((y_fit - np.mean(y_fit)) ** 2)
fit_R_squared.append(R_squared_fit)
parameters, covariance = curve_fit(fit_function, x, y, bounds=bounds, p0=p0, **kwargs)
fitted_func = lambda x: fit_function(x, *parameters)
R_squared = 1 - np.sum((y - fitted_func(x)) ** 2) / np.sum((y - np.mean(y)) ** 2)
if R_squared < min_R_squared: # np.median(error_ratios) > 3.:
return None, None
else:
# plt.scatter(x, y, s=11.)
# xs_plot = np.linspace(x.iloc[0], x.iloc[-1], 1000)
# ys_plot = fitted_func(xs_plot)
# plt.plot(xs_plot, ys_plot, color='blue')
# plt.title("MedER: {:.4f}, MedFE: {:.5f}, MedTE: {:.5f}, \n"
# "MedRS: {:.4f}, MedRSF: {:.4f}".format(np.median(error_ratios),
# np.median(fit_errors),
# np.median(test_errors),
# np.median(R_squared),
# np.median(fit_R_squared)))
# plt.show()
return parameters, covariance
def _fit_and_plot(
df,
metric_name,
fit_function,
xtol,
maxfev,
metric_op=None,
return_at_x=None,
color="C0",
plot=True,
figs_dir=None,
reject_overtrained=False,
overtraining_conditions=None,
crossval=False,
return_plotting_df=False,
):
"""Either metric_op or return_at_x must not be None!.
Parameters
----------
df : _type_
_description_
metric_name : _type_
_description_
fit_function : _type_
_description_
xtol : _type_
_description_
maxfev : _type_
_description_
metric_op : _type_
_description_ (Default value = None)
return_at_x : _type_
if given, will return the value of the fit function at this point instead of the extreme value (Default value = None)
color : _type_
_description_ (Default value = "C0")
plot : _type_
_description_ (Default value = True)
figs_dir : _type_
_description_ (Default value = None)
reject_overtrained : _type_
_description_ (Default value = False)
overtraining_conditions : _type_
_description_ (Default value = None)
crossval : _type_
_description_ (Default value = False)
return_plotting_df : _type_
_description_ (Default value = False)
"""
def _fit(metric_to_fit, metric_to_fit_op=None):
"""Helper function that fits the fitting function to a metric in df.
Parameters
----------
metric_to_fit : _type_
_description_
metric_to_fit_op : _type_
_description_ (Default value = None)
"""
if hasattr(run_config, "fit_function"):
fitted_func = run_config.fit_function(df, metric_to_fit, metric_to_fit_op, overtraining_conditions)
elif fit_function == "custom":
# fewer data points than twice the number of function parameters
if len(df) < 16:
return
# fit curve to metric; choose initial values to be exponential convergence to the observed extreme value
if metric_to_fit_op is not None:
best_metric_guess = (
df[metric_to_fit].max() if metric_to_fit_op == "max" else df[metric_to_fit].min()
)
best_epoch_guess = (
df.loc[df[metric_to_fit].idxmax()]["training_iteration"]
if metric_to_fit_op == "max"
else df.loc[df[metric_to_fit].idxmin()]["training_iteration"]
)
else:
# assume the progress is in the correct direction
progress = df[metric_to_fit].iloc[-1] - df[metric_to_fit].iloc[0]
best_metric_guess = df[metric_to_fit].max() if progress > 0 else df[metric_to_fit].min()
best_epoch_guess = df["training_iteration"].iloc[-1]
metric_first = df[metric_to_fit][0]
B_init = -(best_metric_guess - metric_first)
b_init = 5 * 100 / (best_epoch_guess)
const_init = best_metric_guess
init = [0, 0, 0, B_init, b_init, 0, 0, const_init]
# f = A * exp(-a * (x-c)^2) + B * exp(-bx) + Cx + Dx^2 + const
# --> make sure the exponentials are always getting smaller with larger x!
bounds = (
[-np.inf, 0, -np.inf, -np.inf, 0, -np.inf, -np.inf, -np.inf],
[np.inf, np.inf, 0, np.inf, np.inf, np.inf, np.inf, np.inf],
)
if crossval:
parameters, covariance = _crossval_fit(
_custom_fit_function,
df["training_iteration"],
df[metric_to_fit],
bounds=bounds,
p0=init,
xtol=xtol,
maxfev=maxfev,
)
if parameters is None:
return
else:
parameters, covariance = curve_fit(
_custom_fit_function,
df["training_iteration"],
df[metric_to_fit],
bounds=bounds,
p0=init,
xtol=xtol,
maxfev=maxfev,
)
fitted_func = lambda x: _custom_fit_function(x, *parameters)
elif fit_function == "custom_2":
# fewer data points than twice the number of function parameters
if len(df) < 10:
return
if crossval:
parameters, covariance = _crossval_fit(
_custom_fit_function_2, df["training_iteration"], df[metric_to_fit], xtol=xtol, maxfev=maxfev
)
if parameters is None:
return
else:
parameters, covariance = curve_fit(
_custom_fit_function_2, df["training_iteration"], df[metric_to_fit], xtol=xtol, maxfev=maxfev
)
fitted_func = lambda x: _custom_fit_function_2(x, *parameters)
elif fit_function == "custom_3":
# fewer data points than twice the number of function parameters
if len(df) < 20:
return
# fit curve to metric; choose initial values to be exponential convergence to the observed extreme value
if metric_to_fit_op is not None:
best_metric_guess = (
df[metric_to_fit].max() if metric_to_fit_op == "max" else df[metric_to_fit].min()
)
best_epoch_guess = (
df.loc[df[metric_to_fit].idxmax()]["training_iteration"]
if metric_to_fit_op == "max"
else df.loc[df[metric_to_fit].idxmin()]["training_iteration"]
)
else:
best_metric_guess = df[metric_to_fit].median()
best_epoch_guess = df["training_iteration"].iloc[-1]
metric_first = df[metric_to_fit][0]
B_init = -(best_metric_guess - metric_first)
b_init = 5 * 100 / (best_epoch_guess)
const_init = best_metric_guess
init = [0, 0, 0, B_init, b_init, 0, 0, 0, 0, const_init]
# f = A * exp(-a * (x-c)^2) + B * exp(-bx) + Cx + Dx^2 + const
# --> make sure the exponentials are always getting smaller with larger x!
bounds = (
[-np.inf, 0, -np.inf, -np.inf, 0, -np.inf, -np.inf, -np.inf, -np.inf, -np.inf],
[np.inf, np.inf, 0, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf],
)
if crossval:
parameters, covariance = _crossval_fit(
_custom_fit_function_3,
df["training_iteration"],
df[metric_to_fit],
bounds=bounds,
p0=init,
xtol=xtol,
maxfev=maxfev,
)
if parameters is None:
return
else:
parameters, covariance = curve_fit(
_custom_fit_function_3,
df["training_iteration"],
df[metric_to_fit],
bounds=bounds,
p0=init,
xtol=xtol,
maxfev=maxfev,
)
fitted_func = lambda x: _custom_fit_function_3(x, *parameters)
else:
if crossval:
parameters, covariance = _crossval_fit(
fit_function, df["training_iteration"], df[metric_to_fit], xtol=xtol, maxfev=maxfev
)
if parameters is None:
return
else:
parameters, covariance = curve_fit(
fit_function, df["training_iteration"], df[metric_to_fit], xtol=xtol, maxfev=maxfev
)
fitted_func = lambda x: fit_function(x, *parameters)
return fitted_func
if len(df[~df["overtrained"]]) == 0:
return None, None, None
fitted_func = _fit(metric_name, metric_op)
if fitted_func is None:
return None, None, None
if plot:
# create a df that contains all values for plotting the fit
df_for_plotting = pd.DataFrame()
df_for_plotting["training_iteration"] = np.linspace(
df["training_iteration"].iloc[0], df["training_iteration"].iloc[-1], 1000
)
df_for_plotting[metric_name] = fitted_func(df_for_plotting["training_iteration"])
# when overtraining conditions are given, also fit all metrics necessary to evaluate the overtraining conditions
# using the fit values
if overtraining_conditions is not None:
df_for_plotting.loc[:, "overtrained"] = 0.0
# iterate over all overtraining conditions to get a full set of metrics that need to be fitted
metrics_for_overtraining_check = []
for _, input_metric_names, _ in overtraining_conditions:
metrics_for_overtraining_check += list(input_metric_names)
metrics_for_overtraining_check = list(set(metrics_for_overtraining_check)) # remove duplicate entries
if metric_name in metrics_for_overtraining_check:
metrics_for_overtraining_check.remove(
metric_name
) # remove metric_name as we already have done the fit for that
# iterate over all metrics for the overtraining conditions and apply a fit to each, then fill the dfs
for metric_to_fit in metrics_for_overtraining_check:
fitted_func_overtraining = _fit(metric_to_fit)
# when one of the metrics could not be fitted well, we'll skip this trial (one could also fall back
# to simply checking the overtraining flag of the raw values, but that would not be consistent with
# the over trials)
if fitted_func_overtraining is None:
return None, None, None
df_for_plotting[metric_to_fit] = fitted_func_overtraining(df_for_plotting["training_iteration"])
# update the overtrained column using the fit values
df_for_plotting = check_overtraining(df_for_plotting, overtraining_conditions)
# plot the raw metric values
alpha_array = copy.deepcopy(df["overtrained"].to_numpy(dtype=np.float64))
alpha_array[alpha_array == True] = 0.2 # noqa: E712
alpha_array[alpha_array == False] = 1.0 # noqa: E712
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
plt.scatter(
df["training_iteration"], df[metric_name], s=11.0, color=color, alpha=alpha_array, label=metric_name
)
plt.xlabel("training iteration")
# find the best epoch according to the fit (while checking for overtraining, if requested)
if return_at_x is None:
fit_values_epochs = fitted_func(df["training_iteration"])
# when overtraining conditions are given, also fit all metrics necessary to evaluate the overtraining conditions
# using the fit values
if overtraining_conditions is not None:
# build a new df and fill all necessary columns with fit values; also create a df with intermediate values
# when plotting is requested
df_from_fit = df.copy()
df_from_fit[metric_name] = fit_values_epochs
df_from_fit.loc[:, "overtrained"] = 0.0
# iterate over all overtraining conditions to get a full set of metrics that need to be fitted
metrics_for_overtraining_check = []
for _, input_metric_names, _ in overtraining_conditions:
metrics_for_overtraining_check += list(input_metric_names)
metrics_for_overtraining_check = list(set(metrics_for_overtraining_check)) # remove duplicate entries
if metric_name in metrics_for_overtraining_check:
metrics_for_overtraining_check.remove(
metric_name
) # remove metric_name as we already have done the fit for that
# iterate over all metrics for the overtraining conditions and apply a fit to each, then fill the dfs
for metric_to_fit in metrics_for_overtraining_check:
fitted_func_overtraining = _fit(metric_to_fit)
# when one of the metrics could not be fitted well, we'll skip this trial (one could also fall back
# to simply checking the overtraining flag of the raw values, but that would not be consistent with
# the other trials)
if fitted_func_overtraining is None:
return None, None, None
df_from_fit[metric_to_fit] = fitted_func_overtraining(df_from_fit["training_iteration"])
# update the overtrained column using the fit values
df_from_fit = check_overtraining(df_from_fit, overtraining_conditions)
# replace values for overtrained epochs with NaN --> will never be extremum
fit_values_epochs = fit_values_epochs.where(df_from_fit["overtrained"] == False) # noqa: E712
# when no overtraining conditions are given, but reject_overtrained is True, then check the overtraining
# conditions using the raw metric values (by looking at the "overtrained" column)
elif reject_overtrained:
# replace values for overtrained epochs with NaN --> will never be extremum
fit_values_epochs = fit_values_epochs.where(df["overtrained"] == False) # noqa: E712
# find the extremum; need to use the indices because for PBT, epochs don't always increase monotonically
best_index_fit = fit_values_epochs.idxmax() if metric_op == "max" else fit_values_epochs.idxmin()
if np.isnan(best_index_fit):
return None, None, None # if every epoch is overtrained, best index will be NaN
best_epoch_fit = df.iloc[best_index_fit]["training_iteration"]
best_checkpoint_fit = df.iloc[best_index_fit]["iterations_since_restore"] - 1
best_metric_fit = fit_values_epochs.iloc[best_index_fit]
if plot:
# plot the data
plt.scatter(best_epoch_fit, best_metric_fit, s=40.0, marker="x", c="r")
# plot the fit; when the overtraining conditions are given, use the created plotting df that includes
# "continous" overtrained flag so that the plotted line can be colored accordingly
if overtraining_conditions:
# get indices where overtraining label changes value and the values it changes to
diffs = df_for_plotting["overtrained"][df_for_plotting["overtrained"].diff() != 0]
diff_indices = diffs.index.tolist()
diff_values = diffs.values.tolist()
for i, (diff_index, diff_value) in enumerate(zip(diff_indices, diff_values)):
if i + 1 == len(diff_indices):
iterations = df_for_plotting["training_iteration"][diff_index:]
metrics = df_for_plotting[metric_name][diff_index:]
else:
next_diff_index = diff_indices[i + 1]
iterations = df_for_plotting["training_iteration"][diff_index:next_diff_index]
metrics = df_for_plotting[metric_name][diff_index:next_diff_index]
if diff_value is False:
alpha = 1.0
else:
alpha = 0.2
label = f"{metric_name} fit" if i == 0 else None
plt.plot(iterations, metrics, color=color, alpha=alpha, label=label)
else:
plt.plot(df_for_plotting["training_iteration"], df_for_plotting[metric_name], color=color)
if figs_dir is not None:
if not os.path.exists(figs_dir):
os.makedirs(figs_dir, exist_ok=True)
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
plt.tight_layout()
plt.savefig(figs_dir, dpi=600)
plt.clf()
return best_epoch_fit, best_checkpoint_fit, best_metric_fit
else:
if plot:
if overtraining_conditions:
# get indices where overtraining label changes value and the values it changes to
diffs = df_for_plotting["overtrained"][df_for_plotting["overtrained"].diff() != 0]
diff_indices = diffs.index.tolist()
diff_values = diffs.values.tolist()
for i, (diff_index, diff_value) in enumerate(zip(diff_indices, diff_values)):
if i + 1 == len(diff_indices):
iterations = df_for_plotting["training_iteration"][diff_index:]
metrics = df_for_plotting[metric_name][diff_index:]
else:
next_diff_index = diff_indices[i + 1]
iterations = df_for_plotting["training_iteration"][diff_index:next_diff_index]
metrics = df_for_plotting[metric_name][diff_index:next_diff_index]
if diff_value is False:
alpha = 1.0
else:
alpha = 0.2
label = f"{metric_name} fit" if i == 0 else None
plt.plot(iterations, metrics, color=color, alpha=alpha, label=label)
else:
plt.plot(
df_for_plotting["training_iteration"],
df_for_plotting[metric_name],
color=color,
label=f"{metric_name} fit",
)
if figs_dir is not None:
if not os.path.exists(figs_dir):
os.makedirs(figs_dir, exist_ok=True)
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
plt.tight_layout()
plt.savefig(figs_dir, dpi=600)
plt.clf()
return fitted_func(return_at_x)
if not isinstance(optimize_name, list):
optimize_name = [optimize_name]
if not isinstance(optimize_op, list):
optimize_op = [optimize_op]
# ungroup the metric names (to give e.g. both train_loss and val_loss their own columns)
metric_names_ungrouped = []
metric_names_grouped = []
for group in metric_names:
if not isinstance(group, list) and not isinstance(group, tuple):
metric_names_ungrouped.append(group)
metric_names_grouped.append([group])
else:
for metric in group:
metric_names_ungrouped.append(metric)
metric_names_grouped.append(group)
fit_best = pd.DataFrame(
index=optimize_name, columns=["trial", "best epoch", "best checkpoint", "best metric fit"], dtype=np.float64
)
for metric, op in zip(optimize_name, optimize_op):
for trial, df in dfs.items():
# skip if df is empty
if len(df) == 0:
continue
best_epoch_fit, best_checkpoint_fit, best_metric_fit = _fit_and_plot(
df,
metric,
fit_function,
fit_xtol,
fit_maxfev,
metric_op=op,
plot=False,
reject_overtrained=reject_overtrained,
overtraining_conditions=overtraining_conditions,
crossval=True,
)
if best_epoch_fit is None:
continue # can be None if every epoch is overtrained or NaN
if (
best_metric_fit > fit_best.loc[metric, "best metric fit"]
if op == "max"
else best_metric_fit < fit_best.loc[metric, "best metric fit"]
) or np.isnan(fit_best.loc[metric, "best metric fit"]):
fit_best.loc[metric] = [trial, best_epoch_fit, best_checkpoint_fit, best_metric_fit]
# for all the best trials, now also fit the remaining metrics
metrics_fit_best = pd.DataFrame(
index=optimize_name,
columns=metric_names_ungrouped + ["trial", "best epoch", "best checkpoint"],
dtype=np.float64,
)
metrics_fit_best.index.name = "target"
for target_metric_name in optimize_name:
trial, epoch, checkpoint_num, _ = fit_best.loc[target_metric_name]
if not isinstance(trial, str):
return # no fit succeeded
for group in metric_names_grouped:
for metric, plot_color in zip(group, [f"C{i}" for i in range(10)]):
df = dfs[trial]
fit_value = _fit_and_plot(
df,
metric,
fit_function,
fit_xtol,
fit_maxfev,
return_at_x=epoch,
color=plot_color,
overtraining_conditions=overtraining_conditions,
)
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
plt.plot(epoch, fit_value, "rx")
metrics_fit_best.loc[target_metric_name, metric] = fit_value
plt.title(f"target: {target_metric_name}")
plt.legend()
plt.gcf().set_figheight(6)
plt.gcf().set_figwidth(8)
plt.tight_layout()
if figs_dir is not None:
if not os.path.exists(
os.path.join(
figs_dir,
target_metric_name if len(optimize_name) > 1 else "",
"best_fit",
"plots",
"optimization",
)
):
os.makedirs(
os.path.join(
figs_dir,
target_metric_name if len(optimize_name) > 1 else "",
"best_fit",
"plots",
"optimization",
),
exist_ok=True,
)
plt.savefig(
os.path.join(
figs_dir,
target_metric_name if len(optimize_name) > 1 else "",
"best_fit",
"plots",
"optimization",
"+".join(group) + ".png",
),
dpi=600,
)
else:
plt.show()
plt.clf()
metrics_fit_best.loc[target_metric_name, ["trial", "best epoch", "best checkpoint"]] = [
trial,
epoch,
checkpoint_num,
]
return metrics_fit_best
def calc_native_metric(
run_config: ModuleType,
native_metric: Any,
y_true: np.ndarray,
y_pred: np.ndarray,
sample_weight: Optional[np.ndarray] = None,
) -> Union[int, float]:
"""Calculates the value of a native (stateful) metric for the provided targets and predictions.
The type of metric provided is assumed to correspond to the backend set in ``model_type`` in the `run-config`. This
is not verified.
Before and after the calculation, the state of the metric is reset (if necessary).
Parameters
----------
run_config : ModuleType
Reference to the imported run-config file.
native_metric : Any
The native metric.
y_true : np.ndarray
Numpy array of target labels.
y_pred : np.ndarray
Numpy array of model predictions
sample_weight : Optional[np.ndarray]
Numpy array of sample weights. (Default value = None)
Returns
-------
Union[int, float]
The value of the native metric.
"""
if run_config.model_type == "Keras":
native_metric.reset_state()
native_metric.update_state(y_true, y_pred, sample_weight=sample_weight)
result = native_metric.result().numpy()
native_metric.reset_state()
return result
elif run_config.model_type == "Lightning":
y_true = y_true.copy()
y_true_tensor = torch.from_numpy(y_true).type(torch.float)
y_pred_tensor = torch.from_numpy(y_pred).type(torch.float)
metric = native_metric(y_pred_tensor, y_true_tensor).cpu().detach().numpy()
return metric
Functions
def calc_native_metric(run_config: module, native_metric: Any, y_true: numpy.ndarray, y_pred: numpy.ndarray, sample_weight: Optional[numpy.ndarray] = None) ‑> Union[float, int]-
Calculates the value of a native (stateful) metric for the provided targets and predictions.
The type of metric provided is assumed to correspond to the backend set in
model_typein therun-config. This is not verified.Before and after the calculation, the state of the metric is reset (if necessary).
Parameters
run_config:ModuleType- Reference to the imported run-config file.
native_metric:Any- The native metric.
y_true:np.ndarray- Numpy array of target labels.
y_pred:np.ndarray- Numpy array of model predictions
sample_weight:Optional[np.ndarray]- Numpy array of sample weights. (Default value = None)
Returns
Union[int, float]- The value of the native metric.
Expand source code
def calc_native_metric( run_config: ModuleType, native_metric: Any, y_true: np.ndarray, y_pred: np.ndarray, sample_weight: Optional[np.ndarray] = None, ) -> Union[int, float]: """Calculates the value of a native (stateful) metric for the provided targets and predictions. The type of metric provided is assumed to correspond to the backend set in ``model_type`` in the `run-config`. This is not verified. Before and after the calculation, the state of the metric is reset (if necessary). Parameters ---------- run_config : ModuleType Reference to the imported run-config file. native_metric : Any The native metric. y_true : np.ndarray Numpy array of target labels. y_pred : np.ndarray Numpy array of model predictions sample_weight : Optional[np.ndarray] Numpy array of sample weights. (Default value = None) Returns ------- Union[int, float] The value of the native metric. """ if run_config.model_type == "Keras": native_metric.reset_state() native_metric.update_state(y_true, y_pred, sample_weight=sample_weight) result = native_metric.result().numpy() native_metric.reset_state() return result elif run_config.model_type == "Lightning": y_true = y_true.copy() y_true_tensor = torch.from_numpy(y_true).type(torch.float) y_pred_tensor = torch.from_numpy(y_pred).type(torch.float) metric = native_metric(y_pred_tensor, y_true_tensor).cpu().detach().numpy() return metric def check_overtraining(dfs, overtraining_conditions)-
summary.
Parameters
dfs:_type_- description
overtraining_conditions:_type_- description
Returns
_type_- description
Expand source code
def check_overtraining(dfs, overtraining_conditions): """_summary_. Parameters ---------- dfs : _type_ _description_ overtraining_conditions : _type_ _description_ Returns ------- _type_ _description_ """ def _check_overtraining_for_df(df): """_summary_. Parameters ---------- df : _type_ _description_ Returns ------- _type_ _description_ """ # skip if df is empty if len(df) == 0: return df df_overtraining_checked = copy.deepcopy(df) overtrained = np.zeros_like(df_overtraining_checked.index, dtype=bool) for _, input_metric_names, condition in overtraining_conditions: input_values = [df[input_name] for input_name in input_metric_names] overtrained += condition(*input_values) # boolean addition is supported by numpy df_overtraining_checked["overtrained"] = overtrained return df_overtraining_checked if isinstance(dfs, dict): dfs_overtraining_checked = {} for trial, df in dfs.items(): df_overtraining_checked = _check_overtraining_for_df(df) dfs_overtraining_checked[trial] = df_overtraining_checked elif isinstance(dfs, pd.DataFrame): dfs_overtraining_checked = _check_overtraining_for_df(dfs) else: raise TypeError return dfs_overtraining_checked def clean_analysis_results(dfs, metric_names)-
summary.
Parameters
dfs:_type_- description
metric_names:_type_- description
Returns
_type_- description
Expand source code
def clean_analysis_results(dfs, metric_names): """_summary_. Parameters ---------- dfs : _type_ _description_ metric_names : _type_ _description_ Returns ------- _type_ _description_ """ # ungroup the metric names metric_names_ungrouped = [] for group in metric_names: if not isinstance(group, list) and not isinstance(group, tuple): metric_names_ungrouped.append(group) else: for metric in group: metric_names_ungrouped.append(metric) dfs_cleaned = {} for trial, df in dfs.items(): df_cleaned = df.replace([np.inf, -np.inf], np.nan, inplace=False) # first replace infs with NaNs df_cleaned.dropna( subset=metric_names_ungrouped, how="any", inplace=True ) # then drop all the rows containing NaNs df_cleaned.reset_index(drop=True, inplace=True) # reindex to get rid of the gaps dfs_cleaned[trial] = df_cleaned return dfs_cleaned def draw_total_progress(dfs, optimize_name, optimize_op, metric_names, figs_dir=None, reject_overtrained=False)-
summary.
Parameters
dfs:_type_- description
optimize_name:_type_- description
optimize_op:_type_- description
metric_names:_type_- can be grouped by giving list of lists or list of tuples -> groups will be plotted together in one diagram
figs_dir:_type_- description (Default value = None)
reject_overtrained:_type_- description (Default value = False)
Expand source code
def draw_total_progress(dfs, optimize_name, optimize_op, metric_names, figs_dir=None, reject_overtrained=False): """_summary_. Parameters ---------- dfs : _type_ _description_ optimize_name : _type_ _description_ optimize_op : _type_ _description_ metric_names : _type_ can be grouped by giving list of lists or list of tuples -> groups will be plotted together in one diagram figs_dir : _type_ _description_ (Default value = None) reject_overtrained : _type_ _description_ (Default value = False) """ if not isinstance(optimize_name, list): optimize_name = [optimize_name] if not isinstance(optimize_op, list): optimize_op = [optimize_op] # ungroup the metric names (to give e.g. both train_loss and val_loss their own columns) metric_names_ungrouped = [] metric_names_grouped = [] for group in metric_names: if not isinstance(group, list) and not isinstance(group, tuple): metric_names_ungrouped.append(group) metric_names_grouped.append([group]) else: for metric in group: metric_names_ungrouped.append(metric) metric_names_grouped.append(group) # create a single large dataframe and sort it by "timestamp" large_df = pd.concat(dfs.values(), ignore_index=True, sort=False) large_df = large_df.sort_values(by=["timestamp"], ignore_index=True) # get alpha values from the overtraining flag alpha_array = copy.deepcopy( large_df["overtrained"].to_numpy(dtype=np.float64) ) # need the copy here, otherwise we modify df alpha_array[alpha_array == True] = 0.2 # noqa: E712 alpha_array[alpha_array == False] = 1.0 # noqa: E712 for metric_group in metric_names_grouped: for metric in metric_group: plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) # plt.scatter(large_df["timestamp"]-start_time, large_df[metric], s=3., label=metric, alpha=alpha_array) plt.scatter(large_df.index, large_df[metric], s=3.0, label=metric, alpha=alpha_array) plt.title("Optimization overview: {}".format(", ".join(metric_group))) # plt.xlabel("runtime [s]") plt.xlabel("iterations") plt.legend() plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) plt.tight_layout() if figs_dir is not None: if not os.path.exists(os.path.join(figs_dir, "overview_plots")): os.makedirs(os.path.join(figs_dir, "overview_plots"), exist_ok=True) plt.savefig(os.path.join(figs_dir, "overview_plots", "{}.png".format("+".join(metric_group))), dpi=600) else: plt.show() plt.clf() # extract only those epochs that improved the optimization metrics for target_metric, target_op in zip(optimize_name, optimize_op): # calculate cumulative min/max for each target metric and remove duplicates (which only leaves entries which improved # the target metric) cumextr = large_df[target_metric].cummin() if target_op == "min" else large_df[target_metric].cummax() cumextr_index = cumextr.drop_duplicates().index # get the indices of large_df and replace all values that are not in cumext_index (i. e. those that did not result # in an improvement) with NaN, then frontfill the NaNs to get repeating indices repeating_indices = large_df.index.to_series().where(large_df.index.isin(cumextr_index)) repeating_indices.fillna(method="ffill", inplace=True) # apply repeating indices to large_df to only have the entries that were improvements pruned_large_df = large_df.iloc[repeating_indices][metric_names_ungrouped] # we want to keep the original "timestamp" and indices, so reinsert them pruned_large_df.index = large_df.index pruned_large_df["timestamp"] = large_df["timestamp"] # plot progress for all metrics for metric_group in metric_names_grouped: for metric in metric_group: plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) # plt.plot(pruned_large_df["timestamp"]-start_time, pruned_large_df[metric], label=metric) plt.plot(pruned_large_df.index, pruned_large_df[metric], label=metric) plt.title("Optimization progress: {}, target: {}".format(", ".join(metric_group), target_metric)) # plt.xlabel("runtime [s]") plt.xlabel("iterations") plt.legend() plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) plt.tight_layout() if figs_dir is not None: if not os.path.exists(os.path.join(figs_dir, "progress_plots")): os.makedirs(os.path.join(figs_dir, "progress_plots"), exist_ok=True) plt.savefig( os.path.join( figs_dir, "progress_plots", target_metric if len(optimize_name) > 1 else "", "{}.png".format("+".join(metric_group)), ), dpi=600, ) else: plt.show() plt.clf() def evaluate_experiment(analysis: ray.tune.analysis.experiment_analysis.ExperimentAnalysis, training_func: Callable, run_config: module, optimize_name: str, optimize_op: Union[Literal['max'], Literal['min']], search_space: dict[str, typing.Union[int, float, str, list, tuple, dict]], results_dir: str, inputs_split: list[ray._raylet.ObjectRef], targets_split: list[ray._raylet.ObjectRef], weights_split: list[ray._raylet.ObjectRef], normalized_weights_split: list[ray._raylet.ObjectRef], input_handler: InputHandler, custom_metrics: Optional[list[tuple[str, typing.Callable]]] = None, composite_metrics: Optional[list[tuple[str, tuple[str, str], typing.Callable]]] = None, native_metrics: Optional[list] = None, weighted_native_metrics: Optional[list] = None, cpus_per_model: int = 1, gpus_per_model: int = 0, overtraining_conditions: Optional[list] = None, write_results: bool = True, return_results_str: bool = False, return_unfilled: bool = False, return_crossval_models: bool = False, PBT: bool = False, PBT_mutation_handler: Optional[PBTMutationHandler] = None, seed: Optional[int] = None) ‑> Union[tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame], tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, str], tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, str, list], tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, dict, dict], tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, str, dict, dict], tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, str, list, dict, dict]]-
Performs the evaluation of an experiment to find the best trial, run the crossvalidation and evaluate the models.
After removing any reports containing
inforNaNvalues, the dictionary of dataframes containing the reports of all trials is saved to'dfs.pickle'in the providedresults_dir. If this file already exists, it is not overwritten.Two sets of plots are produced to give an overview of the entire experiment:
- A set of overview plots (one per metric) containing the values of the target metric, all
custom metricson the training and validation datasets and allcomposite metricsfor each report of every trial, i.e. the metric values after every epoch throughout the entire experiment, sorted by the time of the report. Reports corresponding to overfitted epochs (i.e. if any of theoverfitting conditionsdefined in the run-config if not satisfied) are shown semi-transparent. The plots are saved in the subdirectory'overview_plots'in the providedresults_dir. - A set of progress plots (one per metric) containing the value of the target metric, all
custom metricson the training and validation datasets and allcomposite metricscorresponding to the best epoch up to a certain point during the optimization. E.g., at epoch 100, the values of the best epoch in the first 100 epochs is drawn. Thus, the shown curves can be interpreted as a convergence of the optimization.
From the reported metric values, the best trial is selected using two independent methods (except if
PBTisTrue, in which case only thebest value-method is used):best value: the best trial is given by the best reported value of the target metric while passing alloverfitting conditions.best fit: the evolution of all metrics of each trial is fitted (seeevaluation.get_best_trials_from_fitfor details). The best trial is given by the best target metric fit function value that passed all overfitting conditions. The overfitting conditions are evaluated using the fit function values of all necessary metrics at each epoch (ifrun_config.check_overtraining_with_fitisTrue) or using the reported values (ifrun_config.check_overtraining_with_fitisFalse).
For both methods, the hyperparameters corresponding to the best trial and the number of epochs to reach the best target metric value are extracted. If
PBTisFalse, the hyperparameters correspond to the config provided by Tune viaanalysis.get_all_configs(). IfPBTisTrue, theget_policy()-function of the providedPBT_mutation_handleris called to check if a hyperparameter schedule is available for the best trial. If this is the case, this policy is used. If not, the config provided by Tune is used. Both the hyperparameters and the number of epochs are given to theperform_crossvalidation-function to perform the k-fold crossvalidation, resulting inktrained models per method.The
evaluate-function is called for the 2*k models trained during the crossvalidation. After the evaluation, the mean and standard deviation of thekvalues for each metric provided by the evaluation is calculated for both sets of hyperparameters. The results are printed to console.If
write_resultsis True, the results and a summary of the experiment (the used input variables as given by the providedinput_handler-instance, the shape of the training, validation and (if used) testing dataset as well as the search space) are saved toresults.txtin theresults_dir.The full results of the experiment evaluation is saved to
evaluation.picklein theresults_dirwhich allows to reload the evaluation results. This is useful when e.g. resuming a partially finished optimization run because the evaluation of finished steps does not need to be repeated. Thus, the evaluation is automatically skipped when aevaluation.pickle-file is present inresults_dirand the results are instead reloaded from that file.Parameters
analysis:tune.ExperimentAnalysis- The
tune.ExperimentAnalysis-object extracted from thetune.ResultsGridreturned by theTuner. training_func:Callable- Reference to the function performing the training. This is given to
perform_crossvalidation. run_config:ModuleType- Reference to the imported run-config file.
optimize_name:str- Name of the target metric.
optimize_op:Union[Literal["max"], Literal["min"]]- Specifies if the target metric is to be maximized or minimized. Can be either
'max'or'min'. search_space:dict[str, run_config_search_space_entry_type]- The search space as defined in the run-config.
results_dir:str- Path to the directory where the results are to be saved.
inputs_split:list[ray.ObjectRef]- List containing the object reference to the numpy array of input features for the training, validation and (if used) testing sets.
targets_split:list[ray.ObjectRef]- List containing the object reference to the numpy array of target labels for the training, validation and (if used) testing sets.
weights_split:list[ray.ObjectRef]- List containing the object reference to the numpy array of event weights for the training, validation and (if used) testing sets.
normalized_weights_split:list[ray.ObjectRef]- List containing the object reference to the numpy array of normalized event weights for the training, validation and (if used) testing sets.
input_handler:InputHandler- Instance of the
InputHandler-class custom_metrics:Optional[list[tuple[str, Callable]]]- A list of
custom metricsas defined in the run-config. (Default value = None) composite_metrics:Optional[list[tuple[str, tuple[str, str], Callable]]]- A list of
composite metricsas defined in the run-config. (Default value = None) native_metrics:Optional[list]- A list of native metrics as defined in the run-config. (Default value = None)
weighted_native_metrics:Optional[list]- A list of weighted native metrics as defined in the run-config. (Default value = None)
cpus_per_model:int- The number of CPUs to use to train each model. This is given to
perform_crossvalidation. (Default value = 1) gpus_per_model:int- The number of GPUs to use to train each model. This is given to
perform_crossvalidation. (Default value = 0) overtraining_conditions:Optional[list]- A list of
overtraining conditionsas defined in the run-config. (Default value = None) write_results:bool- If
True, the results are written toresults.txtinresults_dir. (Default value = True) return_results_str:bool- If
True, the results string that is printed to console is also returned. (Default value = False) return_unfilled:bool- If
True, the evaluation part of the results string (containing the metric values returned by theevaluate-function) will be provided "unfilled", i.e. with{}instead of the metric values. Additionally, the list of raw metric values is provided. This can be useful for testing. (Default value = False) return_crossval_models:bool- If
True, a dictionary containing file names of the saved crossvalidation models, the corresponding indices denoting which of thekcrossvalidation-splittings was used for the training, the model config and a dictionary containing the training, validation and (if used) testing datasets for each splitting are returned. (Default value = False) PBT:bool- Signifies if this is the evaluation of the Population Based Training step. This is given to
perform_crossvalidationand used to skip the fit evaluation. (Default value = False) PBT_mutation_handler:Optional[PBTMutationHandler]- An instance of the
PBTMutationHandler-class that helps to handle the PBT mutation policies. Only used ofPBTisTrue(Default value = None) seed:Optional[int]- Seed given to
perform_crossvalidation. (Default value = None)
Returns
Union[- tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, dict, dict], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, dict, dict], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list, dict, dict],
] Two pandas dataframes containing the values of all metrics, the trial-id, the training iteration and the path to the checkpoint directory of the two best checkpoints and a dataframe containing the corresponding hyperparameters (that were used to train the crossvalidation models) are returned. If
return_results_strisTrue, the results string that was printed to console is also returned. Finally, ifreturn_crossval_modelsisTrue, dictionaries containing the file names of the saved crossvalidation models, the corresponding indices denoting which of thekcrossvalidation-splittings was used for the training, the model config and the corresponding training, validation and (if used) testing data are returned as well.Expand source code
def evaluate_experiment( analysis: tune.ExperimentAnalysis, training_func: Callable, run_config: ModuleType, optimize_name: str, optimize_op: Union[Literal["max"], Literal["min"]], search_space: dict[str, run_config_search_space_entry_type], results_dir: str, inputs_split: list[ray.ObjectRef], targets_split: list[ray.ObjectRef], weights_split: list[ray.ObjectRef], normalized_weights_split: list[ray.ObjectRef], input_handler: OPTIMA.builtin.inputs.InputHandler, custom_metrics: Optional[list[tuple[str, Callable]]] = None, composite_metrics: Optional[list[tuple[str, tuple[str, str], Callable]]] = None, native_metrics: Optional[list] = None, weighted_native_metrics: Optional[list] = None, cpus_per_model: int = 1, gpus_per_model: int = 0, overtraining_conditions: Optional[list] = None, write_results: bool = True, return_results_str: bool = False, return_unfilled: bool = False, return_crossval_models: bool = False, PBT: bool = False, PBT_mutation_handler: Optional[OPTIMA.core.training.PBTMutationHandler] = None, seed: Optional[int] = None, ) -> Union[ tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, dict, dict], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, dict, dict], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list, dict, dict], ]: """Performs the evaluation of an experiment to find the best trial, run the crossvalidation and evaluate the models. After removing any reports containing ``inf`` or ``NaN`` values, the dictionary of dataframes containing the reports of all trials is saved to ``'dfs.pickle'`` in the provided ``results_dir``. If this file already exists, it is not overwritten. Two sets of plots are produced to give an overview of the entire experiment: - A set of overview plots (one per metric) containing the values of the target metric, all `custom metrics` on the training and validation datasets and all `composite metrics` for each report of every trial, i.e. the metric values after every epoch throughout the entire experiment, sorted by the time of the report. Reports corresponding to overfitted epochs (i.e. if any of the `overfitting conditions` defined in the run-config if not satisfied) are shown semi-transparent. The plots are saved in the subdirectory ``'overview_plots'`` in the provided ``results_dir``. - A set of progress plots (one per metric) containing the value of the target metric, all `custom metrics` on the training and validation datasets and all `composite metrics` corresponding to the best epoch up to a certain point during the optimization. E.g., at epoch 100, the values of the best epoch in the first 100 epochs is drawn. Thus, the shown curves can be interpreted as a convergence of the optimization. From the reported metric values, the best trial is selected using two independent methods (except if ``PBT`` is ``True``, in which case only the `best value`-method is used): - `best value`: the best trial is given by the best reported value of the target metric while passing all `overfitting conditions`. - `best fit`: the evolution of all metrics of each trial is fitted (see ``evaluation.get_best_trials_from_fit`` for details). The best trial is given by the best target metric fit function value that passed all overfitting conditions. The overfitting conditions are evaluated using the fit function values of all necessary metrics at each epoch (if ``run_config.check_overtraining_with_fit`` is ``True``) or using the reported values (if ``run_config.check_overtraining_with_fit`` is ``False``). For both methods, the hyperparameters corresponding to the best trial and the number of epochs to reach the best target metric value are extracted. If ``PBT`` is ``False``, the hyperparameters correspond to the config provided by Tune via ``analysis.get_all_configs()``. If ``PBT`` is ``True``, the ``get_policy()``-function of the provided ``PBT_mutation_handler`` is called to check if a hyperparameter schedule is available for the best trial. If this is the case, this policy is used. If not, the config provided by Tune is used. Both the hyperparameters and the number of epochs are given to the ``perform_crossvalidation``-function to perform the k-fold crossvalidation, resulting in `k` trained models per method. The ``evaluate``-function is called for the 2*k models trained during the crossvalidation. After the evaluation, the mean and standard deviation of the `k` values for each metric provided by the evaluation is calculated for both sets of hyperparameters. The results are printed to console. If ``write_results`` is True, the results and a summary of the experiment (the used input variables as given by the provided ``input_handler``-instance, the shape of the training, validation and (if used) testing dataset as well as the search space) are saved to `results.txt` in the ``results_dir``. The full results of the experiment evaluation is saved to `evaluation.pickle` in the ``results_dir`` which allows to reload the evaluation results. This is useful when e.g. resuming a partially finished optimization run because the evaluation of finished steps does not need to be repeated. Thus, the evaluation is automatically skipped when a `evaluation.pickle`-file is present in ``results_dir`` and the results are instead reloaded from that file. Parameters ---------- analysis : tune.ExperimentAnalysis The ``tune.ExperimentAnalysis``-object extracted from the ``tune.ResultsGrid`` returned by the ``Tuner``. training_func : Callable Reference to the function performing the training. This is given to ``perform_crossvalidation``. run_config : ModuleType Reference to the imported run-config file. optimize_name : str Name of the target metric. optimize_op : Union[Literal["max"], Literal["min"]] Specifies if the target metric is to be maximized or minimized. Can be either ``'max'`` or ``'min'``. search_space : dict[str, run_config_search_space_entry_type] The search space as defined in the run-config. results_dir : str Path to the directory where the results are to be saved. inputs_split : list[ray.ObjectRef] List containing the object reference to the numpy array of input features for the training, validation and (if used) testing sets. targets_split : list[ray.ObjectRef] List containing the object reference to the numpy array of target labels for the training, validation and (if used) testing sets. weights_split : list[ray.ObjectRef] List containing the object reference to the numpy array of event weights for the training, validation and (if used) testing sets. normalized_weights_split : list[ray.ObjectRef] List containing the object reference to the numpy array of normalized event weights for the training, validation and (if used) testing sets. input_handler : OPTIMA.builtin.inputs.InputHandler Instance of the ``OPTIMA.builtin.inputs.InputHandler``-class custom_metrics : Optional[list[tuple[str, Callable]]] A list of `custom metrics` as defined in the run-config. (Default value = None) composite_metrics : Optional[list[tuple[str, tuple[str, str], Callable]]] A list of `composite metrics` as defined in the run-config. (Default value = None) native_metrics : Optional[list] A list of native metrics as defined in the run-config. (Default value = None) weighted_native_metrics : Optional[list] A list of weighted native metrics as defined in the run-config. (Default value = None) cpus_per_model : int The number of CPUs to use to train each model. This is given to ``perform_crossvalidation``. (Default value = 1) gpus_per_model : int The number of GPUs to use to train each model. This is given to ``perform_crossvalidation``. (Default value = 0) overtraining_conditions : Optional[list] A list of `overtraining conditions` as defined in the run-config. (Default value = None) write_results : bool If ``True``, the results are written to `results.txt` in ``results_dir``. (Default value = True) return_results_str : bool If ``True``, the results string that is printed to console is also returned. (Default value = False) return_unfilled : bool If ``True``, the evaluation part of the results string (containing the metric values returned by the ``evaluate``-function) will be provided "unfilled", i.e. with ``{}`` instead of the metric values. Additionally, the list of raw metric values is provided. This can be useful for testing. (Default value = False) return_crossval_models : bool If ``True``, a dictionary containing file names of the saved crossvalidation models, the corresponding indices denoting which of the `k` crossvalidation-splittings was used for the training, the model config and a dictionary containing the training, validation and (if used) testing datasets for each splitting are returned. (Default value = False) PBT : bool Signifies if this is the evaluation of the Population Based Training step. This is given to ``perform_crossvalidation`` and used to skip the fit evaluation. (Default value = False) PBT_mutation_handler : Optional[OPTIMA.core.training.PBTMutationHandler] An instance of the ``OPTIMA.core.training.PBTMutationHandler``-class that helps to handle the PBT mutation policies. Only used of ``PBT`` is ``True`` (Default value = None) seed : Optional[int] Seed given to ``perform_crossvalidation``. (Default value = None) Returns ------- Union[ tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, dict, dict], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, dict, dict], tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, list, dict, dict], ] Two pandas dataframes containing the values of all metrics, the trial-id, the training iteration and the path to the checkpoint directory of the two best checkpoints and a dataframe containing the corresponding hyperparameters (that were used to train the crossvalidation models) are returned. If ``return_results_str`` is ``True``, the results string that was printed to console is also returned. Finally, if ``return_crossval_models`` is ``True``, dictionaries containing the file names of the saved crossvalidation models, the corresponding indices denoting which of the `k` crossvalidation-splittings was used for the training, the model config and the corresponding training, validation and (if used) testing data are returned as well. """ if overtraining_conditions is None: overtraining_conditions = [] if native_metrics is None: native_metrics = [] if weighted_native_metrics is None: weighted_native_metrics = [] if custom_metrics is None: custom_metrics = [] if composite_metrics is None: composite_metrics = [] # if we are doing the evaluation for the PBT step, we want to skip the fit evaluation skip_fit_evaluation = PBT # check if evaluation was already done previously if not os.path.isfile(os.path.join(results_dir, "evaluation.pickle")): # create the results directory if not present if not os.path.exists(results_dir): os.makedirs(results_dir, exist_ok=True) # build a list containing the names of all metrics, grouped together like [[train_loss, val_loss], [train_accuracy, val_accuracy], ...] metric_names = [] optimize_name_included = False for metric, _ in native_metrics + weighted_native_metrics + custom_metrics: group = ("train_" + metric, "val_" + metric) metric_names.append(group) if optimize_name in group: optimize_name_included = True for metric, _, _ in composite_metrics: metric_names.append(metric) if metric == optimize_name: optimize_name_included = True if not optimize_name_included: metric_names = [optimize_name] + metric_names # get the results dataframes and remove all NaN and inf values in columns corresponding to metrics if not os.path.isfile(os.path.join(results_dir, "dfs.pickle")): dfs_dirty = analysis.trial_dataframes dfs = clean_analysis_results(dfs_dirty, metric_names) # now also save the dataframes (if not already present) with open(os.path.join(results_dir, "dfs.pickle"), "wb") as file: pickle.dump(dfs, file) else: print(f"{os.path.join(results_dir, 'dfs.pickle')} already exists, reloading...") with open(os.path.join(results_dir, "dfs.pickle"), "rb") as file: dfs = pickle.load(file) # go through dataframes and explicitly check if overtraining conditions are fulfilled, and add results (True/False) # as new column "overtrained" dfs_overtraining_checked = check_overtraining(dfs, overtraining_conditions) # produce two sets of plots showing the overall progress of the experiment, one set containing all trials as datapoints, # and one showing the evolution of the "best" trial; both as a function of epoch (total epochs across all trials) draw_total_progress( dfs_overtraining_checked, optimize_name, optimize_op, metric_names, figs_dir=results_dir, reject_overtrained=True, ) # find best trials by going through the dataframes and finding the extreme value of the metrics we are interested in optimization_results_string = "" # will be printed and saved to file later best_trials = get_best_trials( dfs_overtraining_checked, optimize_name, optimize_op, metric_names, figs_dir=results_dir, reject_overtrained=True, ) optimization_results_string += "Best trials according to best achieved value for the metrics to monitor:\n" optimization_results_string += tabulate.tabulate( best_trials, headers=[best_trials.index.name] + list(best_trials.columns), tablefmt="fancy_grid" ) if not skip_fit_evaluation: # get best trials from fit: apply fit to target metrics for each trial, get the minimum of the fit, find the nearest # checkpoint that is not overtrained, and write down the fit value at that point; then find the lowest fit value # over all trials; when conf.check_overtraining_with_fit is set, give the overtraining conditions to the function # which are then used to evaluate the overtraining using the fit function value for all relevant metrics, # otherwise the "overtrained" column in the dfs is used if run_config.check_overtraining_with_fit: best_trials_fit = get_best_trials_from_fit( run_config, dfs_overtraining_checked, optimize_name, optimize_op, metric_names, figs_dir=results_dir, overtraining_conditions=overtraining_conditions, min_R_squared=run_config.fit_min_R_squared, ) else: best_trials_fit = get_best_trials_from_fit( run_config, dfs_overtraining_checked, optimize_name, optimize_op, metric_names, figs_dir=results_dir, reject_overtrained=True, min_R_squared=run_config.fit_min_R_squared, ) if best_trials_fit is None: print( "WARNING: The fit evaluation failed. This indicates that no trial resulted in a sensible fit, which " "might indicate a problem with the optimization. Skipping the fit evaluation..." ) skip_fit_evaluation = True else: optimization_results_string += ( "\n\nBest trials according to best fit of the metrics to monitor: (all metric values from fit)\n" ) optimization_results_string += tabulate.tabulate( best_trials_fit, headers=[best_trials_fit.index.name] + list(best_trials_fit.columns), tablefmt="fancy_grid", ) # go through results by iterating over the best trials for each target metric, print the corresponding configs, # and save output paths for later evaluation # first prepare the dataframe that will contain the best configs, meaning the hyperparameters of the best trials # for each target metric, once determined using the best value and once using the fit. Since the first # checkpointing epoch and the checkpointing frequency are not relevant hyperparameters, skip those entries. optimize_name_list = optimize_name if isinstance(optimize_name, list) else [optimize_name] hps_names = list(search_space.keys()) hps_names.remove("first_checkpoint_epoch") hps_names.remove("checkpoint_frequency") if PBT: hps_names.remove("seed") model_configs_df = pd.DataFrame( index=hps_names + ["epochs", "seed"], columns=list(*zip(optimize_name_list, [f"{metric} fit" for metric in optimize_name_list])) if not skip_fit_evaluation else optimize_name_list, ) model_configs_df.index.name = "Hyperparameter" optimization_results_string += "\n\nBest configs:\n" # start by fetching dictionary containing the configs of all trials if not os.path.isfile(os.path.join(results_dir, "configs.pickle")): all_model_configs = analysis.get_all_configs() assert all_model_configs != {}, "Dictionary of configs could not be loaded, was the optimization deleted?" trial_ids = [trial.trial_id for trial in analysis.trials] with open(os.path.join(results_dir, "configs.pickle"), "wb") as file: pickle.dump((all_model_configs, trial_ids), file) else: print(f"{os.path.join(results_dir, 'configs.pickle')} already exists, reloading...") with open(os.path.join(results_dir, "configs.pickle"), "rb") as file: all_model_configs, trial_ids = pickle.load(file) # start with results from the best metric values dirs_to_evaluate = ( [] ) # will contain the paths to the directories containing the models to evaluate (best model from optimization + crossvalidation models) model_configs_to_evaluate = [] # will contain the configs of the best models for metric, best_trial, best_epoch in zip(best_trials.index, best_trials["trial"], best_trials["best epoch"]): # get the config of this trial model_config_to_evaluate = {} config = all_model_configs[best_trial].copy() # get the trial id for trial_id in trial_ids: if trial_id in best_trial: model_config_to_evaluate[ "trial_id" ] = trial_id # best_trial is full path to the optimization folder while trails in trial_list as only the names break # when evaluating the PBT step, check if there is a policy available for this trial. If yes, use a different # structure for the model_config_to_evaluate if PBT and PBT_mutation_handler.get_policy(model_config_to_evaluate["trial_id"]) is not None: # get the policy policy = PBT_mutation_handler.get_policy(model_config_to_evaluate["trial_id"]) # since first_checkpoint_epoch, checkpoint_frequency, max_epochs and seed are constants, they will be # the same for every config in the policy. Thus, move them outside of the hyperparameter schedule constants = { "first_checkpoint_epoch": policy[0][1]["first_checkpoint_epoch"], "checkpoint_frequency": policy[0][1]["checkpoint_frequency"], "max_epochs": policy[0][1]["max_epochs"], "seed": policy[0][1]["seed"], } # since the constant entries as well as "first_checkpoint_epoch" and "checkpoint_frequency" are not # needed in the schedule, remove the corresponding entries from each config in the policy for _, model_config in policy: del model_config["max_epochs"] del model_config["seed"] del model_config["first_checkpoint_epoch"] del model_config["checkpoint_frequency"] # add the policy and the constants to the model config to evaluate model_config_to_evaluate["hp_schedule"] = policy model_config_to_evaluate.update(constants) else: if PBT: print( f"Did not find a hyperparameter mutation policy for trial " f"{model_config_to_evaluate['trial_id']}. Using fixed hyperparameters: {config}" ) model_config_to_evaluate = config # add the best epoch and save it to the model_configs_to_evaluate list; this is later given to the # crossvalidation function to train multiple models for each config model_config_to_evaluate["epochs"] = int(best_epoch) model_configs_to_evaluate.append(model_config_to_evaluate) # Add the hyperparameters to the dataframe containing the configs of the best trials. For PBT with an # available policy, indicate the mutations. For hierarchical search spaces, not all hyperparameters must # have a value, so insert "-" for those. Since the checkpointing frequency and the first checkpoint epoch # are not relevant hyperparameters, skip those entries. if "hp_schedule" in model_config_to_evaluate.keys(): # grab the hyperparameter schedule policy = model_config_to_evaluate["hp_schedule"] for hp in model_configs_df.index: if hp in ["first_checkpoint_epoch", "checkpoint_frequency"]: continue elif hp == "epochs": model_configs_df.loc[hp, metric] = model_config_to_evaluate["epochs"] elif hp == "seed": model_configs_df.loc[hp, metric] = model_config_to_evaluate["seed"] else: # build the string of mutations by iterating through this hp's schedule hp_string = "" previous_hp_value = None for start_epoch, model_config in policy: # get the hp value for this schedule step hp_value = model_config[hp] if hp in model_config.keys() else "-" # check if the value has changed, otherwise we can skip this step if hp_value == previous_hp_value: continue # add this step's hp value to the string if hp_string == "": hp_string = str(hp_value) else: hp_string += f"\nepoch {start_epoch} --> {hp_value}" # update the previous value previous_hp_value = hp_value # write the config entry model_configs_df.loc[hp, metric] = hp_string else: for hp in model_configs_df.index: if hp in ["first_checkpoint_epoch", "checkpoint_frequency"]: continue hp_value = model_config_to_evaluate.get(hp) model_configs_df.loc[hp, metric] = hp_value if hp_value is not None else "-" # create the target folder for the following crossvalidation target_folder = os.path.join(results_dir, metric if len(best_trials.index) > 1 else "", "best_value") dirs_to_evaluate.append(target_folder) # mark target_folder to be evaluated later if not os.path.exists(target_folder): os.makedirs(target_folder, exist_ok=True) # then the results from the fit if not skip_fit_evaluation: for metric, best_trial, best_epoch in zip( best_trials_fit.index, best_trials_fit["trial"], best_trials_fit["best epoch"], ): model_config_to_evaluate = all_model_configs[best_trial].copy() model_config_to_evaluate["epochs"] = int(best_epoch) for trial_id in trial_ids: if trial_id in best_trial: model_config_to_evaluate[ "trial_id" ] = trial_id # best_trail is full path to the optimization folder while trails in trial_list as only the names break model_configs_to_evaluate.append(model_config_to_evaluate) model_config = model_configs_to_evaluate[-1] for hp in model_configs_df.index: if hp in ["first_checkpoint_epoch", "checkpoint_frequency"]: continue hp_value = model_config.get(hp) model_configs_df.loc[hp, f"{metric} fit"] = hp_value if hp_value is not None else "-" target_folder = os.path.join(results_dir, metric if len(best_trials_fit.index) > 1 else "", "best_fit") dirs_to_evaluate.append(target_folder) # mark target_folder to be evaluated later if not os.path.exists(target_folder): os.makedirs(target_folder, exist_ok=True) optimization_results_string += tabulate.tabulate( model_configs_df, headers=[model_configs_df.index.name] + list(model_configs_df.columns), tablefmt="fancy_grid", ) print("\n" + optimization_results_string) # give the list of best configs to perform_crossvalidation which will perform crossvalidation for each of the # configs, applying the same EarlyStopping criteria as during the optimization. print("Starting k-fold cross-validation for the best model-configs...") crossval_model_info, crossval_input_data = OPTIMA.core.training.perform_crossvalidation( model_configs_to_evaluate, dirs_to_evaluate, training_func, run_config, input_handler, cpus_per_model, gpus_per_model, custom_metrics, composite_metrics, native_metrics, weighted_native_metrics, seed=seed, ) print("Cross-validation finished!") # reload best models from the optimization and the corresponding crossvalidation models and do the evaluation evaluation_string = "Evaluation:" print("\nReloading the cross-validation models for evaluation...") # get the evaluation function if hasattr(run_config, "evaluate"): evaluate_func = run_config.evaluate else: evaluate_func = OPTIMA.builtin.evaluation.evaluate # wrap the evaluation function in a ray task evaluate_remote = ray.remote(num_cpus=cpus_per_model, num_gpus=gpus_per_model)(evaluate_func).remote # go through all the dirs containing models to evaluate raw_values_list = ( [] ) # will contain all raw values so that they can be returned as numbers; can be useful for testing futures = [] # will contain the futures for the remote execution of the evaluation for model_dir in dirs_to_evaluate: # loop over all models from the crossvalidation and perform an evaluation for each. The evaluation is # executed as a Ray task, so we save the returned futures for later. print(f"Evaluating {model_dir}") for model_info in crossval_model_info[model_dir]: # get the inputs inputs_split_k, targets_split_k, weights_split_k, normalized_weights_split_k = crossval_input_data[ model_info["split"] ] # do the evaluation futures.append( evaluate_remote( run_config, os.path.join(model_dir, model_info["name"]), inputs_split_k, targets_split_k, weights_split_k, normalized_weights_split_k, os.path.join(model_dir, "plots", "evaluation", "crossval_{}".format(model_info["split"])), cpus=cpus_per_model, print_results=False, native_metrics=native_metrics, weighted_native_metrics=weighted_native_metrics, custom_FoMs=custom_metrics, class_labels=run_config.evaluation_class_labels, return_unfilled=True, ) ) # save the values of the metrics returned by the evaluation to a list to then calculate mean and std, which # are then inserted into the unfilled results string for model_dir in dirs_to_evaluate: results_str_unfilled = "" metrics = [] for model_info in crossval_model_info[model_dir]: # get and save the evaluation results results_str_unfilled_k, metrics_k = ray.get(futures.pop(0)) if results_str_unfilled == "": results_str_unfilled = results_str_unfilled_k metrics.append(metrics_k) # if defined, execute the finalize function for this model if hasattr(run_config, "finalize_model"): # get the inputs for potential use in the finalize function inputs_split_k, targets_split_k, weights_split_k, normalized_weights_split_k = crossval_input_data[ model_info["split"] ] # execute the finalize function as ray task in case it does predictions or something else with the # models. However, we can't know if the finalize-function is written thread-safe, so execute them # sequentially f = ray.remote(num_cpus=cpus_per_model, num_gpus=gpus_per_model)(run_config.finalize_model).remote( run_config, inputs_split_k, targets_split_k, weights_split_k, normalized_weights_split_k, results_dir=results_dir, model_dir=model_dir, model_info=model_info, input_handler=input_handler, ) ray.get(f) del f # calculate the mean and std for the returned metrics and fill them into the results string metrics_array = np.array(metrics) metrics_mean = np.mean(metrics_array, axis=0) metrics_std = np.std(metrics_array, axis=0) metrics_with_errors_strs = [] for mean, std, raw_values in zip(metrics_mean, metrics_std, metrics_array.transpose()): if std != 0: err_significant_digit = max(-int(np.floor(np.log10(abs(std)))), 0) metric_with_error_str = "{{:.{}f}} +- {{:.{}f}}".format( err_significant_digit + 1, err_significant_digit + 1 ) else: metric_with_error_str = "{} +- {}" err_significant_digit = 4 metric_with_error_str += ( " (" + ", ".join(["{{:.{}f}}".format(err_significant_digit + 1) for _ in raw_values]) + ")" ) raw_values_list += [mean, std] + list(raw_values) metrics_with_errors_strs.append(metric_with_error_str) evaluation_string += f"\n{model_dir}:\n" evaluation_string += results_str_unfilled.format(*metrics_with_errors_strs) # get summary of the optimization for the results file if run_config.use_testing_dataset: targets_train, targets_val, targets_test = ray.get(targets_split) else: targets_train, targets_val = ray.get(targets_split) if targets_train.shape[1] == 1: optimization_str = ( f"training events: {targets_train[targets_train[:, 0] == 1].shape[0]} signal, " f"{targets_train[targets_train[:, 0] == 0].shape[0]} background\n" ) optimization_str += ( f"validation events: {targets_val[targets_val[:, 0] == 1].shape[0]} signal, " f"{targets_val[targets_val[:, 0] == 0].shape[0]} background\n" ) if run_config.use_testing_dataset: optimization_str += ( f"test events: {targets_test[targets_test[:, 0] == 1].shape[0]} signal, " f"{targets_test[targets_test[:, 0] == 0].shape[0]} background\n" ) else: optimization_str = f"training events: {[targets_train[targets_train[:, j] == 1].shape[0] for j in range(targets_train.shape[1])]}\n" optimization_str += f"validation events: {[targets_val[targets_val[:, j] == 1].shape[0] for j in range(targets_val.shape[1])]}\n" if run_config.use_testing_dataset: optimization_str += f"test events: {[targets_test[targets_test[:, j] == 1].shape[0] for j in range(targets_test.shape[1])]}\n" optimization_str += "input variables: {}\n\n".format(", ".join(input_handler.get_vars())) optimization_str += "search space:\n" for hp in search_space.keys(): if hp in ["first_checkpoint_epoch", "checkpoint_frequency"]: continue optimization_str += f"\t{hp}: {search_space[hp]}\n" # write results to file if write_results: with open(os.path.join(results_dir, "results.txt"), "w") as results_file: results_file.write( optimization_str + "\n" + optimization_results_string + "\n\n" + evaluation_string.format(*raw_values_list) ) # saving results of the evaluation to file to reload the evaluation should the experiment be stopped and resumed in # the future with open(os.path.join(results_dir, "evaluation.pickle"), "wb") as evaluation_file: pickle.dump( ( best_trials, best_trials_fit if not skip_fit_evaluation else None, model_configs_df, optimization_str, optimization_results_string, crossval_model_info, { k: [ray.get(e) for e in v] for k, v in crossval_input_data.items() }, # we want to save the actual data here! model_configs_to_evaluate, dirs_to_evaluate, evaluation_string, raw_values_list, ), evaluation_file, ) else: print( f"Found previous evaluation at {os.path.join(results_dir, 'evaluation.pickle')}, reloading previous results " f"and skipping the evaluation. If this is not intentional, delete the evaluation.pickle file or " f"the {results_dir} directory." ) with open(os.path.join(results_dir, "evaluation.pickle"), "rb") as evaluation_file: ( best_trials, best_trials_fit, model_configs_df, optimization_str, optimization_results_string, crossval_model_info, crossval_input_data, crossval_model_configs_to_evaluate, crossval_dirs_to_evaluate, evaluation_string, raw_values_list, ) = pickle.load(evaluation_file) # check if the crossvalidation should be done again; could be useful when crossvalidation should be repeated but # the optimization folder was already deleted, but should be avoided when possible because the evaluation cannot # be redone in that case (because the configs of each trial are saved inside the optimization folder and are # thus not available anymore) print( "Checking if crossvalidation needs to be repeated. Note that the evaluation will not be repeated even when" " the crossvalidation is redone! If you want to repeat the evaluation, it's necessary to delete the " "'evaluation.pickle' file." ) crossval_model_info, crossval_input_data = OPTIMA.core.training.perform_crossvalidation( crossval_model_configs_to_evaluate, crossval_dirs_to_evaluate, training_func, run_config, input_handler, cpus_per_model, gpus_per_model, custom_metrics, composite_metrics, native_metrics, weighted_native_metrics, seed=seed, ) print( "\nResults:\n" + optimization_str + "\n" + optimization_results_string + "\n\n" + evaluation_string.format(*raw_values_list) ) if not return_results_str: if return_crossval_models: return ( best_trials, best_trials_fit if not skip_fit_evaluation else None, model_configs_df, crossval_model_info, crossval_input_data, ) else: return best_trials, best_trials_fit if not skip_fit_evaluation else None, model_configs_df else: if return_crossval_models: if not return_unfilled: return ( best_trials, best_trials_fit if not skip_fit_evaluation else None, model_configs_df, optimization_str + "\n" + optimization_results_string + "\n\n" + evaluation_string.format(*raw_values_list), crossval_model_info, crossval_input_data, ) else: return ( best_trials, best_trials_fit if not skip_fit_evaluation else None, model_configs_df, optimization_str + "\n" + optimization_results_string + "\n\n" + evaluation_string, raw_values_list, crossval_model_info, crossval_input_data, ) else: if not return_unfilled: return ( best_trials, best_trials_fit if not skip_fit_evaluation else None, model_configs_df, optimization_str + "\n" + optimization_results_string + "\n\n" + evaluation_string.format(*raw_values_list), ) else: return ( best_trials, best_trials_fit if not skip_fit_evaluation else None, model_configs_df, optimization_str + "\n" + optimization_results_string + "\n\n" + evaluation_string, raw_values_list, ) - A set of overview plots (one per metric) containing the values of the target metric, all
def get_best_trials(dfs, optimize_name, optimize_op, metric_names, figs_dir=None, reject_overtrained=False)-
summary.
Parameters
dfs:_type_- description
optimize_name:_type_- description
optimize_op:_type_- description
metric_names:_type_- can be grouped by giving list of lists or list of tuples -> groups will be plotted together in one diagram
figs_dir:_type_- description (Default value = None)
reject_overtrained:_type_- description (Default value = False)
Expand source code
def get_best_trials(dfs, optimize_name, optimize_op, metric_names, figs_dir=None, reject_overtrained=False): """_summary_. Parameters ---------- dfs : _type_ _description_ optimize_name : _type_ _description_ optimize_op : _type_ _description_ metric_names : _type_ can be grouped by giving list of lists or list of tuples -> groups will be plotted together in one diagram figs_dir : _type_ _description_ (Default value = None) reject_overtrained : _type_ _description_ (Default value = False) """ if not isinstance(optimize_name, list): optimize_name = [optimize_name] if not isinstance(optimize_op, list): optimize_op = [optimize_op] # ungroup the metric names (to give e.g. both train_loss and val_loss their own columns) metric_names_ungrouped = [] metric_names_grouped = [] for group in metric_names: if not isinstance(group, list) and not isinstance(group, tuple): metric_names_ungrouped.append(group) metric_names_grouped.append([group]) else: for metric in group: metric_names_ungrouped.append(metric) metric_names_grouped.append(group) best_trials = pd.DataFrame( index=optimize_name, columns=metric_names_ungrouped + ["trial", "best epoch", "best index", "best checkpoint"], dtype=np.float64, ) best_trials.index.name = "target" for metric, op in zip(optimize_name, optimize_op): for trial, df in dfs.items(): # skip if df is empty if len(df) == 0: continue # need to find the index of the best epoch because for PBT, epochs don't always increase monotonically if reject_overtrained: df_nonovertrained = df.where(df["overtrained"] == False) # reject overtrained epochs # noqa: E712 best_index = df_nonovertrained[metric].idxmax() if op == "max" else df_nonovertrained[metric].idxmin() else: best_index = df[metric].idxmax() if op == "max" else df[metric].idxmin() # if every epoch is overtrained, best index will be NaN if not np.isnan(best_index): metric_best = df.iloc[best_index][metric] best_epoch = df.iloc[best_index]["training_iteration"] best_checkpoint = ( df.iloc[best_index]["iterations_since_restore"] - 1 ) # when restoring during PBT, checkpoint numbering will be restarted from 0, overwriting previous checkpoints if ( metric_best > best_trials.loc[metric, metric] if op == "max" else metric_best < best_trials.loc[metric, metric] ) or np.isnan(best_trials.loc[metric, metric]): best_trials.loc[[metric], metric_names_ungrouped] = df.loc[ best_index, metric_names_ungrouped ].to_numpy() best_trials.loc[[metric], ["trial", "best epoch", "best index", "best checkpoint"]] = [ trial, best_epoch, best_index, best_checkpoint, ] # plotting the evolution of the metrics for best trial for target_metric_name, best_trial, best_epoch, best_index in zip( optimize_name, best_trials["trial"], best_trials["best epoch"], best_trials["best index"] ): df = dfs[best_trial] alpha_array = copy.deepcopy( df["overtrained"].to_numpy(dtype=np.float64) ) # need the copy here, otherwise we modify df alpha_array[alpha_array == True] = 0.2 # noqa: E712 alpha_array[alpha_array == False] = 1.0 # noqa: E712 for metric_group in metric_names_grouped: for metric in metric_group: plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) plt.scatter(df["training_iteration"], df[metric], s=11.0, label=metric, alpha=alpha_array) plt.scatter( best_epoch, df.iloc[int(best_index)][metric], s=40.0, marker="x", c="r" ) # need to use the index because for PBT, epochs don't always increase monotonically plt.title(f"target: {target_metric_name}") plt.xlabel("training iteration") plt.legend() plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) plt.tight_layout() if figs_dir is not None: if not os.path.exists( os.path.join( figs_dir, target_metric_name if len(optimize_name) > 1 else "", "best_value", "plots", "optimization", ) ): os.makedirs( os.path.join( figs_dir, target_metric_name if len(optimize_name) > 1 else "", "best_value", "plots", "optimization", ), exist_ok=True, ) plt.savefig( os.path.join( figs_dir, target_metric_name if len(optimize_name) > 1 else "", "best_value", "plots", "optimization", "+".join(metric_group) + ".png", ), dpi=600, ) else: plt.show() plt.clf() return best_trials def get_best_trials_from_fit(run_config: module, dfs, optimize_name, optimize_op, metric_names, fit_function='custom_2', fit_xtol=1e-05, fit_maxfev=10000, figs_dir=None, overtraining_conditions=None, reject_overtrained=False, min_R_squared=0.9)-
summary.
Parameters
run_config:ModuleType- Reference to the imported run-config file.
dfs:_type_- dict of pandas dataframes
optimize_name:_type_- string or list of strings
optimize_op:_type_- string ('min'/'max') or list of strings of same length as metric_name
metric_names:_type_- description
fit_function:_type_- defining which function should be used for fitting. Can either be single value or list of same length (Default value = 'custom_2')
fit_xtol:_type_- description (Default value = 1e-5)
fit_maxfev:_type_- description (Default value = 10000)
figs_dir:_type_- description (Default value = None)
overtraining_conditions:_type_- description (Default value = None)
reject_overtrained:_type_- description (Default value = False)
min_R_squared:_type_- description (Default value = 0.9)
Expand source code
def get_best_trials_from_fit( run_config: ModuleType, dfs, optimize_name, optimize_op, metric_names, fit_function="custom_2", fit_xtol=1e-5, fit_maxfev=10000, figs_dir=None, overtraining_conditions=None, reject_overtrained=False, min_R_squared=0.9, ): """_summary_. Parameters ---------- run_config : ModuleType Reference to the imported run-config file. dfs : _type_ dict of pandas dataframes optimize_name : _type_ string or list of strings optimize_op : _type_ string ('min'/'max') or list of strings of same length as metric_name metric_names : _type_ _description_ fit_function : _type_ defining which function should be used for fitting. Can either be single value or list of same length (Default value = 'custom_2') fit_xtol : _type_ _description_ (Default value = 1e-5) fit_maxfev : _type_ _description_ (Default value = 10000) figs_dir : _type_ _description_ (Default value = None) overtraining_conditions : _type_ _description_ (Default value = None) reject_overtrained : _type_ _description_ (Default value = False) min_R_squared : _type_ _description_ (Default value = 0.9) """ def _custom_fit_function(x, A, a, c, B, b, C, D, const): """_summary_. Parameters ---------- x : _type_ _description_ A : _type_ _description_ a : _type_ _description_ c : _type_ _description_ B : _type_ _description_ b : _type_ _description_ C : _type_ _description_ D : _type_ _description_ const : _type_ _description_ Returns ------- _type_ _description_ """ return A * np.exp(-a / 1e-4 * (x - c) ** 2) + B * np.exp(-b / 100 * x) + C * x + D * x * x + const def _custom_fit_function_2(x, A, B, C, D, const): """_summary_. Parameters ---------- x : _type_ _description_ A : _type_ _description_ B : _type_ _description_ C : _type_ _description_ D : _type_ _description_ const : _type_ _description_ Returns ------- _type_ _description_ """ return A / x**2 + B / x + C * x**2 + D * x + const def _custom_fit_function_3(x, A, a, c, B, b, C, D, E, F, const): """_summary_. Parameters ---------- x : _type_ _description_ A : _type_ _description_ a : _type_ _description_ c : _type_ _description_ B : _type_ _description_ b : _type_ _description_ C : _type_ _description_ D : _type_ _description_ E : _type_ _description_ F : _type_ _description_ const : _type_ _description_ Returns ------- _type_ _description_ """ return ( A * np.exp(-a / 1e-4 * (x - c) ** 2) + B * np.exp(-b / 100 * x) + C * x + D * x * x + E / x**2 + F / x + const ) def _crossval_fit(fit_function, x, y, bounds=(-np.inf, np.inf), p0=None, **kwargs): """Function to perform a crossvalidation-like fit to assess the goodness of fit. It applies scipy.optimize's curve_fit multiple times to a subset of the data (using kfold splitting), calculates the average deviation between the fit function and the fitting and testing data, gets the ratio between the two errors for each splitting, and then looks at the median value. if that is > 3, the fit is rejected, otherwise the fit is repeated on the full dataset and the fit parameters are returned Parameters ---------- fit_function : _type_ _description_ x : _type_ _description_ y : _type_ _description_ bounds : _type_ _description_ (Default value = (-np.inf, np.inf)) p0 : _type_ _description_ (Default value = None) **kwargs : _type_ _description_ """ fit_errors = [] test_errors = [] error_ratios = [] fit_R_squared = [] kfold = KFold(n_splits=5, shuffle=True, random_state=1234) for fit_indices, test_indices in kfold.split(x): x_fit = x[fit_indices] x_test = x[test_indices] y_fit = y[fit_indices] y_test = y[test_indices] parameters, covariance = curve_fit(fit_function, x_fit, y_fit, bounds=bounds, p0=p0, **kwargs) fitted_func = lambda x, p=parameters: fit_function(x, *p) y_fit_pred = fitted_func(x_fit) y_test_pred = fitted_func(x_test) mean_abs_fit_error = np.mean(np.abs(y_fit_pred - y_fit)) mean_abs_test_error = np.mean(np.abs(y_test_pred - y_test)) error_fraction = mean_abs_test_error / mean_abs_fit_error fit_errors.append(mean_abs_fit_error) test_errors.append(mean_abs_test_error) error_ratios.append(error_fraction) R_squared_fit = np.sum((y_fit - y_fit_pred) ** 2) / np.sum((y_fit - np.mean(y_fit)) ** 2) fit_R_squared.append(R_squared_fit) parameters, covariance = curve_fit(fit_function, x, y, bounds=bounds, p0=p0, **kwargs) fitted_func = lambda x: fit_function(x, *parameters) R_squared = 1 - np.sum((y - fitted_func(x)) ** 2) / np.sum((y - np.mean(y)) ** 2) if R_squared < min_R_squared: # np.median(error_ratios) > 3.: return None, None else: # plt.scatter(x, y, s=11.) # xs_plot = np.linspace(x.iloc[0], x.iloc[-1], 1000) # ys_plot = fitted_func(xs_plot) # plt.plot(xs_plot, ys_plot, color='blue') # plt.title("MedER: {:.4f}, MedFE: {:.5f}, MedTE: {:.5f}, \n" # "MedRS: {:.4f}, MedRSF: {:.4f}".format(np.median(error_ratios), # np.median(fit_errors), # np.median(test_errors), # np.median(R_squared), # np.median(fit_R_squared))) # plt.show() return parameters, covariance def _fit_and_plot( df, metric_name, fit_function, xtol, maxfev, metric_op=None, return_at_x=None, color="C0", plot=True, figs_dir=None, reject_overtrained=False, overtraining_conditions=None, crossval=False, return_plotting_df=False, ): """Either metric_op or return_at_x must not be None!. Parameters ---------- df : _type_ _description_ metric_name : _type_ _description_ fit_function : _type_ _description_ xtol : _type_ _description_ maxfev : _type_ _description_ metric_op : _type_ _description_ (Default value = None) return_at_x : _type_ if given, will return the value of the fit function at this point instead of the extreme value (Default value = None) color : _type_ _description_ (Default value = "C0") plot : _type_ _description_ (Default value = True) figs_dir : _type_ _description_ (Default value = None) reject_overtrained : _type_ _description_ (Default value = False) overtraining_conditions : _type_ _description_ (Default value = None) crossval : _type_ _description_ (Default value = False) return_plotting_df : _type_ _description_ (Default value = False) """ def _fit(metric_to_fit, metric_to_fit_op=None): """Helper function that fits the fitting function to a metric in df. Parameters ---------- metric_to_fit : _type_ _description_ metric_to_fit_op : _type_ _description_ (Default value = None) """ if hasattr(run_config, "fit_function"): fitted_func = run_config.fit_function(df, metric_to_fit, metric_to_fit_op, overtraining_conditions) elif fit_function == "custom": # fewer data points than twice the number of function parameters if len(df) < 16: return # fit curve to metric; choose initial values to be exponential convergence to the observed extreme value if metric_to_fit_op is not None: best_metric_guess = ( df[metric_to_fit].max() if metric_to_fit_op == "max" else df[metric_to_fit].min() ) best_epoch_guess = ( df.loc[df[metric_to_fit].idxmax()]["training_iteration"] if metric_to_fit_op == "max" else df.loc[df[metric_to_fit].idxmin()]["training_iteration"] ) else: # assume the progress is in the correct direction progress = df[metric_to_fit].iloc[-1] - df[metric_to_fit].iloc[0] best_metric_guess = df[metric_to_fit].max() if progress > 0 else df[metric_to_fit].min() best_epoch_guess = df["training_iteration"].iloc[-1] metric_first = df[metric_to_fit][0] B_init = -(best_metric_guess - metric_first) b_init = 5 * 100 / (best_epoch_guess) const_init = best_metric_guess init = [0, 0, 0, B_init, b_init, 0, 0, const_init] # f = A * exp(-a * (x-c)^2) + B * exp(-bx) + Cx + Dx^2 + const # --> make sure the exponentials are always getting smaller with larger x! bounds = ( [-np.inf, 0, -np.inf, -np.inf, 0, -np.inf, -np.inf, -np.inf], [np.inf, np.inf, 0, np.inf, np.inf, np.inf, np.inf, np.inf], ) if crossval: parameters, covariance = _crossval_fit( _custom_fit_function, df["training_iteration"], df[metric_to_fit], bounds=bounds, p0=init, xtol=xtol, maxfev=maxfev, ) if parameters is None: return else: parameters, covariance = curve_fit( _custom_fit_function, df["training_iteration"], df[metric_to_fit], bounds=bounds, p0=init, xtol=xtol, maxfev=maxfev, ) fitted_func = lambda x: _custom_fit_function(x, *parameters) elif fit_function == "custom_2": # fewer data points than twice the number of function parameters if len(df) < 10: return if crossval: parameters, covariance = _crossval_fit( _custom_fit_function_2, df["training_iteration"], df[metric_to_fit], xtol=xtol, maxfev=maxfev ) if parameters is None: return else: parameters, covariance = curve_fit( _custom_fit_function_2, df["training_iteration"], df[metric_to_fit], xtol=xtol, maxfev=maxfev ) fitted_func = lambda x: _custom_fit_function_2(x, *parameters) elif fit_function == "custom_3": # fewer data points than twice the number of function parameters if len(df) < 20: return # fit curve to metric; choose initial values to be exponential convergence to the observed extreme value if metric_to_fit_op is not None: best_metric_guess = ( df[metric_to_fit].max() if metric_to_fit_op == "max" else df[metric_to_fit].min() ) best_epoch_guess = ( df.loc[df[metric_to_fit].idxmax()]["training_iteration"] if metric_to_fit_op == "max" else df.loc[df[metric_to_fit].idxmin()]["training_iteration"] ) else: best_metric_guess = df[metric_to_fit].median() best_epoch_guess = df["training_iteration"].iloc[-1] metric_first = df[metric_to_fit][0] B_init = -(best_metric_guess - metric_first) b_init = 5 * 100 / (best_epoch_guess) const_init = best_metric_guess init = [0, 0, 0, B_init, b_init, 0, 0, 0, 0, const_init] # f = A * exp(-a * (x-c)^2) + B * exp(-bx) + Cx + Dx^2 + const # --> make sure the exponentials are always getting smaller with larger x! bounds = ( [-np.inf, 0, -np.inf, -np.inf, 0, -np.inf, -np.inf, -np.inf, -np.inf, -np.inf], [np.inf, np.inf, 0, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf, np.inf], ) if crossval: parameters, covariance = _crossval_fit( _custom_fit_function_3, df["training_iteration"], df[metric_to_fit], bounds=bounds, p0=init, xtol=xtol, maxfev=maxfev, ) if parameters is None: return else: parameters, covariance = curve_fit( _custom_fit_function_3, df["training_iteration"], df[metric_to_fit], bounds=bounds, p0=init, xtol=xtol, maxfev=maxfev, ) fitted_func = lambda x: _custom_fit_function_3(x, *parameters) else: if crossval: parameters, covariance = _crossval_fit( fit_function, df["training_iteration"], df[metric_to_fit], xtol=xtol, maxfev=maxfev ) if parameters is None: return else: parameters, covariance = curve_fit( fit_function, df["training_iteration"], df[metric_to_fit], xtol=xtol, maxfev=maxfev ) fitted_func = lambda x: fit_function(x, *parameters) return fitted_func if len(df[~df["overtrained"]]) == 0: return None, None, None fitted_func = _fit(metric_name, metric_op) if fitted_func is None: return None, None, None if plot: # create a df that contains all values for plotting the fit df_for_plotting = pd.DataFrame() df_for_plotting["training_iteration"] = np.linspace( df["training_iteration"].iloc[0], df["training_iteration"].iloc[-1], 1000 ) df_for_plotting[metric_name] = fitted_func(df_for_plotting["training_iteration"]) # when overtraining conditions are given, also fit all metrics necessary to evaluate the overtraining conditions # using the fit values if overtraining_conditions is not None: df_for_plotting.loc[:, "overtrained"] = 0.0 # iterate over all overtraining conditions to get a full set of metrics that need to be fitted metrics_for_overtraining_check = [] for _, input_metric_names, _ in overtraining_conditions: metrics_for_overtraining_check += list(input_metric_names) metrics_for_overtraining_check = list(set(metrics_for_overtraining_check)) # remove duplicate entries if metric_name in metrics_for_overtraining_check: metrics_for_overtraining_check.remove( metric_name ) # remove metric_name as we already have done the fit for that # iterate over all metrics for the overtraining conditions and apply a fit to each, then fill the dfs for metric_to_fit in metrics_for_overtraining_check: fitted_func_overtraining = _fit(metric_to_fit) # when one of the metrics could not be fitted well, we'll skip this trial (one could also fall back # to simply checking the overtraining flag of the raw values, but that would not be consistent with # the over trials) if fitted_func_overtraining is None: return None, None, None df_for_plotting[metric_to_fit] = fitted_func_overtraining(df_for_plotting["training_iteration"]) # update the overtrained column using the fit values df_for_plotting = check_overtraining(df_for_plotting, overtraining_conditions) # plot the raw metric values alpha_array = copy.deepcopy(df["overtrained"].to_numpy(dtype=np.float64)) alpha_array[alpha_array == True] = 0.2 # noqa: E712 alpha_array[alpha_array == False] = 1.0 # noqa: E712 plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) plt.scatter( df["training_iteration"], df[metric_name], s=11.0, color=color, alpha=alpha_array, label=metric_name ) plt.xlabel("training iteration") # find the best epoch according to the fit (while checking for overtraining, if requested) if return_at_x is None: fit_values_epochs = fitted_func(df["training_iteration"]) # when overtraining conditions are given, also fit all metrics necessary to evaluate the overtraining conditions # using the fit values if overtraining_conditions is not None: # build a new df and fill all necessary columns with fit values; also create a df with intermediate values # when plotting is requested df_from_fit = df.copy() df_from_fit[metric_name] = fit_values_epochs df_from_fit.loc[:, "overtrained"] = 0.0 # iterate over all overtraining conditions to get a full set of metrics that need to be fitted metrics_for_overtraining_check = [] for _, input_metric_names, _ in overtraining_conditions: metrics_for_overtraining_check += list(input_metric_names) metrics_for_overtraining_check = list(set(metrics_for_overtraining_check)) # remove duplicate entries if metric_name in metrics_for_overtraining_check: metrics_for_overtraining_check.remove( metric_name ) # remove metric_name as we already have done the fit for that # iterate over all metrics for the overtraining conditions and apply a fit to each, then fill the dfs for metric_to_fit in metrics_for_overtraining_check: fitted_func_overtraining = _fit(metric_to_fit) # when one of the metrics could not be fitted well, we'll skip this trial (one could also fall back # to simply checking the overtraining flag of the raw values, but that would not be consistent with # the other trials) if fitted_func_overtraining is None: return None, None, None df_from_fit[metric_to_fit] = fitted_func_overtraining(df_from_fit["training_iteration"]) # update the overtrained column using the fit values df_from_fit = check_overtraining(df_from_fit, overtraining_conditions) # replace values for overtrained epochs with NaN --> will never be extremum fit_values_epochs = fit_values_epochs.where(df_from_fit["overtrained"] == False) # noqa: E712 # when no overtraining conditions are given, but reject_overtrained is True, then check the overtraining # conditions using the raw metric values (by looking at the "overtrained" column) elif reject_overtrained: # replace values for overtrained epochs with NaN --> will never be extremum fit_values_epochs = fit_values_epochs.where(df["overtrained"] == False) # noqa: E712 # find the extremum; need to use the indices because for PBT, epochs don't always increase monotonically best_index_fit = fit_values_epochs.idxmax() if metric_op == "max" else fit_values_epochs.idxmin() if np.isnan(best_index_fit): return None, None, None # if every epoch is overtrained, best index will be NaN best_epoch_fit = df.iloc[best_index_fit]["training_iteration"] best_checkpoint_fit = df.iloc[best_index_fit]["iterations_since_restore"] - 1 best_metric_fit = fit_values_epochs.iloc[best_index_fit] if plot: # plot the data plt.scatter(best_epoch_fit, best_metric_fit, s=40.0, marker="x", c="r") # plot the fit; when the overtraining conditions are given, use the created plotting df that includes # "continous" overtrained flag so that the plotted line can be colored accordingly if overtraining_conditions: # get indices where overtraining label changes value and the values it changes to diffs = df_for_plotting["overtrained"][df_for_plotting["overtrained"].diff() != 0] diff_indices = diffs.index.tolist() diff_values = diffs.values.tolist() for i, (diff_index, diff_value) in enumerate(zip(diff_indices, diff_values)): if i + 1 == len(diff_indices): iterations = df_for_plotting["training_iteration"][diff_index:] metrics = df_for_plotting[metric_name][diff_index:] else: next_diff_index = diff_indices[i + 1] iterations = df_for_plotting["training_iteration"][diff_index:next_diff_index] metrics = df_for_plotting[metric_name][diff_index:next_diff_index] if diff_value is False: alpha = 1.0 else: alpha = 0.2 label = f"{metric_name} fit" if i == 0 else None plt.plot(iterations, metrics, color=color, alpha=alpha, label=label) else: plt.plot(df_for_plotting["training_iteration"], df_for_plotting[metric_name], color=color) if figs_dir is not None: if not os.path.exists(figs_dir): os.makedirs(figs_dir, exist_ok=True) plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) plt.tight_layout() plt.savefig(figs_dir, dpi=600) plt.clf() return best_epoch_fit, best_checkpoint_fit, best_metric_fit else: if plot: if overtraining_conditions: # get indices where overtraining label changes value and the values it changes to diffs = df_for_plotting["overtrained"][df_for_plotting["overtrained"].diff() != 0] diff_indices = diffs.index.tolist() diff_values = diffs.values.tolist() for i, (diff_index, diff_value) in enumerate(zip(diff_indices, diff_values)): if i + 1 == len(diff_indices): iterations = df_for_plotting["training_iteration"][diff_index:] metrics = df_for_plotting[metric_name][diff_index:] else: next_diff_index = diff_indices[i + 1] iterations = df_for_plotting["training_iteration"][diff_index:next_diff_index] metrics = df_for_plotting[metric_name][diff_index:next_diff_index] if diff_value is False: alpha = 1.0 else: alpha = 0.2 label = f"{metric_name} fit" if i == 0 else None plt.plot(iterations, metrics, color=color, alpha=alpha, label=label) else: plt.plot( df_for_plotting["training_iteration"], df_for_plotting[metric_name], color=color, label=f"{metric_name} fit", ) if figs_dir is not None: if not os.path.exists(figs_dir): os.makedirs(figs_dir, exist_ok=True) plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) plt.tight_layout() plt.savefig(figs_dir, dpi=600) plt.clf() return fitted_func(return_at_x) if not isinstance(optimize_name, list): optimize_name = [optimize_name] if not isinstance(optimize_op, list): optimize_op = [optimize_op] # ungroup the metric names (to give e.g. both train_loss and val_loss their own columns) metric_names_ungrouped = [] metric_names_grouped = [] for group in metric_names: if not isinstance(group, list) and not isinstance(group, tuple): metric_names_ungrouped.append(group) metric_names_grouped.append([group]) else: for metric in group: metric_names_ungrouped.append(metric) metric_names_grouped.append(group) fit_best = pd.DataFrame( index=optimize_name, columns=["trial", "best epoch", "best checkpoint", "best metric fit"], dtype=np.float64 ) for metric, op in zip(optimize_name, optimize_op): for trial, df in dfs.items(): # skip if df is empty if len(df) == 0: continue best_epoch_fit, best_checkpoint_fit, best_metric_fit = _fit_and_plot( df, metric, fit_function, fit_xtol, fit_maxfev, metric_op=op, plot=False, reject_overtrained=reject_overtrained, overtraining_conditions=overtraining_conditions, crossval=True, ) if best_epoch_fit is None: continue # can be None if every epoch is overtrained or NaN if ( best_metric_fit > fit_best.loc[metric, "best metric fit"] if op == "max" else best_metric_fit < fit_best.loc[metric, "best metric fit"] ) or np.isnan(fit_best.loc[metric, "best metric fit"]): fit_best.loc[metric] = [trial, best_epoch_fit, best_checkpoint_fit, best_metric_fit] # for all the best trials, now also fit the remaining metrics metrics_fit_best = pd.DataFrame( index=optimize_name, columns=metric_names_ungrouped + ["trial", "best epoch", "best checkpoint"], dtype=np.float64, ) metrics_fit_best.index.name = "target" for target_metric_name in optimize_name: trial, epoch, checkpoint_num, _ = fit_best.loc[target_metric_name] if not isinstance(trial, str): return # no fit succeeded for group in metric_names_grouped: for metric, plot_color in zip(group, [f"C{i}" for i in range(10)]): df = dfs[trial] fit_value = _fit_and_plot( df, metric, fit_function, fit_xtol, fit_maxfev, return_at_x=epoch, color=plot_color, overtraining_conditions=overtraining_conditions, ) plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) plt.plot(epoch, fit_value, "rx") metrics_fit_best.loc[target_metric_name, metric] = fit_value plt.title(f"target: {target_metric_name}") plt.legend() plt.gcf().set_figheight(6) plt.gcf().set_figwidth(8) plt.tight_layout() if figs_dir is not None: if not os.path.exists( os.path.join( figs_dir, target_metric_name if len(optimize_name) > 1 else "", "best_fit", "plots", "optimization", ) ): os.makedirs( os.path.join( figs_dir, target_metric_name if len(optimize_name) > 1 else "", "best_fit", "plots", "optimization", ), exist_ok=True, ) plt.savefig( os.path.join( figs_dir, target_metric_name if len(optimize_name) > 1 else "", "best_fit", "plots", "optimization", "+".join(group) + ".png", ), dpi=600, ) else: plt.show() plt.clf() metrics_fit_best.loc[target_metric_name, ["trial", "best epoch", "best checkpoint"]] = [ trial, epoch, checkpoint_num, ] return metrics_fit_best def scientific_rounding(value, err, notation='separate')-
Helper function to perform scientific rounding based on the provided uncertainty.
Notation can be 'bracket', meaning 0.0123 +- 0.0234 will become 0.012(23), or 'separate' where the rounded value and error will be returned separately
Parameters
value:_type_- description
err:_type_- description
notation:_type_- description (Default value = 'separate')
Expand source code
def scientific_rounding(value, err, notation="separate"): """Helper function to perform scientific rounding based on the provided uncertainty. Notation can be 'bracket', meaning 0.0123 +- 0.0234 will become 0.012(23), or 'separate' where the rounded value and error will be returned separately Parameters ---------- value : _type_ _description_ err : _type_ _description_ notation : _type_ _description_ (Default value = 'separate') """ if abs(err) > 0: significant_digit = max(-int(np.floor(np.log10(abs(err)))), 0) error_digits = round(err * 10**significant_digit) if error_digits < 4: significant_digit += 1 error_digits = round(err * 10**significant_digit) if notation == "bracket": value_rounded = "{{:.{}f}}({{}})".format(significant_digit).format(value, error_digits) return value_rounded elif notation == "separate": value_rounded = "{{:.{}f}}".format(significant_digit).format(value) err_rounded = "{{:.{}f}}".format(significant_digit).format(err) return value_rounded, err_rounded else: if notation == "bracket": return f"{value}(0)" elif notation == "separate": return value, err