Coverage for src/ifunnel/models/ScenarioGeneration.py: 63%

150 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-12 09:14 +0000

1import math 

2 

3import numpy as np 

4import pandas as pd 

5from loguru import logger 

6from sklearn.covariance import MinCovDet 

7 

8 

9class MomentGenerator: 

10 """ 

11 Provides methods for mean, variance generation. 

12 """ 

13 

14 @staticmethod 

15 def _alpha_numerator(Z, S): 

16 s = 0 

17 T = Z.shape[1] 

18 for k in range(T): 

19 z = Z[:, k][:, np.newaxis] 

20 X = z @ z.T - S 

21 s += np.trace(X @ X) 

22 s /= T**2 

23 return s 

24 

25 @staticmethod 

26 def _ledoit_wolf_shrinkage(X, S): 

27 """ 

28 Computes the Ledoit--Wolf shrinkage, using a target of scaled identity. 

29 """ 

30 N = len(X.columns) 

31 # In case only one asset in the matrix, for example for benchmark with one asset, no shrinkage is needed 

32 if N == 1: 

33 return S 

34 

35 # Center the data 

36 X = (X - X.mean(0)).to_numpy().T 

37 

38 # Target. 

39 s_avg2 = np.trace(S) / N 

40 B = s_avg2 * np.eye(N) 

41 

42 # Shrinkage coefficient. 

43 alpha_num = MomentGenerator._alpha_numerator(X, S) 

44 alpha_den = np.trace((S - B) @ (S - B)) 

45 alpha = alpha_num / alpha_den 

46 

47 # Shrunk covariance 

48 shrunk = (1 - alpha) * S + alpha * B 

49 

50 return shrunk 

51 

52 @staticmethod 

53 def _jorion_shrinkage(MU, MU_STAR, _lambda): 

54 """ 

55 Applies shrinkage to the mean of weekly returns. 

56 

57 Parameters: 

58 - weekly_returns: numpy array or a pandas series of weekly returns. 

59 - target_mean: float, the target mean to shrink towards. 

60 - lambda_shrinkage: float, the shrinkage intensity, between 0 and 1. 

61 

62 Returns: 

63 - shrunk_mean: float, the shrunk mean of the weekly returns. 

64 """ 

65 

66 # Apply the shrinkage formula 

67 shrunk_mean = _lambda * MU_STAR + (1 - _lambda) * MU 

68 

69 return shrunk_mean 

70 

71 @staticmethod 

72 def compute_annualized_covariance(X): 

73 """ 

74 Computes the annualized covariance matrix from weekly return data, 

75 incorporating robust estimation (MCD), Ledoit-Wolf shrinkage. 

76 

77 :param X: A pandas DataFrame with weekly returns for each asset. 

78 :return: Annualized covariance matrix as a pandas DataFrame. 

79 """ 

80 # Step 1: Compute Robust Covariance Matrix using MCD 

81 mcd = MinCovDet().fit(X) 

82 robust_cov_matrix = mcd.covariance_ 

83 

84 # Convert to DataFrame for compatibility with Ledoit-Wolf function 

85 robust_cov_df = pd.DataFrame(robust_cov_matrix, index=X.columns, columns=X.columns) 

86 

87 # Step 2: Apply Ledoit-Wolf Shrinkage 

88 shrunk_cov_df = MomentGenerator._ledoit_wolf_shrinkage(X, robust_cov_df) 

89 

90 return shrunk_cov_df 

91 

92 @staticmethod 

93 def generate_sigma_mu_for_test_periods(data: pd.DataFrame, n_test: int) -> tuple[list, list]: 

94 logger.info("⏳ Computing covariance matrix and mean array for each investment period") 

95 

96 # Initialize variables 

97 sigma_lst = [] 

98 mu_lst = [] 

99 

100 n_iter = 4 # we work with 4-week periods 

101 n_train_weeks = len(data.index) - n_test 

102 n_rolls = math.floor(n_test / n_iter) + 1 

103 

104 for p in range(int(n_rolls)): 

105 rolling_train_dataset = data.iloc[(n_iter * p) : (n_train_weeks + n_iter * p), :] 

106 

107 sigma = np.atleast_2d( 

108 np.cov(rolling_train_dataset, rowvar=False, bias=True) 

109 ) # The sample covariance matrix 

