Coverage for src/jsharpe/sharpe/clustering.py: 100%

67 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-29 13:57 +0000

1"""Clustering and effective-dimensionality utilities. 

2 

3This module groups the routines used to assess the structure of a 

4correlation matrix: the effective rank and the optimal number of 

5clusters (with silhouette-based quality scoring). 

6""" 

7# ruff: noqa: N803, N806, S101, TRY003 

8 

9import math 

10import warnings 

11 

12import numpy as np 

13import scipy 

14 

15 

16def effective_rank(C: np.ndarray) -> float: 

17 """Compute the effective rank of a positive semi-definite matrix. 

18 

19 The effective rank measures the "effective dimensionality" of a matrix 

20 by computing the exponential of the entropy of its normalized eigenvalues. 

21 This provides a continuous measure between 1 (perfectly correlated) and 

22 n (perfectly uncorrelated/identity matrix). 

23 

24 Algorithm: 

25 1. Compute eigenvalues (non-negative for PSD matrices) 

26 2. Discard zero eigenvalues 

27 3. Normalize to form a probability distribution 

28 4. Compute entropy H = -Σ p_i log(p_i) 

29 5. Return exp(H) 

30 

31 Args: 

32 C: Positive semi-definite matrix (e.g., correlation matrix). 

33 Shape (n, n). 

34 

35 Returns: 

36 Effective rank, a value in [1, n] where n is the matrix dimension. 

37 

38 References: 

39 Roy, O. and Vetterli, M. (2007). "The effective rank: a measure of 

40 effective dimensionality." EURASIP Journal on Advances in Signal 

41 Processing. http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.177.2721 

42 

43 Example: 

44 >>> import numpy as np 

45 >>> # Identity matrix has effective rank equal to its dimension 

46 >>> abs(effective_rank(np.eye(3)) - 3.0) < 1e-10 

47 True 

48 >>> # Perfectly correlated matrix has effective rank 1 

49 >>> C = np.ones((3, 3)) 

50 >>> abs(effective_rank(C) - 1.0) < 1e-10 

51 True 

52 """ 

53 p = np.linalg.eigvalsh(C) 

54 p = p[p > 0] 

55 p = p / sum(p) 

56 H = np.sum(-p * np.log(p)) 

57 return math.exp(H) 

58 

59 

60def _silhouette_samples(X: np.ndarray, labels: np.ndarray) -> np.ndarray: 

61 """Compute silhouette coefficients for each sample. 

62 

63 For each sample i: 

64 a(i) = mean distance from i to all other samples in the same cluster 

65 b(i) = min over other clusters c of mean distance from i to samples in c 

66 s(i) = (b(i) - a(i)) / max(a(i), b(i)) 

67 

68 Args: 

69 X: Feature matrix of shape (n_samples, n_features). 

70 labels: Cluster labels of shape (n_samples,). 

71 

72 Returns: 

73 Silhouette coefficients of shape (n_samples,). 

74 """ 

75 n = X.shape[0] 

76 unique_labels = np.unique(labels) 

77 n_clusters = len(unique_labels) 

78 

79 # Pairwise Euclidean distances: dist[i,j] = ||X[i] - X[j]|| 

80 diff = X[:, np.newaxis, :] - X[np.newaxis, :, :] 

81 dist = np.sqrt(np.einsum("ijk,ijk->ij", diff, diff)) 

82 

83 # For each cluster c, compute mean distance from every sample to cluster c. 

84 # For samples IN cluster c, the self-distance (zero) is excluded. 

85 label_to_idx = {c: idx for idx, c in enumerate(unique_labels)} 

86 cluster_mean_dist = np.zeros((n, n_clusters)) 

87 for c_idx, c in enumerate(unique_labels): 

88 mask = labels == c 

89 count = int(mask.sum()) 

90 dist_to_cluster = dist[:, mask].sum(axis=1) 

91 cluster_mean_dist[:, c_idx] = dist_to_cluster / max(count, 1) 

92 # For members of cluster c, exclude self-distance (which is 0) 

93 in_cluster = np.where(mask)[0] 

94 if count > 1: 

95 cluster_mean_dist[in_cluster, c_idx] = dist_to_cluster[in_cluster] / (count - 1) 

96 else: 

97 cluster_mean_dist[in_cluster, c_idx] = 0.0 

98 

99 # a(i): mean distance to same cluster 

100 own_cluster_idx = np.array([label_to_idx[lbl] for lbl in labels]) 

