Coverage for src / jquantstats / _stats / _reporting.py: 100%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-26 18:44 +0000

1"""Temporal reporting, capture ratios, and summary statistics.""" 

2 

3from __future__ import annotations 

4 

5from typing import TYPE_CHECKING, Any, cast 

6 

7import polars as pl 

8 

9from ._core import _drawdown_series, _to_float, columnwise_stat 

10 

11# ── Reporting statistics mixin ─────────────────────────────────────────────── 

12 

13 

14class _ReportingStatsMixin: 

15 """Mixin providing temporal, capture, and summary reporting metrics. 

16 

17 Covers: periods per year, average drawdown, Calmar ratio, recovery factor, 

18 max drawdown duration, monthly win rate, worst-N periods, up/down capture 

19 ratios, annual breakdown, and summary statistics table. 

20 

21 Attributes (provided by the concrete subclass): 

22 data: The :class:`~jquantstats._data.Data` object. 

23 all: Combined DataFrame for efficient column selection. 

24 """ 

25 

26 if TYPE_CHECKING: 

27 from ._protocol import DataLike 

28 

29 data: DataLike 

30 all: pl.DataFrame | None 

31 

32 def avg_return(self) -> dict[str, float]: 

33 """Defined on _BasicStatsMixin.""" 

34 

35 def avg_win(self) -> dict[str, float]: 

36 """Defined on _BasicStatsMixin.""" 

37 

38 def avg_loss(self) -> dict[str, float]: 

39 """Defined on _BasicStatsMixin.""" 

40 

41 def win_rate(self) -> dict[str, float]: 

42 """Defined on _BasicStatsMixin.""" 

43 

44 def profit_factor(self) -> dict[str, float]: 

45 """Defined on _BasicStatsMixin.""" 

46 

47 def payoff_ratio(self) -> dict[str, float]: 

48 """Defined on _BasicStatsMixin.""" 

49 

50 def best(self) -> dict[str, float]: 

51 """Defined on _BasicStatsMixin.""" 

52 

53 def worst(self) -> dict[str, float]: 

54 """Defined on _BasicStatsMixin.""" 

55 

56 def volatility(self) -> dict[str, float]: 

57 """Defined on _BasicStatsMixin.""" 

58 

59 def sharpe(self) -> dict[str, float]: 

60 """Defined on _PerformanceStatsMixin.""" 

61 

62 def skew(self) -> dict[str, float]: 

63 """Defined on _BasicStatsMixin.""" 

64 

65 def kurtosis(self) -> dict[str, float]: 

66 """Defined on _BasicStatsMixin.""" 

67 

68 def value_at_risk(self) -> dict[str, float]: 

69 """Defined on _BasicStatsMixin.""" 

70 

71 def conditional_value_at_risk(self) -> dict[str, float]: 

72 """Defined on _BasicStatsMixin.""" 

73 

74 def max_drawdown(self) -> dict[str, float]: 

75 """Defined on _PerformanceStatsMixin.""" 

76 

77 # ── Temporal & reporting ────────────────────────────────────────────────── 

78 

79 @property 

80 def periods_per_year(self) -> float: 

81 """Estimate the number of periods per year from the data index spacing. 

82 

83 Returns: 

84 float: Estimated number of observations per calendar year. 

85 """ 

86 return self.data._periods_per_year 

87 

88 @columnwise_stat 

89 def avg_drawdown(self, series: pl.Series) -> float: 

90 """Average drawdown across all underwater periods. 

91 

92 Returns 0.0 when there are no underwater periods. 

93 

94 Args: 

95 series (pl.Series): Series of additive daily returns. 

96 

97 Returns: 

98 float: Mean drawdown in [0, 1]. 

99 """ 

100 dd = _drawdown_series(series) 

101 in_dd = dd.filter(dd > 0) 

102 if in_dd.is_empty(): 

103 return 0.0 

104 return _to_float(in_dd.mean()) 

105 

106 @columnwise_stat 

107 def calmar(self, series: pl.Series, periods: int | float | None = None) -> float: 

108 """Calmar ratio (annualised return divided by maximum drawdown). 

109 

110 Returns ``nan`` when the maximum drawdown is zero. 

111 

112 Args: 

113 series (pl.Series): Series of additive daily returns. 

114 periods: Annualisation factor. Defaults to ``periods_per_year``. 

115 

116 Returns: 

117 float: Calmar ratio, or ``nan`` if max drawdown is zero. 

118 """ 

119 raw_periods = periods or self.data._periods_per_year 

120 max_dd = _to_float(_drawdown_series(series).max()) 

121 if max_dd <= 0: 

122 return float("nan") 

