Coverage for src/pyhrp/algos.py: 98%
84 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 16:29 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 16:29 +0000
1"""Portfolio optimization algorithms for hierarchical risk parity.
3This module implements various portfolio optimization algorithms:
4- risk_parity: The main hierarchical risk parity algorithm
5- schur_risk_parity: Schur Complementary Allocation (Cotton, arXiv:2411.05807)
6- one_over_n: A simple equal-weight allocation strategy
7"""
9from __future__ import annotations
11from collections.abc import Callable, Generator
12from copy import deepcopy
13from typing import TYPE_CHECKING
15import numpy as np
16import polars as pl
18from .cluster import Cluster, Portfolio
20if TYPE_CHECKING:
21 from .hrp import Dendrogram
23__all__ = ["one_over_n", "risk_parity", "schur_risk_parity"]
26def risk_parity(root: Cluster, cov: pl.DataFrame) -> Cluster:
27 """Compute hierarchical risk parity weights for a cluster tree.
29 This is the main algorithm for hierarchical risk parity. It recursively
30 traverses the cluster tree and assigns weights to each node based on
31 the risk parity principle.
33 Note:
34 The tree is modified in place: the portfolio of every node is rebuilt
35 from scratch, so the function is idempotent and a tree can be reused
36 with a different covariance matrix.
38 Args:
39 root (Cluster): The root node of the cluster tree
40 cov (pl.DataFrame): Covariance matrix of asset returns
42 Returns:
43 Cluster: The root node with portfolio weights assigned
45 Examples:
46 >>> import polars as pl
47 >>> from pyhrp.cluster import Cluster
48 >>> from pyhrp.algos import risk_parity
49 >>> cov = pl.DataFrame({"A": [4.0, 0.0], "B": [0.0, 1.0]})
50 >>> root = Cluster(2, left=Cluster(0), right=Cluster(1))
51 >>> cluster = risk_parity(root=root, cov=cov)
52 >>> round(cluster.portfolio["B"], 1)
53 0.8
54 """
55 cov_np = cov.to_numpy()
56 index = {name: i for i, name in enumerate(cov.columns)}
58 def combine(cluster: Cluster) -> Cluster:
59 """Combine the child portfolios of a cluster via inverse-variance split."""
60 left, right = _children(cluster)
61 v_left = _block_variance(left.portfolio, cov_np, index)
62 v_right = _block_variance(right.portfolio, cov_np, index)
63 return _split(cluster, v_left, v_right)
65 return _allocate(root, cov.columns, combine)
68def schur_risk_parity(root: Cluster, cov: pl.DataFrame, gamma: float = 0.5) -> Cluster:
69 """Compute Schur Complementary Allocation weights for a cluster tree.
71 An extension of HRP introduced by Peter Cotton (arXiv:2411.05807) that augments
72 sub-covariance matrices with off-diagonal block information via Schur complements.
73 At gamma=0 this recovers standard HRP; at gamma=1 it recovers the minimum-variance
74 portfolio through the same recursive structure.
76 Note:
77 The tree is modified in place: the portfolio of every node is rebuilt
78 from scratch, so the function is idempotent and a tree can be reused
79 with a different covariance matrix or gamma.
81 Args:
82 root (Cluster): The root node of the cluster tree
83 cov (pl.DataFrame): Covariance matrix of asset returns
84 gamma (float): Interpolation parameter in [0, 1]. 0 = HRP, 1 = minimum variance.
86 Returns:
87 Cluster: The root node with portfolio weights assigned
89 Raises:
90 ValueError: If gamma is outside the interval [0, 1].
92 Examples:
93 >>> import polars as pl
94 >>> from pyhrp.cluster import Cluster
95 >>> from pyhrp.algos import schur_risk_parity
96 >>> cov = pl.DataFrame({"A": [4.0, 0.0], "B": [0.0, 1.0]})
97 >>> root = Cluster(2, left=Cluster(0), right=Cluster(1))
98 >>> cluster = schur_risk_parity(root=root, cov=cov, gamma=0.5)
99 >>> round(cluster.portfolio["B"], 1)
100 0.8
101 """
102 if not 0.0 <= gamma <= 1.0:
103 msg = f"gamma must be in [0, 1], got {gamma}"
104 raise ValueError(msg)
106 cov_np = cov.to_numpy()
107 index = {name: i for i, name in enumerate(cov.columns)}
109 def combine(cluster: Cluster) -> Cluster:
110 """Combine the child portfolios of a cluster via a Schur-augmented split."""
111 left, right = _children(cluster)
113 li = [index[a] for a in left.portfolio.assets]
114 ri = [index[a] for a in right.portfolio.assets]
116 a_mat = cov_np[np.ix_(li, li)]
117 b_mat = cov_np[np.ix_(li, ri)]
118 d_mat = cov_np[np.ix_(ri, ri)]
120 w_left = np.array([left.portfolio[a] for a in left.portfolio.assets])
121 w_right = np.array([right.portfolio[a] for a in right.portfolio.assets])
123 # Schur-augmented blocks: condition each group on the other
124 a_aug = a_mat - gamma * (b_mat @ _solve(d_mat, b_mat.T))
125 d_aug = d_mat - gamma * (b_mat.T @ _solve(a_mat, b_mat))
127 v_left = float(w_left @ a_aug @ w_left)
128 v_right = float(w_right @ d_aug @ w_right)
129 return _split(cluster, v_left, v_right)
131 return _allocate(root, cov.columns, combine)
134def _allocate(root: Cluster, assets: list[str], combine: Callable[[Cluster], Cluster]) -> Cluster:
135 """Traverse the tree bottom-up, assigning leaf portfolios and combining children.
137 Every node's portfolio is replaced, never accumulated into, which keeps
138 repeated allocations on the same tree idempotent.
140 Args:
141 root (Cluster): The (sub)tree to allocate weights for
142 assets (list[str]): Asset names; a leaf's value indexes into this list
143 combine (Callable[[Cluster], Cluster]): Combines the two child portfolios
144 of a node into the node's own portfolio
146 Returns:
147 Cluster: The input node with portfolio weights assigned
148 """
149 if root.is_leaf:
150 root.portfolio = Portfolio()
151 root.portfolio[assets[int(root.value)]] = 1.0
152 return root
154 left, right = _children(root)
155 root.left = _allocate(left, assets, combine)
156 root.right = _allocate(right, assets, combine)
157 return combine(root)
160def _children(cluster: Cluster) -> tuple[Cluster, Cluster]:
161 """Return the validated left and right children of a non-leaf cluster."""
162 if not isinstance(cluster.left, Cluster):
163 raise TypeError("Expected left child to be a Cluster") # noqa: TRY003
164 if not isinstance(cluster.right, Cluster):
165 raise TypeError("Expected right child to be a Cluster") # noqa: TRY003
166 return cluster.left, cluster.right
169def _block_variance(portfolio: Portfolio, cov_np: np.ndarray, index: dict[str, int]) -> float:
170 """Compute the variance of a portfolio against a precomputed covariance array."""
171 assets = portfolio.assets
172 idx = [index[a] for a in assets]
173 w = np.array([portfolio[a] for a in assets])
174 return float(w @ cov_np[np.ix_(idx, idx)] @ w)
177def _split(cluster: Cluster, v_left: float, v_right: float) -> Cluster:
178 """Distribute weight between the two children inversely proportional to risk.
180 The split satisfies v_left * alpha_left == v_right * alpha_right with
181 alpha_left + alpha_right == 1. If both variances are zero (e.g. riskless
182 sub-portfolios), the weight is split equally.
184 Args:
185 cluster (Cluster): The parent cluster with left and right children
186 v_left (float): Variance of the left sub-portfolio
187 v_right (float): Variance of the right sub-portfolio
189 Returns:
190 Cluster: The parent cluster with portfolio weights assigned
191 """
192 left, right = _children(cluster)
193 total = v_left + v_right
194 alpha_left = v_right / total if total > 0 else 0.5
195 alpha_right = 1.0 - alpha_left
197 cluster.portfolio = Portfolio()
198 for asset, weight in left.portfolio.weights.items():
199 cluster.portfolio[asset] = alpha_left * weight
200 for asset, weight in right.portfolio.weights.items():
201 cluster.portfolio[asset] = alpha_right * weight
203 return cluster
206def _solve(m: np.ndarray, b: np.ndarray) -> np.ndarray:
207 """Solve m @ x = b, falling back to least squares for singular matrices.
209 Covariance blocks of collinear assets are singular; the minimum-norm
210 least-squares solution keeps the Schur augmentation well-defined there.
211 """
212 try:
213 return np.linalg.solve(m, b)
214 except np.linalg.LinAlgError:
215 return np.asarray(np.linalg.lstsq(m, b, rcond=None)[0])
218def one_over_n(dendrogram: Dendrogram) -> Generator[tuple[int, Portfolio]]:
219 """Generate portfolios using the 1/N (equal weight) strategy at each tree level.
221 This function implements a hierarchical 1/N strategy where weights are
222 distributed equally among assets within each cluster at each level of the tree.
223 The weight assigned to each cluster decreases by half at each level.
225 Args:
226 dendrogram: A dendrogram object containing the hierarchical clustering tree
227 and the list of assets
229 Yields:
230 tuple[int, Portfolio]: A tuple containing the level number and the portfolio
231 at that level
233 Examples:
234 >>> import polars as pl
235 >>> from pyhrp.hrp import build_tree
236 >>> from pyhrp.algos import one_over_n
237 >>> cor = pl.DataFrame({"A": [1.0, 0.3], "B": [0.3, 1.0]})
238 >>> dg = build_tree(cor, method="ward")
239 >>> levels = list(one_over_n(dg))
240 >>> len(levels) > 0
241 True
242 """
243 root = dendrogram.root
244 assets = dendrogram.assets
246 # Initial weight to distribute
247 w: float = 1.0
249 # Process each level of the tree
250 for n, level in enumerate(root.levels):
251 for node in level:
252 # Distribute weight equally among all leaves in this node
253 for leaf in node.leaves:
254 root.portfolio[assets[leaf.value]] = w / node.leaf_count
256 # Reduce weight for the next level
257 w *= 0.5
259 # Yield the current level number and a deep copy of the portfolio
260 yield n, deepcopy(root.portfolio)