Coverage for src / qsmile / data / prices.py: 98%

204 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 21:47 +0000

1"""Bid/ask option price chain with forward/DF calibration.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6from dataclasses import dataclass, field 

7from typing import TYPE_CHECKING 

8 

9import cvxpy as cp 

10import numpy as np 

11from numpy.typing import NDArray 

12from scipy.stats import norm 

13 

14if TYPE_CHECKING: 

15 import matplotlib.figure 

16 

17 from qsmile.data.vols import SmileData 

18 

19 

20def _calibrate_forward_df( 

21 strikes: NDArray[np.float64], 

22 call_mid: NDArray[np.float64], 

23 put_mid: NDArray[np.float64], 

24) -> tuple[float, float]: 

25 """Calibrate forward and discount factor from put-call parity. 

26 

27 Fits C_mid - P_mid = D * (F - K) using quasi-delta weighted least squares. 

28 The weighting approximates |D(1-D)| by a Gaussian centred at ATM whose 

29 width is inferred from the ATM call price via the Brenner-Subrahmanyam 

30 approximation, giving a characteristic strike-space scale of F0*sigma0*sqrt(T). 

31 

32 Parameters 

33 ---------- 

34 strikes : NDArray 

35 Strike prices. 

36 call_mid : NDArray 

37 Mid call prices. 

38 put_mid : NDArray 

39 Mid put prices. 

40 

41 Returns: 

42 ------- 

43 tuple[float, float] 

44 (forward, discount_factor) 

45 """ 

46 y = call_mid - put_mid # C - P = D*(F - K) = D*F - D*K 

47 

48 # Initial forward estimate: strike where |C-P| is smallest 

49 atm_idx = int(np.argmin(np.abs(y))) 

50 f0 = float(strikes[atm_idx]) 

51 

52 # Quasi-delta weights: Gaussian width inferred from ATM call price 

53 # Brenner-Subrahmanyam: C_ATM ~ F*sigma*sqrt(T) / sqrt(2*pi) (assuming D~1) 

54 # => sigma0 ~ C_ATM * sqrt(2*pi) / (F0*sqrt(T)) -- but we don't know T here, 

55 # so we use the strike-space width h = C_ATM * sqrt(2*pi) directly, 

56 # which equals F0*sigma0*sqrt(T) (the natural delta scale). 

57 c_atm = float(call_mid[atm_idx]) 

58 h = c_atm * np.sqrt(2 * np.pi) 

59 # Guard against degenerate case (e.g. very deep ITM/OTM ATM estimate) 

60 h = max(h, 1e-8 * f0) 

61 weights = np.exp(-0.5 * ((strikes - f0) / h) ** 2) 

62 W = np.diag(np.sqrt(weights)) 

63 

64 # Variables: D*F and D (so the problem is linear) 

65 df = cp.Variable(pos=True, name="D") # discount factor 

66 df_times_f = cp.Variable(pos=True, name="DF") # D * F 

67 

68 # y_i = D*F - D*K_i for each strike 

69 residuals = y - (df_times_f - df * strikes) 

70 objective = cp.Minimize(cp.sum_squares(W @ residuals)) 

71 

72 prob = cp.Problem(objective, [df <= 1.0]) 

73 prob.solve(solver=cp.CLARABEL) 

74 

75 if df.value is None or df_times_f.value is None: 

76 msg = "put-call parity calibration failed to converge" 

77 raise RuntimeError(msg) 

78 

79 D_val = float(df.value) 

80 F_val = float(df_times_f.value) / D_val 

81 

82 return F_val, D_val 

83 

84 

85def delta_blend_ivols( 

86 call_bid_ivols: NDArray[np.float64], 

87 call_ask_ivols: NDArray[np.float64], 

88 put_bid_ivols: NDArray[np.float64], 

89 put_ask_ivols: NDArray[np.float64], 

90 strikes: NDArray[np.float64], 

91 forward: float, 

92 expiry: float, 

93) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

94 """Blend call and put implied vols using Black76 undiscounted call-delta weights. 

95 

96 At each strike K, the blending weight is w(K) = Phi(d1) where 

97 d1 = [ln(F/K) + 0.5 * sigma_C^2 * t] / (sigma_C * sqrt(t)) 

98 and sigma_C is the mid call-implied vol at that strike. 

99 

100 The blended vol is: sigma = w * sigma_C + (1 - w) * sigma_P. 

101 Bid and ask are blended independently with the same weights. 

102 

103 NaN values in input vols indicate inversion failures. At such strikes 

104 the blended vol falls back to the available option type. If neither 

105 is available, the strike is excluded (NaN in output). 

