Coverage for src/ifunnel/models/ScenarioGeneration.py: 63%
150 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-12 09:14 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-12 09:14 +0000
1import math
3import numpy as np
4import pandas as pd
5from loguru import logger
6from sklearn.covariance import MinCovDet
9class MomentGenerator:
10 """
11 Provides methods for mean, variance generation.
12 """
14 @staticmethod
15 def _alpha_numerator(Z, S):
16 s = 0
17 T = Z.shape[1]
18 for k in range(T):
19 z = Z[:, k][:, np.newaxis]
20 X = z @ z.T - S
21 s += np.trace(X @ X)
22 s /= T**2
23 return s
25 @staticmethod
26 def _ledoit_wolf_shrinkage(X, S):
27 """
28 Computes the Ledoit--Wolf shrinkage, using a target of scaled identity.
29 """
30 N = len(X.columns)
31 # In case only one asset in the matrix, for example for benchmark with one asset, no shrinkage is needed
32 if N == 1:
33 return S
35 # Center the data
36 X = (X - X.mean(0)).to_numpy().T
38 # Target.
39 s_avg2 = np.trace(S) / N
40 B = s_avg2 * np.eye(N)
42 # Shrinkage coefficient.
43 alpha_num = MomentGenerator._alpha_numerator(X, S)
44 alpha_den = np.trace((S - B) @ (S - B))
45 alpha = alpha_num / alpha_den
47 # Shrunk covariance
48 shrunk = (1 - alpha) * S + alpha * B
50 return shrunk
52 @staticmethod
53 def _jorion_shrinkage(MU, MU_STAR, _lambda):
54 """
55 Applies shrinkage to the mean of weekly returns.
57 Parameters:
58 - weekly_returns: numpy array or a pandas series of weekly returns.
59 - target_mean: float, the target mean to shrink towards.
60 - lambda_shrinkage: float, the shrinkage intensity, between 0 and 1.
62 Returns:
63 - shrunk_mean: float, the shrunk mean of the weekly returns.
64 """
66 # Apply the shrinkage formula
67 shrunk_mean = _lambda * MU_STAR + (1 - _lambda) * MU
69 return shrunk_mean
71 @staticmethod
72 def compute_annualized_covariance(X):
73 """
74 Computes the annualized covariance matrix from weekly return data,
75 incorporating robust estimation (MCD), Ledoit-Wolf shrinkage.
77 :param X: A pandas DataFrame with weekly returns for each asset.
78 :return: Annualized covariance matrix as a pandas DataFrame.
79 """
80 # Step 1: Compute Robust Covariance Matrix using MCD
81 mcd = MinCovDet().fit(X)
82 robust_cov_matrix = mcd.covariance_
84 # Convert to DataFrame for compatibility with Ledoit-Wolf function
85 robust_cov_df = pd.DataFrame(robust_cov_matrix, index=X.columns, columns=X.columns)
87 # Step 2: Apply Ledoit-Wolf Shrinkage
88 shrunk_cov_df = MomentGenerator._ledoit_wolf_shrinkage(X, robust_cov_df)
90 return shrunk_cov_df
92 @staticmethod
93 def generate_sigma_mu_for_test_periods(data: pd.DataFrame, n_test: int) -> tuple[list, list]:
94 logger.info("⏳ Computing covariance matrix and mean array for each investment period")
96 # Initialize variables
97 sigma_lst = []
98 mu_lst = []
100 n_iter = 4 # we work with 4-week periods
101 n_train_weeks = len(data.index) - n_test
102 n_rolls = math.floor(n_test / n_iter) + 1
104 for p in range(int(n_rolls)):
105 rolling_train_dataset = data.iloc[(n_iter * p) : (n_train_weeks + n_iter * p), :]
107 sigma = np.atleast_2d(
108 np.cov(rolling_train_dataset, rowvar=False, bias=True)
109 ) # The sample covariance matrix
111 # Add a shrinkage term (Ledoit--Wolf multiple of identity)
112 sigma = MomentGenerator._ledoit_wolf_shrinkage(rolling_train_dataset, sigma)
114 # Make sure sigma is positive semidefinite
115 # sigma = np.atleast_2d(0.5 * (sigma + sigma.T))
116 # min_eig = np.min(np.linalg.eigvalsh(sigma))
117 # if min_eig < 0:
118 # sigma -= 5 * min_eig * np.eye(*sigma.shape)
120 # RHO = np.corrcoef(ret_train, rowvar=False) # The correlation matrix
121 mu = np.mean(rolling_train_dataset, axis=0) # The mean array
122 # sd = np.sqrt(np.diagonal(SIGMA)) # The standard deviation
124 sigma_lst.append(sigma)
125 mu_lst.append(mu)
127 return sigma_lst, mu_lst
129 @staticmethod
130 def split_dataset(data: pd.DataFrame, sampling_ratio: float = 0.6):
131 """
132 Splits the dataset into a sampling (training) set and an estimating (testing) set.
134 Parameters:
135 - data: The dataset to be split, provided as a pandas DataFrame.
136 - sampling_ratio: The ratio of the dataset to be used for sampling (training),
137 with the remainder used for estimating (testing).
139 Returns:
140 - A tuple containing two DataFrames: (sampling_set, estimating_set).
141 """
143 # Ensure the sampling ratio is between 0 and 1
144 if not (0 < sampling_ratio < 1):
145 raise ValueError("Sampling ratio must be between 0 and 1.")
147 # Calculate the split index
148 split_index = int(len(data) * sampling_ratio)
150 # Split the dataset
151 sampling_set = data.iloc[:split_index]
152 estimating_set = data.iloc[split_index:]
154 return sampling_set, estimating_set
156 @staticmethod
157 def generate_annual_sigma_mu_with_risk_free(
158 data: pd.DataFrame, risk_free_rate_annual: float = 0.015
159 ) -> tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]:
160 """
161 Computes the annualized and weekly covariance matrix (sigma) and mean return array (mu)
162 for the entire historical dataset, including a risk-free asset.
164 Parameters:
165 - data: A pandas DataFrame with weekly returns for each asset.
166 - risk_free_rate_annual: Annual return rate of the risk-free asset, default is 2%.
168 Returns:
169 - A tuple containing:
170 - sigma_annual: Annualized covariance matrix including the risk-free asset.
171 - mu_annual: Annualized mean return vector including the risk-free asset.
172 - sigma_weekly: Weekly covariance matrix including the risk-free asset.
173 - mu_weekly: Weekly mean return vector including the risk-free asset.
174 """
175 logger.debug("⏳ Generating annual Sigma and Mu parameter estimations for the optimization model.")
176 # Compute the sample covariance matrix for the entire dataset
178 sigma_weekly_np = np.atleast_2d(
179 # np.cov(data, rowvar=False, bias=True)
180 MomentGenerator.compute_annualized_covariance(data)
181 ) # The sample covariance matrix
183 # Add a shrinkage term (Ledoit--Wolf multiple of identity)
184 # sigma_weekly_np = MomentGenerator._ledoit_wolf_shrinkage(data, sigma_weekly_np)
186 # Compute the mean return array for the entire dataset
187 mu_weekly_np = np.mean(data, axis=0)
189 # Convert the annual risk-free rate to a weekly rate
190 risk_free_rate_weekly = (1 + risk_free_rate_annual) ** (1 / 52) - 1
192 # Append the risk-free rate to the weekly mean return array
193 mu_weekly_np = np.append(mu_weekly_np, risk_free_rate_weekly)
195 # Append a row and column of zeros for the risk-free asset in the covariance matrix
196 sigma_weekly_np = np.pad(sigma_weekly_np, ((0, 1), (0, 1)), "constant")
198 # Convert numpy arrays to pandas DataFrame/Series and set appropriate asset names
199 assets_with_rf = data.columns.tolist() + ["Cash"]
200 sigma_weekly = pd.DataFrame(sigma_weekly_np, index=assets_with_rf, columns=assets_with_rf)
201 mu_weekly = pd.Series(mu_weekly_np, index=assets_with_rf)
203 # Annualize the covariance matrix and mean return array
204 sigma_annual = sigma_weekly * 52
205 mu_annual = mu_weekly.copy()
206 mu_annual.iloc[:-1] = mu_annual.iloc[:-1] * 52 # Annualize only the risky assets
208 return sigma_annual, mu_annual, sigma_weekly, mu_weekly
211class ScenarioGenerator:
212 """
213 Provides methods for scenario generation.
214 """
216 def __init__(self, rng: np.random.Generator):
217 self.rng = rng
219 # ----------------------------------------------------------------------
220 # Scenario Generation: THE MONTE CARLO METHOD
221 # ----------------------------------------------------------------------
222 def monte_carlo(
223 self,
224 data: pd.DataFrame,
225 n_simulations: int,
226 n_test: int,
227 sigma_lst: list,
228 mu_lst: list,
229 ) -> np.ndarray:
230 logger.info(f"⏳ Generating {n_simulations} scenarios for each investment period with Monte Carlo method")
232 n_iter = 4 # we work with 4-week periods
233 n_indices = data.shape[1]
234 n_rolls = math.floor(n_test / n_iter) + 1
235 sim = np.zeros((n_rolls * 4, n_simulations, n_indices), dtype=float) # Match GAMS format
237 # First generate the weekly simulations for each rolling period
238 for p in range(int(n_rolls)):
239 sigma = sigma_lst[p]
240 mu = mu_lst[p]
242 for week in range(n_iter * p, n_iter * p + n_iter):
243 sim[week, :, :] = self.rng.multivariate_normal(mean=mu, cov=sigma, size=n_simulations)
245 # Now create the monthly (4-weeks) simulations for each rolling period
246 monthly_sim = np.zeros((n_rolls, n_simulations, n_indices))
247 for roll in range(n_rolls):
248 roll_mult = roll * n_iter
249 for s in range(n_simulations):
250 for index in range(n_indices):
251 tmp_rets = 1 + sim[roll_mult : (roll_mult + n_iter), s, index]
252 monthly_sim[roll, s, index] = np.prod(tmp_rets) - 1
254 return monthly_sim
256 # ----------------------------------------------------------------------
257 # Scenario Generation: THE BOOTSTRAPPING METHOD
258 # ----------------------------------------------------------------------
259 def bootstrapping(self, data: pd.DataFrame, n_simulations: int, n_test: int) -> np.ndarray:
260 logger.info(f"⏳ Generating {n_simulations} scenarios for each investment period with Bootstrapping method")
262 n_iter = 4 # 4 weeks compounded in our scenario
263 n_train_weeks = len(data.index) - n_test
264 n_indices = data.shape[1]
265 n_simulations = n_simulations
266 n_rolls = math.floor(n_test / n_iter) + 1
268 sim = np.zeros((int(n_rolls), n_simulations, n_indices, n_iter), dtype=float)
269 monthly_sim = np.ones(
270 (
271 int(n_rolls),
272 n_simulations,
273 n_indices,
274 )
275 )
276 for p in range(int(n_rolls)):
277 for s in range(n_simulations):
278 for w in range(n_iter):
279 random_num = self.rng.integers(n_iter * p, n_train_weeks + n_iter * p)
280 sim[p, s, :, w] = data.iloc[random_num, :]
281 monthly_sim[p, s, :] *= 1 + sim[p, s, :, w]
282 monthly_sim[p, s, :] += -1
284 return monthly_sim
286 def MC_simulation_annual_from_weekly(
287 self,
288 weekly_mu: pd.Series,
289 weekly_sigma: pd.DataFrame,
290 n_simulations: int,
291 n_years: int,
292 cash_return_annual: float = 0.015,
293 ):
294 """
295 Generates Monte Carlo simulations for annual returns based on provided weekly mu and sigma.
296 Assumes 'Cash' or risk-free asset is already included and sets its annual return to a constant value.
298 Parameters:
299 - weekly_mu: Weekly mean returns as a pandas Series, including 'Cash'.
300 - weekly_sigma: Weekly covariance matrix as a pandas DataFrame, including 'Cash'.
301 - n_simulations: Number of simulations to generate.
302 - n_years: Number of years to simulate.
303 - cash_return_annual: Annual return rate of the 'Cash' or risk-free asset, default is 2%.
305 Returns:
306 - annual_simulations: An array of simulated annual returns (n_simulations, n_years, n_assets).
307 """
308 logger.debug(
309 f"⏳ Simulating annual returns with Monte Carlo method based on weekly mu and weekly sigma. "
310 f"We are generating {n_simulations} simulations for {n_years} years."
311 )
312 n_assets = len(weekly_mu)
313 weeks_per_year = 52
314 weekly_scenarios = np.zeros((n_simulations, n_years * weeks_per_year, n_assets), dtype=float)
316 # Generate weekly simulations
317 for week in range(n_years * weeks_per_year):
318 weekly_returns = self.rng.multivariate_normal(
319 mean=weekly_mu.values, cov=weekly_sigma.values, size=n_simulations
320 )
321 weekly_scenarios[:, week, :] = weekly_returns
323 # Convert weekly simulations to annual simulations
324 annual_simulations = np.zeros((n_simulations, n_years, n_assets), dtype=float)
325 for year in range(n_years):
326 start_week = year * weeks_per_year
327 end_week = (year + 1) * weeks_per_year
328 # Accumulate weekly returns to get annual returns
329 for simulation in range(n_simulations):
330 # Convert weekly returns to cumulative product for each asset
331 for asset in range(n_assets):
332 if weekly_mu.index[asset] == "Cash": # Assume 'Cash' represents the risk-free asset
333 # Set 'Cash' returns to a constant annual rate
334 annual_simulations[simulation, year, asset] = cash_return_annual
335 else:
336 annual_simulations[simulation, year, asset] = (
337 np.prod(1 + weekly_scenarios[simulation, start_week:end_week, asset]) - 1
338 )
340 return annual_simulations
342 def bootstrap_simulation_annual_from_weekly(
343 self,
344 historical_weekly_returns: pd.DataFrame,
345 n_simulations: int,
346 n_years: int,
347 cash_return_annual: float = 0.015,
348 ) -> np.ndarray:
349 """
350 Generates bootstrap simulations for annual returns based on historical weekly returns,
351 correctly handling weekly data to compound into annual returns.
353 Parameters:
354 - historical_weekly_returns: DataFrame containing historical weekly returns for each asset.
355 - n_simulations: Number of simulations to generate.
356 - n_years: Number of years to simulate.
358 Returns:
359 - annual_simulations: An array of simulated annual returns (n_simulations, n_years, n_assets).
360 """
361 weeks_per_year = 52
362 n_assets = historical_weekly_returns.shape[1] # Number of assets
363 # Initialize the array for annual simulations
364 annual_simulations = np.zeros((n_simulations, n_years, n_assets + 1), dtype=float)
366 for simulation in range(n_simulations):
367 for year in range(n_years):
368 # For each year in each simulation, sample weeks and compound
369 annual_return = np.ones(n_assets) # Start with a base of 1 for compounding
370 for week in range(weeks_per_year):
371 # Sample a random week
372 random_week_index = self.rng.integers(0, len(historical_weekly_returns))
373 weekly_return = historical_weekly_returns.iloc[random_week_index].values
374 # Compound the returns
375 annual_return *= 1 + weekly_return
376 # Calculate the annual return for this year, subtract 1 to account for the base
377 annual_simulations[simulation, year, :-1] = annual_return - 1
378 annual_simulations[simulation, year, -1] = cash_return_annual
380 return annual_simulations