123 ann_return = _to_float(series.mean()) * raw_periods 

124 return ann_return / max_dd 

125 

126 @columnwise_stat 

127 def recovery_factor(self, series: pl.Series) -> float: 

128 """Recovery factor (total return divided by maximum drawdown). 

129 

130 Returns ``nan`` when the maximum drawdown is zero. 

131 

132 Args: 

133 series (pl.Series): Series of additive daily returns. 

134 

135 Returns: 

136 float: Recovery factor, or ``nan`` if max drawdown is zero. 

137 """ 

138 max_dd = _to_float(_drawdown_series(series).max()) 

139 if max_dd <= 0: 

140 return float("nan") 

141 total_return = _to_float(series.sum()) 

142 return total_return / max_dd 

143 

144 def max_drawdown_duration(self) -> dict[str, float | int | None]: 

145 """Maximum drawdown duration in calendar days (or periods) per asset. 

146 

147 When the index is a temporal column (``Date`` / ``Datetime``) the 

148 duration is expressed as calendar days spanned by the longest 

149 underwater run. For integer-indexed data each row counts as one 

150 period. 

151 

152 Returns: 

153 dict[str, float | int | None]: Asset → max drawdown duration. 

154 Returns 0 when there are no underwater periods. 

155 """ 

156 all_df = cast(pl.DataFrame, self.all) 

157 date_col_name = self.data.date_col[0] if self.data.date_col else None 

158 has_date = date_col_name is not None and all_df[date_col_name].dtype.is_temporal() 

159 result: dict[str, float | int | None] = {} 

160 for col, series in self.data.items(): 

161 nav = 1.0 + series.cast(pl.Float64).cum_sum() 

162 hwm = nav.cum_max() 

163 in_dd = nav < hwm 

164 

165 if not in_dd.any(): 

166 result[col] = 0 

167 continue 

168 

169 if has_date and date_col_name is not None: 

170 frame = pl.DataFrame({"date": all_df[date_col_name], "in_dd": in_dd}) 

171 else: 

172 frame = pl.DataFrame({"date": pl.Series(list(range(len(series))), dtype=pl.Int64), "in_dd": in_dd}) 

173 

174 frame = frame.with_columns(pl.col("in_dd").rle_id().alias("run_id")) 

175 dd_runs = ( 

176 frame.filter(pl.col("in_dd")) 

177 .group_by("run_id") 

178 .agg([pl.col("date").min().alias("start"), pl.col("date").max().alias("end")]) 

179 ) 

180 

181 if has_date: 

182 dd_runs = dd_runs.with_columns( 

183 ((pl.col("end") - pl.col("start")).dt.total_days() + 1).alias("duration") 

184 ) 

185 else: 

186 dd_runs = dd_runs.with_columns((pl.col("end") - pl.col("start") + 1).alias("duration")) 

187 

188 result[col] = int(_to_float(dd_runs["duration"].max())) 

189 return result 

190 

191 def monthly_win_rate(self) -> dict[str, float]: 

192 """Fraction of calendar months with a positive compounded return per asset. 

193 

194 Requires a temporal (Date / Datetime) index. Returns ``nan`` per 

195 asset when no temporal index is present. 

196 

197 Returns: 

198 dict[str, float]: Monthly win rate in [0, 1] per asset. 

199 """ 

200 all_df = cast(pl.DataFrame, self.all) 

201 date_col_name = self.data.date_col[0] if self.data.date_col else None 

202 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal(): 

203 return {col: float("nan") for col, _ in self.data.items()} 

204 

205 result: dict[str, float] = {} 

206 for col, _ in self.data.items(): 

207 df = ( 

208 all_df.select([date_col_name, col]) 

209 .drop_nulls() 

210 .with_columns( 

211 [ 

212 pl.col(date_col_name).dt.year().alias("_year"), 

213 pl.col(date_col_name).dt.month().alias("_month"), 

214 ] 

215 ) 

216 ) 

217 monthly = ( 

218 df.group_by(["_year", "_month"]) 

219 .agg((pl.col(col) + 1.0).product().alias("gross")) 

220 .with_columns((pl.col("gross") - 1.0).alias("monthly_return")) 

221 ) 

222 n_total = len(monthly) 

223 if n_total == 0: 

224 result[col] = float("nan") 

225 else: 

226 n_positive = int((monthly["monthly_return"] > 0).sum()) 

227 result[col] = n_positive / n_total 

228 return result 

229 

230 def worst_n_periods(self, n: int = 5) -> dict[str, list[float | None]]: 

