Coverage for src/jquantstats/_stats.py: 99%

205 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-05 07:23 +0000

1import dataclasses 

2from collections.abc import Callable, Iterable 

3from functools import wraps 

4from typing import cast 

5 

6import numpy as np 

7import polars as pl 

8from scipy.stats import norm 

9 

10 

11@dataclasses.dataclass(frozen=True) 

12class Stats: 

13 """Statistical analysis tools for financial returns data. 

14 

15 This class provides a comprehensive set of methods for calculating various 

16 financial metrics and statistics on returns data, including: 

17 

18 - Basic statistics (mean, skew, kurtosis) 

19 - Risk metrics (volatility, value-at-risk, drawdown) 

20 - Performance ratios (Sharpe, Sortino, information ratio) 

21 - Win/loss metrics (win rate, profit factor, payoff ratio) 

22 - Rolling calculations (rolling volatility, rolling Sharpe) 

23 - Factor analysis (alpha, beta, R-squared) 

24 

25 The class is designed to work with the _Data class and operates on Polars DataFrames 

26 for efficient computation. 

27 

28 Attributes: 

29 data: The _Data object containing returns and benchmark data. 

30 all: A DataFrame combining all data (index, returns, benchmark) for easy access. 

31 

32 """ 

33 

34 data: "Data" # type: ignore 

35 all: pl.DataFrame | None = None # Default is None; will be set in __post_init__ 

36 

37 def __post_init__(self): 

38 object.__setattr__(self, "all", self.data.all) 

39 

40 @staticmethod 

41 def _mean_positive_expr(series: pl.Series) -> float: 

42 return series.filter(series > 0).mean() 

43 

44 @staticmethod 

45 def _mean_negative_expr(series: pl.Series) -> float: 

46 return series.filter(series < 0).mean() 

47 

48 @staticmethod 

49 def columnwise_stat(func: Callable) -> Callable: 

50 """Apply a column-wise statistical function to all numeric columns. 

51 

52 Args: 

53 func (Callable): The function to decorate. 

54 

55 Returns: 

56 Callable: The decorated function. 

57 

58 """ 

59 

60 @wraps(func) 

61 def wrapper(self, *args, **kwargs) -> dict[str, float]: 

62 return {col: func(self, series, *args, **kwargs) for col, series in self.data.items()} 

63 

64 return wrapper 

65 

66 @staticmethod 

67 def to_frame(func: Callable) -> Callable: 

68 """Apply per-column expressions and evaluates with .with_columns(...). 

69 

70 Args: 

71 func (Callable): The function to decorate. 

72 

73 Returns: 

74 Callable: The decorated function. 

75 

76 """ 

77 

78 @wraps(func) 

79 def wrapper(self, *args, **kwargs) -> pl.DataFrame: 

80 return self.all.select( 

81 [pl.col(name) for name in self.data.date_col] 

82 + [func(self, series, *args, **kwargs).alias(col) for col, series in self.data.items()] 

83 ) 

84 

85 return wrapper 

86 

87 @columnwise_stat 

88 def skew(self, series: pl.Series) -> int | float | None: 

89 """Calculate skewness (asymmetry) for each numeric column. 

90 

91 Args: 

92 series (pl.Series): The series to calculate skewness for. 

93 

94 Returns: 

95 float: The skewness value. 

96 

97 """ 

98 return series.skew(bias=False) 

99 

100 @columnwise_stat 

101 def kurtosis(self, series: pl.Series) -> int | float | None: 

102 """Calculate the kurtosis of returns. 

103 

104 The degree to which a distribution peak compared to a normal distribution. 

105 

106 Args: 

107 series (pl.Series): The series to calculate kurtosis for. 

108 

109 Returns: 

110 float: The kurtosis value. 

111 

112 """ 

113 return series.kurtosis(bias=False) 

114 

115 @columnwise_stat 

116 def avg_return(self, series: pl.Series) -> float: 

117 """Calculate average return per non-zero, non-null value. 

118 

119 Args: 

120 series (pl.Series): The series to calculate average return for. 

121 

122 Returns: 

123 float: The average return value. 

124 

125 """ 

