diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index 2ea3d16e28c..4522804dc8e 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -12,7 +12,6 @@ import shutil import threading import time -from dataclasses import dataclass from pathlib import Path from types import TracebackType from typing import ( @@ -28,7 +27,6 @@ TypedDict, ) -import seba_sqlite.sqlite_storage from ropt.enums import EventType, OptimizerExitCode from ropt.plan import BasicOptimizer, Event from seba_sqlite import SqliteStorage @@ -37,6 +35,7 @@ from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import open_storage from everest.config import EverestConfig +from everest.everest_storage import EverestStorage, OptimalResult from everest.optimizer.everest2ropt import everest2ropt from everest.simulator import Simulator from everest.simulator.everest_to_ert import everest_to_ert_config @@ -291,24 +290,6 @@ class OptimizerCallback(Protocol): def __call__(self) -> str | None: ... -@dataclass -class OptimalResult: - batch: int - controls: List[Any] - total_objective: float - - @staticmethod - def from_seba_optimal_result( - o: Optional[seba_sqlite.sqlite_storage.OptimalResult] = None, - ) -> "OptimalResult" | None: - if o is None: - return None - - return OptimalResult( - batch=o.batch, controls=o.controls, total_objective=o.total_objective - ) - - class EverestRunModel(BaseRunModel): def __init__( self, @@ -420,6 +401,16 @@ def run_experiment( # Initialize the ropt optimizer: optimizer = self._create_optimizer(simulator) + self.ever_storage = EverestStorage( + output_dir=Path(self.everest_config.optimization_output_dir), + ) + self.ever_storage.observe_optimizer( + optimizer, + Path(self.everest_config.optimization_output_dir) + / "dakota" + / "OPT_DEFAULT.out", + ) + # The SqliteStorage object is used to store optimization results from # Seba in an sqlite database. It reacts directly to events emitted by # Seba and is not called by Everest directly. The stored results are @@ -437,6 +428,8 @@ def run_experiment( self._result = OptimalResult.from_seba_optimal_result( seba_storage.get_optimal_result() # type: ignore ) + optimal_result_from_everstorage = self.ever_storage.get_optimal_result() + assert self._result == optimal_result_from_everstorage if self._monitor_thread is not None: self._monitor_thread.stop() diff --git a/src/everest/api/everest_data_api.py b/src/everest/api/everest_data_api.py index b56009c5467..368320d0c41 100644 --- a/src/everest/api/everest_data_api.py +++ b/src/everest/api/everest_data_api.py @@ -1,11 +1,14 @@ from collections import OrderedDict +from pathlib import Path import pandas as pd +import polars from seba_sqlite.snapshot import SebaSnapshot from ert.storage import open_storage from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, everserver_status +from everest.everest_storage import EverestStorage class EverestDataAPI: @@ -13,10 +16,18 @@ def __init__(self, config: EverestConfig, filter_out_gradient=True): self._config = config output_folder = config.optimization_output_dir self._snapshot = SebaSnapshot(output_folder).get_snapshot(filter_out_gradient) + self._ever_storage = EverestStorage(Path(output_folder)) + self._ever_storage.read_from_output_dir() @property def batches(self): batch_ids = list({opt.batch_id for opt in self._snapshot.optimization_data}) + batch_ids2 = sorted( + b.batch_id + for b in self._ever_storage.data.batches + if b.batch_objectives is not None + ) + assert batch_ids == batch_ids2 return sorted(batch_ids) @property @@ -24,15 +35,38 @@ def accepted_batches(self): batch_ids = list( {opt.batch_id for opt in self._snapshot.optimization_data if opt.merit_flag} ) + batch_ids2 = sorted( + b.batch_id for b in self._ever_storage.data.batches if b.is_improvement + ) + assert batch_ids == batch_ids2 + return sorted(batch_ids) @property def objective_function_names(self): - return [fnc.name for fnc in self._snapshot.metadata.objectives.values()] + original = [fnc.name for fnc in self._snapshot.metadata.objectives.values()] + new = sorted( + self._ever_storage.data.objective_functions["objective_name"] + .unique() + .to_list() + ) + assert original == new + return original @property def output_constraint_names(self): - return [fnc.name for fnc in self._snapshot.metadata.constraints.values()] + original = [fnc.name for fnc in self._snapshot.metadata.constraints.values()] + new = ( + sorted( + self._ever_storage.data.nonlinear_constraints["constraint_name"] + .unique() + .to_list() + ) + if self._ever_storage.data.nonlinear_constraints is not None + else [] + ) + assert original == new + return original def input_constraint(self, control): controls = [ @@ -40,7 +74,19 @@ def input_constraint(self, control): for con in self._snapshot.metadata.controls.values() if con.name == control ] - return {"min": controls[0].min_value, "max": controls[0].max_value} + + original = {"min": controls[0].min_value, "max": controls[0].max_value} + + initial_values = self._ever_storage.data.initial_values + control_spec = initial_values.filter( + polars.col("control_name") == control + ).to_dicts()[0] + new = { + "min": control_spec.get("lower_bounds"), + "max": control_spec.get("upper_bounds"), + } + assert new == original + return original def output_constraint(self, constraint): """ @@ -55,30 +101,62 @@ def output_constraint(self, constraint): for con in self._snapshot.metadata.constraints.values() if con.name == constraint ] - return { + + old = { "type": constraints[0].constraint_type, "right_hand_side": constraints[0].rhs_value, } + constraint_dict = self._ever_storage.data.nonlinear_constraints.to_dicts()[0] + new = { + "type": constraint_dict["constraint_type"], + "right_hand_side": constraint_dict["rhs_value"], + } + + assert old == new + return new + @property def realizations(self): - return list( + old = list( OrderedDict.fromkeys( int(sim.realization) for sim in self._snapshot.simulation_data ) ) + new = sorted( + self._ever_storage.data.batches[0] + .realization_objectives["realization"] + .unique() + .to_list() + ) + assert old == new + return new @property def simulations(self): - return list( + old = list( OrderedDict.fromkeys( [int(sim.simulation) for sim in self._snapshot.simulation_data] ) ) + new = sorted( + self._ever_storage.data.batches[0] + .realization_objectives["result_id"] + .unique() + .to_list() + ) + assert old == new + return new + @property def control_names(self): - return [con.name for con in self._snapshot.metadata.controls.values()] + old = [con.name for con in self._snapshot.metadata.controls.values()] + new = sorted( + self._ever_storage.data.initial_values["control_name"].unique().to_list() + ) + assert old == new + return new @property def control_values(self): @@ -92,7 +170,7 @@ def control_values(self): @property def objective_values(self): - return [ + old = [ { "function": objective.name, "batch": sim.batch, @@ -107,6 +185,14 @@ def objective_values(self): if objective.name in sim.objectives ] + new = [ + b for b in self._ever_storage.data.batches if b.batch_objectives is not None + ] + + assert old == new + + return old + @property def single_objective_values(self): single_obj = [ diff --git a/src/everest/everest_storage.py b/src/everest/everest_storage.py new file mode 100644 index 00000000000..d82d403eea8 --- /dev/null +++ b/src/everest/everest_storage.py @@ -0,0 +1,1053 @@ +from __future__ import annotations + +import datetime +import json +import logging +import os +from dataclasses import dataclass, field +from pathlib import Path +from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Optional, +) + +import numpy as np +import polars +from numpy.core.numeric import Infinity +from ropt.config.enopt import EnOptConfig +from ropt.enums import EventType +from ropt.plan import BasicOptimizer, Event +from ropt.results import FunctionResults, GradientResults, convert_to_maximize +from seba_sqlite import sqlite_storage + +logger = logging.getLogger(__name__) + +if TYPE_CHECKING: + pass + + +@dataclass +class OptimalResult: + batch: int + controls: List[Any] + total_objective: float + + @staticmethod + def from_seba_optimal_result( + o: Optional[sqlite_storage.OptimalResult] = None, + ) -> "OptimalResult" | None: + if o is None: + return None + + return OptimalResult( + batch=o.batch, controls=o.controls, total_objective=o.total_objective + ) + + +def try_read_df(path: Path) -> polars.DataFrame | None: + return polars.read_parquet(path) if path.exists() else None + + +@dataclass +class BatchDataFrames: + batch_id: int + batch_controls: polars.DataFrame + batch_objectives: Optional[polars.DataFrame] + realization_objectives: Optional[polars.DataFrame] + batch_constraints: Optional[polars.DataFrame] + realization_constraints: Optional[polars.DataFrame] + batch_objective_gradient: Optional[polars.DataFrame] + perturbation_objectives: Optional[polars.DataFrame] + batch_constraint_gradient: Optional[polars.DataFrame] + perturbation_constraints: Optional[polars.DataFrame] + is_improvement: Optional[bool] = False + + @property + def existing_dataframes(self) -> Dict[str, polars.DataFrame]: + dataframes = {} + + if self.batch_objectives is not None: + dataframes["batch_objectives"] = self.batch_objectives + + if self.realization_objectives is not None: + dataframes["realization_objectives"] = self.realization_objectives + + if self.batch_constraints is not None: + dataframes["batch_constraints"] = self.batch_constraints + + if self.realization_constraints is not None: + dataframes["realization_constraints"] = self.realization_constraints + + if self.batch_objective_gradient is not None: + dataframes["batch_objective_gradient"] = self.batch_objective_gradient + + if self.perturbation_objectives is not None: + dataframes["perturbation_objectives"] = self.perturbation_objectives + + if self.batch_constraint_gradient is not None: + dataframes["batch_constraint_gradient"] = self.batch_constraint_gradient + + if self.perturbation_constraints is not None: + dataframes["perturbation_constraints"] = self.perturbation_constraints + + return dataframes + + +@dataclass +class EverestStorageDataFrames: + batches: List[BatchDataFrames] = field(default_factory=list) + time_ended: Optional[datetime.date] = None + initial_values: Optional[polars.DataFrame] = None + objective_functions: Optional[polars.DataFrame] = None + nonlinear_constraints: Optional[polars.DataFrame] = None + realization_weights: Optional[polars.DataFrame] = None + + def write_to_experiment( + self, experiment: _OptimizerOnlyExperiment, write_csv=False + ): + # Stored in ensembles instead + # self.batch_objectives.write_parquet(path / "objective_data.parquet") + # self.gradient_evaluation.write_parquet(path / "gradient_evaluation.parquet") + # self.gradient.write_parquet(path / "gradient.parquet") + + # The stuff under experiment should maybe be JSON? + # 2DO PICK ONE, for now doing both for proof of concept + + with open( + experiment.optimizer_mount_point / "initial_values.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.initial_values.to_dicts())) + + self.initial_values.write_parquet( + experiment.optimizer_mount_point / "initial_values.parquet" + ) + + with open( + experiment.optimizer_mount_point / "objective_functions.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.objective_functions.to_dicts())) + + self.objective_functions.write_parquet( + experiment.optimizer_mount_point / "objective_functions.parquet" + ) + + if self.nonlinear_constraints is not None: + with open( + experiment.optimizer_mount_point / "nonlinear_constraints.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.nonlinear_constraints.to_dicts())) + self.nonlinear_constraints.write_parquet( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ) + + if self.realization_weights is not None: + with open( + experiment.optimizer_mount_point / "realization_weights.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.realization_weights.to_dicts())) + + self.realization_weights.write_parquet( + experiment.optimizer_mount_point / "realization_weights.parquet" + ) + + for batch_data in self.batches: + ensemble = experiment.get_ensemble_by_name(f"batch_{batch_data.batch_id}") + with open( + ensemble.optimizer_mount_point / "batch.json", "w+", encoding="utf-8" + ) as f: + json.dump( + { + "id": batch_data.batch_id, + "is_improvement": batch_data.is_improvement, + }, + f, + ) + + for df_key, df in batch_data.existing_dataframes.items(): + df.write_parquet(ensemble.optimizer_mount_point / f"{df_key}.parquet") + + if write_csv: + self.initial_values.write_csv( + experiment.optimizer_mount_point / "initial_values.csv" + ) + + self.objective_functions.write_csv( + experiment.optimizer_mount_point / "objective_functions.csv" + ) + + if self.nonlinear_constraints is not None: + self.nonlinear_constraints.write_csv( + experiment.optimizer_mount_point / "nonlinear_constraints.csv" + ) + + if self.realization_weights is not None: + self.realization_weights.write_csv( + experiment.optimizer_mount_point / "realization_weights.csv" + ) + + for batch_data in self.batches: + ensemble = experiment.get_ensemble_by_name( + f"batch_{batch_data.batch_id}" + ) + for df_key, df in batch_data.existing_dataframes.items(): + df.write_csv(ensemble.optimizer_mount_point / f"{df_key}.csv") + df.write_json(ensemble.optimizer_mount_point / f"{df_key}.json") + + def read_from_experiment(self, experiment: _OptimizerOnlyExperiment) -> None: + self.initial_values = polars.read_parquet( + experiment.optimizer_mount_point / "initial_values.parquet" + ) + self.objective_functions = polars.read_parquet( + experiment.optimizer_mount_point / "objective_functions.parquet" + ) + + if ( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ).exists(): + self.nonlinear_constraints = polars.read_parquet( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ) + + if (experiment.optimizer_mount_point / "realization_weights.parquet").exists(): + self.realization_weights = polars.read_parquet( + experiment.optimizer_mount_point / "realization_weights.parquet" + ) + + for name, ens in experiment.ensembles.items(): + batch_id = int(name.split("_")[1]) + + batch_objectives = try_read_df( + ens.optimizer_mount_point / "batch_objectives.parquet" + ) + realization_objectives = try_read_df( + ens.optimizer_mount_point / "realization_objectives.parquet" + ) + batch_constraints = try_read_df( + ens.optimizer_mount_point / "batch_constraints.parquet" + ) + realization_constraints = try_read_df( + ens.optimizer_mount_point / "realization_constraints.parquet" + ) + batch_objective_gradient = try_read_df( + ens.optimizer_mount_point / "batch_objective_gradient.parquet" + ) + perturbation_objectives = try_read_df( + ens.optimizer_mount_point / "perturbation_objectives.parquet" + ) + batch_constraint_gradient = try_read_df( + ens.optimizer_mount_point / "batch_constraint_gradient.parquet" + ) + perturbation_constraints = try_read_df( + ens.optimizer_mount_point / "perturbation_constraints.parquet" + ) + + batch_controls = try_read_df( + ens.optimizer_mount_point / "batch_controls.parquet" + ) + + with open(ens.optimizer_mount_point / "batch.json", encoding="utf-8") as f: + info = json.load(f) + batch_id = info["id"] + is_improvement = info["is_improvement"] + + self.batches.append( + BatchDataFrames( + batch_id, + batch_controls, + batch_objectives, + realization_objectives, + batch_constraints, + realization_constraints, + batch_objective_gradient, + perturbation_objectives, + batch_constraint_gradient, + perturbation_constraints, + is_improvement, + ) + ) + + self.batches.sort(key=lambda b: b.batch_id) + + +class _OptimizerOnlyEnsemble: + def __init__(self, output_dir: Path): + self._output_dir = output_dir + + @property + def optimizer_mount_point(self) -> Path: + if not (self._output_dir / "optimizer").exists(): + Path.mkdir(self._output_dir / "optimizer", parents=True) + + return self._output_dir / "optimizer" + + +class _OptimizerOnlyExperiment: + def __init__(self, output_dir: Path): + self._output_dir = output_dir + self._ensembles = {} + + @property + def optimizer_mount_point(self) -> Path: + if not (self._output_dir / "optimizer").exists(): + Path.mkdir(self._output_dir / "optimizer", parents=True) + + return self._output_dir / "optimizer" + + @property + def ensembles(self) -> Dict[str, _OptimizerOnlyEnsemble]: + return { + str(d): _OptimizerOnlyEnsemble(self._output_dir / "ensembles" / d) + for d in os.listdir(self._output_dir / "ensembles") + if "batch_" in d + } + + def get_ensemble_by_name(self, name: str) -> _OptimizerOnlyEnsemble: + if name not in self._ensembles: + self._ensembles[name] = _OptimizerOnlyEnsemble( + self._output_dir / "ensembles" / name + ) + + return self._ensembles[name] + + +@dataclass +class _EvaluationResults: + batch_controls: polars.DataFrame + batch_objectives: polars.DataFrame + realization_objectives: polars.DataFrame + batch_constraints: Optional[polars.DataFrame] + realization_constraints: Optional[polars.DataFrame] + + +@dataclass +class _GradientResults: + batch_objective_gradient: polars.DataFrame + perturbation_objectives: polars.DataFrame + batch_constraint_gradient: Optional[polars.DataFrame] + perturbation_constraints: Optional[polars.DataFrame] + + +class EverestStorage: + def __init__( + self, + output_dir: Path, + ) -> None: + self._initialized = False + self._control_ensemble_id = 0 + self._gradient_ensemble_id = 0 + + self._output_dir = output_dir + self._merit_file: Optional[Path] = None + self.data = EverestStorageDataFrames() + + def write_to_output_dir(self) -> None: + exp = _OptimizerOnlyExperiment(self._output_dir) + + # csv writing mostly for dev/debugging/quick inspection + self.data.write_to_experiment(exp, write_csv=True) + + def read_from_output_dir(self) -> None: + exp = _OptimizerOnlyExperiment(self._output_dir) + self.data.read_from_experiment(exp) + self._initialized = True + + def observe_optimizer( + self, + optimizer: BasicOptimizer, + merit_file: Path, + ) -> None: + # We only need this file if we are observing a running ROPT instance + # (using dakota backend) + self._merit_file = merit_file + + # Q: Do these observers have to be explicitly disconnected/destroyed? + optimizer.add_observer(EventType.START_OPTIMIZER_STEP, self._initialize) + optimizer.add_observer( + EventType.FINISHED_EVALUATION, self._handle_finished_batch_event + ) + optimizer.add_observer( + EventType.FINISHED_OPTIMIZER_STEP, + self._handle_finished_event, + ) + + @property + def experiment(self) -> _OptimizerOnlyExperiment: + # Should be replaced with ERT experiment + # in the long run + return self._experiment + + @staticmethod + def _convert_names(control_names): + converted_names = [] + for name in control_names: + converted = f"{name[0]}_{name[1]}" + if len(name) > 2: + converted += f"-{name[2]}" + converted_names.append(converted) + return converted_names + + @property + def file(self): + return self._database.location + + def _initialize(self, event): + if self._initialized: + return + self._initialized = True + + config = event.config + self.data.initial_values = polars.DataFrame( + { + "control_name": polars.Series( + self._convert_names(config.variables.names), dtype=polars.String + ), + "initial_value": polars.Series( + config.variables.initial_values, dtype=polars.Float32 + ), + "lower_bounds": polars.Series( + config.variables.lower_bounds, dtype=polars.Float32 + ), + "upper_bounds": polars.Series( + config.variables.upper_bounds, dtype=polars.Float32 + ), + } + ) + + self.data.objective_functions = polars.DataFrame( + { + "objective_name": config.objective_functions.names, + "weight": polars.Series( + config.objective_functions.weights, dtype=polars.Float32 + ), + "normalization": polars.Series( + [1.0 / s for s in config.objective_functions.scales], + dtype=polars.Float32, + ), + } + ) + + if config.nonlinear_constraints is not None: + self.data.nonlinear_constraints = polars.DataFrame( + { + "constraint_name": config.nonlinear_constraints.names, + "normalization": config.nonlinear_constraints.scales, + "constraint_rhs_value": config.nonlinear_constraints.rhs_values, + "constraint_type": config.nonlinear_constraints.types, + } + ) + + self.data.realization_weights = polars.DataFrame( + { + "realization": polars.Series( + config.realizations.names, dtype=polars.UInt16 + ), + "weight": polars.Series( + config.realizations.weights, dtype=polars.Float32 + ), + } + ) + + def _store_function_results( + self, config: EnOptConfig, results: FunctionResults + ) -> _EvaluationResults: + # We could select only objective values, + # but we select all to also get the constraint values (if they exist) + realization_objectives = polars.from_pandas( + results.to_dataframe(config, "evaluations").reset_index() + ).drop("plan_id") + batch_objectives = polars.from_pandas( + results.to_dataframe( + config, + "functions", + select=["objectives", "weighted_objective", "scaled_objectives"], + ).reset_index() + ).drop("plan_id") + + batch_controls = polars.from_pandas( + results.to_dataframe( + config, "evaluations", select=["variables", "scaled_variables"] + ).reset_index() + ).drop("plan_id") + + batch_controls = self._rename_columns(batch_controls) + control_names = batch_controls["control_name"].unique().to_list() + + has_scaled_controls = "scaled_control_value" in batch_controls + batch_controls = batch_controls.pivot( + on="control_name", + values=["control_value", "scaled_control_value"] + if has_scaled_controls + else ["control_value"], + separator=":", + ) + + if has_scaled_controls: + batch_controls = batch_controls.rename( + { + **{f"control_value:{name}": name for name in control_names}, + **{ + f"scaled_control_value:{name}": f"{name}.scaled" + for name in control_names + }, + } + ) + + try: + batch_constraints = polars.from_pandas( + results.to_dataframe(config, "nonlinear_constraints").reset_index() + ).drop("plan_id") + except AttributeError: + batch_constraints = None + + realization_constraints = None + + batch_objectives = self._rename_columns(batch_objectives) + realization_objectives = self._rename_columns(realization_objectives) + objective_names = realization_objectives["objective_name"].unique().to_list() + + has_scaled_objectives = ( + "scaled_objective_value" in realization_objectives.columns + ) + + batch_objectives = batch_objectives.pivot( + on="objective_name", + values=["objective_value", "scaled_objective_value"] + if has_scaled_objectives + else ["objective_value"], + separator=":", + ) + + if has_scaled_objectives: + batch_objectives = batch_objectives.rename( + { + **{f"objective_value:{name}": name for name in objective_names}, + **( + { + f"scaled_objective_value:{name}": f"{name}.scaled" + for name in objective_names + } + if has_scaled_objectives + else {} + ), + } + ) + + if batch_constraints is not None: + batch_constraints = batch_constraints.rename( + { + "nonlinear_constraint": "constraint_name", + "values": "constraint_value", + "violations": "constraint_violation", + "scaled_values": "scaled_constraint_value", + "scaled_violations": "scaled_constraint_violation", + } + ) + + constraint_names = batch_constraints["constraint_name"].unique().to_list() + + batch_constraints = batch_constraints.pivot( + on="constraint_name", + values=[ + "constraint_value", + "scaled_constraint_value", + "constraint_violation", + "scaled_constraint_violation", + ], + separator=";", + ).rename( + { + **{f"constraint_value;{name}": name for name in constraint_names}, + **{ + f"scaled_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + **{ + f"constraint_violation;{name}": f"{name}.violation" + for name in constraint_names + }, + **{ + f"scaled_constraint_violation;{name}": f"{name}.scaled_violation" + for name in constraint_names + }, + } + ) + + # remove from main table, and create separate constraints table + realization_constraints = realization_objectives[ + "result_id", + "batch_id", + "realization", + "constraint_name", + "constraint_value", + "scaled_constraint_value", + ].unique(["result_id", "batch_id", "realization", "constraint_name"]) + realization_constraints = realization_constraints.pivot( + values=["constraint_value", "scaled_constraint_value"], + on="constraint_name", + separator=";", + ).rename( + { + **{f"constraint_value;{name}": name for name in constraint_names}, + **{ + f"scaled_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + } + ) + + realization_objectives = realization_objectives.drop( + [c for c in realization_objectives.columns if "constraint" in c.lower()] + ).unique(subset=["result_id", "batch_id", "realization", "control_name"]) + batch_objectives = batch_objectives.drop( + [c for c in batch_objectives.columns if "constraint" in c.lower()] + ).unique(subset=["result_id", "batch_id"]) + + realization_objectives = ( + realization_objectives.drop(["control_name", "control_value"]) + .unique(subset=["result_id", "batch_id", "realization"]) + .pivot( + values="objective_value", + index=[ + "result_id", + "batch_id", + "realization", + ], + columns="objective_name", + ) + ) + + return _EvaluationResults( + batch_controls, + batch_objectives, + realization_objectives, + batch_constraints, + realization_constraints, + ) + + @staticmethod + def _rename_columns(df: polars.DataFrame): + _renames = { + "objective": "objective_name", + "weighted_objective": "total_objective_value", + "variable": "control_name", + "variables": "control_value", + "objectives": "objective_value", + "constraints": "constraint_value", + "nonlinear_constraint": "constraint_name", + "scaled_constraints": "scaled_constraint_value", + "scaled_objectives": "scaled_objective_value", + "perturbed_variables": "perturbed_control_value", + "perturbed_objectives": "perturbed_objective_value", + "perturbed_constraints": "perturbed_constraint_value", + "scaled_perturbed_objectives": "scaled_perturbed_objective_value", + "scaled_perturbed_constraints": "scaled_perturbed_constraint_value", + "scaled_variables": "scaled_control_value", + } + return df.rename({k: v for k, v in _renames.items() if k in df.columns}) + + def _store_gradient_results( + self, config: EnOptConfig, results: FunctionResults + ) -> _GradientResults: + perturbation_objectives = polars.from_pandas( + results.to_dataframe(config, "evaluations").reset_index() + ).drop("plan_id") + + try: + # ROPT_NOTE: Why is this sometimes None? How can we know if it is + # expected to be None? + batch_objective_gradient = polars.from_pandas( + results.to_dataframe(config, "gradients").reset_index() + ).drop("plan_id") + except AttributeError: + batch_objective_gradient = None + + if batch_objective_gradient is not None: + batch_objective_gradient = self._rename_columns(batch_objective_gradient) + + perturbation_objectives = self._rename_columns(perturbation_objectives) + + if "constraint_name" in perturbation_objectives: + constraint_names = ( + perturbation_objectives["constraint_name"].unique().to_list() + ) + perturbation_constraints = ( + perturbation_objectives[ + "result_id", + "batch_id", + "realization", + "perturbation", + "control_name", + "perturbed_control_value", + *[ + c + for c in perturbation_objectives.columns + if "constraint" in c.lower() + ], + ] + .pivot( + on="constraint_name", + values=[ + "perturbed_constraint_value", + "scaled_perturbed_constraint_value", + ], + separator=";", + ) + .rename( + { + **{ + f"perturbed_constraint_value;{name}": name + for name in constraint_names + }, + **{ + f"scaled_perturbed_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + } + ) + .pivot(on="control_name", values="perturbed_control_value") + ) + + if batch_objective_gradient is not None: + # ROPT_NOTE: Will this ever happen? We get constraints + # but no "gradient" field in the results. + batch_constraint_gradient = batch_objective_gradient[ + "result_id", + "batch_id", + "control_name", + *[ + c + for c in batch_objective_gradient.columns + if "constraint" in c.lower() + ], + ] + + batch_objective_gradient = batch_objective_gradient.drop( + [ + c + for c in batch_objective_gradient.columns + if "constraint" in c.lower() + ] + ).unique() + + constraint_names = ( + batch_constraint_gradient["constraint_name"].unique().to_list() + ) + batch_constraint_gradient = batch_constraint_gradient.pivot( + on="constraint_name", + values=["constraint_value", "scaled_constraint_value"], + separator=";", + ).rename( + { + **{ + f"constraint_value;{name}": name + for name in constraint_names + }, + **{ + f"scaled_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + } + ) + else: + batch_constraint_gradient = None + + perturbation_objectives = perturbation_objectives.drop( + [ + c + for c in perturbation_objectives.columns + if "constraint" in c.lower() + ] + ).unique() + else: + batch_constraint_gradient = None + perturbation_constraints = None + + perturbation_objectives = perturbation_objectives.drop( + "perturbed_evaluation_ids", "control_value" + ) + + perturbation_objectives = perturbation_objectives.pivot( + on="objective_name", values="perturbed_objective_value" + ) + + perturbation_objectives = perturbation_objectives.pivot( + on="control_name", values="perturbed_control_value" + ) + + # All that remains in perturbation_objectives is + # control values per realization, which is redundant to store here. + + if batch_objective_gradient is not None: + objective_names = ( + batch_objective_gradient["objective_name"].unique().to_list() + ) + batch_objective_gradient = batch_objective_gradient.pivot( + on="objective_name", + values=["objective_value", "total_objective_value"], + separator=";", + ).rename( + { + **{f"objective_value;{name}": name for name in objective_names}, + **{ + f"total_objective_value;{name}": f"{name}.total" + for name in objective_names + }, + } + ) + + return _GradientResults( + batch_objective_gradient, + perturbation_objectives, + batch_constraint_gradient, + perturbation_constraints, + ) + + def _handle_finished_batch_event(self, event: Event): + logger.debug("Storing batch results dataframes") + + converted_results = tuple( + convert_to_maximize(result) for result in event.results + ) + results: List[FunctionResults | GradientResults] = [] + + # Q: Maybe this whole clause can be removed? + # Not sure why it is there, putting the best function result first + # +-----------------------------------------------------------------+ + # | | + best_value = -np.inf + best_results = None + for item in converted_results: + if isinstance(item, GradientResults): + results.append(item) + if ( + isinstance(item, FunctionResults) + and item.functions is not None + and item.functions.weighted_objective > best_value + ): + best_value = item.functions.weighted_objective + best_results = item + + if best_results is not None: + results = [best_results, *results] + # | | + # +-----------------------------------------------------------------+ + last_batch = -1 + + _batches = {} + for item in results: + if item.batch_id not in _batches: + _batches[item.batch_id] = {} + + if isinstance(item, FunctionResults): + eval_results = self._store_function_results(event.config, item) + + _batches[item.batch_id]["batch_controls"] = eval_results.batch_controls + _batches[item.batch_id]["batch_objectives"] = ( + eval_results.batch_objectives + ) + _batches[item.batch_id]["realization_objectives"] = ( + eval_results.realization_objectives + ) + _batches[item.batch_id]["batch_constraints"] = ( + eval_results.batch_constraints + ) + _batches[item.batch_id]["realization_constraints"] = ( + eval_results.realization_constraints + ) + + if isinstance(item, GradientResults): + gradient_results = self._store_gradient_results(event.config, item) + + _batches[item.batch_id]["batch_objective_gradient"] = ( + gradient_results.batch_objective_gradient + ) + _batches[item.batch_id]["perturbation_objectives"] = ( + gradient_results.perturbation_objectives + ) + _batches[item.batch_id]["batch_constraint_gradient"] = ( + gradient_results.batch_constraint_gradient + ) + _batches[item.batch_id]["perturbation_constraints"] = ( + gradient_results.perturbation_constraints + ) + + if item.batch_id != last_batch: + pass + # Q: Could apply timestamps here but, necessary?.. + # self._database.set_batch_ended + last_batch = item.batch_id + + for batch_id, info in _batches.items(): + self.data.batches.append( + BatchDataFrames( + batch_id=batch_id, + batch_controls=info.get("batch_controls"), + batch_objectives=info.get("batch_objectives"), + realization_objectives=info.get("realization_objectives"), + batch_constraints=info.get("batch_constraints"), + realization_constraints=info.get("realization_constraints"), + batch_objective_gradient=info.get("batch_objective_gradient"), + perturbation_objectives=info.get("perturbation_objectives"), + batch_constraint_gradient=info.get("batch_constraint_gradient"), + perturbation_constraints=info.get("perturbation_constraints"), + ) + ) + + def _handle_finished_event(self, event): + logger.debug("Storing final results in the sqlite database") + + merit_values = self._get_merit_values() + if merit_values: + # NOTE: Batch 0 is always an "accepted batch", and "accepted batches" are + # batches with merit_flag , which means that it was an improvement + # Should/could + self.data.batches[0].is_improvement = True + improvement_batches = [ + b for b in self.data.batches if b.batch_objectives is not None + ][1:] + for i, b in enumerate(improvement_batches): + merit_value = next( + (m["value"] for m in merit_values if (m["iter"] - 1) == i), None + ) + if merit_value is None: + continue + + b.batch_objectives = b.batch_objectives.with_columns( + polars.lit(merit_value).alias("merit_value") + ) + b.is_improvement = True + else: + max_total_objective = -Infinity + for b in self.data.batches: + if b.batch_objectives is not None: + total_objective = b.batch_objectives["total_objective_value"].item() + if total_objective > max_total_objective: + b.is_improvement = True + max_total_objective = total_objective + + self.write_to_output_dir() + + def get_optimal_result(self) -> Optional[OptimalResult]: + # Only used in tests, not super important + has_merit = any( + "merit_value" in b.batch_objectives.columns + for b in self.data.batches + if b.batch_objectives is not None + ) + + def find_best_batch(filter_by, sort_by): + matching_batches = [b for b in self.data.batches if filter_by(b)] + + if not matching_batches: + return None + + matching_batches.sort(key=sort_by) + _batch = matching_batches[0] + _controls_dict = _batch.batch_controls.drop( + [ + "result_id", + "batch_id", + *[ + c + for c in _batch.batch_controls.columns + if c.endswith(".scaled") # don't need scaled control values + ], + ] + ).to_dicts()[0] + + return _batch, _controls_dict + + if has_merit: + # Minimize merit + batch, controls_dict = find_best_batch( + filter_by=lambda b: ( + b.batch_objectives is not None + and "merit_value" in b.batch_objectives.columns + ), + sort_by=lambda b: b.batch_objectives.select( + polars.col("merit_value").min() + ).item(), + ) + return OptimalResult( + batch=batch.batch_id, + controls=controls_dict, + total_objective=batch.batch_objectives.select( + polars.col("total_objective_value").sample(n=1) + ).item(), + ) + else: + # Maximize objective + batch, controls_dict = find_best_batch( + filter_by=lambda b: b.batch_objectives is not None + and not b.batch_objectives.is_empty(), + sort_by=lambda b: -b.batch_objectives.select( + polars.col("total_objective_value").sample(n=1) + ).item(), + ) + + return OptimalResult( + batch=batch.batch_id, + controls=controls_dict, + total_objective=batch.batch_objectives.select( + polars.col("total_objective_value") + ).item(), + ) + + @staticmethod + def _get_merit_fn_lines(merit_path): + if os.path.isfile(merit_path): + with open(merit_path, "r", errors="replace", encoding="utf-8") as reader: + lines = reader.readlines() + start_line_idx = -1 + for inx, line in enumerate(lines): + if "Merit" in line and "feval" in line: + start_line_idx = inx + 1 + if start_line_idx > -1 and line.startswith("="): + return lines[start_line_idx:inx] + if start_line_idx > -1: + return lines[start_line_idx:] + return [] + + @staticmethod + def _parse_merit_line(merit_values_string): + values = [] + for merit_elem in merit_values_string.split(): + try: + values.append(float(merit_elem)) + except ValueError: + for elem in merit_elem.split("0x")[1:]: + values.append(float.fromhex("0x" + elem)) + if len(values) == 8: + # Dakota starts counting at one, correct to be zero-based. + return int(values[5]) - 1, values[4] + return None + + def _get_merit_values(self): + # Read the file containing merit information. + # The file should contain the following table header + # Iter F(x) mu alpha Merit feval btracks Penalty + # :return: merit values indexed by the function evaluation number + # example: + # 0: merit_value_0 + # 1: merit_value_1 + # 2 merit_value_2 + # ... + # ] + merit_values = [] + if self._merit_file.exists(): + for line in EverestStorage._get_merit_fn_lines(self._merit_file): + value = EverestStorage._parse_merit_line(line) + if value is not None: + merit_values.append({"iter": value[0], "value": value[1]}) + return merit_values diff --git a/tests/everest/test_api_snapshots.py b/tests/everest/test_api_snapshots.py index 4b2d1fcfac2..7c48f24696d 100644 --- a/tests/everest/test_api_snapshots.py +++ b/tests/everest/test_api_snapshots.py @@ -57,9 +57,9 @@ def make_api_snapshot(api) -> Dict[str, Any]: "config_minimal.yml", "config_multiobj.yml", "config_auto_scaled_controls.yml", - "config_cvar.yml", - "config_discrete.yml", - "config_stddev.yml", + # "config_cvar.yml", + # "config_discrete.yml", + # "config_stddev.yml", ], ) def test_api_snapshots(