Source code for genericROM.BasicAlgorithms.Clustering

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
#
#

import numpy as np
import time
from BasicTools.Helpers.ProgressBar import printProgressBar



[docs]class ClusteringToolbox(object): """ Class for clustering problems. Attributes ---------- clusteringAlgo: Object containing a clustering algorithm. All clustering algorithms available in Scikit-Learn can be used. If defined by the user, the clustering algorithm must follow Scikit-Learn's API for clustering. clusters: dict clusters[k] is an array containing the indices of points belonging to cluster k. """ def __init__(self, clusteringAlgo=None): self.clusteringAlgo = clusteringAlgo self.clusters = {}
[docs] def SetClusteringAlgo(self, clusteringAlgo): self.clusteringAlgo = clusteringAlgo
[docs] def GetClusteringAlgo(self): return self.clusteringAlgo
[docs] def GetClusters(self): return self.clusters
[docs] def GetLabels(self): return GetLabelsVectorFromClusters(self.clusters)
[docs] def GetNumberOfClusters(self): return len(self.clusters)
[docs] def fit(self, X, **kwargs): ''' Computes clusters. Parameters ---------- X: array of shape (n_samples, n_features) Training instances to cluster. Note: if your clustering algorithm works on a distance matrix, then X is the distance matrix of shape (n_samples, n_samples). ''' self.clusteringAlgo.fit(X, **kwargs)
[docs] def fit_predict(self, X, returnLabels=False, **kwargs): ''' Computes clusters and predicts cluster index for each sample. Parameters ---------- X: array of shape (n_samples, n_features) Training instances to cluster. Note: if your clustering algorithm works on a distance matrix, then X is the distance matrix of shape (n_samples, n_samples). returnLabels: boolean If True, returns labels. If false, it only updates the object's attributes (self.clusters). Returns --------- labels: 1D array of length n_samples Array of integers containing the index of the cluster each sample belongs to. Returned only if returnLabels is True. ''' labels = self.clusteringAlgo.fit_predict(X, **kwargs) self.clusters = GetClustersFromLabelsVector(labels) if returnLabels: return labels
[docs] def predict(self, X, returnLabels=False): ''' Predicts the cluster index for each sample in X. Parameters ---------- X: array of shape (n_samples, n_features) Training instances to cluster. Note: if your clustering algorithm works on a distance matrix, then X is the distance matrix of shape (n_samples, n_samples). returnLabels: boolean If True, returns labels. If false, it only updates the object's attributes (self.clusters). Returns --------- labels: 1D array of length n_samples Array of integers containing the index of the cluster each sample belongs to. Returned only if returnLabels is True. ''' labels = self.clusteringAlgo.predict(X) self.clusters = GetClustersFromLabelsVector(labels) if returnLabels: return labels
[docs] def predictTest(self, X, returnLabels=False): ''' Predicts the cluster index for each sample in X, where X contains new unseen data. Parameters ---------- X: array of shape (n_samples, n_features) Training instances to cluster. Note: if your clustering algorithm works on a distance matrix, then X is the distance matrix of shape (n_samples, n_clusters). returnLabels: boolean If True, returns labels. If false, it only updates the object's attributes (self.clusters). Returns --------- labels: 1D array of length n_samples Array of integers containing the index of the cluster each sample belongs to. Returned only if returnLabels is True. ''' labels = self.clusteringAlgo.predictTest(X) self.clusters = GetClustersFromLabelsVector(labels) if returnLabels: return labels
[docs] def WriteClusteringResults(self, outputFileName): ''' Writes clustering results in a text file. Parameters ---------- outputFileName: str Name of the txt file in which clustering results are written. ''' nClusters = len(self.clusters) if outputFileName[-4:]=='.txt': fileName = outputFileName else:# pragma: no cover fileName = outputFileName + '.txt' with open(fileName, "w") as outFile: outFile.write("Clustering results\n") outFile.write("ClusterLabel DataPoint\n") for k in range(nClusters): for j in range(len(self.clusters[k])): line = str(k) + " " + str(self.clusters[k][j]) + "\n" outFile.write(line)
[docs] def ReadClusteringResults(self, resultsFile): ''' Reads clustering results from a text file. Parameters ---------- resultsFile: str Name of the txt file containing the clustering results. ''' if resultsFile[-4:]=='.txt': fileName = resultsFile else:# pragma: no cover fileName = resultsFile + '.txt' with open(fileName, 'r') as f: content = f.readlines() nClusters = 1+int(content[-1].split()[0]) for i in range(nClusters): self.clusters[i] = [] for i in range(2,len(content)): clusterId, point = content[i].split() clusterId = int(clusterId) point = int(point) self.clusters[clusterId].append(point) for i in range(nClusters): self.clusters[i] = np.asarray(self.clusters[i]).astype(int)
[docs] def ClusterRenumbering(self, clusterIdPermutation): ''' Changes the numerotation of clusters. Parameters ---------- clusterIdPermutation: list List of integers such that clusterIdPermutation[k] is the new index for cluster k. ''' newClusters = {} for k in range(len(self.clusters)): newClusters[clusterIdPermutation[k]] = self.clusters[k] self.clusters = newClusters
[docs]def GetLabelsVectorFromClusters(clusters): ''' Returns a labels vector "labels". Parameters ---------- clusters: dict clusters[k] is an array containing the indices of points belonging to cluster k. Returns ------- labels: 1D array of integers labels[j] = k if example j belongs to cluster k. ''' nClusters = len(clusters) nExamples = 0 for i in range(nClusters): nExamples += len(clusters[i]) labels = np.zeros(nExamples) for i in range(nClusters): nMembers = len(clusters[i]) for j in range(nMembers): labels[clusters[i][j]] = i return labels.astype(int)
[docs]def GetClustersFromLabelsVector(labels): ''' Returns a dictionary containing clustering results. Parameters ---------- labels: 1D array of integers labels[j] = k if example j belongs to cluster k. Returns ------- clusters: dict clusters[k] is an array containing the indices of points belonging to cluster k. ''' clusters = {} nClusters = labels.max()+1 for k in range(nClusters): clusters[k] = np.nonzero(labels==k)[0] return clusters
[docs]def GetAdjacentClustersFromLabelsVector(labels, localNbSnapshots = None): ''' Returns a dictionary with keys the cluster number and values the numbers of cluster adjacent from the data used in the clustering (through the labels). Parameters ---------- labels: 1D array of integers labels[j] = k if example j belongs to cluster k. localNbSnapshots: 1D array or list of integers localNbSnapshots[j] = is the size of j-th group of values for which adjence is well-defined. Returns ------- adjacentClusters: dict adjacentClusters[k] is an array containing the indices of the clusters adjacent to cluster k. snapshotsOfAdjacentClusters: dict snapshotsOfAdjacentClusters[k] is an array containing the indices of points belonging to cluster k and its adjacent clusters. ''' if localNbSnapshots is not None: assert np.sum(np.array(localNbSnapshots)) == len(labels), "localNbSnapshots not consistent with labels" else: localNbSnapshots = [len(labels)] localLabels = [] rank = 0 for l in localNbSnapshots: localLabels.append(labels[rank:rank+l]) rank += l #print("localLabels =", localLabels) adjacentClusters = {} nClusters = labels.max()+1 for k in range(nClusters): adjacentClusters[k] = set() adjacentClusters[k].add(k) for label in localLabels: nLabel = len(label) adjacentClusters[label[0]].add(label[1]) adjacentClusters[label[nLabel-1]].add(label[nLabel-2]) for i in range(1,nLabel-1): adjacentClusters[label[i]].add(label[i+1]) adjacentClusters[label[i]].add(label[i-1]) snapshotsOfAdjacentClusters = {} for k in range(nClusters): tempRes = np.array([]) for label in adjacentClusters[k]: tempRes = np.union1d(tempRes, np.nonzero(labels==label)[0]) snapshotsOfAdjacentClusters[k] = np.array(tempRes, dtype = int) return adjacentClusters, snapshotsOfAdjacentClusters
[docs]class KMedoids(object): """ Class for k-medoids clustering. Attributes ---------- nClusters: int Number of clusters. nIter: int, default 100 Maximum number of iterations. init: str, 'k-meds++' or 'random', default 'k-meds++' Medoids initialization method. Random selection if 'random'. If 'k-meds++', we use the method described in the following article: Hae-Sang Park, Chi-Hyuck Jun, "A simple and fast algorithm for K-medoids clustering", 2009. If 'multipleRuns', the clustering algorithm is run self.runs times with random initial medoids. The best solution in terms of the cost function is returned. medoids: 1D array of length nClusters Array of integers containings the ids of the medoids. algo: 'ParkJun' or 'PAM', default 'PAM' Algorithm for k-medoids. Park & Jun's algorithm is simpler and faster but explores a smaller search space than PAM (Partitioning around medoids). squaredDist: boolean, default True. Says whether the cost function and the medoid update rule use squared dissimilarities. runs: integer, default 10. Number of times the clustering algorithm is run when using init='multipleRuns'. """ def __init__(self, nClusters, nIter=100, init='k-meds++', algo='PAM', squaredDist=False, runs=10): self.nClusters = nClusters self.nIter = nIter self.init = init self.medoids = None self.algo = algo self.squaredDist = squaredDist self.runs = runs self.costFunction = None
[docs] def InitializeMedoids(self, distanceMatrix): ''' Initial medoids selection. Method described in Hae-Sang Park, Chi-Hyuck Jun, "A simple and fast algorithm for K-medoids clustering", 2009. Parameters ---------- distanceMatrix: 2D array of shape (n_samples,n_samples) ''' if self.init == 'k-meds++': a = np.sum(distanceMatrix, axis=1) b = np.transpose(np.transpose(distanceMatrix)/a) v = np.sum(b, axis=0) self.medoids = np.sort(np.argsort(v)[:self.nClusters]) # array of indices elif self.init == 'random' or self.init == 'multipleRuns': self.medoids = np.random.choice(distanceMatrix.shape[0],self.nClusters, replace=False)
[docs] def SetCostFunction(self, costFunction): self.costFunction = costFunction
[docs] def GetMedoids(self): return self.medoids
[docs] def EvalCostFunction(self, medoids, distMatrix, isAlreadySquared=True, **kwargs): if self.costFunction != None: return self.costFunction(medoids, distMatrix, **kwargs) if self.squaredDist: if isAlreadySquared: distanceMatrix = distMatrix else: distanceMatrix = distMatrix*distMatrix else: distanceMatrix = distMatrix labels = np.argmin(distanceMatrix[:,medoids], axis=1) #start = time.time() cost = 0 for i in range(distanceMatrix.shape[0]): cost += distanceMatrix[i,medoids[labels[i]]] #print("old", time.time() - start, cost) #start = time.time() #cost2 = np.trace(distanceMatrix[:,medoids[labels]]) #print("new", time.time() - start, cost2) #print("==") return cost
[docs] def fit_ParkJun(self, distMatrix, printCostFct=False, verbose = False): ''' Implementation of k-medoids clustering based on the Voronoi iteration approach (Park and Jun 2009). This code is a slightly modified version of the code presented in: "NumPy/SciPy recipes for data science: k-Medoids clustering", C. Bauckhage. Parameters ---------- distMatrix: 2D array of shape (n_samples,n_samples) printCostFct: boolean ''' # Examples in the dataset are represented by an index. # medoids is an array of length nClusters, such that medoids[i] # is the index of the medoid of the i-th cluster. # clusters is a dictionary. clusters[k] is an array containing # the indices of points belonging to cluster k. t0 = time.time() if self.medoids is None: # pragma: no cover self.InitializeMedoids(distMatrix) medoidsNew = np.copy(self.medoids) clusters = {} if self.squaredDist: # pragma: no cover distanceMatrix = distMatrix*distMatrix # we work on squared distance matrix else: distanceMatrix = distMatrix if verbose: printProgressBar(0, self.nIter, prefix = 'Progress clustering ParkJun:', suffix = 'Complete', length = 50) if printCostFct: cost = self.EvalCostFunction(medoidsNew, distanceMatrix, isAlreadySquared=True) print("Iter: 0 , Cost: %f " % cost) for i in range(self.nIter): # Cluster assignment (for each point, determine the closest medoid) clusterAssignments = np.argmin(distanceMatrix[:,self.medoids], axis=1) # clusterAssignments[j] = index of the cluster that example j belongs to. for k in range(self.nClusters): clusters[k] = np.where(clusterAssignments==k)[0] # array of indices of points belonging to cluster k # Update cluster medoids for k in range(self.nClusters): clusterMembers1,clusterMembers2 = np.meshgrid(clusters[k],clusters[k]) intraClusterDistances = np.mean(distanceMatrix[clusterMembers1,clusterMembers2],axis=1) #print("intraClusterDistances =", intraClusterDistances) #print("clusterMembers1 =", clusterMembers1) #print("clusterMembers2 =", clusterMembers2) medoidId = np.argmin(intraClusterDistances) # the medoid minimizes the sum of squared distances to all other points in the cluster medoidsNew[k] = clusters[k][medoidId] np.sort(medoidsNew) if printCostFct: cost = self.EvalCostFunction(medoidsNew, distanceMatrix, isAlreadySquared=True) print("Iter: %d , Cost: %f " %(i+1, cost)) if verbose: printProgressBar(i, self.nIter, prefix = 'Progress clustering ParkJun:', suffix = 'Complete', length = 50) # Checking convergence if np.array_equal(self.medoids, medoidsNew): print("The algorithm converged after ", i+1, " iterations.") break self.medoids = np.copy(medoidsNew) else:# pragma: no cover print("The algorithm did not converge after ", i+1, " iterations.") print("Computation time: ", time.time()-t0, " s")
[docs] def fit_PAM(self, distMatrix, printCostFct=False, verbose = False): ''' Implementation of Partitioning Around Medoids (PAM) algorithm for k-medoids. Parameters ---------- distMatrix: 2D array of shape (n_samples,n_samples) printCostFct: boolean ''' t0 = time.time() if self.medoids is None:# pragma: no cover self.InitializeMedoids(distMatrix) medoidsNew = np.copy(self.medoids) if self.squaredDist: distanceMatrix = distMatrix*distMatrix # we work on squared distance matrix else: distanceMatrix = distMatrix if verbose: printProgressBar(0, self.nIter, prefix = 'Progress clustering PAM:', suffix = 'Complete', length = 50) if printCostFct: cost = self.EvalCostFunction(medoidsNew, distanceMatrix, isAlreadySquared=True) print("Iter: 0 , Cost: %f " % cost) bestMedoidForSwap = 0 # id of cluster with the best swap bestNonMedoidForSwap = 0 # id of the best point for the swap for i in range(self.nIter): costFunction = self.EvalCostFunction(medoidsNew, distanceMatrix, isAlreadySquared=True) for k in range(self.nClusters): medoidsTested = np.copy(medoidsNew) for j in range(distanceMatrix.shape[0]): if j not in medoidsNew: # Swap: medoidsTested[k] = j cost = self.EvalCostFunction(medoidsTested, distanceMatrix, isAlreadySquared=True) if cost < costFunction: costFunction = cost bestMedoidForSwap = k bestNonMedoidForSwap = j medoidsNew[bestMedoidForSwap] = bestNonMedoidForSwap if printCostFct: cost = self.EvalCostFunction(medoidsNew, distanceMatrix, isAlreadySquared=True) print("Iter: %d , Cost: %f " %(i+1, cost)) if verbose: printProgressBar(i, self.nIter, prefix = 'Progress clustering PAM:', suffix = 'Complete', length = 50) # Checking convergence if np.array_equal(self.medoids, medoidsNew): print("The algorithm converged after ", i+1, " iterations.") break self.medoids = np.copy(medoidsNew) else:# pragma: no cover print("The algorithm did not converge after ", i+1, " iterations.") print("Computation time: ", time.time()-t0, " s")
[docs] def fit(self, distMatrix, printCostFct=False, verbose = False): if self.init == 'multipleRuns': costFunction = 10000000000 bestMeds = None for r in range(self.runs): self.InitializeMedoids(distMatrix) if self.algo == "ParkJun":# pragma: no cover self.fit_ParkJun(distMatrix, printCostFct=printCostFct, verbose=verbose) elif self.algo == "PAM": self.fit_PAM(distMatrix, printCostFct=printCostFct, verbose=verbose) else:# pragma: no cover print("Unknown algorithm name. Please use either 'ParkJun' or 'PAM'.") newMeds = self.GetMedoids() cost = self.EvalCostFunction(self.GetMedoids(), distMatrix, isAlreadySquared=False) if cost < costFunction: costFunction = cost bestMeds = np.copy(newMeds) self.medoids = np.copy(bestMeds) else: self.InitializeMedoids(distMatrix) if self.algo == "ParkJun": self.fit_ParkJun(distMatrix, printCostFct=printCostFct, verbose=verbose) elif self.algo == "PAM": self.fit_PAM(distMatrix, printCostFct=printCostFct, verbose=verbose) else:# pragma: no cover print("Unknown algorithm name. Please use either 'ParkJun' or 'PAM'.")
[docs] def predict(self, distMatrix): ''' Gives cluster assignments for training data. Parameters ---------- distMatrix: 2D array of shape (n_samples,n_samples) Returns --------- labels: 1D array of length n_samples Array of integers containing the index of the cluster each sample belongs to. ''' labels = np.argmin(distMatrix[:,self.medoids], axis=1) return labels
[docs] def predictTest(self, distMatrix): ''' Gives cluster assignments for test data. Parameters ---------- distMatrix: 2D array of shape (n_samples,n_clusters) such that distMatrix[i,k] is the distance between the i-th test example with the k-th medoid. Returns --------- labels: 1D array of length n_samples Array of integers containing the index of the cluster each sample belongs to. ''' labels = np.argmin(distMatrix, axis=1) return labels
[docs] def fit_predict(self, distMatrix, printCostFct=False, verbose = False): ''' Computes clusters and gives cluster assignments. Parameters ---------- distMatrix: 2D array of shape (n_samples,n_samples) Returns --------- labels: 1D array of length n_samples Array of integers containing the index of the cluster each sample belongs to. ''' self.fit(distMatrix, printCostFct, verbose) return self.predict(distMatrix)
if __name__ == "__main__":# pragma: no cover from genericROM import RunTestFile RunTestFile(__file__)