110 

111 # Add a shrinkage term (Ledoit--Wolf multiple of identity) 

112 sigma = MomentGenerator._ledoit_wolf_shrinkage(rolling_train_dataset, sigma) 

113 

114 # Make sure sigma is positive semidefinite 

115 # sigma = np.atleast_2d(0.5 * (sigma + sigma.T)) 

116 # min_eig = np.min(np.linalg.eigvalsh(sigma)) 

117 # if min_eig < 0: 

118 # sigma -= 5 * min_eig * np.eye(*sigma.shape) 

119 

120 # RHO = np.corrcoef(ret_train, rowvar=False) # The correlation matrix 

121 mu = np.mean(rolling_train_dataset, axis=0) # The mean array 

122 # sd = np.sqrt(np.diagonal(SIGMA)) # The standard deviation 

123 

124 sigma_lst.append(sigma) 

125 mu_lst.append(mu) 

126 

127 return sigma_lst, mu_lst 

128 

129 @staticmethod 

130 def split_dataset(data: pd.DataFrame, sampling_ratio: float = 0.6): 

131 """ 

132 Splits the dataset into a sampling (training) set and an estimating (testing) set. 

133 

134 Parameters: 

135 - data: The dataset to be split, provided as a pandas DataFrame. 

136 - sampling_ratio: The ratio of the dataset to be used for sampling (training), 

137 with the remainder used for estimating (testing). 

138 

139 Returns: 

140 - A tuple containing two DataFrames: (sampling_set, estimating_set). 

141 """ 

142 

143 # Ensure the sampling ratio is between 0 and 1 

144 if not (0 < sampling_ratio < 1): 

145 raise ValueError("Sampling ratio must be between 0 and 1.") 

146 

147 # Calculate the split index 

148 split_index = int(len(data) * sampling_ratio) 

149 

150 # Split the dataset 

151 sampling_set = data.iloc[:split_index] 

152 estimating_set = data.iloc[split_index:] 

153 

154 return sampling_set, estimating_set 

155 

156 @staticmethod 

157 def generate_annual_sigma_mu_with_risk_free( 

158 data: pd.DataFrame, risk_free_rate_annual: float = 0.015 

159 ) -> tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]: 

160 """ 

161 Computes the annualized and weekly covariance matrix (sigma) and mean return array (mu) 

162 for the entire historical dataset, including a risk-free asset. 

163 

164 Parameters: 

165 - data: A pandas DataFrame with weekly returns for each asset. 

166 - risk_free_rate_annual: Annual return rate of the risk-free asset, default is 2%. 

167 

168 Returns: 

169 - A tuple containing: 

170 - sigma_annual: Annualized covariance matrix including the risk-free asset. 

171 - mu_annual: Annualized mean return vector including the risk-free asset. 

172 - sigma_weekly: Weekly covariance matrix including the risk-free asset. 

173 - mu_weekly: Weekly mean return vector including the risk-free asset. 

174 """ 

175 logger.debug("⏳ Generating annual Sigma and Mu parameter estimations for the optimization model.") 

176 # Compute the sample covariance matrix for the entire dataset 

177 

178 sigma_weekly_np = np.atleast_2d( 

179 # np.cov(data, rowvar=False, bias=True) 

180 MomentGenerator.compute_annualized_covariance(data) 

181 ) # The sample covariance matrix 

182 

183 # Add a shrinkage term (Ledoit--Wolf multiple of identity) 

184 # sigma_weekly_np = MomentGenerator._ledoit_wolf_shrinkage(data, sigma_weekly_np) 

185 

186 # Compute the mean return array for the entire dataset 

187 mu_weekly_np = np.mean(data, axis=0) 

188 

189 # Convert the annual risk-free rate to a weekly rate 

190 risk_free_rate_weekly = (1 + risk_free_rate_annual) ** (1 / 52) - 1 

191 

192 # Append the risk-free rate to the weekly mean return array 

193 mu_weekly_np = np.append(mu_weekly_np, risk_free_rate_weekly) 

194 

195 # Append a row and column of zeros for the risk-free asset in the covariance matrix 

196 sigma_weekly_np = np.pad(sigma_weekly_np, ((0, 1), (0, 1)), "constant") 

197 

198 # Convert numpy arrays to pandas DataFrame/Series and set appropriate asset names 

