"""Functionality for managing DOE campaigns. Main point of interaction via Python."""
from __future__ import annotations
import gc
import json
from collections.abc import Callable, Collection, Sequence
from functools import reduce
from typing import TYPE_CHECKING, Any
import cattrs
import numpy as np
import pandas as pd
from attrs import Attribute, Factory, define, evolve, field, fields
from attrs.converters import optional
from attrs.validators import instance_of
from typing_extensions import override
from baybe.constraints.base import DiscreteConstraint
from baybe.exceptions import (
IncompatibilityError,
NoMeasurementsError,
NotEnoughPointsLeftError,
)
from baybe.objectives.base import Objective, to_objective
from baybe.parameters.base import Parameter
from baybe.recommenders.base import RecommenderProtocol
from baybe.recommenders.meta.base import MetaRecommender
from baybe.recommenders.meta.sequential import TwoPhaseMetaRecommender
from baybe.recommenders.pure.bayesian.base import BayesianRecommender
from baybe.recommenders.pure.nonpredictive.base import NonPredictiveRecommender
from baybe.searchspace._filtered import FilteredSubspaceDiscrete
from baybe.searchspace.core import (
SearchSpace,
SearchSpaceType,
to_searchspace,
validate_searchspace_from_config,
)
from baybe.serialization import SerialMixin, converter
from baybe.surrogates.base import PosteriorStatistic, SurrogateProtocol
from baybe.targets.base import Target
from baybe.utils.basic import UNSPECIFIED, UnspecifiedType, is_all_instance
from baybe.utils.boolean import eq_dataframe
from baybe.utils.conversion import to_string
from baybe.utils.dataframe import (
_ValidatedDataFrame,
filter_df,
fuzzy_row_match,
normalize_input_dtypes,
)
from baybe.utils.validation import (
validate_object_names,
validate_objective_input,
validate_parameter_input,
validate_target_input,
)
if TYPE_CHECKING:
from botorch.acquisition import AcquisitionFunction
from botorch.posteriors import Posterior
# Metadata columns
_RECOMMENDED = "recommended"
_MEASURED = "measured"
_EXCLUDED = "excluded"
_METADATA_COLUMNS = [_RECOMMENDED, _MEASURED, _EXCLUDED]
def _make_allow_flag_default_factory(
default: bool,
) -> Callable[[Campaign], bool | UnspecifiedType]:
"""Make a default factory for allow_* flags."""
def default_allow_flag(campaign: Campaign) -> bool | UnspecifiedType:
"""Attrs-compatible default factory for allow_* flags."""
if campaign.searchspace.type is SearchSpaceType.DISCRETE:
return default
return UNSPECIFIED
return default_allow_flag
def _validate_allow_flag(campaign: Campaign, attribute: Attribute, value: Any) -> None:
"""Attrs-compatible validator for context-aware validation of allow_* flags."""
match campaign.searchspace.type:
case SearchSpaceType.DISCRETE:
if not isinstance(value, bool):
raise ValueError(
f"For search spaces of '{SearchSpaceType.DISCRETE}', "
f"'{attribute.name}' must be a Boolean."
)
case _:
if value is not UNSPECIFIED:
raise ValueError(
f"For search spaces of type other than "
f"'{SearchSpaceType.DISCRETE}', '{attribute.name}' cannot be set "
f"since the flag is meaningless in such contexts.",
)
[docs]
@define
class Campaign(SerialMixin):
"""Main class for interaction with BayBE.
Campaigns define and record an experimentation process, i.e. the execution of a
series of measurements and the iterative sequence of events involved.
In particular, a campaign:
* Defines the objective of an experimentation process.
* Defines the search space over which the experimental parameter may vary.
* Defines a recommender for exploring the search space.
* Records the measurement data collected during the process.
* Records metadata about the progress of the experimentation process.
"""
# DOE specifications
searchspace: SearchSpace = field(converter=to_searchspace)
"""The search space in which the experiments are conducted.
When passing a :class:`baybe.parameters.base.Parameter`,
a :class:`baybe.searchspace.discrete.SubspaceDiscrete`, or a
a :class:`baybe.searchspace.continuous.SubspaceContinuous`, conversion to
:class:`baybe.searchspace.core.SearchSpace` is automatically applied."""
objective: Objective | None = field(default=None, converter=optional(to_objective))
"""The optimization objective.
When passing a :class:`baybe.targets.base.Target`, conversion to
:class:`baybe.objectives.single.SingleTargetObjective` is automatically applied."""
@objective.validator
def _validate_objective( # noqa: DOC101, DOC103
self, _: Any, value: Objective | None
) -> None:
"""Validate no overlapping names between targets and parameters."""
if value is None:
return
validate_object_names(self.searchspace.parameters + value.targets)
recommender: RecommenderProtocol = field(
factory=TwoPhaseMetaRecommender,
validator=instance_of(RecommenderProtocol),
)
"""The employed recommender"""
allow_recommending_already_measured: bool | UnspecifiedType = field(
default=Factory(
_make_allow_flag_default_factory(default=True), takes_self=True
),
validator=_validate_allow_flag,
kw_only=True,
)
"""Allow to recommend experiments that were already measured earlier.
Can only be set for discrete search spaces."""
allow_recommending_already_recommended: bool | UnspecifiedType = field(
default=Factory(
_make_allow_flag_default_factory(default=False), takes_self=True
),
validator=_validate_allow_flag,
kw_only=True,
)
"""Allow to recommend experiments that were already recommended earlier.
Can only be set for discrete search spaces."""
allow_recommending_pending_experiments: bool | UnspecifiedType = field(
default=Factory(
_make_allow_flag_default_factory(default=False), takes_self=True
),
validator=_validate_allow_flag,
kw_only=True,
)
"""Allow pending experiments to be part of the recommendations.
Can only be set for discrete search spaces."""
# Metadata
_searchspace_metadata: pd.DataFrame = field(init=False, eq=eq_dataframe)
"""Metadata tracking the experimentation status of the search space."""
n_batches_done: int = field(default=0, init=False)
"""The number of already processed batches."""
n_fits_done: int = field(default=0, init=False)
"""The number of fits already done."""
# Private
_measurements_exp: pd.DataFrame = field(
factory=pd.DataFrame, eq=eq_dataframe, init=False
)
"""The experimental representation of the conducted experiments."""
_cached_recommendation: pd.DataFrame = field(
factory=pd.DataFrame, eq=eq_dataframe, init=False
)
"""The cached recommendations."""
@_searchspace_metadata.default
def _default_searchspace_metadata(self) -> pd.DataFrame:
"""Create a fresh metadata object."""
df = pd.DataFrame(
False,
index=self.searchspace.discrete.exp_rep.index,
columns=_METADATA_COLUMNS,
)
return df
@override
def __str__(self) -> str:
recommended_count = sum(self._searchspace_metadata[_RECOMMENDED])
measured_count = sum(self._searchspace_metadata[_MEASURED])
excluded_count = sum(self._searchspace_metadata[_EXCLUDED])
n_elements = len(self._searchspace_metadata)
searchspace_fields = [
to_string(
"Recommended:",
f"{recommended_count}/{n_elements}",
single_line=True,
),
to_string(
"Measured:",
f"{measured_count}/{n_elements}",
single_line=True,
),
to_string(
"Excluded:",
f"{excluded_count}/{n_elements}",
single_line=True,
),
]
metadata_fields = [
to_string("Batches done", self.n_batches_done, single_line=True),
to_string("Fits done", self.n_fits_done, single_line=True),
to_string("Discrete Subspace Meta Data", *searchspace_fields),
]
metadata = to_string("Meta Data", *metadata_fields)
fields = [metadata, self.searchspace, self.objective, self.recommender]
return to_string(self.__class__.__name__, *fields)
@property
def measurements(self) -> pd.DataFrame:
"""The experimental data added to the Campaign."""
return self._measurements_exp
@property
def parameters(self) -> tuple[Parameter, ...]:
"""The parameters of the underlying search space."""
return self.searchspace.parameters
@property
def targets(self) -> tuple[Target, ...]:
"""The targets of the underlying objective."""
return self.objective.targets if self.objective is not None else ()
[docs]
@classmethod
def from_config(cls, config_json: str) -> Campaign:
"""Create a campaign from a configuration JSON.
Args:
config_json: The string with the configuration JSON.
Returns:
The constructed campaign.
"""
config = json.loads(config_json)
return converter.structure(config, Campaign)
[docs]
@classmethod
def validate_config(cls, config_json: str) -> None:
"""Validate a given campaign configuration JSON.
Args:
config_json: The JSON that should be validated.
"""
config = json.loads(config_json)
_validation_converter.structure(config, Campaign)
[docs]
def add_measurements(
self,
data: pd.DataFrame,
numerical_measurements_must_be_within_tolerance: bool = True,
) -> None:
"""Add results from a dataframe to the internal database.
Each addition of data is considered a new batch. Added results are checked for
validity. Categorical values need to have an exact match. For numerical values,
a campaign flag determines if values that lie outside a specified tolerance
are accepted. Possible validation exceptions are documented in
:func:`baybe.utils.validation.validate_target_input` and
:func:`baybe.utils.validation.validate_parameter_input`.
Args:
data: The data to be added (with filled values for targets). Preferably
created via :func:`baybe.campaign.Campaign.recommend`.
numerical_measurements_must_be_within_tolerance: Flag indicating if
numerical parameters need to be within their tolerances.
"""
# Invalidate recommendation cache first (in case of uncaught exceptions below)
self._cached_recommendation = pd.DataFrame()
# Validate target and parameter input values
validate_target_input(data, self.targets)
if self.objective is not None:
validate_objective_input(data, self.objective)
validate_parameter_input(
data, self.parameters, numerical_measurements_must_be_within_tolerance
)
data = normalize_input_dtypes(data, self.parameters + self.targets)
data.__class__ = _ValidatedDataFrame
# Read in measurements and add them to the database
self.n_batches_done += 1
to_insert = data.copy()
to_insert["BatchNr"] = self.n_batches_done
to_insert["FitNr"] = np.nan
self._measurements_exp = pd.concat(
[self._measurements_exp, to_insert], axis=0, ignore_index=True
)
# Update metadata
if self.searchspace.type in (SearchSpaceType.DISCRETE, SearchSpaceType.HYBRID):
idxs_matched = fuzzy_row_match(
self.searchspace.discrete.exp_rep, data, self.parameters
)
self._searchspace_metadata.loc[idxs_matched, _MEASURED] = True
[docs]
def update_measurements(
self,
data: pd.DataFrame,
numerical_measurements_must_be_within_tolerance: bool = True,
) -> None:
"""Update previously added measurements.
This can be useful to correct mistakes or update target measurements. The
match to existing data entries is made based on the index. This will reset
the `FitNr` of the corresponding measurement and reset cached recommendations.
Args:
data: The measurement data to be updated (with filled values for targets).
numerical_measurements_must_be_within_tolerance: Flag indicating if
numerical parameters need to be within their tolerances.
Raises:
ValueError: If the given data contains duplicated indices.
ValueError: If the given data contains indices not present in existing
measurements.
"""
# Validate target and parameter input values
validate_target_input(data, self.targets)
if self.objective is not None:
validate_objective_input(data, self.objective)
validate_parameter_input(
data, self.parameters, numerical_measurements_must_be_within_tolerance
)
data = normalize_input_dtypes(data, self.parameters + self.targets)
data.__class__ = _ValidatedDataFrame
# Block duplicate input indices
if data.index.has_duplicates:
raise ValueError(
"The input dataframe containing the measurement updates has duplicated "
"indices. Please ensure that all updates for a given measurement are "
"made in a single combined entry."
)
# Allow only existing indices
if nonmatching_idxs := set(data.index).difference(self._measurements_exp.index):
raise ValueError(
f"Updating measurements requires indices matching the "
f"existing measurements. The following indices were in the input, but "
f"are not found in the existing entries: {nonmatching_idxs}"
)
# Perform the update
cols = [p.name for p in self.parameters] + [t.name for t in self.targets]
self._measurements_exp.loc[data.index, cols] = data[cols]
# Reset fit number and cached recommendations
self._measurements_exp.loc[data.index, "FitNr"] = np.nan
self._cached_recommendation = pd.DataFrame()
[docs]
def toggle_discrete_candidates( # noqa: DOC501
self,
constraints: Collection[DiscreteConstraint] | pd.DataFrame,
exclude: bool,
complement: bool = False,
dry_run: bool = False,
) -> pd.DataFrame:
"""In-/exclude certain discrete points in/from the candidate set.
Args:
constraints: A filtering mechanism determining the candidates subset to be
in-/excluded. Can be either a collection of
:class:`~baybe.constraints.base.DiscreteConstraint` or a dataframe.
For the latter, see :func:`~baybe.utils.dataframe.filter_df`
for details.
exclude: If ``True``, the specified candidates are excluded.
If ``False``, the candidates are considered for recommendation.
complement: If ``True``, the filtering mechanism is inverted so that
the complement of the candidate subset specified by the filter is
toggled. For details, see :func:`~baybe.utils.dataframe.filter_df`.
dry_run: If ``True``, the target subset is only extracted but not
affected. If ``False``, the candidate set is updated correspondingly.
Useful for setting up the correct filtering mechanism.
Returns:
A new dataframe containing the discrete candidate set passing through the
specified filter.
"""
# Clear cache
self._cached_recommendation = pd.DataFrame()
df = self.searchspace.discrete.exp_rep
if isinstance(constraints, pd.DataFrame):
# Determine the candidate subset to be toggled
points = filter_df(df, constraints, complement)
elif isinstance(constraints, Collection) and is_all_instance(
constraints, DiscreteConstraint
):
# TODO: Should be taken over by upcoming `SubspaceDiscrete.filter` method,
# automatically choosing the appropriate backend (polars/pandas/...)
# Filter the search space dataframe according to the given constraint
idx = reduce(
lambda x, y: x.intersection(y), (c.get_valid(df) for c in constraints)
)
# Determine the candidate subset to be toggled
points = df.drop(index=idx) if complement else df.loc[idx].copy()
else:
raise TypeError(
"Candidate toggling is not implemented for the given type of "
"constraint specifications."
)
if not dry_run:
self._searchspace_metadata.loc[points.index, _EXCLUDED] = exclude
return points
[docs]
def recommend(
self,
batch_size: int,
pending_experiments: pd.DataFrame | None = None,
) -> pd.DataFrame:
"""Provide the recommendations for the next batch of experiments.
Args:
batch_size: Number of requested recommendations.
pending_experiments: Parameter configurations specifying experiments
that are currently pending.
Returns:
Dataframe containing the recommendations in experimental representation.
Raises:
ValueError: If ``batch_size`` is smaller than 1.
"""
if batch_size < 1:
raise ValueError(
f"You must at least request one recommendation per batch, but provided "
f"{batch_size=}."
)
# Invalidate cached recommendation if pending experiments are provided
if (pending_experiments is not None) and not pending_experiments.empty:
self._cached_recommendation = pd.DataFrame()
validate_parameter_input(pending_experiments, self.parameters)
pending_experiments = normalize_input_dtypes(
pending_experiments, self.parameters
)
pending_experiments.__class__ = _ValidatedDataFrame
# If there are cached recommendations and the batch size of those is equal to
# the previously requested one, we just return those
if len(self._cached_recommendation) == batch_size:
return self._cached_recommendation
# Update recommendation meta data
if len(self._measurements_exp) > 0:
self.n_fits_done += 1
self._measurements_exp.fillna({"FitNr": self.n_fits_done}, inplace=True)
# Prepare the search space according to the current campaign state
if self.searchspace.type is SearchSpaceType.DISCRETE:
# TODO: This implementation should at some point be hidden behind an
# appropriate public interface, like `SubspaceDiscrete.filter()`
mask_todrop = self._searchspace_metadata[_EXCLUDED].copy()
if not self.allow_recommending_already_recommended:
mask_todrop |= self._searchspace_metadata[_RECOMMENDED]
if not self.allow_recommending_already_measured:
mask_todrop |= self._searchspace_metadata[_MEASURED]
if (
not self.allow_recommending_pending_experiments
and pending_experiments is not None
):
mask_todrop |= pd.merge(
self.searchspace.discrete.exp_rep,
pending_experiments,
indicator=True,
how="left",
)["_merge"].eq("both")
searchspace = evolve(
self.searchspace,
discrete=FilteredSubspaceDiscrete.from_subspace(
self.searchspace.discrete, ~mask_todrop.to_numpy()
),
)
else:
searchspace = self.searchspace
# Pending experiments should not be passed to non-predictive recommenders
# to avoid complaints about unused arguments, so we need to know of what
# type the next recommender will be
recommender = self.recommender
if isinstance(recommender, MetaRecommender):
recommender = recommender.get_non_meta_recommender(
batch_size,
searchspace,
self.objective,
self._measurements_exp,
pending_experiments,
)
is_nonpredictive = isinstance(recommender, NonPredictiveRecommender)
# Get the recommended search space entries
try:
# NOTE: The `recommend` call must happen on `self.recommender` to update
# potential inner states in case of meta recommenders!
rec = self.recommender.recommend(
batch_size,
searchspace,
self.objective,
self._measurements_exp,
None if is_nonpredictive else pending_experiments,
)
except NotEnoughPointsLeftError as ex:
# Aliases for code compactness
f = fields(Campaign)
ok_m = self.allow_recommending_already_measured
ok_r = self.allow_recommending_already_recommended
ok_p = self.allow_recommending_pending_experiments
ok_m_name = f.allow_recommending_already_measured.name
ok_r_name = f.allow_recommending_already_recommended.name
ok_p_name = f.allow_recommending_pending_experiments.name
no_blocked_pending_points = ok_p or (pending_experiments is None)
# If there are no candidate restrictions to be relaxed
if ok_m and ok_r and no_blocked_pending_points:
raise ex
# Otherwise, extract possible relaxations
solution = [
f"'{name}=True'"
for name, value in [
(ok_m_name, ok_m),
(ok_r_name, ok_r),
(ok_p_name, no_blocked_pending_points),
]
if not value
]
message = solution[0] if len(solution) == 1 else " and/or ".join(solution)
raise NotEnoughPointsLeftError(
f"{str(ex)} Consider setting {message}."
) from ex
# Cache the recommendations
self._cached_recommendation = rec.copy()
# Update metadata
if self.searchspace.type in (SearchSpaceType.DISCRETE, SearchSpaceType.HYBRID):
self._searchspace_metadata.loc[rec.index, _RECOMMENDED] = True
return rec
[docs]
def posterior(self, candidates: pd.DataFrame | None = None) -> Posterior:
"""Get the posterior predictive distribution for the given candidates.
Args:
candidates: The candidate points in experimental recommendations. If not
provided, the posterior for the existing campaign measurements is
returned. For details, see
:meth:`baybe.surrogates.base.Surrogate.posterior`.
Raises:
IncompatibilityError: If the underlying surrogate model exposes no
method for computing the posterior distribution.
Returns:
Posterior: The corresponding posterior object.
For details, see :meth:`baybe.surrogates.base.Surrogate.posterior`.
"""
if candidates is None:
candidates = self.measurements[[p.name for p in self.parameters]]
surrogate = self.get_surrogate()
if not hasattr(surrogate, method_name := "posterior"):
raise IncompatibilityError(
f"The used surrogate type '{surrogate.__class__.__name__}' does not "
f"provide a '{method_name}' method."
)
return surrogate.posterior(candidates)
[docs]
def posterior_stats(
self,
candidates: pd.DataFrame | None = None,
stats: Sequence[PosteriorStatistic] = ("mean", "std"),
) -> pd.DataFrame:
"""Return posterior statistics for each target.
Args:
candidates: The candidate points in experimental representation. If not
provided, the statistics of the existing campaign measurements are
calculated. For details, see
:meth:`baybe.surrogates.base.Surrogate.posterior_stats`.
stats: Sequence indicating which statistics to compute. Also accepts
floats, for which the corresponding quantile point will be computed.
Raises:
ValueError: If a requested quantile is outside the open interval (0,1).
TypeError: If the posterior utilized by the surrogate does not support
a requested statistic.
Returns:
A dataframe with posterior statistics for each target and candidate.
"""
if candidates is None:
if self.measurements.empty:
raise NoMeasurementsError(
f"No candidates were provided and the campaign has no measurements "
f"yet. '{self.posterior_stats.__name__}' has no candidates to "
f"compute statistics for in this case."
)
candidates = self.measurements[[p.name for p in self.parameters]]
surrogate = self.get_surrogate()
if not hasattr(surrogate, method_name := "posterior_stats"):
raise IncompatibilityError(
f"The used surrogate type '{surrogate.__class__.__name__}' does not "
f"provide a '{method_name}' method."
)
return surrogate.posterior_stats(candidates, stats)
[docs]
def get_surrogate(
self,
batch_size: int | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> SurrogateProtocol:
"""Get the current surrogate model.
Args:
batch_size: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
pending_experiments: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
Raises:
IncompatibilityError: If the current recommender does not provide a
surrogate model.
Returns:
Surrogate: The surrogate of the current recommender.
Note:
Currently, this method always returns the surrogate model with respect to
the transformed target(s) / objective. This means that if you are using a
``SingleTargetObjective`` with a transformed target or a
``DesirabilityObjective``, the model's output will correspond to the
transformed quantities and not the original untransformed target(s).
"""
if self.objective is None:
raise IncompatibilityError(
f"No surrogate is available since no '{Objective.__name__}' is defined."
)
recommender = self._get_non_meta_recommender(batch_size, pending_experiments)
if isinstance(recommender, BayesianRecommender):
return recommender.get_surrogate(
self.searchspace, self.objective, self.measurements
)
else:
raise IncompatibilityError(
f"The current recommender is of type "
f"'{recommender.__class__.__name__}', which does not provide "
f"a surrogate model. Surrogate models are only available for "
f"recommender subclasses of '{BayesianRecommender.__name__}'."
)
def _get_non_meta_recommender(
self,
batch_size: int | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> RecommenderProtocol:
"""Get the current recommender.
Args:
batch_size: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
pending_experiments: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
Returns:
The recommender for the current recommendation context.
"""
if not isinstance(self.recommender, MetaRecommender):
return self.recommender
return self.recommender.get_non_meta_recommender(
batch_size,
self.searchspace,
self.objective,
self.measurements,
pending_experiments,
)
def _get_bayesian_recommender(
self,
batch_size: int | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> BayesianRecommender:
"""Get the current Bayesian recommender (if available).
For details on the method arguments, see :meth:`_get_non_meta_recommender`.
"""
recommender = self._get_non_meta_recommender(batch_size, pending_experiments)
if not isinstance(recommender, BayesianRecommender):
raise IncompatibilityError(
f"The current recommender is of type "
f"'{recommender.__class__.__name__}', which does not provide "
f"a surrogate model or acquisition values. Both objects are "
f"only available for recommender subclasses of "
f"'{BayesianRecommender.__name__}'."
)
return recommender
[docs]
def get_acquisition_function(
self,
batch_size: int | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> AcquisitionFunction:
"""Get the current acquisition function.
Args:
batch_size: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
pending_experiments: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
Raises:
IncompatibilityError: If no objective has been specified.
IncompatibilityError: If the current recommender does not use an acquisition
function.
Returns:
The acquisition function of the current recommender.
"""
if self.objective is None:
raise IncompatibilityError(
"Acquisition values can only be computed if an objective has "
"been defined."
)
recommender = self._get_bayesian_recommender(batch_size, pending_experiments)
return recommender.get_acquisition_function(
self.searchspace, self.objective, self.measurements, pending_experiments
)
[docs]
def acquisition_values(
self,
candidates: pd.DataFrame,
acquisition_function: AcquisitionFunction | None = None,
*,
batch_size: int | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> pd.Series:
"""Compute the acquisition values for the given candidates.
Args:
candidates: The candidate points in experimental representation.
For details, see :meth:`baybe.surrogates.base.Surrogate.posterior`.
acquisition_function: The acquisition function to be evaluated.
If not provided, the acquisition function of the recommender is used.
batch_size: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
pending_experiments: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
Returns:
A series of individual acquisition values, one for each candidate.
"""
recommender = self._get_bayesian_recommender(batch_size, pending_experiments)
assert self.objective is not None
return recommender.acquisition_values(
candidates,
self.searchspace,
self.objective,
self.measurements,
pending_experiments,
acquisition_function,
)
[docs]
def joint_acquisition_value( # noqa: DOC101, DOC103
self,
candidates: pd.DataFrame,
acquisition_function: AcquisitionFunction | None = None,
*,
batch_size: int | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> float:
"""Compute the joint acquisition values for the given candidate batch.
For details on the method arguments, see :meth:`acquisition_values`.
Returns:
The joint acquisition value of the batch.
"""
recommender = self._get_bayesian_recommender(batch_size, pending_experiments)
assert self.objective is not None
return recommender.joint_acquisition_value(
candidates,
self.searchspace,
self.objective,
self.measurements,
pending_experiments,
acquisition_function,
)
def _add_version(dict_: dict) -> dict:
"""Add the package version to the given dictionary."""
from baybe import __version__
return {**dict_, "version": __version__}
def _drop_version(dict_: dict) -> dict:
"""Drop the package version from the given dictionary."""
dict_.pop("version", None)
return dict_
# Register (un-)structure hooks
unstructure_hook = cattrs.gen.make_dict_unstructure_fn(
Campaign, converter, _cattrs_include_init_false=True
)
structure_hook = cattrs.gen.make_dict_structure_fn(
Campaign, converter, _cattrs_include_init_false=True
)
converter.register_unstructure_hook(
Campaign, lambda x: _add_version(unstructure_hook(x))
)
converter.register_structure_hook(
Campaign, lambda d, cl: structure_hook(_drop_version(d), cl)
)
# Converter for config validation
_validation_converter = converter.copy()
_validation_converter.register_structure_hook(
SearchSpace, validate_searchspace_from_config
)
# Collect leftover original slotted classes processed by `attrs.define`
gc.collect()