126 return series.filter(series.is_not_null() & (series != 0)).mean() 

127 

128 @columnwise_stat 

129 def avg_win(self, series: pl.Series) -> float: 

130 """Calculate the average winning return/trade for an asset. 

131 

132 Args: 

133 series (pl.Series): The series to calculate average win for. 

134 

135 Returns: 

136 float: The average winning return. 

137 

138 """ 

139 return self._mean_positive_expr(series) 

140 

141 @columnwise_stat 

142 def avg_loss(self, series: pl.Series) -> float: 

143 """Calculate the average loss return/trade for a period. 

144 

145 Args: 

146 series (pl.Series): The series to calculate average loss for. 

147 

148 Returns: 

149 float: The average loss return. 

150 

151 """ 

152 return self._mean_negative_expr(series) 

153 

154 @columnwise_stat 

155 def volatility(self, series: pl.Series, periods: int | float | None = None, annualize: bool = True) -> float: 

156 """Calculate the volatility of returns. 

157 

158 - Std dev of returns 

159 - Annualized by sqrt(periods) if `annualize` is True. 

160 

161 Args: 

162 series (pl.Series): The series to calculate volatility for. 

163 periods (int, optional): Number of periods per year. Defaults to 252. 

164 annualize (bool, optional): Whether to annualize the result. Defaults to True. 

165 

166 Returns: 

167 float: The volatility value. 

168 

169 """ 

170 raw_periods = periods or self.data._periods_per_year 

171 

172 # Ensure it's numeric 

173 if not isinstance(raw_periods, int | float): 

174 raise TypeError(f"Expected int or float for periods, got {type(raw_periods).__name__}") 

175 

176 factor = np.sqrt(raw_periods) if annualize else 1.0 

177 return float(series.std()) * factor 

178 

179 # periods = periods or self.data._periods_per_year 

180 # factor = np.sqrt(periods) if annualize else 1 

181 # return series.std() * factor 

182 

183 @columnwise_stat 

184 def payoff_ratio(self, series: pl.Series) -> float: 

185 """Measure the payoff ratio. 

186 

187 The payoff ratio is calculated as average win / abs(average loss). 

188 

189 Args: 

190 series (pl.Series): The series to calculate payoff ratio for. 

191 

192 Returns: 

193 float: The payoff ratio value. 

194 

195 """ 

196 avg_win = series.filter(series > 0).mean() 

197 # avg_win = self.avg_win(series) 

198 avg_loss = np.abs(series.filter(series < 0).mean()) 

199 return avg_win / avg_loss 

200 

201 def win_loss_ratio(self) -> dict[str, float]: 

202 """Shorthand for payoff_ratio(). 

203 

204 Returns: 

205 dict[str, float]: Dictionary mapping asset names to win/loss ratios. 

206 

207 """ 

208 return self.payoff_ratio() 

209 

210 @columnwise_stat 

211 def profit_ratio(self, series: pl.Series) -> float: 

212 """Measure the profit ratio. 

213 

214 The profit ratio is calculated as win ratio / loss ratio. 

215 

216 Args: 

217 series (pl.Series): The series to calculate profit ratio for. 

218 

219 Returns: 

220 float: The profit ratio value. 

221 

222 """ 

223 wins = series.filter(series >= 0) 

224 losses = series.filter(series < 0) 

225 

226 try: 

227 win_ratio = np.abs(wins.mean() / wins.count()) 

228 loss_ratio = np.abs(losses.mean() / losses.count()) 

229 

230 return win_ratio / loss_ratio 

231 

232 except TypeError: 

233 return np.nan 

234 

235 @columnwise_stat 

236 def profit_factor(self, series: pl.Series) -> float: 

237 """Measure the profit factor. 

238 

239 The profit factor is calculated as wins / loss. 

240 

241 Args: 

242 series (pl.Series): The series to calculate profit factor for. 

243 

244 Returns: 

245 float: The profit factor value. 

246 

247 """ 

248 wins = series.filter(series > 0) 

249 losses = series.filter(series < 0) 