199 assets_with_rf = data.columns.tolist() + ["Cash"] 

200 sigma_weekly = pd.DataFrame(sigma_weekly_np, index=assets_with_rf, columns=assets_with_rf) 

201 mu_weekly = pd.Series(mu_weekly_np, index=assets_with_rf) 

202 

203 # Annualize the covariance matrix and mean return array 

204 sigma_annual = sigma_weekly * 52 

205 mu_annual = mu_weekly.copy() 

206 mu_annual.iloc[:-1] = mu_annual.iloc[:-1] * 52 # Annualize only the risky assets 

207 

208 return sigma_annual, mu_annual, sigma_weekly, mu_weekly 

209 

210 

211class ScenarioGenerator: 

212 """ 

213 Provides methods for scenario generation. 

214 """ 

215 

216 def __init__(self, rng: np.random.Generator): 

217 self.rng = rng 

218 

219 # ---------------------------------------------------------------------- 

220 # Scenario Generation: THE MONTE CARLO METHOD 

221 # ---------------------------------------------------------------------- 

222 def monte_carlo( 

223 self, 

224 data: pd.DataFrame, 

225 n_simulations: int, 

226 n_test: int, 

227 sigma_lst: list, 

228 mu_lst: list, 

229 ) -> np.ndarray: 

230 logger.info(f"⏳ Generating {n_simulations} scenarios for each investment period with Monte Carlo method") 

231 

232 n_iter = 4 # we work with 4-week periods 

233 n_indices = data.shape[1] 

234 n_rolls = math.floor(n_test / n_iter) + 1 

235 sim = np.zeros((n_rolls * 4, n_simulations, n_indices), dtype=float) # Match GAMS format 

236 

237 # First generate the weekly simulations for each rolling period 

238 for p in range(int(n_rolls)): 

239 sigma = sigma_lst[p] 

240 mu = mu_lst[p] 

241 

242 for week in range(n_iter * p, n_iter * p + n_iter): 

243 sim[week, :, :] = self.rng.multivariate_normal(mean=mu, cov=sigma, size=n_simulations) 

244 

245 # Now create the monthly (4-weeks) simulations for each rolling period 

246 monthly_sim = np.zeros((n_rolls, n_simulations, n_indices)) 

247 for roll in range(n_rolls): 

248 roll_mult = roll * n_iter 

249 for s in range(n_simulations): 

250 for index in range(n_indices): 

251 tmp_rets = 1 + sim[roll_mult : (roll_mult + n_iter), s, index] 

252 monthly_sim[roll, s, index] = np.prod(tmp_rets) - 1 

253 

254 return monthly_sim 

255 

256 # ---------------------------------------------------------------------- 

257 # Scenario Generation: THE BOOTSTRAPPING METHOD 

258 # ---------------------------------------------------------------------- 

259 def bootstrapping(self, data: pd.DataFrame, n_simulations: int, n_test: int) -> np.ndarray: 

260 logger.info(f"⏳ Generating {n_simulations} scenarios for each investment period with Bootstrapping method") 

261 

262 n_iter = 4 # 4 weeks compounded in our scenario 

263 n_train_weeks = len(data.index) - n_test 

264 n_indices = data.shape[1] 

265 n_simulations = n_simulations 

266 n_rolls = math.floor(n_test / n_iter) + 1 

267 

268 sim = np.zeros((int(n_rolls), n_simulations, n_indices, n_iter), dtype=float) 

269 monthly_sim = np.ones( 

270 ( 

271 int(n_rolls), 

272 n_simulations, 

273 n_indices, 

274 ) 

275 ) 

276 for p in range(int(n_rolls)): 

277 for s in range(n_simulations): 

278 for w in range(n_iter): 

279 random_num = self.rng.integers(n_iter * p, n_train_weeks + n_iter * p) 

280 sim[p, s, :, w] = data.iloc[random_num, :] 

281 monthly_sim[p, s, :] *= 1 + sim[p, s, :, w] 

282 monthly_sim[p, s, :] += -1 

283 

284 return monthly_sim 

285 

286 def MC_simulation_annual_from_weekly( 

287 self, 

288 weekly_mu: pd.Series, 

289 weekly_sigma: pd.DataFrame, 

290 n_simulations: int, 

291 n_years: int, 

292 cash_return_annual: float = 0.015, 

293 ): 

