Coverage for src/pyhrp/algos.py: 98%

84 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 16:29 +0000

1"""Portfolio optimization algorithms for hierarchical risk parity. 

2 

3This module implements various portfolio optimization algorithms: 

4- risk_parity: The main hierarchical risk parity algorithm 

5- schur_risk_parity: Schur Complementary Allocation (Cotton, arXiv:2411.05807) 

6- one_over_n: A simple equal-weight allocation strategy 

7""" 

8 

9from __future__ import annotations 

10 

11from collections.abc import Callable, Generator 

12from copy import deepcopy 

13from typing import TYPE_CHECKING 

14 

15import numpy as np 

16import polars as pl 

17 

18from .cluster import Cluster, Portfolio 

19 

20if TYPE_CHECKING: 

21 from .hrp import Dendrogram 

22 

23__all__ = ["one_over_n", "risk_parity", "schur_risk_parity"] 

24 

25 

26def risk_parity(root: Cluster, cov: pl.DataFrame) -> Cluster: 

27 """Compute hierarchical risk parity weights for a cluster tree. 

28 

29 This is the main algorithm for hierarchical risk parity. It recursively 

30 traverses the cluster tree and assigns weights to each node based on 

31 the risk parity principle. 

32 

33 Note: 

34 The tree is modified in place: the portfolio of every node is rebuilt 

35 from scratch, so the function is idempotent and a tree can be reused 

36 with a different covariance matrix. 

37 

38 Args: 

39 root (Cluster): The root node of the cluster tree 

40 cov (pl.DataFrame): Covariance matrix of asset returns 

41 

42 Returns: 

43 Cluster: The root node with portfolio weights assigned 

44 

45 Examples: 

46 >>> import polars as pl 

47 >>> from pyhrp.cluster import Cluster 

48 >>> from pyhrp.algos import risk_parity 

49 >>> cov = pl.DataFrame({"A": [4.0, 0.0], "B": [0.0, 1.0]}) 

50 >>> root = Cluster(2, left=Cluster(0), right=Cluster(1)) 

51 >>> cluster = risk_parity(root=root, cov=cov) 

52 >>> round(cluster.portfolio["B"], 1) 

53 0.8 

54 """ 

55 cov_np = cov.to_numpy() 

56 index = {name: i for i, name in enumerate(cov.columns)} 

57 

58 def combine(cluster: Cluster) -> Cluster: 

59 """Combine the child portfolios of a cluster via inverse-variance split.""" 

60 left, right = _children(cluster) 

61 v_left = _block_variance(left.portfolio, cov_np, index) 

62 v_right = _block_variance(right.portfolio, cov_np, index) 

63 return _split(cluster, v_left, v_right) 

64 

65 return _allocate(root, cov.columns, combine) 

66 

67 

68def schur_risk_parity(root: Cluster, cov: pl.DataFrame, gamma: float = 0.5) -> Cluster: 

69 """Compute Schur Complementary Allocation weights for a cluster tree. 

70 

71 An extension of HRP introduced by Peter Cotton (arXiv:2411.05807) that augments 

72 sub-covariance matrices with off-diagonal block information via Schur complements. 

73 At gamma=0 this recovers standard HRP; at gamma=1 it recovers the minimum-variance 

74 portfolio through the same recursive structure. 

75 

76 Note: 

77 The tree is modified in place: the portfolio of every node is rebuilt 

78 from scratch, so the function is idempotent and a tree can be reused 

79 with a different covariance matrix or gamma. 

80 

81 Args: 

82 root (Cluster): The root node of the cluster tree 

83 cov (pl.DataFrame): Covariance matrix of asset returns 

84 gamma (float): Interpolation parameter in [0, 1]. 0 = HRP, 1 = minimum variance. 

85 

86 Returns: 

87 Cluster: The root node with portfolio weights assigned 

88 

89 Raises: 

90 ValueError: If gamma is outside the interval [0, 1]. 

91 

92 Examples: 

93 >>> import polars as pl 

94 >>> from pyhrp.cluster import Cluster 

95 >>> from pyhrp.algos import schur_risk_parity 

96 >>> cov = pl.DataFrame({"A": [4.0, 0.0], "B": [0.0, 1.0]}) 

97 >>> root = Cluster(2, left=Cluster(0), right=Cluster(1)) 

98 >>> cluster = schur_risk_parity(root=root, cov=cov, gamma=0.5) 

99 >>> round(cluster.portfolio["B"], 1) 

100 0.8 

101 """ 

102 if not 0.0 <= gamma <= 1.0: 

103 msg = f"gamma must be in [0, 1], got {gamma}" 

104 raise ValueError(msg) 

105 

106 cov_np = cov.to_numpy() 

107 index = {name: i for i, name in enumerate(cov.columns)} 

108 

109 def combine(cluster: Cluster) -> Cluster: 

110 """Combine the child portfolios of a cluster via a Schur-augmented split.""" 

111 left, right = _children(cluster) 

112 

113 li = [index[a] for a in left.portfolio.assets] 

114 ri = [index[a] for a in right.portfolio.assets] 

115 

116 a_mat = cov_np[np.ix_(li, li)] 

117 b_mat = cov_np[np.ix_(li, ri)] 

118 d_mat = cov_np[np.ix_(ri, ri)] 

119 

120 w_left = np.array([left.portfolio[a] for a in left.portfolio.assets]) 

121 w_right = np.array([right.portfolio[a] for a in right.portfolio.assets]) 

122 

123 # Schur-augmented blocks: condition each group on the other 

124 a_aug = a_mat - gamma * (b_mat @ _solve(d_mat, b_mat.T)) 

125 d_aug = d_mat - gamma * (b_mat.T @ _solve(a_mat, b_mat)) 

