Coverage for src / qsmile / data / prices.py: 97%

235 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 22:47 +0000

1"""Bid/ask option price chain with forward/DF calibration.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6from dataclasses import dataclass, replace 

7from typing import TYPE_CHECKING 

8 

9import cvxpy as cp 

10import numpy as np 

11from numpy.typing import NDArray 

12from scipy.stats import norm 

13 

14from qsmile.data.meta import SmileMetadata 

15from qsmile.data.strikes import StrikeArray 

16 

17if TYPE_CHECKING: 

18 import matplotlib.figure 

19 

20 from qsmile.data.vols import VolData 

21 

22 

23def _calibrate_forward_df( 

24 strikes: NDArray[np.float64], 

25 call_mid: NDArray[np.float64], 

26 put_mid: NDArray[np.float64], 

27) -> tuple[float, float]: 

28 """Calibrate forward and discount factor from put-call parity. 

29 

30 Fits C_mid - P_mid = D * (F - K) using quasi-delta weighted least squares. 

31 The weighting approximates |D(1-D)| by a Gaussian centred at ATM whose 

32 width is inferred from the ATM call price via the Brenner-Subrahmanyam 

33 approximation, giving a characteristic strike-space scale of F0*sigma0*sqrt(T). 

34 

35 Parameters 

36 ---------- 

37 strikes : NDArray 

38 Strike prices. 

39 call_mid : NDArray 

40 Mid call prices. 

41 put_mid : NDArray 

42 Mid put prices. 

43 

44 Returns: 

45 ------- 

46 tuple[float, float] 

47 (forward, discount_factor) 

48 """ 

49 y = call_mid - put_mid # C - P = D*(F - K) = D*F - D*K 

50 

51 # Initial forward estimate: strike where |C-P| is smallest 

52 atm_idx = int(np.argmin(np.abs(y))) 

53 f0 = float(strikes[atm_idx]) 

54 

55 # Quasi-delta weights: Gaussian width inferred from ATM call price 

56 # Brenner-Subrahmanyam: C_ATM ~ F*sigma*sqrt(T) / sqrt(2*pi) (assuming D~1) 

57 # => sigma0 ~ C_ATM * sqrt(2*pi) / (F0*sqrt(T)) -- but we don't know T here, 

58 # so we use the strike-space width h = C_ATM * sqrt(2*pi) directly, 

59 # which equals F0*sigma0*sqrt(T) (the natural delta scale). 

60 c_atm = float(call_mid[atm_idx]) 

61 h = c_atm * np.sqrt(2 * np.pi) 

62 # Guard against degenerate case (e.g. very deep ITM/OTM ATM estimate) 

63 h = max(h, 1e-8 * f0) 

64 weights = np.exp(-0.5 * ((strikes - f0) / h) ** 2) 

65 W = np.diag(np.sqrt(weights)) 

66 

67 # Variables: D*F and D (so the problem is linear) 

68 df = cp.Variable(pos=True, name="D") # discount factor 

69 df_times_f = cp.Variable(pos=True, name="DF") # D * F 

70 

71 # y_i = D*F - D*K_i for each strike 

72 residuals = y - (df_times_f - df * strikes) 

73 objective = cp.Minimize(cp.sum_squares(W @ residuals)) 

74 

75 prob = cp.Problem(objective, [df <= 1.0]) 

76 prob.solve(solver=cp.CLARABEL) 

77 

78 if df.value is None or df_times_f.value is None: 

79 msg = "put-call parity calibration failed to converge" 

80 raise RuntimeError(msg) 

81 

82 D_val = float(df.value) 

83 F_val = float(df_times_f.value) / D_val 

84 

85 return F_val, D_val 

86 

87 

88def delta_blend_ivols( 

89 call_bid_ivols: NDArray[np.float64], 

90 call_ask_ivols: NDArray[np.float64], 

91 put_bid_ivols: NDArray[np.float64], 

92 put_ask_ivols: NDArray[np.float64], 

93 strikes: NDArray[np.float64], 

94 forward: float, 

95 expiry: float, 

96) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

97 """Blend call and put implied vols using Black76 undiscounted call-delta weights. 