294 """ 

295 Generates Monte Carlo simulations for annual returns based on provided weekly mu and sigma. 

296 Assumes 'Cash' or risk-free asset is already included and sets its annual return to a constant value. 

297 

298 Parameters: 

299 - weekly_mu: Weekly mean returns as a pandas Series, including 'Cash'. 

300 - weekly_sigma: Weekly covariance matrix as a pandas DataFrame, including 'Cash'. 

301 - n_simulations: Number of simulations to generate. 

302 - n_years: Number of years to simulate. 

303 - cash_return_annual: Annual return rate of the 'Cash' or risk-free asset, default is 2%. 

304 

305 Returns: 

306 - annual_simulations: An array of simulated annual returns (n_simulations, n_years, n_assets). 

307 """ 

308 logger.debug( 

309 f"⏳ Simulating annual returns with Monte Carlo method based on weekly mu and weekly sigma. " 

310 f"We are generating {n_simulations} simulations for {n_years} years." 

311 ) 

312 n_assets = len(weekly_mu) 

313 weeks_per_year = 52 

314 weekly_scenarios = np.zeros((n_simulations, n_years * weeks_per_year, n_assets), dtype=float) 

315 

316 # Generate weekly simulations 

317 for week in range(n_years * weeks_per_year): 

318 weekly_returns = self.rng.multivariate_normal( 

319 mean=weekly_mu.values, cov=weekly_sigma.values, size=n_simulations 

320 ) 

321 weekly_scenarios[:, week, :] = weekly_returns 

322 

323 # Convert weekly simulations to annual simulations 

324 annual_simulations = np.zeros((n_simulations, n_years, n_assets), dtype=float) 

325 for year in range(n_years): 

326 start_week = year * weeks_per_year 

327 end_week = (year + 1) * weeks_per_year 

328 # Accumulate weekly returns to get annual returns 

329 for simulation in range(n_simulations): 

330 # Convert weekly returns to cumulative product for each asset 

331 for asset in range(n_assets): 

332 if weekly_mu.index[asset] == "Cash": # Assume 'Cash' represents the risk-free asset 

333 # Set 'Cash' returns to a constant annual rate 

334 annual_simulations[simulation, year, asset] = cash_return_annual 

335 else: 

336 annual_simulations[simulation, year, asset] = ( 

337 np.prod(1 + weekly_scenarios[simulation, start_week:end_week, asset]) - 1 

338 ) 

339 

340 return annual_simulations 

341 

342 def bootstrap_simulation_annual_from_weekly( 

343 self, 

344 historical_weekly_returns: pd.DataFrame, 

345 n_simulations: int, 

346 n_years: int, 

347 cash_return_annual: float = 0.015, 

348 ) -> np.ndarray: 

349 """ 

350 Generates bootstrap simulations for annual returns based on historical weekly returns, 

351 correctly handling weekly data to compound into annual returns. 

352 

353 Parameters: 

354 - historical_weekly_returns: DataFrame containing historical weekly returns for each asset. 

355 - n_simulations: Number of simulations to generate. 

356 - n_years: Number of years to simulate. 

357 

358 Returns: 

359 - annual_simulations: An array of simulated annual returns (n_simulations, n_years, n_assets). 

360 """ 

361 weeks_per_year = 52 

362 n_assets = historical_weekly_returns.shape[1] # Number of assets 

363 # Initialize the array for annual simulations 

364 annual_simulations = np.zeros((n_simulations, n_years, n_assets + 1), dtype=float) 

365 

366 for simulation in range(n_simulations): 

367 for year in range(n_years): 

368 # For each year in each simulation, sample weeks and compound 

369 annual_return = np.ones(n_assets) # Start with a base of 1 for compounding 

370 for week in range(weeks_per_year): 

371 # Sample a random week 

372 random_week_index = self.rng.integers(0, len(historical_weekly_returns)) 

373 weekly_return = historical_weekly_returns.iloc[random_week_index].values 

374 # Compound the returns 

375 annual_return *= 1 + weekly_return 

376 # Calculate the annual return for this year, subtract 1 to account for the base 

377 annual_simulations[simulation, year, :-1] = annual_return - 1 

378 annual_simulations[simulation, year, -1] = cash_return_annual 

379 

380 return annual_simulations