250 

251 return np.abs(wins.sum() / losses.sum()) 

252 

253 @columnwise_stat 

254 def value_at_risk(self, series: pl.Series, sigma: float = 1.0, alpha: float = 0.05) -> float: 

255 """Calculate the daily value-at-risk. 

256 

257 Uses variance-covariance calculation with confidence level. 

258 

259 Args: 

260 series (pl.Series): The series to calculate value at risk for. 

261 alpha (float, optional): Confidence level. Defaults to 0.05. 

262 sigma (float, optional): Standard deviation multiplier. Defaults to 1.0. 

263 

264 Returns: 

265 float: The value at risk. 

266 

267 """ 

268 mu = float(series.mean()) 

269 sigma *= float(series.std()) 

270 

271 return norm.ppf(alpha, mu, sigma) 

272 

273 @columnwise_stat 

274 def conditional_value_at_risk(self, series: pl.Series, sigma: float = 1.0, alpha: float = 0.05) -> float: 

275 """Calculate the conditional value-at-risk. 

276 

277 Also known as CVaR or expected shortfall, calculated for each numeric column. 

278 

279 Args: 

280 series (pl.Series): The series to calculate conditional value at risk for. 

281 alpha (float, optional): Confidence level. Defaults to 0.05. 

282 sigma (float, optional): Standard deviation multiplier. Defaults to 1.0. 

283 

284 Returns: 

285 float: The conditional value at risk. 

286 

287 """ 

288 mu = float(series.mean()) 

289 sigma *= float(series.std()) 

290 

291 var = norm.ppf(alpha, mu, sigma) 

292 

293 # Compute mean of returns less than or equal to VaR 

294 # Cast to Any or pl.Series to suppress Ty error 

295 # Cast the mask to pl.Expr to satisfy type checker 

296 mask = cast(Iterable[bool], series < var) 

297 return series.filter(mask).mean() 

298 

299 # filtered_series = cast(pl.Series, series.filter(series < var)) 

300 # return filtered_series.mean() 

301 

302 @columnwise_stat 

303 def win_rate(self, series: pl.Series) -> float: 

304 """Calculate the win ratio for a period. 

305 

306 Args: 

307 series (pl.Series): The series to calculate win rate for. 

308 

309 Returns: 

310 float: The win rate value. 

311 

312 """ 

313 num_pos = series.filter(series > 0).count() 

314 num_nonzero = series.filter(series != 0).count() 

315 return num_pos / num_nonzero 

316 

317 @columnwise_stat 

318 def gain_to_pain_ratio(self, series: pl.Series) -> float: 

319 """Calculate Jack Schwager's Gain-to-Pain Ratio. 

320 

321 The ratio is calculated as total return / sum of losses (in absolute value). 

322 

323 Args: 

324 series (pl.Series): The series to calculate gain to pain ratio for. 

325 

326 Returns: 

327 float: The gain to pain ratio value. 

328 

329 """ 

330 total_gain = series.sum() 

331 total_pain = series.filter(series < 0).abs().sum() 

332 try: 

333 return total_gain / total_pain 

334 except ZeroDivisionError: 

335 return np.nan 

336 

337 @columnwise_stat 

338 def risk_return_ratio(self, series: pl.Series) -> float: 

339 """Calculate the return/risk ratio. 

340 

341 This is equivalent to the Sharpe ratio without a risk-free rate. 

342 

343 Args: 

344 series (pl.Series): The series to calculate risk return ratio for. 

345 

346 Returns: 

347 float: The risk return ratio value. 

348 

349 """ 

350 return float(series.mean()) / float(series.std()) 

351 

352 def kelly_criterion(self) -> dict[str, float]: 

353 """Calculate the optimal capital allocation per column. 

354 

355 Uses the Kelly Criterion formula: f* = [(b * p) - q] / b 

356 where: 

357 - b = payoff ratio 

358 - p = win rate 

359 - q = 1 - p. 

360 

361 Returns: 

362 dict[str, float]: Dictionary mapping asset names to Kelly criterion values. 

363 

364 """ 