106 

107 Parameters 

108 ---------- 

109 call_bid_ivols : NDArray[np.float64] 

110 Call-implied bid vols (NaN where inversion failed). 

111 call_ask_ivols : NDArray[np.float64] 

112 Call-implied ask vols (NaN where inversion failed). 

113 put_bid_ivols : NDArray[np.float64] 

114 Put-implied bid vols (NaN where inversion failed). 

115 put_ask_ivols : NDArray[np.float64] 

116 Put-implied ask vols (NaN where inversion failed). 

117 strikes : NDArray[np.float64] 

118 Strike prices. 

119 forward : float 

120 Forward price. 

121 expiry : float 

122 Time to expiry in years. 

123 

124 Returns: 

125 ------- 

126 tuple[NDArray[np.float64], NDArray[np.float64]] 

127 (blended_bid_ivols, blended_ask_ivols). Strikes where neither 

128 call nor put vol is available will have NaN. 

129 """ 

130 call_mid = (call_bid_ivols + call_ask_ivols) / 2.0 

131 sqrt_t = np.sqrt(expiry) 

132 

133 # Compute delta weights from call mid vol 

134 # Where call vol is NaN, use put mid vol for delta calc; if both NaN, weight = NaN 

135 put_mid = (put_bid_ivols + put_ask_ivols) / 2.0 

136 sigma_for_delta = np.where(np.isnan(call_mid), put_mid, call_mid) 

137 safe_sigma = np.where(np.isnan(sigma_for_delta), 1.0, sigma_for_delta) 

138 safe_sigma = np.where(safe_sigma <= 0, 1e-8, safe_sigma) 

139 

140 d1 = (np.log(forward / strikes) + 0.5 * safe_sigma**2 * expiry) / (safe_sigma * sqrt_t) 

141 w = norm.cdf(d1) 

142 

143 # Mask NaN vols: if call is NaN, force weight to 0 (use put); if put is NaN, force to 1 (use call) 

144 call_available = ~np.isnan(call_bid_ivols) 

145 put_available = ~np.isnan(put_bid_ivols) 

146 

147 # Both missing → NaN 

148 neither = ~call_available & ~put_available 

149 w = np.where(call_available & ~put_available, 1.0, w) 

150 w = np.where(~call_available & put_available, 0.0, w) 

151 w = np.where(neither, np.nan, w) 

152 

153 # Replace NaN vols with 0 for arithmetic (masked by weight) 

154 cb = np.where(np.isnan(call_bid_ivols), 0.0, call_bid_ivols) 

155 ca = np.where(np.isnan(call_ask_ivols), 0.0, call_ask_ivols) 

156 pb = np.where(np.isnan(put_bid_ivols), 0.0, put_bid_ivols) 

157 pa = np.where(np.isnan(put_ask_ivols), 0.0, put_ask_ivols) 

158 

159 blended_bid = w * cb + (1.0 - w) * pb 

160 blended_ask = w * ca + (1.0 - w) * pa 

161 

162 return blended_bid, blended_ask 

163 

164 

165@dataclass 

166class OptionChain: 

167 """Bid/ask option price chain for a single expiry. 

168 

169 Parameters 

170 ---------- 

171 strikes : NDArray[np.float64] 

172 Strike prices. Must be positive. 

173 call_bid : NDArray[np.float64] 

174 Call bid prices. Must be non-negative. 

175 call_ask : NDArray[np.float64] 

176 Call ask prices. Must be >= call_bid. 

177 put_bid : NDArray[np.float64] 

178 Put bid prices. Must be non-negative. 

179 put_ask : NDArray[np.float64] 

180 Put ask prices. Must be >= put_bid. 

181 expiry : float 

182 Time to expiry in years. Must be positive. 

183 forward : float | None 

184 Forward price. Calibrated from put-call parity if not provided. 

185 discount_factor : float | None 

186 Discount factor. Calibrated from put-call parity if not provided. 

