Coverage for src / jquantstats / _stats / _reporting.py: 100%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 18:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 18:44 +0000
1"""Temporal reporting, capture ratios, and summary statistics."""
3from __future__ import annotations
5from typing import TYPE_CHECKING, Any, cast
7import polars as pl
9from ._core import _drawdown_series, _to_float, columnwise_stat
11# ── Reporting statistics mixin ───────────────────────────────────────────────
14class _ReportingStatsMixin:
15 """Mixin providing temporal, capture, and summary reporting metrics.
17 Covers: periods per year, average drawdown, Calmar ratio, recovery factor,
18 max drawdown duration, monthly win rate, worst-N periods, up/down capture
19 ratios, annual breakdown, and summary statistics table.
21 Attributes (provided by the concrete subclass):
22 data: The :class:`~jquantstats._data.Data` object.
23 all: Combined DataFrame for efficient column selection.
24 """
26 if TYPE_CHECKING:
27 from ._protocol import DataLike
29 data: DataLike
30 all: pl.DataFrame | None
32 def avg_return(self) -> dict[str, float]:
33 """Defined on _BasicStatsMixin."""
35 def avg_win(self) -> dict[str, float]:
36 """Defined on _BasicStatsMixin."""
38 def avg_loss(self) -> dict[str, float]:
39 """Defined on _BasicStatsMixin."""
41 def win_rate(self) -> dict[str, float]:
42 """Defined on _BasicStatsMixin."""
44 def profit_factor(self) -> dict[str, float]:
45 """Defined on _BasicStatsMixin."""
47 def payoff_ratio(self) -> dict[str, float]:
48 """Defined on _BasicStatsMixin."""
50 def best(self) -> dict[str, float]:
51 """Defined on _BasicStatsMixin."""
53 def worst(self) -> dict[str, float]:
54 """Defined on _BasicStatsMixin."""
56 def volatility(self) -> dict[str, float]:
57 """Defined on _BasicStatsMixin."""
59 def sharpe(self) -> dict[str, float]:
60 """Defined on _PerformanceStatsMixin."""
62 def skew(self) -> dict[str, float]:
63 """Defined on _BasicStatsMixin."""
65 def kurtosis(self) -> dict[str, float]:
66 """Defined on _BasicStatsMixin."""
68 def value_at_risk(self) -> dict[str, float]:
69 """Defined on _BasicStatsMixin."""
71 def conditional_value_at_risk(self) -> dict[str, float]:
72 """Defined on _BasicStatsMixin."""
74 def max_drawdown(self) -> dict[str, float]:
75 """Defined on _PerformanceStatsMixin."""
77 # ── Temporal & reporting ──────────────────────────────────────────────────
79 @property
80 def periods_per_year(self) -> float:
81 """Estimate the number of periods per year from the data index spacing.
83 Returns:
84 float: Estimated number of observations per calendar year.
85 """
86 return self.data._periods_per_year
88 @columnwise_stat
89 def avg_drawdown(self, series: pl.Series) -> float:
90 """Average drawdown across all underwater periods.
92 Returns 0.0 when there are no underwater periods.
94 Args:
95 series (pl.Series): Series of additive daily returns.
97 Returns:
98 float: Mean drawdown in [0, 1].
99 """
100 dd = _drawdown_series(series)
101 in_dd = dd.filter(dd > 0)
102 if in_dd.is_empty():
103 return 0.0
104 return _to_float(in_dd.mean())
106 @columnwise_stat
107 def calmar(self, series: pl.Series, periods: int | float | None = None) -> float:
108 """Calmar ratio (annualised return divided by maximum drawdown).
110 Returns ``nan`` when the maximum drawdown is zero.
112 Args:
113 series (pl.Series): Series of additive daily returns.
114 periods: Annualisation factor. Defaults to ``periods_per_year``.
116 Returns:
117 float: Calmar ratio, or ``nan`` if max drawdown is zero.
118 """
119 raw_periods = periods or self.data._periods_per_year
120 max_dd = _to_float(_drawdown_series(series).max())
121 if max_dd <= 0:
122 return float("nan")
123 ann_return = _to_float(series.mean()) * raw_periods
124 return ann_return / max_dd
126 @columnwise_stat
127 def recovery_factor(self, series: pl.Series) -> float:
128 """Recovery factor (total return divided by maximum drawdown).
130 Returns ``nan`` when the maximum drawdown is zero.
132 Args:
133 series (pl.Series): Series of additive daily returns.
135 Returns:
136 float: Recovery factor, or ``nan`` if max drawdown is zero.
137 """
138 max_dd = _to_float(_drawdown_series(series).max())
139 if max_dd <= 0:
140 return float("nan")
141 total_return = _to_float(series.sum())
142 return total_return / max_dd
144 def max_drawdown_duration(self) -> dict[str, float | int | None]:
145 """Maximum drawdown duration in calendar days (or periods) per asset.
147 When the index is a temporal column (``Date`` / ``Datetime``) the
148 duration is expressed as calendar days spanned by the longest
149 underwater run. For integer-indexed data each row counts as one
150 period.
152 Returns:
153 dict[str, float | int | None]: Asset → max drawdown duration.
154 Returns 0 when there are no underwater periods.
155 """
156 all_df = cast(pl.DataFrame, self.all)
157 date_col_name = self.data.date_col[0] if self.data.date_col else None
158 has_date = date_col_name is not None and all_df[date_col_name].dtype.is_temporal()
159 result: dict[str, float | int | None] = {}
160 for col, series in self.data.items():
161 nav = 1.0 + series.cast(pl.Float64).cum_sum()
162 hwm = nav.cum_max()
163 in_dd = nav < hwm
165 if not in_dd.any():
166 result[col] = 0
167 continue
169 if has_date and date_col_name is not None:
170 frame = pl.DataFrame({"date": all_df[date_col_name], "in_dd": in_dd})
171 else:
172 frame = pl.DataFrame({"date": pl.Series(list(range(len(series))), dtype=pl.Int64), "in_dd": in_dd})
174 frame = frame.with_columns(pl.col("in_dd").rle_id().alias("run_id"))
175 dd_runs = (
176 frame.filter(pl.col("in_dd"))
177 .group_by("run_id")
178 .agg([pl.col("date").min().alias("start"), pl.col("date").max().alias("end")])
179 )
181 if has_date:
182 dd_runs = dd_runs.with_columns(
183 ((pl.col("end") - pl.col("start")).dt.total_days() + 1).alias("duration")
184 )
185 else:
186 dd_runs = dd_runs.with_columns((pl.col("end") - pl.col("start") + 1).alias("duration"))
188 result[col] = int(_to_float(dd_runs["duration"].max()))
189 return result
191 def monthly_win_rate(self) -> dict[str, float]:
192 """Fraction of calendar months with a positive compounded return per asset.
194 Requires a temporal (Date / Datetime) index. Returns ``nan`` per
195 asset when no temporal index is present.
197 Returns:
198 dict[str, float]: Monthly win rate in [0, 1] per asset.
199 """
200 all_df = cast(pl.DataFrame, self.all)
201 date_col_name = self.data.date_col[0] if self.data.date_col else None
202 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal():
203 return {col: float("nan") for col, _ in self.data.items()}
205 result: dict[str, float] = {}
206 for col, _ in self.data.items():
207 df = (
208 all_df.select([date_col_name, col])
209 .drop_nulls()
210 .with_columns(
211 [
212 pl.col(date_col_name).dt.year().alias("_year"),
213 pl.col(date_col_name).dt.month().alias("_month"),
214 ]
215 )
216 )
217 monthly = (
218 df.group_by(["_year", "_month"])
219 .agg((pl.col(col) + 1.0).product().alias("gross"))
220 .with_columns((pl.col("gross") - 1.0).alias("monthly_return"))
221 )
222 n_total = len(monthly)
223 if n_total == 0:
224 result[col] = float("nan")
225 else:
226 n_positive = int((monthly["monthly_return"] > 0).sum())
227 result[col] = n_positive / n_total
228 return result
230 def worst_n_periods(self, n: int = 5) -> dict[str, list[float | None]]:
231 """Return the N worst return periods per asset.
233 If a series has fewer than ``n`` non-null observations the list is
234 padded with ``None`` on the right.
236 Args:
237 n: Number of worst periods to return. Defaults to 5.
239 Returns:
240 dict[str, list[float | None]]: Sorted worst returns per asset.
241 """
242 result: dict[str, list[float | None]] = {}
243 for col, series in self.data.items():
244 nonnull = series.drop_nulls()
245 worst: list[float | None] = nonnull.sort(descending=False).head(n).to_list()
246 while len(worst) < n:
247 worst.append(None)
248 result[col] = worst
249 return result
251 # ── Capture ratios ────────────────────────────────────────────────────────
253 def up_capture(self, benchmark: pl.Series) -> dict[str, float]:
254 """Up-market capture ratio relative to an explicit benchmark series.
256 Measures the fraction of the benchmark's upside that the strategy
257 captures. A value greater than 1.0 means the strategy outperformed
258 the benchmark in rising markets.
260 Args:
261 benchmark: Benchmark return series aligned row-by-row with the data.
263 Returns:
264 dict[str, float]: Up capture ratio per asset.
265 """
266 up_mask = benchmark > 0
267 bench_up = benchmark.filter(up_mask).drop_nulls()
268 if bench_up.is_empty():
269 return {col: float("nan") for col, _ in self.data.items()}
270 bench_geom = float((bench_up + 1.0).product()) ** (1.0 / len(bench_up)) - 1.0
271 if bench_geom == 0.0: # pragma: no cover
272 return {col: float("nan") for col, _ in self.data.items()}
273 result: dict[str, float] = {}
274 for col, series in self.data.items():
275 strat_up = series.filter(up_mask).drop_nulls()
276 if strat_up.is_empty():
277 result[col] = float("nan")
278 else:
279 strat_geom = float((strat_up + 1.0).product()) ** (1.0 / len(strat_up)) - 1.0
280 result[col] = strat_geom / bench_geom
281 return result
283 def down_capture(self, benchmark: pl.Series) -> dict[str, float]:
284 """Down-market capture ratio relative to an explicit benchmark series.
286 A value less than 1.0 means the strategy lost less than the benchmark
287 in falling markets (a desirable property).
289 Args:
290 benchmark: Benchmark return series aligned row-by-row with the data.
292 Returns:
293 dict[str, float]: Down capture ratio per asset.
294 """
295 down_mask = benchmark < 0
296 bench_down = benchmark.filter(down_mask).drop_nulls()
297 if bench_down.is_empty():
298 return {col: float("nan") for col, _ in self.data.items()}
299 bench_geom = float((bench_down + 1.0).product()) ** (1.0 / len(bench_down)) - 1.0
300 if bench_geom == 0.0: # pragma: no cover
301 return {col: float("nan") for col, _ in self.data.items()}
302 result: dict[str, float] = {}
303 for col, series in self.data.items():
304 strat_down = series.filter(down_mask).drop_nulls()
305 if strat_down.is_empty():
306 result[col] = float("nan")
307 else:
308 strat_geom = float((strat_down + 1.0).product()) ** (1.0 / len(strat_down)) - 1.0
309 result[col] = strat_geom / bench_geom
310 return result
312 # ── Summary & breakdown ────────────────────────────────────────────────────
314 def annual_breakdown(self) -> pl.DataFrame:
315 """Summary statistics broken down by calendar year.
317 Groups the data by calendar year using the date index, computes a
318 full :py:meth:`summary` for each year, and stacks the results with an
319 additional ``year`` column.
321 Returns:
322 pl.DataFrame: Columns ``year``, ``metric``, one per asset, sorted
323 by ``year``.
325 Raises:
326 ValueError: If the data has no date index.
327 """
328 all_df = cast(pl.DataFrame, self.all)
329 date_col_name = self.data.date_col[0] if self.data.date_col else None
330 has_temporal = date_col_name is not None and all_df[date_col_name].dtype.is_temporal()
332 from ..data import Data
334 if not has_temporal:
335 # Integer-index fallback: group by chunks of ~_periods_per_year rows
336 chunk = round(self.data._periods_per_year)
337 total = all_df.height
338 frames_int: list[pl.DataFrame] = []
339 for i, start in enumerate(range(0, total, chunk), start=1):
340 chunk_all = all_df.slice(start, chunk)
341 if chunk_all.height < max(5, chunk // 4):
342 continue
343 chunk_index = chunk_all.select(self.data.date_col)
344 chunk_returns = chunk_all.select(self.data.returns.columns)
345 chunk_benchmark = (
346 chunk_all.select(self.data.benchmark.columns) if self.data.benchmark is not None else None
347 )
348 chunk_data = Data(returns=chunk_returns, index=chunk_index, benchmark=chunk_benchmark)
349 chunk_summary = cast(Any, type(self))(chunk_data).summary()
350 chunk_summary = chunk_summary.with_columns(pl.lit(i).alias("year"))
351 frames_int.append(chunk_summary)
352 if not frames_int:
353 return pl.DataFrame()
354 result_int = pl.concat(frames_int)
355 ordered_int = ["year", "metric", *[c for c in result_int.columns if c not in ("year", "metric")]]
356 return result_int.select(ordered_int)
358 if date_col_name is None: # unreachable: has_temporal guarantees non-None # pragma: no cover
359 return pl.DataFrame() # pragma: no cover
360 years = all_df[date_col_name].dt.year().unique().sort().to_list()
362 frames: list[pl.DataFrame] = []
363 for year in years:
364 year_all = all_df.filter(pl.col(date_col_name).dt.year() == year)
365 if year_all.height < 2:
366 continue
367 year_index = year_all.select([date_col_name])
368 year_returns = year_all.select(self.data.returns.columns)
369 year_benchmark = year_all.select(self.data.benchmark.columns) if self.data.benchmark is not None else None
370 year_data = Data(returns=year_returns, index=year_index, benchmark=year_benchmark)
371 year_summary = cast(Any, type(self))(year_data).summary()
372 year_summary = year_summary.with_columns(pl.lit(year).alias("year"))
373 frames.append(year_summary)
375 if not frames:
376 asset_cols = list(self.data.returns.columns)
377 schema: dict[str, type[pl.DataType]] = {
378 "year": pl.Int32,
379 "metric": pl.String,
380 **dict.fromkeys(asset_cols, pl.Float64),
381 }
382 return pl.DataFrame(schema=schema)
384 result = pl.concat(frames)
385 ordered = ["year", "metric", *[c for c in result.columns if c not in ("year", "metric")]]
386 return result.select(ordered)
388 def summary(self) -> pl.DataFrame:
389 """Summary statistics for each asset as a tidy DataFrame.
391 Each row is one metric; each column beyond ``metric`` is one asset.
393 Returns:
394 pl.DataFrame: A DataFrame with a ``metric`` column followed by one
395 column per asset.
396 """
397 assets = [col for col, _ in self.data.items()]
399 def _safe(fn: Any) -> dict[str, Any]:
400 """Call *fn()* and return its result; return NaN for each asset on any exception."""
401 try:
402 return fn()
403 except Exception:
404 return dict.fromkeys(assets, float("nan"))
406 metrics: dict[str, dict[str, Any]] = {
407 "avg_return": _safe(self.avg_return),
408 "avg_win": _safe(self.avg_win),
409 "avg_loss": _safe(self.avg_loss),
410 "win_rate": _safe(self.win_rate),
411 "profit_factor": _safe(self.profit_factor),
412 "payoff_ratio": _safe(self.payoff_ratio),
413 "monthly_win_rate": _safe(self.monthly_win_rate),
414 "best": _safe(self.best),
415 "worst": _safe(self.worst),
416 "volatility": _safe(self.volatility),
417 "sharpe": _safe(self.sharpe),
418 "skew": _safe(self.skew),
419 "kurtosis": _safe(self.kurtosis),
420 "value_at_risk": _safe(self.value_at_risk),
421 "conditional_value_at_risk": _safe(self.conditional_value_at_risk),
422 "max_drawdown": _safe(self.max_drawdown),
423 "avg_drawdown": _safe(self.avg_drawdown),
424 "max_drawdown_duration": _safe(self.max_drawdown_duration),
425 "calmar": _safe(self.calmar),
426 "recovery_factor": _safe(self.recovery_factor),
427 }
429 rows: list[dict[str, object]] = [
430 {"metric": name, **{asset: values.get(asset) for asset in assets}} for name, values in metrics.items()
431 ]
432 return pl.DataFrame(rows)