365 b = self.payoff_ratio() 

366 p = self.win_rate() 

367 

368 return { 

369 col: ((b[col] * p[col]) - (1 - p[col])) / b[col] 

370 # if b[col] not in (None, 0) and p[col] is not None else None 

371 for col in b 

372 } 

373 

374 @columnwise_stat 

375 def best(self, series: pl.Series) -> float | None: 

376 """Find the maximum return per column (best period). 

377 

378 Args: 

379 series (pl.Series): The series to find the best return for. 

380 

381 Returns: 

382 float: The maximum return value. 

383 

384 """ 

385 return series.max() # .alias(series.meta.output_name) 

386 

387 @columnwise_stat 

388 def worst(self, series: pl.Series) -> float | None: 

389 """Find the minimum return per column (worst period). 

390 

391 Args: 

392 series (pl.Series): The series to find the worst return for. 

393 

394 Returns: 

395 float: The minimum return value. 

396 

397 """ 

398 return series.min() # .alias(series.meta.output_name) 

399 

400 @columnwise_stat 

401 def exposure(self, series: pl.Series) -> float: 

402 """Calculate the market exposure time (returns != 0). 

403 

404 Args: 

405 series (pl.Series): The series to calculate exposure for. 

406 

407 Returns: 

408 float: The exposure value. 

409 

410 """ 

411 return np.round((series.filter(series != 0).count() / self.all.height), decimals=2) 

412 

413 @columnwise_stat 

414 def sharpe(self, series: pl.Series, periods: int | float | None = None) -> float: 

415 """Calculate the Sharpe ratio of asset returns. 

416 

417 Args: 

418 series (pl.Series): The series to calculate Sharpe ratio for. 

419 periods (int, optional): Number of periods per year. Defaults to 252. 

420 

421 Returns: 

422 float: The Sharpe ratio value. 

423 

424 """ 

425 periods = periods or self.data._periods_per_year 

426 

427 divisor = float(series.std(ddof=1)) 

428 

429 res = float(series.mean()) / divisor 

430 factor = periods or 1 

431 return res * np.sqrt(factor) 

432 

433 @columnwise_stat 

434 def sortino(self, series: pl.Series, periods: int | float | None = None) -> float: 

435 """Calculate the Sortino ratio. 

436 

437 The Sortino ratio is the mean return divided by downside deviation. 

438 Based on Red Rock Capital's Sortino ratio paper. 

439 

440 Args: 

441 series (pl.Series): The series to calculate Sortino ratio for. 

442 periods (int, optional): Number of periods per year. Defaults to 252. 

443 

444 Returns: 

445 float: The Sortino ratio value. 

446 

447 """ 

448 periods = periods or self.data._periods_per_year 

449 downside_deviation = np.sqrt(((series.filter(series < 0)) ** 2).sum() / series.count()) 

450 ratio = series.mean() / downside_deviation 

451 return ratio * np.sqrt(periods) 

452 

453 @to_frame 

454 def rolling_sortino( 

455 self, series: pl.Expr, rolling_period: int = 126, periods_per_year: int | float | None = None 

456 ) -> pl.Expr: 

457 """Calculate the rolling Sortino ratio. 

458 

459 Args: 

460 series (pl.Expr): The expression to calculate rolling Sortino ratio for. 

461 rolling_period (int, optional): The rolling window size. Defaults to 126. 

462 periods_per_year (int, optional): Number of periods per year. Defaults to 252. 

463 

464 Returns: 

465 pl.Expr: The rolling Sortino ratio expression. 

466 

467 """ 

468 periods_per_year = periods_per_year or self.data._periods_per_year 

469 

470 mean_ret = series.rolling_mean(window_size=rolling_period) 

471 

472 # Rolling downside deviation (squared negative returns averaged over window) 

473 downside = series.map_elements(lambda x: x**2 if x < 0 else 0.0).rolling_mean(window_size=rolling_period) 

474 

475 # Avoid division by zero 

476 sortino = mean_ret / downside.sqrt().fill_nan(0).fill_null(0) 

477 return sortino * (periods_per_year**0.5) 