187 """ 

188 

189 strikes: NDArray[np.float64] 

190 call_bid: NDArray[np.float64] 

191 call_ask: NDArray[np.float64] 

192 put_bid: NDArray[np.float64] 

193 put_ask: NDArray[np.float64] 

194 expiry: float 

195 forward: float | None = field(default=None) 

196 discount_factor: float | None = field(default=None) 

197 

198 def __post_init__(self) -> None: 

199 """Validate and convert inputs, calibrate forward/DF if needed.""" 

200 self.strikes = np.asarray(self.strikes, dtype=np.float64) 

201 self.call_bid = np.asarray(self.call_bid, dtype=np.float64) 

202 self.call_ask = np.asarray(self.call_ask, dtype=np.float64) 

203 self.put_bid = np.asarray(self.put_bid, dtype=np.float64) 

204 self.put_ask = np.asarray(self.put_ask, dtype=np.float64) 

205 

206 n = len(self.strikes) 

207 for name, arr in [ 

208 ("call_bid", self.call_bid), 

209 ("call_ask", self.call_ask), 

210 ("put_bid", self.put_bid), 

211 ("put_ask", self.put_ask), 

212 ]: 

213 if len(arr) != n: 

214 msg = f"{name} must have the same length as strikes ({n}), got {len(arr)}" 

215 raise ValueError(msg) 

216 

217 if n < 3: 

218 msg = f"at least 3 strikes required, got {n}" 

219 raise ValueError(msg) 

220 if np.any(self.strikes <= 0): 

221 msg = "all strikes must be positive" 

222 raise ValueError(msg) 

223 if self.expiry <= 0: 

224 msg = f"expiry must be positive, got {self.expiry}" 

225 raise ValueError(msg) 

226 

227 for name, arr in [ 

228 ("call_bid", self.call_bid), 

229 ("call_ask", self.call_ask), 

230 ("put_bid", self.put_bid), 

231 ("put_ask", self.put_ask), 

232 ]: 

233 if np.any(arr < 0): 

234 msg = f"{name} must be non-negative" 

235 raise ValueError(msg) 

236 

237 if np.any(self.call_bid > self.call_ask): 

238 msg = "call_bid must not exceed call_ask" 

239 raise ValueError(msg) 

240 if np.any(self.put_bid > self.put_ask): 

241 msg = "put_bid must not exceed put_ask" 

242 raise ValueError(msg) 

243 

244 # Calibrate forward and discount factor if not provided 

245 if self.forward is None or self.discount_factor is None: 

246 f_cal, d_cal = _calibrate_forward_df(self.strikes, self.call_mid, self.put_mid) 

247 if self.forward is None: 

248 self.forward = f_cal 

249 if self.discount_factor is None: 

250 self.discount_factor = d_cal 

251 

252 @property 

253 def call_mid(self) -> NDArray[np.float64]: 

254 """Midpoint of call bid and ask prices.""" 

255 return (self.call_bid + self.call_ask) / 2.0 

256 

257 @property 

258 def put_mid(self) -> NDArray[np.float64]: 

259 """Midpoint of put bid and ask prices.""" 

260 return (self.put_bid + self.put_ask) / 2.0 

261 

262 def to_smile_data(self) -> SmileData: 

263 """Convert to a SmileData with (FixedStrike, Price) coordinates. 

264 

265 Uses call mid-market prices as the Y-values. 

266 """ 

267 from qsmile.core.coords import XCoord, YCoord 

268 from qsmile.data.meta import SmileMetadata 

269 from qsmile.data.vols import SmileData 

270 

271 assert self.forward is not None # noqa: S101 

272 assert self.discount_factor is not None # noqa: S101 

273 

274 return SmileData( 

275 x=self.strikes.copy(), 

276 y_bid=self.call_bid.copy(), 

277 y_ask=self.call_ask.copy(), 

278 x_coord=XCoord.FixedStrike, 

279 y_coord=YCoord.Price, 

280 metadata=SmileMetadata( 

281 forward=self.forward, 

282 discount_factor=self.discount_factor, 

283 expiry=self.expiry, 

284 ), 

285 ) 

286 

287 def to_smile_data_blended(self) -> SmileData: 

288 """Convert to a SmileData with (FixedStrike, Volatility) using delta-blended vols. 

289 

290 Inverts both call and put prices to implied vols at every strike, then 

291 blends them using Black76 undiscounted call-delta weights. OTM options 

292 dominate in each wing; ATM is approximately equal-weighted. 

293 

294 Strikes where neither call nor put vol can be computed are excluded. 