98 

99 At each strike K, the blending weight is w(K) = Phi(d1) where 

100 d1 = [ln(F/K) + 0.5 * sigma_C^2 * t] / (sigma_C * sqrt(t)) 

101 and sigma_C is the mid call-implied vol at that strike. 

102 

103 The blended vol is: sigma = w * sigma_C + (1 - w) * sigma_P. 

104 Bid and ask are blended independently with the same weights. 

105 

106 NaN values in input vols indicate inversion failures. At such strikes 

107 the blended vol falls back to the available option type. If neither 

108 is available, the strike is excluded (NaN in output). 

109 

110 Parameters 

111 ---------- 

112 call_bid_ivols : NDArray[np.float64] 

113 Call-implied bid vols (NaN where inversion failed). 

114 call_ask_ivols : NDArray[np.float64] 

115 Call-implied ask vols (NaN where inversion failed). 

116 put_bid_ivols : NDArray[np.float64] 

117 Put-implied bid vols (NaN where inversion failed). 

118 put_ask_ivols : NDArray[np.float64] 

119 Put-implied ask vols (NaN where inversion failed). 

120 strikes : NDArray[np.float64] 

121 Strike prices. 

122 forward : float 

123 Forward price. 

124 expiry : float 

125 Time to expiry in years. 

126 

127 Returns: 

128 ------- 

129 tuple[NDArray[np.float64], NDArray[np.float64]] 

130 (blended_bid_ivols, blended_ask_ivols). Strikes where neither 

131 call nor put vol is available will have NaN. 

132 """ 

133 call_mid = (call_bid_ivols + call_ask_ivols) / 2.0 

134 sqrt_t = np.sqrt(expiry) 

135 

136 # Compute delta weights from call mid vol 

137 # Where call vol is NaN, use put mid vol for delta calc; if both NaN, weight = NaN 

138 put_mid = (put_bid_ivols + put_ask_ivols) / 2.0 

139 sigma_for_delta = np.where(np.isnan(call_mid), put_mid, call_mid) 

140 safe_sigma = np.where(np.isnan(sigma_for_delta), 1.0, sigma_for_delta) 

141 safe_sigma = np.where(safe_sigma <= 0, 1e-8, safe_sigma) 

142 

143 d1 = (np.log(forward / strikes) + 0.5 * safe_sigma**2 * expiry) / (safe_sigma * sqrt_t) 

144 # w = 1 - Phi(d1): ≈ 0 at low strikes (OTM put region), ≈ 1 at high strikes (OTM call region) 

145 # Used as the call weight so OTM options dominate in each wing. 

146 w = 1.0 - norm.cdf(d1) 

147 

148 # Mask NaN vols: if call is NaN, force w=0 (use put); if put is NaN, force w=1 (use call) 

149 call_available = ~np.isnan(call_bid_ivols) 

150 put_available = ~np.isnan(put_bid_ivols) 

151 

152 # Both missing -> NaN 

153 neither = ~call_available & ~put_available 

154 w = np.where(call_available & ~put_available, 1.0, w) 

155 w = np.where(~call_available & put_available, 0.0, w) 

156 w = np.where(neither, np.nan, w) 

157 

158 # Replace NaN vols with 0 for arithmetic (masked by weight) 

159 cb = np.where(np.isnan(call_bid_ivols), 0.0, call_bid_ivols) 

160 ca = np.where(np.isnan(call_ask_ivols), 0.0, call_ask_ivols) 

161 pb = np.where(np.isnan(put_bid_ivols), 0.0, put_bid_ivols) 

162 pa = np.where(np.isnan(put_ask_ivols), 0.0, put_ask_ivols) 

163 

164 # w weights calls, (1-w) weights puts: OTM dominates in each wing 

165 blended_bid = w * cb + (1.0 - w) * pb 

166 blended_ask = w * ca + (1.0 - w) * pa 

167 

168 return blended_bid, blended_ask 

169 

170 

171@dataclass(repr=False) 

172class OptionChain: 

173 """Bid/ask option price chain for a single expiry. 