478 

479 @to_frame 

480 def rolling_sharpe( 

481 self, series: pl.Expr, rolling_period: int = 126, periods_per_year: int | float | None = None 

482 ) -> pl.Expr: 

483 """Calculate the rolling Sharpe ratio. 

484 

485 Args: 

486 series (pl.Expr): The expression to calculate rolling Sharpe ratio for. 

487 rolling_period (int, optional): The rolling window size. Defaults to 126. 

488 periods_per_year (int, optional): Number of periods per year. Defaults to 252. 

489 

490 Returns: 

491 pl.Expr: The rolling Sharpe ratio expression. 

492 

493 """ 

494 periods_per_year = periods_per_year or self.data._periods_per_year 

495 res = series.rolling_mean(window_size=rolling_period) / series.rolling_std(window_size=rolling_period) 

496 return res * np.sqrt(periods_per_year) 

497 

498 @to_frame 

499 def rolling_volatility( 

500 self, series: pl.Expr, rolling_period=126, periods_per_year: int | float | None = None 

501 ) -> pl.Expr: 

502 """Calculate the rolling volatility of returns. 

503 

504 Args: 

505 series (pl.Expr): The expression to calculate rolling volatility for. 

506 rolling_period (int, optional): The rolling window size. Defaults to 126. 

507 periods_per_year (float, optional): Number of periods per year. Defaults to None. 

508 

509 Returns: 

510 pl.Expr: The rolling volatility expression. 

511 

512 """ 

513 return series.rolling_std(window_size=rolling_period) * np.sqrt(periods_per_year) 

514 

515 @to_frame 

516 def drawdown(self, series: pl.Series) -> pl.Series: 

517 """Calculate the drawdown series for returns. 

518 

519 Args: 

520 series (pl.Series): The series to calculate drawdown for. 

521 

522 Returns: 

523 pl.Series: The drawdown series. 

524 

525 """ 

526 equity = self.prices(series) 

527 d = (equity / equity.cum_max()) - 1 

528 return -d 

529 

530 @staticmethod 

531 def prices(series: pl.Series) -> pl.Series: 

532 """Convert returns series to price series. 

533 

534 Args: 

535 series (pl.Series): The returns series to convert. 

536 

537 Returns: 

538 pl.Series: The price series. 

539 

540 """ 

541 return (1.0 + series).cum_prod() 

542 

543 @staticmethod 

544 def max_drawdown_single_series(series: pl.Series) -> float: 

545 price = Stats.prices(series) 

546 peak = price.cum_max() 

547 drawdown = price / peak - 1 

548 return -drawdown.min() 

549 

550 @columnwise_stat 

551 def max_drawdown(self, series: pl.Series) -> float: 

552 """Calculate the maximum drawdown for each column. 

553 

554 Args: 

555 series (pl.Series): The series to calculate maximum drawdown for. 

556 

557 Returns: 

558 float: The maximum drawdown value. 

559 

560 """ 

561 return Stats.max_drawdown_single_series(series) 

562 

563 def adjusted_sortino(self, periods: int | float | None = None) -> dict[str, float]: 

564 """Calculate Jack Schwager's adjusted Sortino ratio. 

565 

566 This adjustment allows for direct comparison to Sharpe ratio. 

567 See: https://archive.is/wip/2rwFW. 

568 

569 Args: 

570 periods (int, optional): Number of periods per year. Defaults to 252. 

571 

572 Returns: 

573 dict[str, float]: Dictionary mapping asset names to adjusted Sortino ratios. 

574 

575 """ 

576 sortino_data = self.sortino(periods=periods) 

577 return {k: v / np.sqrt(2) for k, v in sortino_data.items()} 

578 

579 @columnwise_stat 

580 def r_squared(self, series: pl.Series, benchmark: str | None = None) -> float: 

581 """Measure the straight line fit of the equity curve. 

582 

583 Args: 

584 series (pl.Series): The series to calculate R-squared for. 

585 benchmark (str, optional): The benchmark column name. Defaults to None. 

586 

587 Returns: 

588 float: The R-squared value. 

589 

590 Raises: 

591 AttributeError: If no benchmark data is available. 

592 

593 """ 

