-
Notifications
You must be signed in to change notification settings - Fork 4.6k
/
partitioning_around_medoids.py
123 lines (108 loc) · 4.79 KB
/
partitioning_around_medoids.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from __future__ import print_function, division
import numpy as np
from mlfromscratch.utils import normalize, euclidean_distance, Plot
from mlfromscratch.unsupervised_learning import PCA
class PAM():
"""A simple clustering method that forms k clusters by first assigning
samples to the closest medoids, and then swapping medoids with non-medoid
samples if the total distance (cost) between the cluster members and their medoid
is smaller than prevoisly.
Parameters:
-----------
k: int
The number of clusters the algorithm will form.
"""
def __init__(self, k=2):
self.k = k
def _init_random_medoids(self, X):
""" Initialize the medoids as random samples """
n_samples, n_features = np.shape(X)
medoids = np.zeros((self.k, n_features))
for i in range(self.k):
medoid = X[np.random.choice(range(n_samples))]
medoids[i] = medoid
return medoids
def _closest_medoid(self, sample, medoids):
""" Return the index of the closest medoid to the sample """
closest_i = None
closest_distance = float("inf")
for i, medoid in enumerate(medoids):
distance = euclidean_distance(sample, medoid)
if distance < closest_distance:
closest_i = i
closest_distance = distance
return closest_i
def _create_clusters(self, X, medoids):
""" Assign the samples to the closest medoids to create clusters """
clusters = [[] for _ in range(self.k)]
for sample_i, sample in enumerate(X):
medoid_i = self._closest_medoid(sample, medoids)
clusters[medoid_i].append(sample_i)
return clusters
def _calculate_cost(self, X, clusters, medoids):
""" Calculate the cost (total distance between samples and their medoids) """
cost = 0
# For each cluster
for i, cluster in enumerate(clusters):
medoid = medoids[i]
for sample_i in cluster:
# Add distance between sample and medoid as cost
cost += euclidean_distance(X[sample_i], medoid)
return cost
def _get_non_medoids(self, X, medoids):
""" Returns a list of all samples that are not currently medoids """
non_medoids = []
for sample in X:
if not sample in medoids:
non_medoids.append(sample)
return non_medoids
def _get_cluster_labels(self, clusters, X):
""" Classify samples as the index of their clusters """
# One prediction for each sample
y_pred = np.zeros(np.shape(X)[0])
for cluster_i in range(len(clusters)):
cluster = clusters[cluster_i]
for sample_i in cluster:
y_pred[sample_i] = cluster_i
return y_pred
def predict(self, X):
""" Do Partitioning Around Medoids and return the cluster labels """
# Initialize medoids randomly
medoids = self._init_random_medoids(X)
# Assign samples to closest medoids
clusters = self._create_clusters(X, medoids)
# Calculate the initial cost (total distance between samples and
# corresponding medoids)
cost = self._calculate_cost(X, clusters, medoids)
# Iterate until we no longer have a cheaper cost
while True:
best_medoids = medoids
lowest_cost = cost
for medoid in medoids:
# Get all non-medoid samples
non_medoids = self._get_non_medoids(X, medoids)
# Calculate the cost when swapping medoid and samples
for sample in non_medoids:
# Swap sample with the medoid
new_medoids = medoids.copy()
new_medoids[medoids == medoid] = sample
# Assign samples to new medoids
new_clusters = self._create_clusters(X, new_medoids)
# Calculate the cost with the new set of medoids
new_cost = self._calculate_cost(
X, new_clusters, new_medoids)
# If the swap gives us a lower cost we save the medoids and cost
if new_cost < lowest_cost:
lowest_cost = new_cost
best_medoids = new_medoids
# If there was a swap that resultet in a lower cost we save the
# resulting medoids from the best swap and the new cost
if lowest_cost < cost:
cost = lowest_cost
medoids = best_medoids
# Else finished
else:
break
final_clusters = self._create_clusters(X, medoids)
# Return the samples cluster indices as labels
return self._get_cluster_labels(final_clusters, X)