Coverage for src/jquantstats/_stats.py: 99%
205 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 07:23 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-05 07:23 +0000
1import dataclasses
2from collections.abc import Callable, Iterable
3from functools import wraps
4from typing import cast
6import numpy as np
7import polars as pl
8from scipy.stats import norm
11@dataclasses.dataclass(frozen=True)
12class Stats:
13 """Statistical analysis tools for financial returns data.
15 This class provides a comprehensive set of methods for calculating various
16 financial metrics and statistics on returns data, including:
18 - Basic statistics (mean, skew, kurtosis)
19 - Risk metrics (volatility, value-at-risk, drawdown)
20 - Performance ratios (Sharpe, Sortino, information ratio)
21 - Win/loss metrics (win rate, profit factor, payoff ratio)
22 - Rolling calculations (rolling volatility, rolling Sharpe)
23 - Factor analysis (alpha, beta, R-squared)
25 The class is designed to work with the _Data class and operates on Polars DataFrames
26 for efficient computation.
28 Attributes:
29 data: The _Data object containing returns and benchmark data.
30 all: A DataFrame combining all data (index, returns, benchmark) for easy access.
32 """
34 data: "Data" # type: ignore
35 all: pl.DataFrame | None = None # Default is None; will be set in __post_init__
37 def __post_init__(self):
38 object.__setattr__(self, "all", self.data.all)
40 @staticmethod
41 def _mean_positive_expr(series: pl.Series) -> float:
42 return series.filter(series > 0).mean()
44 @staticmethod
45 def _mean_negative_expr(series: pl.Series) -> float:
46 return series.filter(series < 0).mean()
48 @staticmethod
49 def columnwise_stat(func: Callable) -> Callable:
50 """Apply a column-wise statistical function to all numeric columns.
52 Args:
53 func (Callable): The function to decorate.
55 Returns:
56 Callable: The decorated function.
58 """
60 @wraps(func)
61 def wrapper(self, *args, **kwargs) -> dict[str, float]:
62 return {col: func(self, series, *args, **kwargs) for col, series in self.data.items()}
64 return wrapper
66 @staticmethod
67 def to_frame(func: Callable) -> Callable:
68 """Apply per-column expressions and evaluates with .with_columns(...).
70 Args:
71 func (Callable): The function to decorate.
73 Returns:
74 Callable: The decorated function.
76 """
78 @wraps(func)
79 def wrapper(self, *args, **kwargs) -> pl.DataFrame:
80 return self.all.select(
81 [pl.col(name) for name in self.data.date_col]
82 + [func(self, series, *args, **kwargs).alias(col) for col, series in self.data.items()]
83 )
85 return wrapper
87 @columnwise_stat
88 def skew(self, series: pl.Series) -> int | float | None:
89 """Calculate skewness (asymmetry) for each numeric column.
91 Args:
92 series (pl.Series): The series to calculate skewness for.
94 Returns:
95 float: The skewness value.
97 """
98 return series.skew(bias=False)
100 @columnwise_stat
101 def kurtosis(self, series: pl.Series) -> int | float | None:
102 """Calculate the kurtosis of returns.
104 The degree to which a distribution peak compared to a normal distribution.
106 Args:
107 series (pl.Series): The series to calculate kurtosis for.
109 Returns:
110 float: The kurtosis value.
112 """
113 return series.kurtosis(bias=False)
115 @columnwise_stat
116 def avg_return(self, series: pl.Series) -> float:
117 """Calculate average return per non-zero, non-null value.
119 Args:
120 series (pl.Series): The series to calculate average return for.
122 Returns:
123 float: The average return value.
125 """
126 return series.filter(series.is_not_null() & (series != 0)).mean()
128 @columnwise_stat
129 def avg_win(self, series: pl.Series) -> float:
130 """Calculate the average winning return/trade for an asset.
132 Args:
133 series (pl.Series): The series to calculate average win for.
135 Returns:
136 float: The average winning return.
138 """
139 return self._mean_positive_expr(series)
141 @columnwise_stat
142 def avg_loss(self, series: pl.Series) -> float:
143 """Calculate the average loss return/trade for a period.
145 Args:
146 series (pl.Series): The series to calculate average loss for.
148 Returns:
149 float: The average loss return.
151 """
152 return self._mean_negative_expr(series)
154 @columnwise_stat
155 def volatility(self, series: pl.Series, periods: int | float | None = None, annualize: bool = True) -> float:
156 """Calculate the volatility of returns.
158 - Std dev of returns
159 - Annualized by sqrt(periods) if `annualize` is True.
161 Args:
162 series (pl.Series): The series to calculate volatility for.
163 periods (int, optional): Number of periods per year. Defaults to 252.
164 annualize (bool, optional): Whether to annualize the result. Defaults to True.
166 Returns:
167 float: The volatility value.
169 """
170 raw_periods = periods or self.data._periods_per_year
172 # Ensure it's numeric
173 if not isinstance(raw_periods, int | float):
174 raise TypeError(f"Expected int or float for periods, got {type(raw_periods).__name__}")
176 factor = np.sqrt(raw_periods) if annualize else 1.0
177 return float(series.std()) * factor
179 # periods = periods or self.data._periods_per_year
180 # factor = np.sqrt(periods) if annualize else 1
181 # return series.std() * factor
183 @columnwise_stat
184 def payoff_ratio(self, series: pl.Series) -> float:
185 """Measure the payoff ratio.
187 The payoff ratio is calculated as average win / abs(average loss).
189 Args:
190 series (pl.Series): The series to calculate payoff ratio for.
192 Returns:
193 float: The payoff ratio value.
195 """
196 avg_win = series.filter(series > 0).mean()
197 # avg_win = self.avg_win(series)
198 avg_loss = np.abs(series.filter(series < 0).mean())
199 return avg_win / avg_loss
201 def win_loss_ratio(self) -> dict[str, float]:
202 """Shorthand for payoff_ratio().
204 Returns:
205 dict[str, float]: Dictionary mapping asset names to win/loss ratios.
207 """
208 return self.payoff_ratio()
210 @columnwise_stat
211 def profit_ratio(self, series: pl.Series) -> float:
212 """Measure the profit ratio.
214 The profit ratio is calculated as win ratio / loss ratio.
216 Args:
217 series (pl.Series): The series to calculate profit ratio for.
219 Returns:
220 float: The profit ratio value.
222 """
223 wins = series.filter(series >= 0)
224 losses = series.filter(series < 0)
226 try:
227 win_ratio = np.abs(wins.mean() / wins.count())
228 loss_ratio = np.abs(losses.mean() / losses.count())
230 return win_ratio / loss_ratio
232 except TypeError:
233 return np.nan
235 @columnwise_stat
236 def profit_factor(self, series: pl.Series) -> float:
237 """Measure the profit factor.
239 The profit factor is calculated as wins / loss.
241 Args:
242 series (pl.Series): The series to calculate profit factor for.
244 Returns:
245 float: The profit factor value.
247 """
248 wins = series.filter(series > 0)
249 losses = series.filter(series < 0)
251 return np.abs(wins.sum() / losses.sum())
253 @columnwise_stat
254 def value_at_risk(self, series: pl.Series, sigma: float = 1.0, alpha: float = 0.05) -> float:
255 """Calculate the daily value-at-risk.
257 Uses variance-covariance calculation with confidence level.
259 Args:
260 series (pl.Series): The series to calculate value at risk for.
261 alpha (float, optional): Confidence level. Defaults to 0.05.
262 sigma (float, optional): Standard deviation multiplier. Defaults to 1.0.
264 Returns:
265 float: The value at risk.
267 """
268 mu = float(series.mean())
269 sigma *= float(series.std())
271 return norm.ppf(alpha, mu, sigma)
273 @columnwise_stat
274 def conditional_value_at_risk(self, series: pl.Series, sigma: float = 1.0, alpha: float = 0.05) -> float:
275 """Calculate the conditional value-at-risk.
277 Also known as CVaR or expected shortfall, calculated for each numeric column.
279 Args:
280 series (pl.Series): The series to calculate conditional value at risk for.
281 alpha (float, optional): Confidence level. Defaults to 0.05.
282 sigma (float, optional): Standard deviation multiplier. Defaults to 1.0.
284 Returns:
285 float: The conditional value at risk.
287 """
288 mu = float(series.mean())
289 sigma *= float(series.std())
291 var = norm.ppf(alpha, mu, sigma)
293 # Compute mean of returns less than or equal to VaR
294 # Cast to Any or pl.Series to suppress Ty error
295 # Cast the mask to pl.Expr to satisfy type checker
296 mask = cast(Iterable[bool], series < var)
297 return series.filter(mask).mean()
299 # filtered_series = cast(pl.Series, series.filter(series < var))
300 # return filtered_series.mean()
302 @columnwise_stat
303 def win_rate(self, series: pl.Series) -> float:
304 """Calculate the win ratio for a period.
306 Args:
307 series (pl.Series): The series to calculate win rate for.
309 Returns:
310 float: The win rate value.
312 """
313 num_pos = series.filter(series > 0).count()
314 num_nonzero = series.filter(series != 0).count()
315 return num_pos / num_nonzero
317 @columnwise_stat
318 def gain_to_pain_ratio(self, series: pl.Series) -> float:
319 """Calculate Jack Schwager's Gain-to-Pain Ratio.
321 The ratio is calculated as total return / sum of losses (in absolute value).
323 Args:
324 series (pl.Series): The series to calculate gain to pain ratio for.
326 Returns:
327 float: The gain to pain ratio value.
329 """
330 total_gain = series.sum()
331 total_pain = series.filter(series < 0).abs().sum()
332 try:
333 return total_gain / total_pain
334 except ZeroDivisionError:
335 return np.nan
337 @columnwise_stat
338 def risk_return_ratio(self, series: pl.Series) -> float:
339 """Calculate the return/risk ratio.
341 This is equivalent to the Sharpe ratio without a risk-free rate.
343 Args:
344 series (pl.Series): The series to calculate risk return ratio for.
346 Returns:
347 float: The risk return ratio value.
349 """
350 return float(series.mean()) / float(series.std())
352 def kelly_criterion(self) -> dict[str, float]:
353 """Calculate the optimal capital allocation per column.
355 Uses the Kelly Criterion formula: f* = [(b * p) - q] / b
356 where:
357 - b = payoff ratio
358 - p = win rate
359 - q = 1 - p.
361 Returns:
362 dict[str, float]: Dictionary mapping asset names to Kelly criterion values.
364 """
365 b = self.payoff_ratio()
366 p = self.win_rate()
368 return {
369 col: ((b[col] * p[col]) - (1 - p[col])) / b[col]
370 # if b[col] not in (None, 0) and p[col] is not None else None
371 for col in b
372 }
374 @columnwise_stat
375 def best(self, series: pl.Series) -> float | None:
376 """Find the maximum return per column (best period).
378 Args:
379 series (pl.Series): The series to find the best return for.
381 Returns:
382 float: The maximum return value.
384 """
385 return series.max() # .alias(series.meta.output_name)
387 @columnwise_stat
388 def worst(self, series: pl.Series) -> float | None:
389 """Find the minimum return per column (worst period).
391 Args:
392 series (pl.Series): The series to find the worst return for.
394 Returns:
395 float: The minimum return value.
397 """
398 return series.min() # .alias(series.meta.output_name)
400 @columnwise_stat
401 def exposure(self, series: pl.Series) -> float:
402 """Calculate the market exposure time (returns != 0).
404 Args:
405 series (pl.Series): The series to calculate exposure for.
407 Returns:
408 float: The exposure value.
410 """
411 return np.round((series.filter(series != 0).count() / self.all.height), decimals=2)
413 @columnwise_stat
414 def sharpe(self, series: pl.Series, periods: int | float | None = None) -> float:
415 """Calculate the Sharpe ratio of asset returns.
417 Args:
418 series (pl.Series): The series to calculate Sharpe ratio for.
419 periods (int, optional): Number of periods per year. Defaults to 252.
421 Returns:
422 float: The Sharpe ratio value.
424 """
425 periods = periods or self.data._periods_per_year
427 divisor = float(series.std(ddof=1))
429 res = float(series.mean()) / divisor
430 factor = periods or 1
431 return res * np.sqrt(factor)
433 @columnwise_stat
434 def sortino(self, series: pl.Series, periods: int | float | None = None) -> float:
435 """Calculate the Sortino ratio.
437 The Sortino ratio is the mean return divided by downside deviation.
438 Based on Red Rock Capital's Sortino ratio paper.
440 Args:
441 series (pl.Series): The series to calculate Sortino ratio for.
442 periods (int, optional): Number of periods per year. Defaults to 252.
444 Returns:
445 float: The Sortino ratio value.
447 """
448 periods = periods or self.data._periods_per_year
449 downside_deviation = np.sqrt(((series.filter(series < 0)) ** 2).sum() / series.count())
450 ratio = series.mean() / downside_deviation
451 return ratio * np.sqrt(periods)
453 @to_frame
454 def rolling_sortino(
455 self, series: pl.Expr, rolling_period: int = 126, periods_per_year: int | float | None = None
456 ) -> pl.Expr:
457 """Calculate the rolling Sortino ratio.
459 Args:
460 series (pl.Expr): The expression to calculate rolling Sortino ratio for.
461 rolling_period (int, optional): The rolling window size. Defaults to 126.
462 periods_per_year (int, optional): Number of periods per year. Defaults to 252.
464 Returns:
465 pl.Expr: The rolling Sortino ratio expression.
467 """
468 periods_per_year = periods_per_year or self.data._periods_per_year
470 mean_ret = series.rolling_mean(window_size=rolling_period)
472 # Rolling downside deviation (squared negative returns averaged over window)
473 downside = series.map_elements(lambda x: x**2 if x < 0 else 0.0).rolling_mean(window_size=rolling_period)
475 # Avoid division by zero
476 sortino = mean_ret / downside.sqrt().fill_nan(0).fill_null(0)
477 return sortino * (periods_per_year**0.5)
479 @to_frame
480 def rolling_sharpe(
481 self, series: pl.Expr, rolling_period: int = 126, periods_per_year: int | float | None = None
482 ) -> pl.Expr:
483 """Calculate the rolling Sharpe ratio.
485 Args:
486 series (pl.Expr): The expression to calculate rolling Sharpe ratio for.
487 rolling_period (int, optional): The rolling window size. Defaults to 126.
488 periods_per_year (int, optional): Number of periods per year. Defaults to 252.
490 Returns:
491 pl.Expr: The rolling Sharpe ratio expression.
493 """
494 periods_per_year = periods_per_year or self.data._periods_per_year
495 res = series.rolling_mean(window_size=rolling_period) / series.rolling_std(window_size=rolling_period)
496 return res * np.sqrt(periods_per_year)
498 @to_frame
499 def rolling_volatility(
500 self, series: pl.Expr, rolling_period=126, periods_per_year: int | float | None = None
501 ) -> pl.Expr:
502 """Calculate the rolling volatility of returns.
504 Args:
505 series (pl.Expr): The expression to calculate rolling volatility for.
506 rolling_period (int, optional): The rolling window size. Defaults to 126.
507 periods_per_year (float, optional): Number of periods per year. Defaults to None.
509 Returns:
510 pl.Expr: The rolling volatility expression.
512 """
513 return series.rolling_std(window_size=rolling_period) * np.sqrt(periods_per_year)
515 @to_frame
516 def drawdown(self, series: pl.Series) -> pl.Series:
517 """Calculate the drawdown series for returns.
519 Args:
520 series (pl.Series): The series to calculate drawdown for.
522 Returns:
523 pl.Series: The drawdown series.
525 """
526 equity = self.prices(series)
527 d = (equity / equity.cum_max()) - 1
528 return -d
530 @staticmethod
531 def prices(series: pl.Series) -> pl.Series:
532 """Convert returns series to price series.
534 Args:
535 series (pl.Series): The returns series to convert.
537 Returns:
538 pl.Series: The price series.
540 """
541 return (1.0 + series).cum_prod()
543 @staticmethod
544 def max_drawdown_single_series(series: pl.Series) -> float:
545 price = Stats.prices(series)
546 peak = price.cum_max()
547 drawdown = price / peak - 1
548 return -drawdown.min()
550 @columnwise_stat
551 def max_drawdown(self, series: pl.Series) -> float:
552 """Calculate the maximum drawdown for each column.
554 Args:
555 series (pl.Series): The series to calculate maximum drawdown for.
557 Returns:
558 float: The maximum drawdown value.
560 """
561 return Stats.max_drawdown_single_series(series)
563 def adjusted_sortino(self, periods: int | float | None = None) -> dict[str, float]:
564 """Calculate Jack Schwager's adjusted Sortino ratio.
566 This adjustment allows for direct comparison to Sharpe ratio.
567 See: https://archive.is/wip/2rwFW.
569 Args:
570 periods (int, optional): Number of periods per year. Defaults to 252.
572 Returns:
573 dict[str, float]: Dictionary mapping asset names to adjusted Sortino ratios.
575 """
576 sortino_data = self.sortino(periods=periods)
577 return {k: v / np.sqrt(2) for k, v in sortino_data.items()}
579 @columnwise_stat
580 def r_squared(self, series: pl.Series, benchmark: str | None = None) -> float:
581 """Measure the straight line fit of the equity curve.
583 Args:
584 series (pl.Series): The series to calculate R-squared for.
585 benchmark (str, optional): The benchmark column name. Defaults to None.
587 Returns:
588 float: The R-squared value.
590 Raises:
591 AttributeError: If no benchmark data is available.
593 """
594 if self.data.benchmark is None:
595 raise AttributeError("No benchmark data available")
597 benchmark_col = benchmark or self.data.benchmark.columns[0]
599 # if self.data.benchmark is None:
600 # raise AttributeError("No benchmark data available")
601 # Evaluate both series and benchmark as Series
602 dframe = self.all.select([series, pl.col(benchmark_col).alias("benchmark")])
604 # Drop nulls
605 dframe = dframe.drop_nulls()
607 matrix = dframe.to_numpy()
608 # Get actual Series
610 strategy_np = matrix[:, 0]
611 benchmark_np = matrix[:, 1]
613 corr_matrix = np.corrcoef(strategy_np, benchmark_np)
614 r = corr_matrix[0, 1]
615 return r**2
617 def r2(self) -> dict[str, float]:
618 """Shorthand for r_squared().
620 Returns:
621 dict[str, float]: Dictionary mapping asset names to R-squared values.
623 """
624 return self.r_squared()
626 @columnwise_stat
627 def information_ratio(
628 self, series: pl.Series, periods_per_year: int | float | None = None, benchmark: str | None = None
629 ) -> float:
630 """Calculate the information ratio.
632 This is essentially the risk return ratio of the net profits.
634 Args:
635 series (pl.Series): The series to calculate information ratio for.
636 periods_per_year (int, optional): Number of periods per year. Defaults to 252.
637 benchmark (str, optional): The benchmark column name. Defaults to None.
639 Returns:
640 float: The information ratio value.
642 """
643 periods_per_year = periods_per_year or self.data.periods_per_year
645 benchmark_col = benchmark or self.data.benchmark.columns[0]
647 active = series - self.data.benchmark[benchmark_col]
649 mean = active.mean()
650 std = active.std()
652 try:
653 return (mean / std) * (periods_per_year**0.5)
654 except ZeroDivisionError:
655 return 0.0
657 @columnwise_stat
658 def greeks(
659 self, series: pl.Series, periods_per_year: int | float | None = None, benchmark: str | None = None
660 ) -> dict[str, float]:
661 """Calculate alpha and beta of the portfolio.
663 Args:
664 series (pl.Series): The series to calculate greeks for.
665 periods_per_year (int, optional): Number of periods per year. Defaults to 252.
666 benchmark (str, optional): The benchmark column name. Defaults to None.
668 Returns:
669 dict[str, float]: Dictionary containing alpha and beta values.
671 """
672 periods_per_year = periods_per_year or self.data._periods_per_year
674 # period_col = benchmark or self.data.benchmark.columns[0]
676 # find covariance
677 benchmark_col = benchmark or self.data.benchmark.columns[0]
679 # Evaluate both series and benchmark as Series
680 dframe = self.all.select([series, pl.col(benchmark_col).alias("benchmark")])
682 # Drop nulls
683 dframe = dframe.drop_nulls()
684 matrix = dframe.to_numpy()
686 # Get actual Series
687 strategy_np = matrix[:, 0]
688 benchmark_np = matrix[:, 1]
690 # 2x2 covariance matrix: [[var_strategy, cov], [cov, var_benchmark]]
691 cov_matrix = np.cov(strategy_np, benchmark_np)
693 cov = cov_matrix[0, 1]
694 var_benchmark = cov_matrix[1, 1]
696 beta = cov / var_benchmark if var_benchmark != 0 else float("nan")
697 alpha = np.mean(strategy_np) - beta * np.mean(benchmark_np)
699 return {"alpha": alpha * periods_per_year, "beta": beta}