"""Recommenders based on clustering."""
from __future__ import annotations
import gc
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, ClassVar
import numpy as np
import pandas as pd
from attrs import define, field
from typing_extensions import override
from baybe.recommenders.pure.nonpredictive.base import NonPredictiveRecommender
from baybe.searchspace import SearchSpaceType, SubspaceDiscrete
from baybe.utils.conversion import to_string
if TYPE_CHECKING:
from sklearn.base import ClusterMixin
[docs]
@define
class SKLearnClusteringRecommender(NonPredictiveRecommender, ABC):
"""Intermediate class for cluster-based selection of discrete candidates.
Suitable for ``sklearn``-like models that have a ``fit`` and ``predict``
method. Specific model parameters and cluster sub-selection techniques can be
declared in the derived classes.
"""
# Class variables
compatibility: ClassVar[SearchSpaceType] = SearchSpaceType.DISCRETE
# See base class.
# TODO: `use_custom_selector` can probably be replaced with a fallback mechanism
# that checks if a custom mechanism is implemented and uses default otherwise
# (similar to what is done in the recommenders)
model_cluster_num_parameter_name: ClassVar[str]
"""Class variable describing the name of the clustering parameter."""
_use_custom_selector: ClassVar[bool] = False
"""Class variable flagging whether a custom selector is being used."""
# Object variables
model_params: dict = field(factory=dict)
"""Optional model parameter that will be passed to the surrogate constructor.
This is initialized with reasonable default values for the derived child classes."""
@staticmethod
@abstractmethod
def _get_model_cls() -> type[ClusterMixin]:
"""Return the surrogate model class."""
def _make_selection_default(
self,
model: ClusterMixin,
candidates_scaled: pd.DataFrame | np.ndarray,
) -> list[int]:
"""Select one candidate from each cluster uniformly at random.
This function is model-agnostic and can be used by any child class.
Args:
model: The used model.
candidates_scaled: The already scaled candidates.
Returns:
A list with positional indices of the selected candidates.
"""
assigned_clusters = model.predict(candidates_scaled)
selection = [
np.random.choice(np.argwhere(cluster == assigned_clusters).flatten())
for cluster in np.unique(assigned_clusters)
]
return selection
def _make_selection_custom(
self,
model: ClusterMixin,
candidates_scaled: pd.DataFrame | np.ndarray,
) -> list[int]:
"""Select candidates from the computed clustering.
This function is model-specific and may be implemented by the derived class.
Args:
model: The used model.
candidates_scaled: The already scaled candidates.
Returns:
A list with positional indices of the selected candidates.
Raises:
NotImplementedError: If this function is not implemented. Should be
unreachable.
"""
raise NotImplementedError("This line in the code should be unreachable. Sry.")
@override
def _recommend_discrete(
self,
subspace_discrete: SubspaceDiscrete,
candidates_exp: pd.DataFrame,
batch_size: int,
) -> pd.Index:
# Fit scaler on entire search space
from sklearn.preprocessing import StandardScaler
# TODO [Scaling]: scaling should be handled by search space object
scaler = StandardScaler()
scaler.fit(subspace_discrete.comp_rep)
# Scale candidates
candidates_comp = subspace_discrete.transform(candidates_exp)
candidates_scaled = np.ascontiguousarray(scaler.transform(candidates_comp))
# Set model parameters and perform fit
model = self._get_model_cls()(
**{self.model_cluster_num_parameter_name: batch_size},
**self.model_params,
)
model.fit(candidates_scaled)
# Perform selection based on assigned clusters
if self._use_custom_selector:
selection = self._make_selection_custom(model, candidates_scaled)
else:
selection = self._make_selection_default(model, candidates_scaled)
# Convert positional indices into DataFrame indices and return result
return candidates_comp.index[selection]
@override
def __str__(self) -> str:
fields = [
to_string("Compatibility", self.compatibility, single_line=True),
to_string(
"Name of clustering parameter",
self.model_cluster_num_parameter_name,
single_line=True,
),
to_string("Model parameters", self.model_params, single_line=True),
]
return to_string(self.__class__.__name__, *fields)
[docs]
@define
class PAMClusteringRecommender(SKLearnClusteringRecommender):
"""Partitioning Around Medoids (PAM) clustering recommender."""
model_cluster_num_parameter_name: ClassVar[str] = "n_clusters"
# See base class.
_use_custom_selector: ClassVar[bool] = True
# See base class.
# Object variables
model_params: dict = field()
# See base class.
@model_params.default
def _default_model_params(self) -> dict:
"""Create the default model parameters."""
return {"max_iter": 100, "init": "k-medoids++"}
@override
@staticmethod
def _get_model_cls() -> type[ClusterMixin]:
from baybe.utils.clustering_algorithms import KMedoids
return KMedoids
@override
def _make_selection_custom(
self,
model: ClusterMixin,
candidates_scaled: pd.DataFrame | np.ndarray,
) -> list[int]:
"""Select candidates from the computed clustering.
In PAM, cluster centers (medoids) correspond to actual data points,
which means they can be directly used for the selection.
Args:
model: The used model.
candidates_scaled: The already scaled candidates. Unused.
Returns:
A list with positional indices of the selected candidates.
"""
selection = model.medoid_indices_.tolist()
return selection
[docs]
@define
class KMeansClusteringRecommender(SKLearnClusteringRecommender):
"""K-means clustering recommender."""
# Class variables
model_cluster_num_parameter_name: ClassVar[str] = "n_clusters"
# See base class.
_use_custom_selector: ClassVar[bool] = True
# See base class.
# Object variables
model_params: dict = field()
# See base class.
@model_params.default
def _default_model_params(self) -> dict:
"""Create the default model parameters."""
return {"max_iter": 1000, "n_init": 50}
@override
@staticmethod
def _get_model_cls() -> type[ClusterMixin]:
from sklearn.cluster import KMeans
return KMeans
@override
def _make_selection_custom(
self,
model: ClusterMixin,
candidates_scaled: pd.DataFrame | np.ndarray,
) -> list[int]:
"""Select candidates from the computed clustering.
For K-means, a reasonable choice is to pick the points closest to each
cluster center.
Args:
model: The used model.
candidates_scaled: The already scaled candidates.
Returns:
A list with positional indices of the selected candidates.
"""
from sklearn.metrics import pairwise_distances
distances = pairwise_distances(candidates_scaled, model.cluster_centers_)
# Set the distances of points that were not assigned by the model to that
# cluster to infinity. This assures that one unique point per cluster is
# assigned.
predicted_clusters = model.predict(candidates_scaled)
for k_cluster in range(model.cluster_centers_.shape[0]):
idxs = predicted_clusters != k_cluster
distances[idxs, k_cluster] = np.inf
selection = np.argmin(distances, axis=0).tolist()
return selection
[docs]
@define
class GaussianMixtureClusteringRecommender(SKLearnClusteringRecommender):
"""Gaussian mixture model (GMM) clustering recommender."""
# Class variables
model_cluster_num_parameter_name: ClassVar[str] = "n_components"
# See base class.
@override
@staticmethod
def _get_model_cls() -> type[ClusterMixin]:
from sklearn.mixture import GaussianMixture
return GaussianMixture
@override
def _make_selection_custom(
self,
model: ClusterMixin,
candidates_scaled: pd.DataFrame | np.ndarray,
) -> list[int]:
"""Select candidates from the computed clustering.
In a GMM, a reasonable choice is to pick the point with the highest
probability densities for each cluster.
Args:
model: The used model.
candidates_scaled: The already scaled candidates.
Returns:
A list with positional indices of the selected candidates.
"""
from scipy.stats import multivariate_normal
predicted_clusters = model.predict(candidates_scaled)
selection = []
for k_cluster in range(model.n_components):
density = multivariate_normal(
cov=model.covariances_[k_cluster],
mean=model.means_[k_cluster],
).logpdf(candidates_scaled)
# For selecting a point from this cluster we only consider points that were
# assigned to the current cluster by the model, hence set the density of
# others to 0
density[predicted_clusters != k_cluster] = 0.0
selection.append(np.argmax(density).item())
return selection
# Collect leftover original slotted classes processed by `attrs.define`
gc.collect()