174 

175 Parameters 

176 ---------- 

177 strikedata : StrikeArray 

178 Strike-indexed columnar data containing at least ``call_bid``, 

179 ``call_ask``, ``put_bid``, and ``put_ask`` columns. 

180 Optional ``volume`` and ``open_interest`` columns are supported. 

181 metadata : SmileMetadata 

182 Smile metadata. ``expiry`` must be provided. 

183 ``forward`` and ``discount_factor`` are calibrated from 

184 put-call parity if ``None``. 

185 """ 

186 

187 strikedata: StrikeArray 

188 metadata: SmileMetadata 

189 

190 def __post_init__(self) -> None: 

191 """Validate inputs and calibrate forward/DF if needed.""" 

192 sd = self.strikedata 

193 strikes = sd.strikes 

194 n = len(strikes) 

195 

196 if n < 3: 

197 msg = f"at least 3 strikes required, got {n}" 

198 raise ValueError(msg) 

199 if np.any(strikes <= 0): 

200 msg = "all strikes must be positive" 

201 raise ValueError(msg) 

202 

203 for key in (("call", "bid"), ("call", "ask"), ("put", "bid"), ("put", "ask")): 

204 arr = sd.get_values(key) 

205 if arr is not None and np.any(arr < 0): 

206 msg = f"{key[0]}_{key[1]} must be non-negative" 

207 raise ValueError(msg) 

208 

209 call_bid = sd.get_values(("call", "bid")) 

210 call_ask = sd.get_values(("call", "ask")) 

211 put_bid = sd.get_values(("put", "bid")) 

212 put_ask = sd.get_values(("put", "ask")) 

213 

214 if call_bid is not None and call_ask is not None and np.any(call_bid > call_ask): 

215 msg = "call_bid must not exceed call_ask" 

216 raise ValueError(msg) 

217 if put_bid is not None and put_ask is not None and np.any(put_bid > put_ask): 

218 msg = "put_bid must not exceed put_ask" 

219 raise ValueError(msg) 

220 

221 for key in (("meta", "volume"), ("meta", "open_interest")): 

222 arr = sd.get_values(key) 

223 if arr is not None and np.any(arr < 0): 

224 msg = f"{key[1]} must be non-negative" 

225 raise ValueError(msg) 

226 

227 # Calibrate forward and discount factor if not provided 

228 meta = self.metadata 

229 if meta.forward is None or meta.discount_factor is None: 

230 f_cal, d_cal = _calibrate_forward_df(strikes, self.call_mid, self.put_mid) 

231 self.metadata = replace( 

232 meta, 

233 forward=meta.forward if meta.forward is not None else f_cal, 

234 discount_factor=meta.discount_factor if meta.discount_factor is not None else d_cal, 

235 ) 

236 

237 def __repr__(self) -> str: 

238 """Compact repr with date, expiry, forward, and discount factor.""" 

239 m = self.metadata 

240 date = m.date.strftime("%Y-%m-%d") 

241 expiry = m.expiry.strftime("%Y-%m-%d") 

242 fwd = f"{m.forward:.2f}" if m.forward is not None else "None" 

243 df = f"{m.discount_factor:.2f}" if m.discount_factor is not None else "None" 

244 return f"OptionChain(date={date}, expiry={expiry}, forward={fwd}, discount_factor={df})" 

245 

246 # ── convenience accessors ───────────────────────────────────── 

247 

248 @property 

249 def strikes(self) -> NDArray[np.float64]: 

250 """Strike prices.""" 

251 return self.strikedata.strikes 

252 

253 @property 

254 def call_bid(self) -> NDArray[np.float64]: 

255 """Call bid prices.""" 

256 return self.strikedata.values(("call", "bid")) 

257 

258 @property 

259 def call_ask(self) -> NDArray[np.float64]: 

260 """Call ask prices.""" 

261 return self.strikedata.values(("call", "ask")) 

262 

263 @property 

264 def put_bid(self) -> NDArray[np.float64]: 

265 """Put bid prices.""" 

266 return self.strikedata.values(("put", "bid")) 

267 

268 @property 

269 def put_ask(self) -> NDArray[np.float64]: 

270 """Put ask prices.""" 

271 return self.strikedata.values(("put", "ask")) 

272 

273 @property 

274 def volume(self) -> NDArray[np.float64] | None: 

275 """Per-strike traded volume, or None.""" 

276 return self.strikedata.get_values(("meta", "volume")) 

277 

278 @property 

279 def open_interest(self) -> NDArray[np.float64] | None: 

280 """Per-strike open interest, or None.""" 

281 return self.strikedata.get_values(("meta", "open_interest")) 

282 

283 @property 

284 def call_mid(self) -> NDArray[np.float64]: 

285 """Midpoint of call bid and ask prices.""" 

286 return (self.call_bid + self.call_ask) / 2.0 

287 

288 @property 

289 def put_mid(self) -> NDArray[np.float64]: 

290 """Midpoint of put bid and ask prices.""" 

291 return (self.put_bid + self.put_ask) / 2.0 

292 

293 def to_vols(self) -> VolData: 

294 """Convert to a VolData with (FixedStrike, Volatility) using delta-blended vols. 