231 """Return the N worst return periods per asset. 

232 

233 If a series has fewer than ``n`` non-null observations the list is 

234 padded with ``None`` on the right. 

235 

236 Args: 

237 n: Number of worst periods to return. Defaults to 5. 

238 

239 Returns: 

240 dict[str, list[float | None]]: Sorted worst returns per asset. 

241 """ 

242 result: dict[str, list[float | None]] = {} 

243 for col, series in self.data.items(): 

244 nonnull = series.drop_nulls() 

245 worst: list[float | None] = nonnull.sort(descending=False).head(n).to_list() 

246 while len(worst) < n: 

247 worst.append(None) 

248 result[col] = worst 

249 return result 

250 

251 # ── Capture ratios ──────────────────────────────────────────────────────── 

252 

253 def up_capture(self, benchmark: pl.Series) -> dict[str, float]: 

254 """Up-market capture ratio relative to an explicit benchmark series. 

255 

256 Measures the fraction of the benchmark's upside that the strategy 

257 captures. A value greater than 1.0 means the strategy outperformed 

258 the benchmark in rising markets. 

259 

260 Args: 

261 benchmark: Benchmark return series aligned row-by-row with the data. 

262 

263 Returns: 

264 dict[str, float]: Up capture ratio per asset. 

265 """ 

266 up_mask = benchmark > 0 

267 bench_up = benchmark.filter(up_mask).drop_nulls() 

268 if bench_up.is_empty(): 

269 return {col: float("nan") for col, _ in self.data.items()} 

270 bench_geom = float((bench_up + 1.0).product()) ** (1.0 / len(bench_up)) - 1.0 

271 if bench_geom == 0.0: # pragma: no cover 

272 return {col: float("nan") for col, _ in self.data.items()} 

273 result: dict[str, float] = {} 

274 for col, series in self.data.items(): 

275 strat_up = series.filter(up_mask).drop_nulls() 

276 if strat_up.is_empty(): 

277 result[col] = float("nan") 

278 else: 

279 strat_geom = float((strat_up + 1.0).product()) ** (1.0 / len(strat_up)) - 1.0 

280 result[col] = strat_geom / bench_geom 

281 return result 

282 

283 def down_capture(self, benchmark: pl.Series) -> dict[str, float]: 

284 """Down-market capture ratio relative to an explicit benchmark series. 

285 

286 A value less than 1.0 means the strategy lost less than the benchmark 

287 in falling markets (a desirable property). 

288 

289 Args: 

290 benchmark: Benchmark return series aligned row-by-row with the data. 

291 

292 Returns: 

293 dict[str, float]: Down capture ratio per asset. 

294 """ 

295 down_mask = benchmark < 0 

296 bench_down = benchmark.filter(down_mask).drop_nulls() 

297 if bench_down.is_empty(): 

298 return {col: float("nan") for col, _ in self.data.items()} 

299 bench_geom = float((bench_down + 1.0).product()) ** (1.0 / len(bench_down)) - 1.0 

300 if bench_geom == 0.0: # pragma: no cover 

301 return {col: float("nan") for col, _ in self.data.items()} 

302 result: dict[str, float] = {} 

303 for col, series in self.data.items(): 

304 strat_down = series.filter(down_mask).drop_nulls() 

305 if strat_down.is_empty(): 

306 result[col] = float("nan") 

307 else: 

308 strat_geom = float((strat_down + 1.0).product()) ** (1.0 / len(strat_down)) - 1.0 

309 result[col] = strat_geom / bench_geom 

310 return result 

311 

312 # ── Summary & breakdown ──────────────────────────────────────────────────── 

313 

314 def annual_breakdown(self) -> pl.DataFrame: 

315 """Summary statistics broken down by calendar year. 

316 

317 Groups the data by calendar year using the date index, computes a 

318 full :py:meth:`summary` for each year, and stacks the results with an 

319 additional ``year`` column. 

320 

321 Returns: 

322 pl.DataFrame: Columns ``year``, ``metric``, one per asset, sorted 

323 by ``year``. 

324 

325 Raises: 

326 ValueError: If the data has no date index. 

327 """ 

328 all_df = cast(pl.DataFrame, self.all) 

329 date_col_name = self.data.date_col[0] if self.data.date_col else None 

330 has_temporal = date_col_name is not None and all_df[date_col_name].dtype.is_temporal() 

331 

332 from ..data import Data 

333 

334 if not has_temporal: 

335 # Integer-index fallback: group by chunks of ~_periods_per_year rows 

336 chunk = round(self.data._periods_per_year) 

337 total = all_df.height 

338 frames_int: list[pl.DataFrame] = [] 

339 for i, start in enumerate(range(0, total, chunk), start=1): 