295 """ 

296 from qsmile.core.black76 import black76_implied_vol 

297 from qsmile.core.coords import XCoord, YCoord 

298 from qsmile.data.meta import SmileMetadata 

299 from qsmile.data.vols import SmileData 

300 

301 assert self.forward is not None # noqa: S101 

302 assert self.discount_factor is not None # noqa: S101 

303 

304 n = len(self.strikes) 

305 

306 # Invert call and put prices to implied vols (NaN on failure) 

307 call_bid_iv = np.full(n, np.nan) 

308 call_ask_iv = np.full(n, np.nan) 

309 put_bid_iv = np.full(n, np.nan) 

310 put_ask_iv = np.full(n, np.nan) 

311 

312 for i in range(n): 

313 k = float(self.strikes[i]) 

314 with contextlib.suppress(ValueError): 

315 call_bid_iv[i] = black76_implied_vol( 

316 float(self.call_bid[i]), self.forward, k, self.discount_factor, self.expiry, is_call=True 

317 ) 

318 with contextlib.suppress(ValueError): 

319 call_ask_iv[i] = black76_implied_vol( 

320 float(self.call_ask[i]), self.forward, k, self.discount_factor, self.expiry, is_call=True 

321 ) 

322 with contextlib.suppress(ValueError): 

323 put_bid_iv[i] = black76_implied_vol( 

324 float(self.put_bid[i]), self.forward, k, self.discount_factor, self.expiry, is_call=False 

325 ) 

326 with contextlib.suppress(ValueError): 

327 put_ask_iv[i] = black76_implied_vol( 

328 float(self.put_ask[i]), self.forward, k, self.discount_factor, self.expiry, is_call=False 

329 ) 

330 

331 # Blend using delta weights 

332 blended_bid, blended_ask = delta_blend_ivols( 

333 call_bid_iv, 

334 call_ask_iv, 

335 put_bid_iv, 

336 put_ask_iv, 

337 self.strikes, 

338 self.forward, 

339 self.expiry, 

340 ) 

341 

342 # Exclude strikes where neither vol is available 

343 valid = ~np.isnan(blended_bid) & ~np.isnan(blended_ask) 

344 strikes_out = self.strikes[valid] 

345 bid_out = blended_bid[valid] 

346 ask_out = blended_ask[valid] 

347 

348 # Derive sigma_atm from blended mid at ATM strike 

349 mid_out = (bid_out + ask_out) / 2.0 

350 atm_idx = int(np.argmin(np.abs(strikes_out - self.forward))) 

351 sigma_atm = float(mid_out[atm_idx]) 

352 

353 return SmileData( 

354 x=strikes_out, 

355 y_bid=bid_out, 

356 y_ask=ask_out, 

357 x_coord=XCoord.FixedStrike, 

358 y_coord=YCoord.Volatility, 

359 metadata=SmileMetadata( 

360 forward=self.forward, 

361 discount_factor=self.discount_factor, 

362 expiry=self.expiry, 

363 sigma_atm=sigma_atm, 

364 ), 

365 ) 

366 

367 def denoise(self) -> OptionChain: 

368 """Return a cleaned copy with stale and implausible quotes removed. 

369 

370 Applies five filters in sequence: 

371 

372 1. **Zero-bid filter** -- removes strikes where either the call or put 

373 bid is zero (no genuine two-sided market). 

374 2. **Put-call parity monotonicity** -- C_mid - P_mid must be strictly 

375 decreasing in strike (since it equals D*(F - K)). Strikes that 

376 break monotonicity carry stale or mismarked quotes and are dropped. 

377 3. **Call- and put-mid monotonicity** -- call mids must be non-increasing 

378 and put mids non-decreasing in strike. Any remaining violations are 

379 removed. 

380 4. **Sub-intrinsic filter** -- removes strikes where call or put mid 

381 prices fall below their intrinsic value (using the calibrated 

382 forward), which indicates illiquid deep-ITM quotes. 

383 5. **Parity residual filter** -- removes strikes where the put-call 

384 parity residual |C_mid - P_mid - D*(F - K)| exceeds 3x the 

385 combined half bid-ask spread, indicating a stale or mispriced 

386 deep-ITM quote. 

387 

388 Returns: 

389 ------- 

390 OptionChain 

391 A new ``OptionChain`` with the noisy strikes removed and 

392 ``forward`` / ``discount_factor`` re-calibrated on the clean data. 

393 """ 

394 keep = np.ones(len(self.strikes), dtype=bool) 

395 

396 # 1. Both sides must quote a positive bid 

397 keep &= self.call_bid > 0 

398 keep &= self.put_bid > 0 

399 

400 # Work on the surviving subset for the monotonicity checks 

401 def _non_monotone_mask(values: NDArray[np.float64], decreasing: bool) -> NDArray[np.bool_]: 

402 """Return a mask (True = keep) that removes non-monotone points. 

403 

404 Iteratively drops the point at each violation until the sequence 

405 is monotone, working from the largest violation first. 