295 

296 Inverts both call and put prices to implied vols at every strike, then 

297 blends them using Black76 undiscounted call-delta weights. OTM options 

298 dominate in each wing; ATM is approximately equal-weighted. 

299 

300 Strikes where neither call nor put vol can be computed are excluded. 

301 """ 

302 from qsmile.core.black76 import black76_implied_vol 

303 from qsmile.core.coords import XCoord, YCoord 

304 from qsmile.data.vols import VolData 

305 

306 meta = self.metadata 

307 if meta.forward is None or meta.discount_factor is None: 

308 msg = "forward and discount_factor must be calibrated before to_vols()" 

309 raise TypeError(msg) 

310 

311 n = len(self.strikes) 

312 

313 # Invert call and put prices to implied vols (NaN on failure) 

314 call_bid_iv = np.full(n, np.nan) 

315 call_ask_iv = np.full(n, np.nan) 

316 put_bid_iv = np.full(n, np.nan) 

317 put_ask_iv = np.full(n, np.nan) 

318 

319 for i in range(n): 

320 k = float(self.strikes[i]) 

321 with contextlib.suppress(ValueError): 

322 call_bid_iv[i] = black76_implied_vol( 

323 float(self.call_bid[i]), meta.forward, k, meta.discount_factor, meta.texpiry, is_call=True 

324 ) 

325 with contextlib.suppress(ValueError): 

326 call_ask_iv[i] = black76_implied_vol( 

327 float(self.call_ask[i]), meta.forward, k, meta.discount_factor, meta.texpiry, is_call=True 

328 ) 

329 with contextlib.suppress(ValueError): 

330 put_bid_iv[i] = black76_implied_vol( 

331 float(self.put_bid[i]), meta.forward, k, meta.discount_factor, meta.texpiry, is_call=False 

332 ) 

333 with contextlib.suppress(ValueError): 

334 put_ask_iv[i] = black76_implied_vol( 

335 float(self.put_ask[i]), meta.forward, k, meta.discount_factor, meta.texpiry, is_call=False 

336 ) 

337 

338 # Blend using delta weights 

339 blended_bid, blended_ask = delta_blend_ivols( 

340 call_bid_iv, 

341 call_ask_iv, 

342 put_bid_iv, 

343 put_ask_iv, 

344 self.strikes, 

345 meta.forward, 

346 meta.texpiry, 

347 ) 

348 

349 # Exclude strikes where neither vol is available 

350 valid = ~np.isnan(blended_bid) & ~np.isnan(blended_ask) 

351 strikes_out = self.strikes[valid] 

352 bid_out = blended_bid[valid] 

353 ask_out = blended_ask[valid] 

354 

355 # Derive sigma_atm from blended mid at ATM strike 

356 mid_out = (bid_out + ask_out) / 2.0 

357 atm_idx = int(np.argmin(np.abs(strikes_out - meta.forward))) 

358 sigma_atm = float(mid_out[atm_idx]) 

359 

360 import pandas as pd 

361 

362 sa = StrikeArray() 

363 idx = pd.Index(strikes_out, dtype=np.float64) 

364 sa.set(("y", "bid"), pd.Series(bid_out, index=idx)) 

365 sa.set(("y", "ask"), pd.Series(ask_out, index=idx)) 

366 if self.volume is not None: 

367 sa.set(("meta", "volume"), pd.Series(self.volume[valid], index=idx)) 

368 if self.open_interest is not None: 

369 sa.set(("meta", "open_interest"), pd.Series(self.open_interest[valid], index=idx)) 

370 

371 return VolData( 

372 strikearray=sa, 

373 current_x_coord=XCoord.FixedStrike, 

374 current_y_coord=YCoord.Volatility, 

375 metadata=replace(meta, sigma_atm=sigma_atm), 

376 ) 

377 

378 def filter(self) -> OptionChain: 

379 """Return a cleaned copy with stale and implausible quotes removed. 

