Source code for baybe.recommenders.pure.nonpredictive.clustering

"""Recommenders based on clustering."""

from __future__ import annotations

import gc
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, ClassVar

import numpy as np
import pandas as pd
from attrs import define, field
from typing_extensions import override

from baybe.recommenders.pure.nonpredictive.base import NonPredictiveRecommender
from baybe.searchspace import SearchSpaceType, SubspaceDiscrete
from baybe.utils.conversion import to_string

if TYPE_CHECKING:
    from sklearn.base import ClusterMixin


[docs] @define class SKLearnClusteringRecommender(NonPredictiveRecommender, ABC): """Intermediate class for cluster-based selection of discrete candidates. Suitable for ``sklearn``-like models that have a ``fit`` and ``predict`` method. Specific model parameters and cluster sub-selection techniques can be declared in the derived classes. """ # Class variables compatibility: ClassVar[SearchSpaceType] = SearchSpaceType.DISCRETE # See base class. # TODO: `use_custom_selector` can probably be replaced with a fallback mechanism # that checks if a custom mechanism is implemented and uses default otherwise # (similar to what is done in the recommenders) model_cluster_num_parameter_name: ClassVar[str] """Class variable describing the name of the clustering parameter.""" _use_custom_selector: ClassVar[bool] = False """Class variable flagging whether a custom selector is being used.""" # Object variables model_params: dict = field(factory=dict) """Optional model parameter that will be passed to the surrogate constructor. This is initialized with reasonable default values for the derived child classes.""" @staticmethod @abstractmethod def _get_model_cls() -> type[ClusterMixin]: """Return the surrogate model class.""" def _make_selection_default( self, model: ClusterMixin, candidates_scaled: pd.DataFrame | np.ndarray, ) -> list[int]: """Select one candidate from each cluster uniformly at random. This function is model-agnostic and can be used by any child class. Args: model: The used model. candidates_scaled: The already scaled candidates. Returns: A list with positional indices of the selected candidates. """ assigned_clusters = model.predict(candidates_scaled) selection = [ np.random.choice(np.argwhere(cluster == assigned_clusters).flatten()) for cluster in np.unique(assigned_clusters) ] return selection def _make_selection_custom( self, model: ClusterMixin, candidates_scaled: pd.DataFrame | np.ndarray, ) -> list[int]: """Select candidates from the computed clustering. This function is model-specific and may be implemented by the derived class. Args: model: The used model. candidates_scaled: The already scaled candidates. Returns: A list with positional indices of the selected candidates. Raises: NotImplementedError: If this function is not implemented. Should be unreachable. """ raise NotImplementedError("This line in the code should be unreachable. Sry.") @override def _recommend_discrete( self, subspace_discrete: SubspaceDiscrete, candidates_exp: pd.DataFrame, batch_size: int, ) -> pd.Index: # Fit scaler on entire search space from sklearn.preprocessing import StandardScaler # TODO [Scaling]: scaling should be handled by search space object scaler = StandardScaler() scaler.fit(subspace_discrete.comp_rep) # Scale candidates candidates_comp = subspace_discrete.transform(candidates_exp) candidates_scaled = np.ascontiguousarray(scaler.transform(candidates_comp)) # Set model parameters and perform fit model = self._get_model_cls()( **{self.model_cluster_num_parameter_name: batch_size}, **self.model_params, ) model.fit(candidates_scaled) # Perform selection based on assigned clusters if self._use_custom_selector: selection = self._make_selection_custom(model, candidates_scaled) else: selection = self._make_selection_default(model, candidates_scaled) # Convert positional indices into DataFrame indices and return result return candidates_comp.index[selection] @override def __str__(self) -> str: fields = [ to_string("Compatibility", self.compatibility, single_line=True), to_string( "Name of clustering parameter", self.model_cluster_num_parameter_name, single_line=True, ), to_string("Model parameters", self.model_params, single_line=True), ] return to_string(self.__class__.__name__, *fields)
[docs] @define class PAMClusteringRecommender(SKLearnClusteringRecommender): """Partitioning Around Medoids (PAM) clustering recommender.""" model_cluster_num_parameter_name: ClassVar[str] = "n_clusters" # See base class. _use_custom_selector: ClassVar[bool] = True # See base class. # Object variables model_params: dict = field() # See base class. @model_params.default def _default_model_params(self) -> dict: """Create the default model parameters.""" return {"max_iter": 100, "init": "k-medoids++"} @override @staticmethod def _get_model_cls() -> type[ClusterMixin]: from baybe.utils.clustering_algorithms import KMedoids return KMedoids @override def _make_selection_custom( self, model: ClusterMixin, candidates_scaled: pd.DataFrame | np.ndarray, ) -> list[int]: """Select candidates from the computed clustering. In PAM, cluster centers (medoids) correspond to actual data points, which means they can be directly used for the selection. Args: model: The used model. candidates_scaled: The already scaled candidates. Unused. Returns: A list with positional indices of the selected candidates. """ selection = model.medoid_indices_.tolist() return selection
[docs] @define class KMeansClusteringRecommender(SKLearnClusteringRecommender): """K-means clustering recommender.""" # Class variables model_cluster_num_parameter_name: ClassVar[str] = "n_clusters" # See base class. _use_custom_selector: ClassVar[bool] = True # See base class. # Object variables model_params: dict = field() # See base class. @model_params.default def _default_model_params(self) -> dict: """Create the default model parameters.""" return {"max_iter": 1000, "n_init": 50} @override @staticmethod def _get_model_cls() -> type[ClusterMixin]: from sklearn.cluster import KMeans return KMeans @override def _make_selection_custom( self, model: ClusterMixin, candidates_scaled: pd.DataFrame | np.ndarray, ) -> list[int]: """Select candidates from the computed clustering. For K-means, a reasonable choice is to pick the points closest to each cluster center. Args: model: The used model. candidates_scaled: The already scaled candidates. Returns: A list with positional indices of the selected candidates. """ from sklearn.metrics import pairwise_distances distances = pairwise_distances(candidates_scaled, model.cluster_centers_) # Set the distances of points that were not assigned by the model to that # cluster to infinity. This assures that one unique point per cluster is # assigned. predicted_clusters = model.predict(candidates_scaled) for k_cluster in range(model.cluster_centers_.shape[0]): idxs = predicted_clusters != k_cluster distances[idxs, k_cluster] = np.inf selection = np.argmin(distances, axis=0).tolist() return selection
[docs] @define class GaussianMixtureClusteringRecommender(SKLearnClusteringRecommender): """Gaussian mixture model (GMM) clustering recommender.""" # Class variables model_cluster_num_parameter_name: ClassVar[str] = "n_components" # See base class. @override @staticmethod def _get_model_cls() -> type[ClusterMixin]: from sklearn.mixture import GaussianMixture return GaussianMixture @override def _make_selection_custom( self, model: ClusterMixin, candidates_scaled: pd.DataFrame | np.ndarray, ) -> list[int]: """Select candidates from the computed clustering. In a GMM, a reasonable choice is to pick the point with the highest probability densities for each cluster. Args: model: The used model. candidates_scaled: The already scaled candidates. Returns: A list with positional indices of the selected candidates. """ from scipy.stats import multivariate_normal predicted_clusters = model.predict(candidates_scaled) selection = [] for k_cluster in range(model.n_components): density = multivariate_normal( cov=model.covariances_[k_cluster], mean=model.means_[k_cluster], ).logpdf(candidates_scaled) # For selecting a point from this cluster we only consider points that were # assigned to the current cluster by the model, hence set the density of # others to 0 density[predicted_clusters != k_cluster] = 0.0 selection.append(np.argmax(density).item()) return selection
# Collect leftover original slotted classes processed by `attrs.define` gc.collect()