Coverage for src / qsmile / data / prices.py: 97%
235 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 22:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 22:47 +0000
1"""Bid/ask option price chain with forward/DF calibration."""
3from __future__ import annotations
5import contextlib
6from dataclasses import dataclass, replace
7from typing import TYPE_CHECKING
9import cvxpy as cp
10import numpy as np
11from numpy.typing import NDArray
12from scipy.stats import norm
14from qsmile.data.meta import SmileMetadata
15from qsmile.data.strikes import StrikeArray
17if TYPE_CHECKING:
18 import matplotlib.figure
20 from qsmile.data.vols import VolData
23def _calibrate_forward_df(
24 strikes: NDArray[np.float64],
25 call_mid: NDArray[np.float64],
26 put_mid: NDArray[np.float64],
27) -> tuple[float, float]:
28 """Calibrate forward and discount factor from put-call parity.
30 Fits C_mid - P_mid = D * (F - K) using quasi-delta weighted least squares.
31 The weighting approximates |D(1-D)| by a Gaussian centred at ATM whose
32 width is inferred from the ATM call price via the Brenner-Subrahmanyam
33 approximation, giving a characteristic strike-space scale of F0*sigma0*sqrt(T).
35 Parameters
36 ----------
37 strikes : NDArray
38 Strike prices.
39 call_mid : NDArray
40 Mid call prices.
41 put_mid : NDArray
42 Mid put prices.
44 Returns:
45 -------
46 tuple[float, float]
47 (forward, discount_factor)
48 """
49 y = call_mid - put_mid # C - P = D*(F - K) = D*F - D*K
51 # Initial forward estimate: strike where |C-P| is smallest
52 atm_idx = int(np.argmin(np.abs(y)))
53 f0 = float(strikes[atm_idx])
55 # Quasi-delta weights: Gaussian width inferred from ATM call price
56 # Brenner-Subrahmanyam: C_ATM ~ F*sigma*sqrt(T) / sqrt(2*pi) (assuming D~1)
57 # => sigma0 ~ C_ATM * sqrt(2*pi) / (F0*sqrt(T)) -- but we don't know T here,
58 # so we use the strike-space width h = C_ATM * sqrt(2*pi) directly,
59 # which equals F0*sigma0*sqrt(T) (the natural delta scale).
60 c_atm = float(call_mid[atm_idx])
61 h = c_atm * np.sqrt(2 * np.pi)
62 # Guard against degenerate case (e.g. very deep ITM/OTM ATM estimate)
63 h = max(h, 1e-8 * f0)
64 weights = np.exp(-0.5 * ((strikes - f0) / h) ** 2)
65 W = np.diag(np.sqrt(weights))
67 # Variables: D*F and D (so the problem is linear)
68 df = cp.Variable(pos=True, name="D") # discount factor
69 df_times_f = cp.Variable(pos=True, name="DF") # D * F
71 # y_i = D*F - D*K_i for each strike
72 residuals = y - (df_times_f - df * strikes)
73 objective = cp.Minimize(cp.sum_squares(W @ residuals))
75 prob = cp.Problem(objective, [df <= 1.0])
76 prob.solve(solver=cp.CLARABEL)
78 if df.value is None or df_times_f.value is None:
79 msg = "put-call parity calibration failed to converge"
80 raise RuntimeError(msg)
82 D_val = float(df.value)
83 F_val = float(df_times_f.value) / D_val
85 return F_val, D_val
88def delta_blend_ivols(
89 call_bid_ivols: NDArray[np.float64],
90 call_ask_ivols: NDArray[np.float64],
91 put_bid_ivols: NDArray[np.float64],
92 put_ask_ivols: NDArray[np.float64],
93 strikes: NDArray[np.float64],
94 forward: float,
95 expiry: float,
96) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
97 """Blend call and put implied vols using Black76 undiscounted call-delta weights.
99 At each strike K, the blending weight is w(K) = Phi(d1) where
100 d1 = [ln(F/K) + 0.5 * sigma_C^2 * t] / (sigma_C * sqrt(t))
101 and sigma_C is the mid call-implied vol at that strike.
103 The blended vol is: sigma = w * sigma_C + (1 - w) * sigma_P.
104 Bid and ask are blended independently with the same weights.
106 NaN values in input vols indicate inversion failures. At such strikes
107 the blended vol falls back to the available option type. If neither
108 is available, the strike is excluded (NaN in output).
110 Parameters
111 ----------
112 call_bid_ivols : NDArray[np.float64]
113 Call-implied bid vols (NaN where inversion failed).
114 call_ask_ivols : NDArray[np.float64]
115 Call-implied ask vols (NaN where inversion failed).
116 put_bid_ivols : NDArray[np.float64]
117 Put-implied bid vols (NaN where inversion failed).
118 put_ask_ivols : NDArray[np.float64]
119 Put-implied ask vols (NaN where inversion failed).
120 strikes : NDArray[np.float64]
121 Strike prices.
122 forward : float
123 Forward price.
124 expiry : float
125 Time to expiry in years.
127 Returns:
128 -------
129 tuple[NDArray[np.float64], NDArray[np.float64]]
130 (blended_bid_ivols, blended_ask_ivols). Strikes where neither
131 call nor put vol is available will have NaN.
132 """
133 call_mid = (call_bid_ivols + call_ask_ivols) / 2.0
134 sqrt_t = np.sqrt(expiry)
136 # Compute delta weights from call mid vol
137 # Where call vol is NaN, use put mid vol for delta calc; if both NaN, weight = NaN
138 put_mid = (put_bid_ivols + put_ask_ivols) / 2.0
139 sigma_for_delta = np.where(np.isnan(call_mid), put_mid, call_mid)
140 safe_sigma = np.where(np.isnan(sigma_for_delta), 1.0, sigma_for_delta)
141 safe_sigma = np.where(safe_sigma <= 0, 1e-8, safe_sigma)
143 d1 = (np.log(forward / strikes) + 0.5 * safe_sigma**2 * expiry) / (safe_sigma * sqrt_t)
144 # w = 1 - Phi(d1): ≈ 0 at low strikes (OTM put region), ≈ 1 at high strikes (OTM call region)
145 # Used as the call weight so OTM options dominate in each wing.
146 w = 1.0 - norm.cdf(d1)
148 # Mask NaN vols: if call is NaN, force w=0 (use put); if put is NaN, force w=1 (use call)
149 call_available = ~np.isnan(call_bid_ivols)
150 put_available = ~np.isnan(put_bid_ivols)
152 # Both missing -> NaN
153 neither = ~call_available & ~put_available
154 w = np.where(call_available & ~put_available, 1.0, w)
155 w = np.where(~call_available & put_available, 0.0, w)
156 w = np.where(neither, np.nan, w)
158 # Replace NaN vols with 0 for arithmetic (masked by weight)
159 cb = np.where(np.isnan(call_bid_ivols), 0.0, call_bid_ivols)
160 ca = np.where(np.isnan(call_ask_ivols), 0.0, call_ask_ivols)
161 pb = np.where(np.isnan(put_bid_ivols), 0.0, put_bid_ivols)
162 pa = np.where(np.isnan(put_ask_ivols), 0.0, put_ask_ivols)
164 # w weights calls, (1-w) weights puts: OTM dominates in each wing
165 blended_bid = w * cb + (1.0 - w) * pb
166 blended_ask = w * ca + (1.0 - w) * pa
168 return blended_bid, blended_ask
171@dataclass(repr=False)
172class OptionChain:
173 """Bid/ask option price chain for a single expiry.
175 Parameters
176 ----------
177 strikedata : StrikeArray
178 Strike-indexed columnar data containing at least ``call_bid``,
179 ``call_ask``, ``put_bid``, and ``put_ask`` columns.
180 Optional ``volume`` and ``open_interest`` columns are supported.
181 metadata : SmileMetadata
182 Smile metadata. ``expiry`` must be provided.
183 ``forward`` and ``discount_factor`` are calibrated from
184 put-call parity if ``None``.
185 """
187 strikedata: StrikeArray
188 metadata: SmileMetadata
190 def __post_init__(self) -> None:
191 """Validate inputs and calibrate forward/DF if needed."""
192 sd = self.strikedata
193 strikes = sd.strikes
194 n = len(strikes)
196 if n < 3:
197 msg = f"at least 3 strikes required, got {n}"
198 raise ValueError(msg)
199 if np.any(strikes <= 0):
200 msg = "all strikes must be positive"
201 raise ValueError(msg)
203 for key in (("call", "bid"), ("call", "ask"), ("put", "bid"), ("put", "ask")):
204 arr = sd.get_values(key)
205 if arr is not None and np.any(arr < 0):
206 msg = f"{key[0]}_{key[1]} must be non-negative"
207 raise ValueError(msg)
209 call_bid = sd.get_values(("call", "bid"))
210 call_ask = sd.get_values(("call", "ask"))
211 put_bid = sd.get_values(("put", "bid"))
212 put_ask = sd.get_values(("put", "ask"))
214 if call_bid is not None and call_ask is not None and np.any(call_bid > call_ask):
215 msg = "call_bid must not exceed call_ask"
216 raise ValueError(msg)
217 if put_bid is not None and put_ask is not None and np.any(put_bid > put_ask):
218 msg = "put_bid must not exceed put_ask"
219 raise ValueError(msg)
221 for key in (("meta", "volume"), ("meta", "open_interest")):
222 arr = sd.get_values(key)
223 if arr is not None and np.any(arr < 0):
224 msg = f"{key[1]} must be non-negative"
225 raise ValueError(msg)
227 # Calibrate forward and discount factor if not provided
228 meta = self.metadata
229 if meta.forward is None or meta.discount_factor is None:
230 f_cal, d_cal = _calibrate_forward_df(strikes, self.call_mid, self.put_mid)
231 self.metadata = replace(
232 meta,
233 forward=meta.forward if meta.forward is not None else f_cal,
234 discount_factor=meta.discount_factor if meta.discount_factor is not None else d_cal,
235 )
237 def __repr__(self) -> str:
238 """Compact repr with date, expiry, forward, and discount factor."""
239 m = self.metadata
240 date = m.date.strftime("%Y-%m-%d")
241 expiry = m.expiry.strftime("%Y-%m-%d")
242 fwd = f"{m.forward:.2f}" if m.forward is not None else "None"
243 df = f"{m.discount_factor:.2f}" if m.discount_factor is not None else "None"
244 return f"OptionChain(date={date}, expiry={expiry}, forward={fwd}, discount_factor={df})"
246 # ── convenience accessors ─────────────────────────────────────
248 @property
249 def strikes(self) -> NDArray[np.float64]:
250 """Strike prices."""
251 return self.strikedata.strikes
253 @property
254 def call_bid(self) -> NDArray[np.float64]:
255 """Call bid prices."""
256 return self.strikedata.values(("call", "bid"))
258 @property
259 def call_ask(self) -> NDArray[np.float64]:
260 """Call ask prices."""
261 return self.strikedata.values(("call", "ask"))
263 @property
264 def put_bid(self) -> NDArray[np.float64]:
265 """Put bid prices."""
266 return self.strikedata.values(("put", "bid"))
268 @property
269 def put_ask(self) -> NDArray[np.float64]:
270 """Put ask prices."""
271 return self.strikedata.values(("put", "ask"))
273 @property
274 def volume(self) -> NDArray[np.float64] | None:
275 """Per-strike traded volume, or None."""
276 return self.strikedata.get_values(("meta", "volume"))
278 @property
279 def open_interest(self) -> NDArray[np.float64] | None:
280 """Per-strike open interest, or None."""
281 return self.strikedata.get_values(("meta", "open_interest"))
283 @property
284 def call_mid(self) -> NDArray[np.float64]:
285 """Midpoint of call bid and ask prices."""
286 return (self.call_bid + self.call_ask) / 2.0
288 @property
289 def put_mid(self) -> NDArray[np.float64]:
290 """Midpoint of put bid and ask prices."""
291 return (self.put_bid + self.put_ask) / 2.0
293 def to_vols(self) -> VolData:
294 """Convert to a VolData with (FixedStrike, Volatility) using delta-blended vols.
296 Inverts both call and put prices to implied vols at every strike, then
297 blends them using Black76 undiscounted call-delta weights. OTM options
298 dominate in each wing; ATM is approximately equal-weighted.
300 Strikes where neither call nor put vol can be computed are excluded.
301 """
302 from qsmile.core.black76 import black76_implied_vol
303 from qsmile.core.coords import XCoord, YCoord
304 from qsmile.data.vols import VolData
306 meta = self.metadata
307 if meta.forward is None or meta.discount_factor is None:
308 msg = "forward and discount_factor must be calibrated before to_vols()"
309 raise TypeError(msg)
311 n = len(self.strikes)
313 # Invert call and put prices to implied vols (NaN on failure)
314 call_bid_iv = np.full(n, np.nan)
315 call_ask_iv = np.full(n, np.nan)
316 put_bid_iv = np.full(n, np.nan)
317 put_ask_iv = np.full(n, np.nan)
319 for i in range(n):
320 k = float(self.strikes[i])
321 with contextlib.suppress(ValueError):
322 call_bid_iv[i] = black76_implied_vol(
323 float(self.call_bid[i]), meta.forward, k, meta.discount_factor, meta.texpiry, is_call=True
324 )
325 with contextlib.suppress(ValueError):
326 call_ask_iv[i] = black76_implied_vol(
327 float(self.call_ask[i]), meta.forward, k, meta.discount_factor, meta.texpiry, is_call=True
328 )
329 with contextlib.suppress(ValueError):
330 put_bid_iv[i] = black76_implied_vol(
331 float(self.put_bid[i]), meta.forward, k, meta.discount_factor, meta.texpiry, is_call=False
332 )
333 with contextlib.suppress(ValueError):
334 put_ask_iv[i] = black76_implied_vol(
335 float(self.put_ask[i]), meta.forward, k, meta.discount_factor, meta.texpiry, is_call=False
336 )
338 # Blend using delta weights
339 blended_bid, blended_ask = delta_blend_ivols(
340 call_bid_iv,
341 call_ask_iv,
342 put_bid_iv,
343 put_ask_iv,
344 self.strikes,
345 meta.forward,
346 meta.texpiry,
347 )
349 # Exclude strikes where neither vol is available
350 valid = ~np.isnan(blended_bid) & ~np.isnan(blended_ask)
351 strikes_out = self.strikes[valid]
352 bid_out = blended_bid[valid]
353 ask_out = blended_ask[valid]
355 # Derive sigma_atm from blended mid at ATM strike
356 mid_out = (bid_out + ask_out) / 2.0
357 atm_idx = int(np.argmin(np.abs(strikes_out - meta.forward)))
358 sigma_atm = float(mid_out[atm_idx])
360 import pandas as pd
362 sa = StrikeArray()
363 idx = pd.Index(strikes_out, dtype=np.float64)
364 sa.set(("y", "bid"), pd.Series(bid_out, index=idx))
365 sa.set(("y", "ask"), pd.Series(ask_out, index=idx))
366 if self.volume is not None:
367 sa.set(("meta", "volume"), pd.Series(self.volume[valid], index=idx))
368 if self.open_interest is not None:
369 sa.set(("meta", "open_interest"), pd.Series(self.open_interest[valid], index=idx))
371 return VolData(
372 strikearray=sa,
373 current_x_coord=XCoord.FixedStrike,
374 current_y_coord=YCoord.Volatility,
375 metadata=replace(meta, sigma_atm=sigma_atm),
376 )
378 def filter(self) -> OptionChain:
379 """Return a cleaned copy with stale and implausible quotes removed.
381 Applies five filters in sequence:
383 1. **Zero-bid filter** -- removes strikes where either the call or put
384 bid is zero (no genuine two-sided market).
385 2. **Put-call parity monotonicity** -- C_mid - P_mid must be strictly
386 decreasing in strike (since it equals D*(F - K)). Strikes that
387 break monotonicity carry stale or mismarked quotes and are dropped.
388 3. **Call- and put-mid monotonicity** -- call mids must be non-increasing
389 and put mids non-decreasing in strike. Any remaining violations are
390 removed.
391 4. **Sub-intrinsic filter** -- removes strikes where the call or put
392 **bid** falls below intrinsic value (using the calibrated forward),
393 which indicates illiquid deep-ITM or stale quotes.
394 5. **Parity residual filter** -- removes strikes where the put-call
395 parity residual |C_mid - P_mid - D*(F - K)| exceeds 3x the
396 combined half bid-ask spread, indicating a stale or mispriced
397 deep-ITM quote.
399 Returns:
400 -------
401 OptionChain
402 A new ``OptionChain`` with the noisy strikes removed and
403 ``forward`` / ``discount_factor`` re-calibrated on the clean data.
404 """
405 keep = np.ones(len(self.strikes), dtype=bool)
407 # 1. Both sides must quote a positive bid
408 keep &= self.call_bid > 0
409 keep &= self.put_bid > 0
411 # Work on the surviving subset for the monotonicity checks
412 def _non_monotone_mask(values: NDArray[np.float64], decreasing: bool) -> NDArray[np.bool_]:
413 """Return a mask (True = keep) that removes non-monotone points.
415 Iteratively drops the point at each violation until the sequence
416 is monotone, working from the largest violation first.
417 """
418 mask = np.ones(len(values), dtype=bool)
419 while True:
420 vals = values[mask]
421 diff = np.diff(vals)
422 bad = diff > 0 if decreasing else diff < 0
423 if not np.any(bad):
424 break
425 # Find the worst violation in the filtered view
426 magnitudes = np.abs(diff) * bad
427 worst_idx = int(np.argmax(magnitudes))
428 # Map back to the full-array index and remove the point
429 # (remove the second element of the pair that violates)
430 full_indices = np.where(mask)[0]
431 mask[full_indices[worst_idx + 1]] = False
432 return mask
434 # 2. Put-call parity: C_mid - P_mid must be strictly decreasing
435 parity = self.call_mid[keep] - self.put_mid[keep]
436 parity_keep = _non_monotone_mask(parity, decreasing=True)
437 keep_indices = np.where(keep)[0]
438 keep[keep_indices[~parity_keep]] = False
440 # 3a. Call mids must be non-increasing in strike
441 call_keep = _non_monotone_mask(self.call_mid[keep], decreasing=True)
442 keep_indices = np.where(keep)[0]
443 keep[keep_indices[~call_keep]] = False
445 # 3b. Put mids must be non-decreasing in strike
446 put_keep = _non_monotone_mask(self.put_mid[keep], decreasing=False)
447 keep_indices = np.where(keep)[0]
448 keep[keep_indices[~put_keep]] = False
450 # 4. Sub-intrinsic filter: calibrate F on clean data, then remove
451 # strikes where the bid price is below intrinsic value
452 clean_strikes = self.strikes[keep]
453 clean_c_mid = self.call_mid[keep]
454 clean_p_mid = self.put_mid[keep]
455 f_est, d_est = _calibrate_forward_df(clean_strikes, clean_c_mid, clean_p_mid)
456 call_intrinsic = d_est * np.maximum(f_est - clean_strikes, 0.0)
457 put_intrinsic = d_est * np.maximum(clean_strikes - f_est, 0.0)
458 intrinsic_ok = (self.call_bid[keep] >= call_intrinsic) & (self.put_bid[keep] >= put_intrinsic)
459 keep_indices = np.where(keep)[0]
460 keep[keep_indices[~intrinsic_ok]] = False
462 # 5. Parity residual filter (iterative): |C-P - D*(F-K)| must be
463 # within a small multiple of the combined half bid-ask spread.
464 # Iteratively remove the worst outlier and recalibrate F/DF,
465 # since a single stale deep-ITM quote can bias the calibration.
466 while np.sum(keep) >= 3:
467 ks = self.strikes[keep]
468 cm = self.call_mid[keep]
469 pm = self.put_mid[keep]
470 f_est, d_est = _calibrate_forward_df(ks, cm, pm)
471 parity_actual = cm - pm
472 parity_predicted = d_est * (f_est - ks)
473 half_spread = (
474 (self.call_ask[keep] - self.call_bid[keep]) + (self.put_ask[keep] - self.put_bid[keep])
475 ) / 2.0
476 ratio = np.abs(parity_actual - parity_predicted) / np.maximum(half_spread, 1e-10)
477 worst = int(np.argmax(ratio))
478 if ratio[worst] <= 3.0:
479 break
480 keep_indices = np.where(keep)[0]
481 keep[keep_indices[worst]] = False
483 filtered_sd = self.strikedata.filter(keep)
484 return OptionChain(
485 strikedata=filtered_sd,
486 metadata=SmileMetadata(
487 date=self.metadata.date, expiry=self.metadata.expiry, daycount=self.metadata.daycount
488 ),
489 )
491 def plot(self, *, title: str = "Option Chain Prices", ax=None, **kwargs) -> matplotlib.figure.Figure:
492 """Plot bid/ask prices as error bars for calls and puts."""
493 from qsmile.core.plot import _require_matplotlib, plot_bid_ask
495 _require_matplotlib()
496 import matplotlib.pyplot as plt
498 if ax is None:
499 _, ax = plt.subplots()
500 plot_bid_ask(
501 self.strikes,
502 self.call_mid,
503 self.call_bid,
504 self.call_ask,
505 label="Calls",
506 color="tab:blue",
507 fmt="none",
508 ax=ax,
509 **kwargs,
510 )
511 plot_bid_ask(
512 self.strikes,
513 self.put_mid,
514 self.put_bid,
515 self.put_ask,
516 label="Puts",
517 color="tab:red",
518 fmt="none",
519 ax=ax,
520 **kwargs,
521 )
522 ax.set_xlabel("Strike")
523 ax.set_ylabel("Price")
524 ax.set_title(title)
525 ax.legend()
526 fig = ax.get_figure()
527 if not isinstance(fig, plt.Figure):
528 msg = "Expected a matplotlib Figure from ax.get_figure()"
529 raise TypeError(msg)
530 return fig