380 

381 Applies five filters in sequence: 

382 

383 1. **Zero-bid filter** -- removes strikes where either the call or put 

384 bid is zero (no genuine two-sided market). 

385 2. **Put-call parity monotonicity** -- C_mid - P_mid must be strictly 

386 decreasing in strike (since it equals D*(F - K)). Strikes that 

387 break monotonicity carry stale or mismarked quotes and are dropped. 

388 3. **Call- and put-mid monotonicity** -- call mids must be non-increasing 

389 and put mids non-decreasing in strike. Any remaining violations are 

390 removed. 

391 4. **Sub-intrinsic filter** -- removes strikes where the call or put 

392 **bid** falls below intrinsic value (using the calibrated forward), 

393 which indicates illiquid deep-ITM or stale quotes. 

394 5. **Parity residual filter** -- removes strikes where the put-call 

395 parity residual |C_mid - P_mid - D*(F - K)| exceeds 3x the 

396 combined half bid-ask spread, indicating a stale or mispriced 

397 deep-ITM quote. 

398 

399 Returns: 

400 ------- 

401 OptionChain 

402 A new ``OptionChain`` with the noisy strikes removed and 

403 ``forward`` / ``discount_factor`` re-calibrated on the clean data. 

404 """ 

405 keep = np.ones(len(self.strikes), dtype=bool) 

406 

407 # 1. Both sides must quote a positive bid 

408 keep &= self.call_bid > 0 

409 keep &= self.put_bid > 0 

410 

411 # Work on the surviving subset for the monotonicity checks 

412 def _non_monotone_mask(values: NDArray[np.float64], decreasing: bool) -> NDArray[np.bool_]: 

413 """Return a mask (True = keep) that removes non-monotone points. 

414 

415 Iteratively drops the point at each violation until the sequence 

416 is monotone, working from the largest violation first. 