406 """ 

407 mask = np.ones(len(values), dtype=bool) 

408 while True: 

409 vals = values[mask] 

410 diff = np.diff(vals) 

411 bad = diff > 0 if decreasing else diff < 0 

412 if not np.any(bad): 

413 break 

414 # Find the worst violation in the filtered view 

415 magnitudes = np.abs(diff) * bad 

416 worst_idx = int(np.argmax(magnitudes)) 

417 # Map back to the full-array index and remove the point 

418 # (remove the second element of the pair that violates) 

419 full_indices = np.where(mask)[0] 

420 mask[full_indices[worst_idx + 1]] = False 

421 return mask 

422 

423 # 2. Put-call parity: C_mid - P_mid must be strictly decreasing 

424 parity = self.call_mid[keep] - self.put_mid[keep] 

425 parity_keep = _non_monotone_mask(parity, decreasing=True) 

426 keep_indices = np.where(keep)[0] 

427 keep[keep_indices[~parity_keep]] = False 

428 

429 # 3a. Call mids must be non-increasing in strike 

430 call_keep = _non_monotone_mask(self.call_mid[keep], decreasing=True) 

431 keep_indices = np.where(keep)[0] 

432 keep[keep_indices[~call_keep]] = False 

433 

434 # 3b. Put mids must be non-decreasing in strike 

435 put_keep = _non_monotone_mask(self.put_mid[keep], decreasing=False) 

436 keep_indices = np.where(keep)[0] 

437 keep[keep_indices[~put_keep]] = False 

438 

439 # 4. Sub-intrinsic filter: calibrate F on clean data, then remove 

440 # strikes where the mid price is below intrinsic value 

441 clean_strikes = self.strikes[keep] 

442 clean_c_mid = self.call_mid[keep] 

443 clean_p_mid = self.put_mid[keep] 

444 f_est, d_est = _calibrate_forward_df(clean_strikes, clean_c_mid, clean_p_mid) 

445 call_intrinsic = np.maximum(f_est - clean_strikes, 0.0) 

446 put_intrinsic = np.maximum(clean_strikes - f_est, 0.0) 

447 intrinsic_ok = (clean_c_mid >= call_intrinsic) & (clean_p_mid >= put_intrinsic) 

448 keep_indices = np.where(keep)[0] 

449 keep[keep_indices[~intrinsic_ok]] = False 

450 

451 # 5. Parity residual filter (iterative): |C-P - D*(F-K)| must be 

452 # within a small multiple of the combined half bid-ask spread. 

453 # Iteratively remove the worst outlier and recalibrate F/DF, 

454 # since a single stale deep-ITM quote can bias the calibration. 

455 while np.sum(keep) >= 3: 

456 ks = self.strikes[keep] 

457 cm = self.call_mid[keep] 

458 pm = self.put_mid[keep] 

459 f_est, d_est = _calibrate_forward_df(ks, cm, pm) 

460 parity_actual = cm - pm 

461 parity_predicted = d_est * (f_est - ks) 

462 half_spread = ( 

463 (self.call_ask[keep] - self.call_bid[keep]) + (self.put_ask[keep] - self.put_bid[keep]) 

464 ) / 2.0 

465 ratio = np.abs(parity_actual - parity_predicted) / np.maximum(half_spread, 1e-10) 

466 worst = int(np.argmax(ratio)) 

467 if ratio[worst] <= 3.0: 

468 break 

469 keep_indices = np.where(keep)[0] 

470 keep[keep_indices[worst]] = False 

471 

472 return OptionChain( 

473 strikes=self.strikes[keep], 

474 call_bid=self.call_bid[keep], 

475 call_ask=self.call_ask[keep], 

476 put_bid=self.put_bid[keep], 

477 put_ask=self.put_ask[keep], 

478 expiry=self.expiry, 

479 # Re-calibrate forward/DF on the clean data 

480 ) 

481 

482 def plot(self, *, title: str = "Option Chain Prices") -> matplotlib.figure.Figure: 

483 """Plot bid/ask prices as error bars for calls and puts.""" 

484 from qsmile.core.plot import _require_matplotlib, plot_bid_ask 

485 

486 _require_matplotlib() 

487 import matplotlib.pyplot as plt 

488 

489 fig, ax = plt.subplots() 

490 plot_bid_ask( 

491 self.strikes, 

492 self.call_mid, 

493 self.call_bid, 

494 self.call_ask, 

495 label="Calls", 

496 color="tab:blue", 

497 ax=ax, 

498 ) 

499 plot_bid_ask( 

500 self.strikes, 

501 self.put_mid, 

502 self.put_bid, 

503 self.put_ask, 

504 label="Puts", 

505 color="tab:red", 

506 ax=ax, 

507 ) 

508 ax.set_xlabel("Strike") 

509 ax.set_ylabel("Price") 

510 ax.set_title(title) 

511 ax.legend() 

512 return fig