340 chunk_all = all_df.slice(start, chunk) 

341 if chunk_all.height < max(5, chunk // 4): 

342 continue 

343 chunk_index = chunk_all.select(self.data.date_col) 

344 chunk_returns = chunk_all.select(self.data.returns.columns) 

345 chunk_benchmark = ( 

346 chunk_all.select(self.data.benchmark.columns) if self.data.benchmark is not None else None 

347 ) 

348 chunk_data = Data(returns=chunk_returns, index=chunk_index, benchmark=chunk_benchmark) 

349 chunk_summary = cast(Any, type(self))(chunk_data).summary() 

350 chunk_summary = chunk_summary.with_columns(pl.lit(i).alias("year")) 

351 frames_int.append(chunk_summary) 

352 if not frames_int: 

353 return pl.DataFrame() 

354 result_int = pl.concat(frames_int) 

355 ordered_int = ["year", "metric", *[c for c in result_int.columns if c not in ("year", "metric")]] 

356 return result_int.select(ordered_int) 

357 

358 if date_col_name is None: # unreachable: has_temporal guarantees non-None # pragma: no cover 

359 return pl.DataFrame() # pragma: no cover 

360 years = all_df[date_col_name].dt.year().unique().sort().to_list() 

361 

362 frames: list[pl.DataFrame] = [] 

363 for year in years: 

364 year_all = all_df.filter(pl.col(date_col_name).dt.year() == year) 

365 if year_all.height < 2: 

366 continue 

367 year_index = year_all.select([date_col_name]) 

368 year_returns = year_all.select(self.data.returns.columns) 

369 year_benchmark = year_all.select(self.data.benchmark.columns) if self.data.benchmark is not None else None 

370 year_data = Data(returns=year_returns, index=year_index, benchmark=year_benchmark) 

371 year_summary = cast(Any, type(self))(year_data).summary() 

372 year_summary = year_summary.with_columns(pl.lit(year).alias("year")) 

373 frames.append(year_summary) 

374 

375 if not frames: 

376 asset_cols = list(self.data.returns.columns) 

377 schema: dict[str, type[pl.DataType]] = { 

378 "year": pl.Int32, 

379 "metric": pl.String, 

380 **dict.fromkeys(asset_cols, pl.Float64), 

381 } 

382 return pl.DataFrame(schema=schema) 

383 

384 result = pl.concat(frames) 

385 ordered = ["year", "metric", *[c for c in result.columns if c not in ("year", "metric")]] 

386 return result.select(ordered) 

387 

388 def summary(self) -> pl.DataFrame: 

389 """Summary statistics for each asset as a tidy DataFrame. 

390 

391 Each row is one metric; each column beyond ``metric`` is one asset. 

392 

393 Returns: 

394 pl.DataFrame: A DataFrame with a ``metric`` column followed by one 

395 column per asset. 

396 """ 

397 assets = [col for col, _ in self.data.items()] 

398 

399 def _safe(fn: Any) -> dict[str, Any]: 

400 """Call *fn()* and return its result; return NaN for each asset on any exception.""" 

401 try: 

402 return fn() 

403 except Exception: 

404 return dict.fromkeys(assets, float("nan")) 

405 

406 metrics: dict[str, dict[str, Any]] = { 

407 "avg_return": _safe(self.avg_return), 

408 "avg_win": _safe(self.avg_win), 

409 "avg_loss": _safe(self.avg_loss), 

410 "win_rate": _safe(self.win_rate), 

411 "profit_factor": _safe(self.profit_factor), 

412 "payoff_ratio": _safe(self.payoff_ratio), 

413 "monthly_win_rate": _safe(self.monthly_win_rate), 

414 "best": _safe(self.best), 

415 "worst": _safe(self.worst), 

416 "volatility": _safe(self.volatility), 

417 "sharpe": _safe(self.sharpe), 

418 "skew": _safe(self.skew), 

419 "kurtosis": _safe(self.kurtosis), 

420 "value_at_risk": _safe(self.value_at_risk), 

421 "conditional_value_at_risk": _safe(self.conditional_value_at_risk), 

422 "max_drawdown": _safe(self.max_drawdown), 

423 "avg_drawdown": _safe(self.avg_drawdown), 

424 "max_drawdown_duration": _safe(self.max_drawdown_duration), 

425 "calmar": _safe(self.calmar), 

426 "recovery_factor": _safe(self.recovery_factor), 

427 } 

428 

429 rows: list[dict[str, object]] = [ 

430 {"metric": name, **{asset: values.get(asset) for asset in assets}} for name, values in metrics.items() 

431 ] 

432 return pl.DataFrame(rows)