101 a = cluster_mean_dist[np.arange(n), own_cluster_idx] 

102 

103 # b(i): min mean distance to any other cluster 

104 b_mat = cluster_mean_dist.copy() 

105 b_mat[np.arange(n), own_cluster_idx] = np.inf 

106 b = b_mat.min(axis=1) 

107 b = np.where(b == np.inf, 0.0, b) 

108 

109 max_ab = np.maximum(a, b) 

110 return np.where(max_ab > 0, (b - a) / max_ab, 0.0) 

111 

112 

113def number_of_clusters( 

114 C: np.ndarray, 

115 *, 

116 retries: int = 10, 

117 max_clusters: int = 100, 

118) -> tuple[int, dict[int, float], np.ndarray]: 

119 """Compute the optimal number of clusters from a correlation matrix. 

120 

121 Implements the algorithm from section 8.1 of Lopez de Prado (2018): 

122 1. Convert the correlation matrix into a distance matrix. 

123 2. Using the columns of the distance matrix as features, run the 

124 k-means algorithm for each k and compute the quality of the 

125 clustering. 

126 3. Return the clustering with the highest quality. 

127 

128 Quality is defined as the mean of the silhouette scores divided by 

129 their standard deviation. 

130 

131 Args: 

132 C: Correlation matrix. Must be square, symmetric, finite, with ones 

133 on the diagonal and all entries in [-1, 1]. 

134 retries: Number of times to run k-means for each k to reduce the 

135 impact of random initialisation. Default 10. 

136 max_clusters: Maximum number of clusters to evaluate. Capped at 

137 ``C.shape[0] - 1``. Default 100. 

138 

139 Returns: 

140 Tuple of (n_clusters, qualities, labels): 

141 - n_clusters: Optimal number of clusters. 

142 - qualities: Dict mapping k to its quality score. 

143 - labels: Cluster assignment for each observation (shape (n,)). 

144 

145 References: 

146 Lopez de Prado, M. (2018). "Detection of false investment strategies 

147 using unsupervised learning methods." SSRN 3167017. 

148 https://papers.ssrn.com/sol3/papers.cfm?abstract_id=3167017 

149 

150 Example: 

151 >>> from jsharpe.sharpe.generators import get_random_correlation_matrix 

152 >>> np.random.seed(42) 

153 >>> C, _, _ = get_random_correlation_matrix( 

154 ... number_of_trials=20, effective_number_of_trials=4 

155 ... ) 

156 >>> n, qualities, labels = number_of_clusters(C, retries=3, max_clusters=8) 

157 >>> 2 <= n <= 8 

158 True 

159 >>> labels.shape 

160 (20,) 

161 """ 

162 assert isinstance(C, np.ndarray) 

163 assert np.all(C >= -1) 

164 assert np.all(C <= 1) 

165 assert np.all(np.diag(C) == 1) 

166 assert np.all(np.isfinite(C)) 

167 

168 max_clusters = min(max_clusters, C.shape[0] - 1) 

169 

170 # Convert correlations to distances 

171 D = np.sqrt((1 - C) / 2) 

172 assert np.all(np.isfinite(D)) 

173 

174 qualities: dict[int, float] = {} 

175 best_labels: dict[int, np.ndarray] = {} 

176 for k in range(2, max_clusters + 1): 

177 qualities[k] = -np.inf 

178 for _ in range(retries): 

179 with warnings.catch_warnings(): 

180 warnings.simplefilter("ignore") 

181 _centroids, labels = scipy.cluster.vq.kmeans2(D, k, minit="points", iter=300) 

182 # Skip degenerate solutions with empty clusters 

183 if len(np.unique(labels)) < k: 

184 continue 

185 silhouette_vals = _silhouette_samples(D, labels) 

186 std = silhouette_vals.std() 

187 if std == 0: 

188 continue 

189 q = float(silhouette_vals.mean() / std) 

190 if q > qualities[k]: 

191 qualities[k] = q 

192 best_labels[k] = labels.copy() 

193 

194 # Select the best k among those for which a valid solution was found 

195 valid_k = {k: q for k, q in qualities.items() if k in best_labels} 

196 if not valid_k: 

197 raise RuntimeError("No valid clustering solution found; try increasing retries or reducing max_clusters.") 

198 best_k = max(valid_k, key=lambda x: valid_k[x]) 

199 return best_k, qualities, best_labels[best_k]