Coverage for src/jsharpe/sharpe/clustering.py: 100%
67 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-29 13:57 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-29 13:57 +0000
1"""Clustering and effective-dimensionality utilities.
3This module groups the routines used to assess the structure of a
4correlation matrix: the effective rank and the optimal number of
5clusters (with silhouette-based quality scoring).
6"""
7# ruff: noqa: N803, N806, S101, TRY003
9import math
10import warnings
12import numpy as np
13import scipy
16def effective_rank(C: np.ndarray) -> float:
17 """Compute the effective rank of a positive semi-definite matrix.
19 The effective rank measures the "effective dimensionality" of a matrix
20 by computing the exponential of the entropy of its normalized eigenvalues.
21 This provides a continuous measure between 1 (perfectly correlated) and
22 n (perfectly uncorrelated/identity matrix).
24 Algorithm:
25 1. Compute eigenvalues (non-negative for PSD matrices)
26 2. Discard zero eigenvalues
27 3. Normalize to form a probability distribution
28 4. Compute entropy H = -Σ p_i log(p_i)
29 5. Return exp(H)
31 Args:
32 C: Positive semi-definite matrix (e.g., correlation matrix).
33 Shape (n, n).
35 Returns:
36 Effective rank, a value in [1, n] where n is the matrix dimension.
38 References:
39 Roy, O. and Vetterli, M. (2007). "The effective rank: a measure of
40 effective dimensionality." EURASIP Journal on Advances in Signal
41 Processing. http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.177.2721
43 Example:
44 >>> import numpy as np
45 >>> # Identity matrix has effective rank equal to its dimension
46 >>> abs(effective_rank(np.eye(3)) - 3.0) < 1e-10
47 True
48 >>> # Perfectly correlated matrix has effective rank 1
49 >>> C = np.ones((3, 3))
50 >>> abs(effective_rank(C) - 1.0) < 1e-10
51 True
52 """
53 p = np.linalg.eigvalsh(C)
54 p = p[p > 0]
55 p = p / sum(p)
56 H = np.sum(-p * np.log(p))
57 return math.exp(H)
60def _silhouette_samples(X: np.ndarray, labels: np.ndarray) -> np.ndarray:
61 """Compute silhouette coefficients for each sample.
63 For each sample i:
64 a(i) = mean distance from i to all other samples in the same cluster
65 b(i) = min over other clusters c of mean distance from i to samples in c
66 s(i) = (b(i) - a(i)) / max(a(i), b(i))
68 Args:
69 X: Feature matrix of shape (n_samples, n_features).
70 labels: Cluster labels of shape (n_samples,).
72 Returns:
73 Silhouette coefficients of shape (n_samples,).
74 """
75 n = X.shape[0]
76 unique_labels = np.unique(labels)
77 n_clusters = len(unique_labels)
79 # Pairwise Euclidean distances: dist[i,j] = ||X[i] - X[j]||
80 diff = X[:, np.newaxis, :] - X[np.newaxis, :, :]
81 dist = np.sqrt(np.einsum("ijk,ijk->ij", diff, diff))
83 # For each cluster c, compute mean distance from every sample to cluster c.
84 # For samples IN cluster c, the self-distance (zero) is excluded.
85 label_to_idx = {c: idx for idx, c in enumerate(unique_labels)}
86 cluster_mean_dist = np.zeros((n, n_clusters))
87 for c_idx, c in enumerate(unique_labels):
88 mask = labels == c
89 count = int(mask.sum())
90 dist_to_cluster = dist[:, mask].sum(axis=1)
91 cluster_mean_dist[:, c_idx] = dist_to_cluster / max(count, 1)
92 # For members of cluster c, exclude self-distance (which is 0)
93 in_cluster = np.where(mask)[0]
94 if count > 1:
95 cluster_mean_dist[in_cluster, c_idx] = dist_to_cluster[in_cluster] / (count - 1)
96 else:
97 cluster_mean_dist[in_cluster, c_idx] = 0.0
99 # a(i): mean distance to same cluster
100 own_cluster_idx = np.array([label_to_idx[lbl] for lbl in labels])
101 a = cluster_mean_dist[np.arange(n), own_cluster_idx]
103 # b(i): min mean distance to any other cluster
104 b_mat = cluster_mean_dist.copy()
105 b_mat[np.arange(n), own_cluster_idx] = np.inf
106 b = b_mat.min(axis=1)
107 b = np.where(b == np.inf, 0.0, b)
109 max_ab = np.maximum(a, b)
110 return np.where(max_ab > 0, (b - a) / max_ab, 0.0)
113def number_of_clusters(
114 C: np.ndarray,
115 *,
116 retries: int = 10,
117 max_clusters: int = 100,
118) -> tuple[int, dict[int, float], np.ndarray]:
119 """Compute the optimal number of clusters from a correlation matrix.
121 Implements the algorithm from section 8.1 of Lopez de Prado (2018):
122 1. Convert the correlation matrix into a distance matrix.
123 2. Using the columns of the distance matrix as features, run the
124 k-means algorithm for each k and compute the quality of the
125 clustering.
126 3. Return the clustering with the highest quality.
128 Quality is defined as the mean of the silhouette scores divided by
129 their standard deviation.
131 Args:
132 C: Correlation matrix. Must be square, symmetric, finite, with ones
133 on the diagonal and all entries in [-1, 1].
134 retries: Number of times to run k-means for each k to reduce the
135 impact of random initialisation. Default 10.
136 max_clusters: Maximum number of clusters to evaluate. Capped at
137 ``C.shape[0] - 1``. Default 100.
139 Returns:
140 Tuple of (n_clusters, qualities, labels):
141 - n_clusters: Optimal number of clusters.
142 - qualities: Dict mapping k to its quality score.
143 - labels: Cluster assignment for each observation (shape (n,)).
145 References:
146 Lopez de Prado, M. (2018). "Detection of false investment strategies
147 using unsupervised learning methods." SSRN 3167017.
148 https://papers.ssrn.com/sol3/papers.cfm?abstract_id=3167017
150 Example:
151 >>> from jsharpe.sharpe.generators import get_random_correlation_matrix
152 >>> np.random.seed(42)
153 >>> C, _, _ = get_random_correlation_matrix(
154 ... number_of_trials=20, effective_number_of_trials=4
155 ... )
156 >>> n, qualities, labels = number_of_clusters(C, retries=3, max_clusters=8)
157 >>> 2 <= n <= 8
158 True
159 >>> labels.shape
160 (20,)
161 """
162 assert isinstance(C, np.ndarray)
163 assert np.all(C >= -1)
164 assert np.all(C <= 1)
165 assert np.all(np.diag(C) == 1)
166 assert np.all(np.isfinite(C))
168 max_clusters = min(max_clusters, C.shape[0] - 1)
170 # Convert correlations to distances
171 D = np.sqrt((1 - C) / 2)
172 assert np.all(np.isfinite(D))
174 qualities: dict[int, float] = {}
175 best_labels: dict[int, np.ndarray] = {}
176 for k in range(2, max_clusters + 1):
177 qualities[k] = -np.inf
178 for _ in range(retries):
179 with warnings.catch_warnings():
180 warnings.simplefilter("ignore")
181 _centroids, labels = scipy.cluster.vq.kmeans2(D, k, minit="points", iter=300)
182 # Skip degenerate solutions with empty clusters
183 if len(np.unique(labels)) < k:
184 continue
185 silhouette_vals = _silhouette_samples(D, labels)
186 std = silhouette_vals.std()
187 if std == 0:
188 continue
189 q = float(silhouette_vals.mean() / std)
190 if q > qualities[k]:
191 qualities[k] = q
192 best_labels[k] = labels.copy()
194 # Select the best k among those for which a valid solution was found
195 valid_k = {k: q for k, q in qualities.items() if k in best_labels}
196 if not valid_k:
197 raise RuntimeError("No valid clustering solution found; try increasing retries or reducing max_clusters.")
198 best_k = max(valid_k, key=lambda x: valid_k[x])
199 return best_k, qualities, best_labels[best_k]