Coverage for src / qsmile / data / prices.py: 98%
204 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 21:47 +0000
1"""Bid/ask option price chain with forward/DF calibration."""
3from __future__ import annotations
5import contextlib
6from dataclasses import dataclass, field
7from typing import TYPE_CHECKING
9import cvxpy as cp
10import numpy as np
11from numpy.typing import NDArray
12from scipy.stats import norm
14if TYPE_CHECKING:
15 import matplotlib.figure
17 from qsmile.data.vols import SmileData
20def _calibrate_forward_df(
21 strikes: NDArray[np.float64],
22 call_mid: NDArray[np.float64],
23 put_mid: NDArray[np.float64],
24) -> tuple[float, float]:
25 """Calibrate forward and discount factor from put-call parity.
27 Fits C_mid - P_mid = D * (F - K) using quasi-delta weighted least squares.
28 The weighting approximates |D(1-D)| by a Gaussian centred at ATM whose
29 width is inferred from the ATM call price via the Brenner-Subrahmanyam
30 approximation, giving a characteristic strike-space scale of F0*sigma0*sqrt(T).
32 Parameters
33 ----------
34 strikes : NDArray
35 Strike prices.
36 call_mid : NDArray
37 Mid call prices.
38 put_mid : NDArray
39 Mid put prices.
41 Returns:
42 -------
43 tuple[float, float]
44 (forward, discount_factor)
45 """
46 y = call_mid - put_mid # C - P = D*(F - K) = D*F - D*K
48 # Initial forward estimate: strike where |C-P| is smallest
49 atm_idx = int(np.argmin(np.abs(y)))
50 f0 = float(strikes[atm_idx])
52 # Quasi-delta weights: Gaussian width inferred from ATM call price
53 # Brenner-Subrahmanyam: C_ATM ~ F*sigma*sqrt(T) / sqrt(2*pi) (assuming D~1)
54 # => sigma0 ~ C_ATM * sqrt(2*pi) / (F0*sqrt(T)) -- but we don't know T here,
55 # so we use the strike-space width h = C_ATM * sqrt(2*pi) directly,
56 # which equals F0*sigma0*sqrt(T) (the natural delta scale).
57 c_atm = float(call_mid[atm_idx])
58 h = c_atm * np.sqrt(2 * np.pi)
59 # Guard against degenerate case (e.g. very deep ITM/OTM ATM estimate)
60 h = max(h, 1e-8 * f0)
61 weights = np.exp(-0.5 * ((strikes - f0) / h) ** 2)
62 W = np.diag(np.sqrt(weights))
64 # Variables: D*F and D (so the problem is linear)
65 df = cp.Variable(pos=True, name="D") # discount factor
66 df_times_f = cp.Variable(pos=True, name="DF") # D * F
68 # y_i = D*F - D*K_i for each strike
69 residuals = y - (df_times_f - df * strikes)
70 objective = cp.Minimize(cp.sum_squares(W @ residuals))
72 prob = cp.Problem(objective, [df <= 1.0])
73 prob.solve(solver=cp.CLARABEL)
75 if df.value is None or df_times_f.value is None:
76 msg = "put-call parity calibration failed to converge"
77 raise RuntimeError(msg)
79 D_val = float(df.value)
80 F_val = float(df_times_f.value) / D_val
82 return F_val, D_val
85def delta_blend_ivols(
86 call_bid_ivols: NDArray[np.float64],
87 call_ask_ivols: NDArray[np.float64],
88 put_bid_ivols: NDArray[np.float64],
89 put_ask_ivols: NDArray[np.float64],
90 strikes: NDArray[np.float64],
91 forward: float,
92 expiry: float,
93) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
94 """Blend call and put implied vols using Black76 undiscounted call-delta weights.
96 At each strike K, the blending weight is w(K) = Phi(d1) where
97 d1 = [ln(F/K) + 0.5 * sigma_C^2 * t] / (sigma_C * sqrt(t))
98 and sigma_C is the mid call-implied vol at that strike.
100 The blended vol is: sigma = w * sigma_C + (1 - w) * sigma_P.
101 Bid and ask are blended independently with the same weights.
103 NaN values in input vols indicate inversion failures. At such strikes
104 the blended vol falls back to the available option type. If neither
105 is available, the strike is excluded (NaN in output).
107 Parameters
108 ----------
109 call_bid_ivols : NDArray[np.float64]
110 Call-implied bid vols (NaN where inversion failed).
111 call_ask_ivols : NDArray[np.float64]
112 Call-implied ask vols (NaN where inversion failed).
113 put_bid_ivols : NDArray[np.float64]
114 Put-implied bid vols (NaN where inversion failed).
115 put_ask_ivols : NDArray[np.float64]
116 Put-implied ask vols (NaN where inversion failed).
117 strikes : NDArray[np.float64]
118 Strike prices.
119 forward : float
120 Forward price.
121 expiry : float
122 Time to expiry in years.
124 Returns:
125 -------
126 tuple[NDArray[np.float64], NDArray[np.float64]]
127 (blended_bid_ivols, blended_ask_ivols). Strikes where neither
128 call nor put vol is available will have NaN.
129 """
130 call_mid = (call_bid_ivols + call_ask_ivols) / 2.0
131 sqrt_t = np.sqrt(expiry)
133 # Compute delta weights from call mid vol
134 # Where call vol is NaN, use put mid vol for delta calc; if both NaN, weight = NaN
135 put_mid = (put_bid_ivols + put_ask_ivols) / 2.0
136 sigma_for_delta = np.where(np.isnan(call_mid), put_mid, call_mid)
137 safe_sigma = np.where(np.isnan(sigma_for_delta), 1.0, sigma_for_delta)
138 safe_sigma = np.where(safe_sigma <= 0, 1e-8, safe_sigma)
140 d1 = (np.log(forward / strikes) + 0.5 * safe_sigma**2 * expiry) / (safe_sigma * sqrt_t)
141 w = norm.cdf(d1)
143 # Mask NaN vols: if call is NaN, force weight to 0 (use put); if put is NaN, force to 1 (use call)
144 call_available = ~np.isnan(call_bid_ivols)
145 put_available = ~np.isnan(put_bid_ivols)
147 # Both missing → NaN
148 neither = ~call_available & ~put_available
149 w = np.where(call_available & ~put_available, 1.0, w)
150 w = np.where(~call_available & put_available, 0.0, w)
151 w = np.where(neither, np.nan, w)
153 # Replace NaN vols with 0 for arithmetic (masked by weight)
154 cb = np.where(np.isnan(call_bid_ivols), 0.0, call_bid_ivols)
155 ca = np.where(np.isnan(call_ask_ivols), 0.0, call_ask_ivols)
156 pb = np.where(np.isnan(put_bid_ivols), 0.0, put_bid_ivols)
157 pa = np.where(np.isnan(put_ask_ivols), 0.0, put_ask_ivols)
159 blended_bid = w * cb + (1.0 - w) * pb
160 blended_ask = w * ca + (1.0 - w) * pa
162 return blended_bid, blended_ask
165@dataclass
166class OptionChain:
167 """Bid/ask option price chain for a single expiry.
169 Parameters
170 ----------
171 strikes : NDArray[np.float64]
172 Strike prices. Must be positive.
173 call_bid : NDArray[np.float64]
174 Call bid prices. Must be non-negative.
175 call_ask : NDArray[np.float64]
176 Call ask prices. Must be >= call_bid.
177 put_bid : NDArray[np.float64]
178 Put bid prices. Must be non-negative.
179 put_ask : NDArray[np.float64]
180 Put ask prices. Must be >= put_bid.
181 expiry : float
182 Time to expiry in years. Must be positive.
183 forward : float | None
184 Forward price. Calibrated from put-call parity if not provided.
185 discount_factor : float | None
186 Discount factor. Calibrated from put-call parity if not provided.
187 """
189 strikes: NDArray[np.float64]
190 call_bid: NDArray[np.float64]
191 call_ask: NDArray[np.float64]
192 put_bid: NDArray[np.float64]
193 put_ask: NDArray[np.float64]
194 expiry: float
195 forward: float | None = field(default=None)
196 discount_factor: float | None = field(default=None)
198 def __post_init__(self) -> None:
199 """Validate and convert inputs, calibrate forward/DF if needed."""
200 self.strikes = np.asarray(self.strikes, dtype=np.float64)
201 self.call_bid = np.asarray(self.call_bid, dtype=np.float64)
202 self.call_ask = np.asarray(self.call_ask, dtype=np.float64)
203 self.put_bid = np.asarray(self.put_bid, dtype=np.float64)
204 self.put_ask = np.asarray(self.put_ask, dtype=np.float64)
206 n = len(self.strikes)
207 for name, arr in [
208 ("call_bid", self.call_bid),
209 ("call_ask", self.call_ask),
210 ("put_bid", self.put_bid),
211 ("put_ask", self.put_ask),
212 ]:
213 if len(arr) != n:
214 msg = f"{name} must have the same length as strikes ({n}), got {len(arr)}"
215 raise ValueError(msg)
217 if n < 3:
218 msg = f"at least 3 strikes required, got {n}"
219 raise ValueError(msg)
220 if np.any(self.strikes <= 0):
221 msg = "all strikes must be positive"
222 raise ValueError(msg)
223 if self.expiry <= 0:
224 msg = f"expiry must be positive, got {self.expiry}"
225 raise ValueError(msg)
227 for name, arr in [
228 ("call_bid", self.call_bid),
229 ("call_ask", self.call_ask),
230 ("put_bid", self.put_bid),
231 ("put_ask", self.put_ask),
232 ]:
233 if np.any(arr < 0):
234 msg = f"{name} must be non-negative"
235 raise ValueError(msg)
237 if np.any(self.call_bid > self.call_ask):
238 msg = "call_bid must not exceed call_ask"
239 raise ValueError(msg)
240 if np.any(self.put_bid > self.put_ask):
241 msg = "put_bid must not exceed put_ask"
242 raise ValueError(msg)
244 # Calibrate forward and discount factor if not provided
245 if self.forward is None or self.discount_factor is None:
246 f_cal, d_cal = _calibrate_forward_df(self.strikes, self.call_mid, self.put_mid)
247 if self.forward is None:
248 self.forward = f_cal
249 if self.discount_factor is None:
250 self.discount_factor = d_cal
252 @property
253 def call_mid(self) -> NDArray[np.float64]:
254 """Midpoint of call bid and ask prices."""
255 return (self.call_bid + self.call_ask) / 2.0
257 @property
258 def put_mid(self) -> NDArray[np.float64]:
259 """Midpoint of put bid and ask prices."""
260 return (self.put_bid + self.put_ask) / 2.0
262 def to_smile_data(self) -> SmileData:
263 """Convert to a SmileData with (FixedStrike, Price) coordinates.
265 Uses call mid-market prices as the Y-values.
266 """
267 from qsmile.core.coords import XCoord, YCoord
268 from qsmile.data.meta import SmileMetadata
269 from qsmile.data.vols import SmileData
271 assert self.forward is not None # noqa: S101
272 assert self.discount_factor is not None # noqa: S101
274 return SmileData(
275 x=self.strikes.copy(),
276 y_bid=self.call_bid.copy(),
277 y_ask=self.call_ask.copy(),
278 x_coord=XCoord.FixedStrike,
279 y_coord=YCoord.Price,
280 metadata=SmileMetadata(
281 forward=self.forward,
282 discount_factor=self.discount_factor,
283 expiry=self.expiry,
284 ),
285 )
287 def to_smile_data_blended(self) -> SmileData:
288 """Convert to a SmileData with (FixedStrike, Volatility) using delta-blended vols.
290 Inverts both call and put prices to implied vols at every strike, then
291 blends them using Black76 undiscounted call-delta weights. OTM options
292 dominate in each wing; ATM is approximately equal-weighted.
294 Strikes where neither call nor put vol can be computed are excluded.
295 """
296 from qsmile.core.black76 import black76_implied_vol
297 from qsmile.core.coords import XCoord, YCoord
298 from qsmile.data.meta import SmileMetadata
299 from qsmile.data.vols import SmileData
301 assert self.forward is not None # noqa: S101
302 assert self.discount_factor is not None # noqa: S101
304 n = len(self.strikes)
306 # Invert call and put prices to implied vols (NaN on failure)
307 call_bid_iv = np.full(n, np.nan)
308 call_ask_iv = np.full(n, np.nan)
309 put_bid_iv = np.full(n, np.nan)
310 put_ask_iv = np.full(n, np.nan)
312 for i in range(n):
313 k = float(self.strikes[i])
314 with contextlib.suppress(ValueError):
315 call_bid_iv[i] = black76_implied_vol(
316 float(self.call_bid[i]), self.forward, k, self.discount_factor, self.expiry, is_call=True
317 )
318 with contextlib.suppress(ValueError):
319 call_ask_iv[i] = black76_implied_vol(
320 float(self.call_ask[i]), self.forward, k, self.discount_factor, self.expiry, is_call=True
321 )
322 with contextlib.suppress(ValueError):
323 put_bid_iv[i] = black76_implied_vol(
324 float(self.put_bid[i]), self.forward, k, self.discount_factor, self.expiry, is_call=False
325 )
326 with contextlib.suppress(ValueError):
327 put_ask_iv[i] = black76_implied_vol(
328 float(self.put_ask[i]), self.forward, k, self.discount_factor, self.expiry, is_call=False
329 )
331 # Blend using delta weights
332 blended_bid, blended_ask = delta_blend_ivols(
333 call_bid_iv,
334 call_ask_iv,
335 put_bid_iv,
336 put_ask_iv,
337 self.strikes,
338 self.forward,
339 self.expiry,
340 )
342 # Exclude strikes where neither vol is available
343 valid = ~np.isnan(blended_bid) & ~np.isnan(blended_ask)
344 strikes_out = self.strikes[valid]
345 bid_out = blended_bid[valid]
346 ask_out = blended_ask[valid]
348 # Derive sigma_atm from blended mid at ATM strike
349 mid_out = (bid_out + ask_out) / 2.0
350 atm_idx = int(np.argmin(np.abs(strikes_out - self.forward)))
351 sigma_atm = float(mid_out[atm_idx])
353 return SmileData(
354 x=strikes_out,
355 y_bid=bid_out,
356 y_ask=ask_out,
357 x_coord=XCoord.FixedStrike,
358 y_coord=YCoord.Volatility,
359 metadata=SmileMetadata(
360 forward=self.forward,
361 discount_factor=self.discount_factor,
362 expiry=self.expiry,
363 sigma_atm=sigma_atm,
364 ),
365 )
367 def denoise(self) -> OptionChain:
368 """Return a cleaned copy with stale and implausible quotes removed.
370 Applies five filters in sequence:
372 1. **Zero-bid filter** -- removes strikes where either the call or put
373 bid is zero (no genuine two-sided market).
374 2. **Put-call parity monotonicity** -- C_mid - P_mid must be strictly
375 decreasing in strike (since it equals D*(F - K)). Strikes that
376 break monotonicity carry stale or mismarked quotes and are dropped.
377 3. **Call- and put-mid monotonicity** -- call mids must be non-increasing
378 and put mids non-decreasing in strike. Any remaining violations are
379 removed.
380 4. **Sub-intrinsic filter** -- removes strikes where call or put mid
381 prices fall below their intrinsic value (using the calibrated
382 forward), which indicates illiquid deep-ITM quotes.
383 5. **Parity residual filter** -- removes strikes where the put-call
384 parity residual |C_mid - P_mid - D*(F - K)| exceeds 3x the
385 combined half bid-ask spread, indicating a stale or mispriced
386 deep-ITM quote.
388 Returns:
389 -------
390 OptionChain
391 A new ``OptionChain`` with the noisy strikes removed and
392 ``forward`` / ``discount_factor`` re-calibrated on the clean data.
393 """
394 keep = np.ones(len(self.strikes), dtype=bool)
396 # 1. Both sides must quote a positive bid
397 keep &= self.call_bid > 0
398 keep &= self.put_bid > 0
400 # Work on the surviving subset for the monotonicity checks
401 def _non_monotone_mask(values: NDArray[np.float64], decreasing: bool) -> NDArray[np.bool_]:
402 """Return a mask (True = keep) that removes non-monotone points.
404 Iteratively drops the point at each violation until the sequence
405 is monotone, working from the largest violation first.
406 """
407 mask = np.ones(len(values), dtype=bool)
408 while True:
409 vals = values[mask]
410 diff = np.diff(vals)
411 bad = diff > 0 if decreasing else diff < 0
412 if not np.any(bad):
413 break
414 # Find the worst violation in the filtered view
415 magnitudes = np.abs(diff) * bad
416 worst_idx = int(np.argmax(magnitudes))
417 # Map back to the full-array index and remove the point
418 # (remove the second element of the pair that violates)
419 full_indices = np.where(mask)[0]
420 mask[full_indices[worst_idx + 1]] = False
421 return mask
423 # 2. Put-call parity: C_mid - P_mid must be strictly decreasing
424 parity = self.call_mid[keep] - self.put_mid[keep]
425 parity_keep = _non_monotone_mask(parity, decreasing=True)
426 keep_indices = np.where(keep)[0]
427 keep[keep_indices[~parity_keep]] = False
429 # 3a. Call mids must be non-increasing in strike
430 call_keep = _non_monotone_mask(self.call_mid[keep], decreasing=True)
431 keep_indices = np.where(keep)[0]
432 keep[keep_indices[~call_keep]] = False
434 # 3b. Put mids must be non-decreasing in strike
435 put_keep = _non_monotone_mask(self.put_mid[keep], decreasing=False)
436 keep_indices = np.where(keep)[0]
437 keep[keep_indices[~put_keep]] = False
439 # 4. Sub-intrinsic filter: calibrate F on clean data, then remove
440 # strikes where the mid price is below intrinsic value
441 clean_strikes = self.strikes[keep]
442 clean_c_mid = self.call_mid[keep]
443 clean_p_mid = self.put_mid[keep]
444 f_est, d_est = _calibrate_forward_df(clean_strikes, clean_c_mid, clean_p_mid)
445 call_intrinsic = np.maximum(f_est - clean_strikes, 0.0)
446 put_intrinsic = np.maximum(clean_strikes - f_est, 0.0)
447 intrinsic_ok = (clean_c_mid >= call_intrinsic) & (clean_p_mid >= put_intrinsic)
448 keep_indices = np.where(keep)[0]
449 keep[keep_indices[~intrinsic_ok]] = False
451 # 5. Parity residual filter (iterative): |C-P - D*(F-K)| must be
452 # within a small multiple of the combined half bid-ask spread.
453 # Iteratively remove the worst outlier and recalibrate F/DF,
454 # since a single stale deep-ITM quote can bias the calibration.
455 while np.sum(keep) >= 3:
456 ks = self.strikes[keep]
457 cm = self.call_mid[keep]
458 pm = self.put_mid[keep]
459 f_est, d_est = _calibrate_forward_df(ks, cm, pm)
460 parity_actual = cm - pm
461 parity_predicted = d_est * (f_est - ks)
462 half_spread = (
463 (self.call_ask[keep] - self.call_bid[keep]) + (self.put_ask[keep] - self.put_bid[keep])
464 ) / 2.0
465 ratio = np.abs(parity_actual - parity_predicted) / np.maximum(half_spread, 1e-10)
466 worst = int(np.argmax(ratio))
467 if ratio[worst] <= 3.0:
468 break
469 keep_indices = np.where(keep)[0]
470 keep[keep_indices[worst]] = False
472 return OptionChain(
473 strikes=self.strikes[keep],
474 call_bid=self.call_bid[keep],
475 call_ask=self.call_ask[keep],
476 put_bid=self.put_bid[keep],
477 put_ask=self.put_ask[keep],
478 expiry=self.expiry,
479 # Re-calibrate forward/DF on the clean data
480 )
482 def plot(self, *, title: str = "Option Chain Prices") -> matplotlib.figure.Figure:
483 """Plot bid/ask prices as error bars for calls and puts."""
484 from qsmile.core.plot import _require_matplotlib, plot_bid_ask
486 _require_matplotlib()
487 import matplotlib.pyplot as plt
489 fig, ax = plt.subplots()
490 plot_bid_ask(
491 self.strikes,
492 self.call_mid,
493 self.call_bid,
494 self.call_ask,
495 label="Calls",
496 color="tab:blue",
497 ax=ax,
498 )
499 plot_bid_ask(
500 self.strikes,
501 self.put_mid,
502 self.put_bid,
503 self.put_ask,
504 label="Puts",
505 color="tab:red",
506 ax=ax,
507 )
508 ax.set_xlabel("Strike")
509 ax.set_ylabel("Price")
510 ax.set_title(title)
511 ax.legend()
512 return fig