"""A collection of point sampling algorithms."""importwarningsfromcollectionsimportCounterfromcollections.abcimportCollectionfromenumimportEnumfromtypingimportLiteralimportnumpyasnpimportpandasaspdfrombaybe.utils.basicimportis_all_instance
[docs]deffarthest_point_sampling(points:np.ndarray,n_samples:int=1,initialization:Literal["farthest","random"]|Collection[int]="farthest",random_tie_break:bool=True,)->list[int]:"""Select a subset of points using farthest point sampling. Creates a subset of a given collection of points by successively adding points with the largest Euclidean distance to intermediate point selections encountered during the algorithmic process. The mechanism used for the initial point selection is configurable. Args: points: The points that are available for selection, represented as a 2-D array of shape ``(n, k)``, where ``n`` is the number of points and ``k`` is the dimensionality of the points. n_samples: The total number of points to be selected. initialization: Determines how the first points are selected: * ``"farthest"``: The first two selected points are those with the largest distance. If only a single point is requested, a deterministic choice is made based on the point coordinates. * ``"random"``: The first point is selected uniformly at random. * Indices: Points corresponding to these indices will be pre-selected. random_tie_break: Determines if points are chosen deterministically or randomly in equidistant situations. If ``True``, a random point is selected from the candidates, otherwise the first point is selected. For non-equidistant points, the point with the largest minimum distance is always selected. Returns: A list containing the positional indices of the selected points. Raises: ValueError: If the number of requested samples is less than 1. ValueError: If the provided array is not two-dimensional. ValueError: If the array contains no points. ValueError: If the input space has no dimensions. ValueError: Indices for initialization are not unique. ValueError: More initialization indices than available points are provided. ValueError: Initialization indices are out of bounds. ValueError: Unknown initialization method. ValueError: More points are requested than available. """# Input validationifn_samples<1:raiseValueError(f"The number of requested samples must be at least 1. "f"Provided: {n_samples=}.")if(n_dims:=np.ndim(points))!=2:raiseValueError(f"The provided array must be two-dimensional but the given input had "f"{n_dims} dimensions.")if(n_points:=len(points))==0:raiseValueError("The provided array must contain at least one row.")ifpoints.shape[-1]==0:raiseValueError("The provided input space must be at least one-dimensional.")ifisinstance(initialization,Collection)andis_all_instance(initialization,int):ifduplicates:={kfork,vinCounter(initialization).items()ifv>1}:raiseValueError(f"The provided collection of initialization indices must be unique but "f"contains duplicates: {duplicates}")iflen(initialization)>n_points:raiseValueError(f"The number of provided initialization indices "f"({len(initialization)}) cannot be larger than the total number of "f"points provided ({n_points}).")ifproblematic_indices:=[idxforidxininitializationifnot(0<=idx<n_points)]:raiseValueError(f"The provided collection of initialization indices must be within the "f"range of available points (0 to {n_points-1}) but contains "f"out-of-bounds indices: {problematic_indices}")elifinitializationnotin{"farthest","random"}:raiseValueError(f"Unknown initialization type. Expected 'farthest', 'random', or a "f"collection of integers. Provided: {initialization=}")ifn_samples>n_points:raiseValueError(f"The number of requested samples ({n_samples}) cannot be larger than the "f"total number of points provided ({n_points}).")# Catch the pathological case upfrontiflen(np.unique(points,axis=0))==1:warnings.warn("All points are identical.",UserWarning)returnlist(range(n_samples))# Sort the points to produce the same result regardless of the input ordersort_idx=np.lexsort(tuple(points.T))points=points[sort_idx]# Pre-compute the pairwise distances between all pointsfromsklearn.metricsimportpairwise_distancesdist_matrix=pairwise_distances(points)# Avoid wrong behavior situations where all (remaining) points are duplicatesnp.fill_diagonal(dist_matrix,-np.inf)# Initialize the point selectionifinitialization=="random":selected_point_indices=[np.random.randint(0,n_points)]elifinitialization=="farthest":idx_1d=np.argmax(dist_matrix)selected_point_indices=list(map(int,np.unravel_index(idx_1d,dist_matrix.shape)))ifn_samples==1:return[sort_idx[selected_point_indices[0]]]elifisinstance(initialization,Collection)andis_all_instance(initialization,int):selected_point_indices=list(initialization)# Initialize the list of remaining pointsremaining_point_indices=list(range(n_points))foridxinselected_point_indices:remaining_point_indices.remove(idx)# Successively add the points with the largest distancewhilelen(selected_point_indices)<n_samples:# Collect distances between selected and remaining pointsdist=dist_matrix[np.ix_(remaining_point_indices,selected_point_indices)]# Find for each candidate point the smallest distance to the selected pointsmin_dists=np.min(dist,axis=1)max_val=np.max(min_dists)max_indices=np.where(min_dists==max_val)[0]ifrandom_tie_break:# Select a random point that has the "largest smallest distance"choice=np.random.choice(max_indices)else:# Choose the last point with the "largest smallest distance"choice=max_indices[-1]selected_point_index=remaining_point_indices[choice]# Add the chosen point to the selectionselected_point_indices.append(selected_point_index)remaining_point_indices.remove(selected_point_index)# Undo the initial point reorderingreturnsort_idx[selected_point_indices].tolist()
[docs]classDiscreteSamplingMethod(Enum):"""Available discrete sampling methods."""Random="Random""""Random Sampling."""FPS="FPS""""Farthest point sampling."""
[docs]defsample_numerical_df(df:pd.DataFrame,n_points:int,*,method:DiscreteSamplingMethod=DiscreteSamplingMethod.Random,)->pd.DataFrame:"""Sample data points from a numerical dataframe. If the requested amount of points is larger than the number of available points, the entire dataframe will be returned as many times at it fits into the requested number and the specified sampling method will only return the remainder of points. Args: df: Dataframe with purely numerical entries. n_points: Number of points to sample. method: Sampling method. Returns: The sampled points. Raises: TypeError: If the provided dataframe has non-numerical content. ValueError: When an invalid sampling method was provided. """ifany(df[col].dtype.kindnotin"iufb"forcolindf.columns):raiseTypeError(f"'{sample_numerical_df.__name__}' only supports purely numerical "f"dataframes.")# Split points in trivial and sampled partsn_trivial,n_sampled=divmod(n_points,len(df))ilocs=list(range(len(df)))*n_trivialifn_sampled>0:ifmethodisDiscreteSamplingMethod.FPS:ilocs+=farthest_point_sampling(df.values,n_sampled)elifmethodisDiscreteSamplingMethod.Random:ilocs+=df.reset_index(drop=True).sample(n_sampled).index.tolist()else:raiseValueError(f"Unrecognized sampling method: '{method}'.")returndf.iloc[ilocs]