126 

127 v_left = float(w_left @ a_aug @ w_left) 

128 v_right = float(w_right @ d_aug @ w_right) 

129 return _split(cluster, v_left, v_right) 

130 

131 return _allocate(root, cov.columns, combine) 

132 

133 

134def _allocate(root: Cluster, assets: list[str], combine: Callable[[Cluster], Cluster]) -> Cluster: 

135 """Traverse the tree bottom-up, assigning leaf portfolios and combining children. 

136 

137 Every node's portfolio is replaced, never accumulated into, which keeps 

138 repeated allocations on the same tree idempotent. 

139 

140 Args: 

141 root (Cluster): The (sub)tree to allocate weights for 

142 assets (list[str]): Asset names; a leaf's value indexes into this list 

143 combine (Callable[[Cluster], Cluster]): Combines the two child portfolios 

144 of a node into the node's own portfolio 

145 

146 Returns: 

147 Cluster: The input node with portfolio weights assigned 

148 """ 

149 if root.is_leaf: 

150 root.portfolio = Portfolio() 

151 root.portfolio[assets[int(root.value)]] = 1.0 

152 return root 

153 

154 left, right = _children(root) 

155 root.left = _allocate(left, assets, combine) 

156 root.right = _allocate(right, assets, combine) 

157 return combine(root) 

158 

159 

160def _children(cluster: Cluster) -> tuple[Cluster, Cluster]: 

161 """Return the validated left and right children of a non-leaf cluster.""" 

162 if not isinstance(cluster.left, Cluster): 

163 raise TypeError("Expected left child to be a Cluster") # noqa: TRY003 

164 if not isinstance(cluster.right, Cluster): 

165 raise TypeError("Expected right child to be a Cluster") # noqa: TRY003 

166 return cluster.left, cluster.right 

167 

168 

169def _block_variance(portfolio: Portfolio, cov_np: np.ndarray, index: dict[str, int]) -> float: 

170 """Compute the variance of a portfolio against a precomputed covariance array.""" 

171 assets = portfolio.assets 

172 idx = [index[a] for a in assets] 

173 w = np.array([portfolio[a] for a in assets]) 

174 return float(w @ cov_np[np.ix_(idx, idx)] @ w) 

175 

176 

177def _split(cluster: Cluster, v_left: float, v_right: float) -> Cluster: 

178 """Distribute weight between the two children inversely proportional to risk. 

179 

180 The split satisfies v_left * alpha_left == v_right * alpha_right with 

181 alpha_left + alpha_right == 1. If both variances are zero (e.g. riskless 

182 sub-portfolios), the weight is split equally. 

183 

184 Args: 

185 cluster (Cluster): The parent cluster with left and right children 

186 v_left (float): Variance of the left sub-portfolio 

187 v_right (float): Variance of the right sub-portfolio 

188 

189 Returns: 

190 Cluster: The parent cluster with portfolio weights assigned 

191 """ 

192 left, right = _children(cluster) 

193 total = v_left + v_right 

194 alpha_left = v_right / total if total > 0 else 0.5 

195 alpha_right = 1.0 - alpha_left 

196 

197 cluster.portfolio = Portfolio() 

198 for asset, weight in left.portfolio.weights.items(): 

199 cluster.portfolio[asset] = alpha_left * weight 

200 for asset, weight in right.portfolio.weights.items(): 

201 cluster.portfolio[asset] = alpha_right * weight 

202 

203 return cluster 

204 

205 

206def _solve(m: np.ndarray, b: np.ndarray) -> np.ndarray: 

207 """Solve m @ x = b, falling back to least squares for singular matrices. 

208 

209 Covariance blocks of collinear assets are singular; the minimum-norm 

210 least-squares solution keeps the Schur augmentation well-defined there. 

211 """ 

212 try: 

213 return np.linalg.solve(m, b) 

214 except np.linalg.LinAlgError: 

215 return np.asarray(np.linalg.lstsq(m, b, rcond=None)[0]) 

216 

217 

218def one_over_n(dendrogram: Dendrogram) -> Generator[tuple[int, Portfolio]]: 

219 """Generate portfolios using the 1/N (equal weight) strategy at each tree level. 

220 

221 This function implements a hierarchical 1/N strategy where weights are 

222 distributed equally among assets within each cluster at each level of the tree. 

223 The weight assigned to each cluster decreases by half at each level. 

224 

225 Args: 

226 dendrogram: A dendrogram object containing the hierarchical clustering tree 

227 and the list of assets 

228 

229 Yields: 

230 tuple[int, Portfolio]: A tuple containing the level number and the portfolio 

231 at that level 

232 

233 Examples: 

234 >>> import polars as pl 

235 >>> from pyhrp.hrp import build_tree 

236 >>> from pyhrp.algos import one_over_n 

237 >>> cor = pl.DataFrame({"A": [1.0, 0.3], "B": [0.3, 1.0]}) 

238 >>> dg = build_tree(cor, method="ward") 

239 >>> levels = list(one_over_n(dg)) 

240 >>> len(levels) > 0 

241 True 

242 """ 

243 root = dendrogram.root 

244 assets = dendrogram.assets 

245 

246 # Initial weight to distribute 

247 w: float = 1.0 

248 

249 # Process each level of the tree 

250 for n, level in enumerate(root.levels): 

251 for node in level: 

252 # Distribute weight equally among all leaves in this node 

253 for leaf in node.leaves: 

254 root.portfolio[assets[leaf.value]] = w / node.leaf_count 

255 

256 # Reduce weight for the next level 

257 w *= 0.5 

258 

259 # Yield the current level number and a deep copy of the portfolio 

260 yield n, deepcopy(root.portfolio)