594 if self.data.benchmark is None: 

595 raise AttributeError("No benchmark data available") 

596 

597 benchmark_col = benchmark or self.data.benchmark.columns[0] 

598 

599 # if self.data.benchmark is None: 

600 # raise AttributeError("No benchmark data available") 

601 # Evaluate both series and benchmark as Series 

602 dframe = self.all.select([series, pl.col(benchmark_col).alias("benchmark")]) 

603 

604 # Drop nulls 

605 dframe = dframe.drop_nulls() 

606 

607 matrix = dframe.to_numpy() 

608 # Get actual Series 

609 

610 strategy_np = matrix[:, 0] 

611 benchmark_np = matrix[:, 1] 

612 

613 corr_matrix = np.corrcoef(strategy_np, benchmark_np) 

614 r = corr_matrix[0, 1] 

615 return r**2 

616 

617 def r2(self) -> dict[str, float]: 

618 """Shorthand for r_squared(). 

619 

620 Returns: 

621 dict[str, float]: Dictionary mapping asset names to R-squared values. 

622 

623 """ 

624 return self.r_squared() 

625 

626 @columnwise_stat 

627 def information_ratio( 

628 self, series: pl.Series, periods_per_year: int | float | None = None, benchmark: str | None = None 

629 ) -> float: 

630 """Calculate the information ratio. 

631 

632 This is essentially the risk return ratio of the net profits. 

633 

634 Args: 

635 series (pl.Series): The series to calculate information ratio for. 

636 periods_per_year (int, optional): Number of periods per year. Defaults to 252. 

637 benchmark (str, optional): The benchmark column name. Defaults to None. 

638 

639 Returns: 

640 float: The information ratio value. 

641 

642 """ 

643 periods_per_year = periods_per_year or self.data.periods_per_year 

644 

645 benchmark_col = benchmark or self.data.benchmark.columns[0] 

646 

647 active = series - self.data.benchmark[benchmark_col] 

648 

649 mean = active.mean() 

650 std = active.std() 

651 

652 try: 

653 return (mean / std) * (periods_per_year**0.5) 

654 except ZeroDivisionError: 

655 return 0.0 

656 

657 @columnwise_stat 

658 def greeks( 

659 self, series: pl.Series, periods_per_year: int | float | None = None, benchmark: str | None = None 

660 ) -> dict[str, float]: 

661 """Calculate alpha and beta of the portfolio. 

662 

663 Args: 

664 series (pl.Series): The series to calculate greeks for. 

665 periods_per_year (int, optional): Number of periods per year. Defaults to 252. 

666 benchmark (str, optional): The benchmark column name. Defaults to None. 

667 

668 Returns: 

669 dict[str, float]: Dictionary containing alpha and beta values. 

670 

671 """ 

672 periods_per_year = periods_per_year or self.data._periods_per_year 

673 

674 # period_col = benchmark or self.data.benchmark.columns[0] 

675 

676 # find covariance 

677 benchmark_col = benchmark or self.data.benchmark.columns[0] 

678 

679 # Evaluate both series and benchmark as Series 

680 dframe = self.all.select([series, pl.col(benchmark_col).alias("benchmark")]) 

681 

682 # Drop nulls 

683 dframe = dframe.drop_nulls() 

684 matrix = dframe.to_numpy() 

685 

686 # Get actual Series 

687 strategy_np = matrix[:, 0] 

688 benchmark_np = matrix[:, 1] 

689 

690 # 2x2 covariance matrix: [[var_strategy, cov], [cov, var_benchmark]] 

691 cov_matrix = np.cov(strategy_np, benchmark_np) 

692 

693 cov = cov_matrix[0, 1] 

694 var_benchmark = cov_matrix[1, 1] 

695 

696 beta = cov / var_benchmark if var_benchmark != 0 else float("nan") 

697 alpha = np.mean(strategy_np) - beta * np.mean(benchmark_np) 

698 

699 return {"alpha": alpha * periods_per_year, "beta": beta}