417 """ 

418 mask = np.ones(len(values), dtype=bool) 

419 while True: 

420 vals = values[mask] 

421 diff = np.diff(vals) 

422 bad = diff > 0 if decreasing else diff < 0 

423 if not np.any(bad): 

424 break 

425 # Find the worst violation in the filtered view 

426 magnitudes = np.abs(diff) * bad 

427 worst_idx = int(np.argmax(magnitudes)) 

428 # Map back to the full-array index and remove the point 

429 # (remove the second element of the pair that violates) 

430 full_indices = np.where(mask)[0] 

431 mask[full_indices[worst_idx + 1]] = False 

432 return mask 

433 

434 # 2. Put-call parity: C_mid - P_mid must be strictly decreasing 

435 parity = self.call_mid[keep] - self.put_mid[keep] 

436 parity_keep = _non_monotone_mask(parity, decreasing=True) 

437 keep_indices = np.where(keep)[0] 

438 keep[keep_indices[~parity_keep]] = False 

439 

440 # 3a. Call mids must be non-increasing in strike 

441 call_keep = _non_monotone_mask(self.call_mid[keep], decreasing=True) 

442 keep_indices = np.where(keep)[0] 

443 keep[keep_indices[~call_keep]] = False 

444 

445 # 3b. Put mids must be non-decreasing in strike 

446 put_keep = _non_monotone_mask(self.put_mid[keep], decreasing=False) 

447 keep_indices = np.where(keep)[0] 

448 keep[keep_indices[~put_keep]] = False 

449 

450 # 4. Sub-intrinsic filter: calibrate F on clean data, then remove 

451 # strikes where the bid price is below intrinsic value 

452 clean_strikes = self.strikes[keep] 

453 clean_c_mid = self.call_mid[keep] 

454 clean_p_mid = self.put_mid[keep] 

455 f_est, d_est = _calibrate_forward_df(clean_strikes, clean_c_mid, clean_p_mid) 

456 call_intrinsic = d_est * np.maximum(f_est - clean_strikes, 0.0) 

457 put_intrinsic = d_est * np.maximum(clean_strikes - f_est, 0.0) 

458 intrinsic_ok = (self.call_bid[keep] >= call_intrinsic) & (self.put_bid[keep] >= put_intrinsic) 

459 keep_indices = np.where(keep)[0] 

460 keep[keep_indices[~intrinsic_ok]] = False 

461 

462 # 5. Parity residual filter (iterative): |C-P - D*(F-K)| must be 

463 # within a small multiple of the combined half bid-ask spread. 

464 # Iteratively remove the worst outlier and recalibrate F/DF, 

465 # since a single stale deep-ITM quote can bias the calibration. 

466 while np.sum(keep) >= 3: 

467 ks = self.strikes[keep] 

468 cm = self.call_mid[keep] 

469 pm = self.put_mid[keep] 

470 f_est, d_est = _calibrate_forward_df(ks, cm, pm) 

471 parity_actual = cm - pm 

472 parity_predicted = d_est * (f_est - ks) 

473 half_spread = ( 

474 (self.call_ask[keep] - self.call_bid[keep]) + (self.put_ask[keep] - self.put_bid[keep]) 

475 ) / 2.0 

476 ratio = np.abs(parity_actual - parity_predicted) / np.maximum(half_spread, 1e-10) 

477 worst = int(np.argmax(ratio)) 

478 if ratio[worst] <= 3.0: 

479 break 

480 keep_indices = np.where(keep)[0] 

481 keep[keep_indices[worst]] = False 

482 

483 filtered_sd = self.strikedata.filter(keep) 

484 return OptionChain( 

485 strikedata=filtered_sd, 

486 metadata=SmileMetadata( 

487 date=self.metadata.date, expiry=self.metadata.expiry, daycount=self.metadata.daycount 

488 ), 

489 ) 

490 

491 def plot(self, *, title: str = "Option Chain Prices", ax=None, **kwargs) -> matplotlib.figure.Figure: 

492 """Plot bid/ask prices as error bars for calls and puts.""" 

493 from qsmile.core.plot import _require_matplotlib, plot_bid_ask 

494 

495 _require_matplotlib() 

496 import matplotlib.pyplot as plt 

497 

498 if ax is None: 

499 _, ax = plt.subplots() 

500 plot_bid_ask( 

501 self.strikes, 

502 self.call_mid, 

503 self.call_bid, 

504 self.call_ask, 

505 label="Calls", 

506 color="tab:blue", 

507 fmt="none", 

508 ax=ax, 

509 **kwargs, 

510 ) 

511 plot_bid_ask( 

512 self.strikes, 

513 self.put_mid, 

514 self.put_bid, 

515 self.put_ask, 

516 label="Puts", 

517 color="tab:red", 

518 fmt="none", 

519 ax=ax, 

520 **kwargs, 

521 ) 

522 ax.set_xlabel("Strike") 

523 ax.set_ylabel("Price") 

524 ax.set_title(title) 

525 ax.legend() 

526 fig = ax.get_figure() 

527 if not isinstance(fig, plt.Figure): 

528 msg = "Expected a matplotlib Figure from ax.get_figure()" 

529 raise TypeError(msg) 

530 return fig