diff --git a/diff_diff/efficient_did.py b/diff_diff/efficient_did.py index 5ae23f63..aa047a0a 100644 --- a/diff_diff/efficient_did.py +++ b/diff_diff/efficient_did.py @@ -1,13 +1,13 @@ """ Efficient Difference-in-Differences estimator. -Implements the semiparametrically efficient ATT estimator from -Chen, Sant'Anna & Xie (2025), Phase 1 (no covariates). +Implements the ATT estimator from Chen, Sant'Anna & Xie (2025). +Without covariates, achieves the semiparametric efficiency bound via +closed-form within-group covariances. With covariates, uses a doubly +robust path with OLS outcome regression, sieve propensity ratios, and +kernel-smoothed conditional Omega*(X) (see class docstring for caveats). -The estimator achieves the efficiency bound by optimally weighting -across pre-treatment periods and comparison groups via the inverse of -the within-group covariance matrix Omega*. Under the stronger PT-All -assumption the model is overidentified and EDiD exploits this for +Under PT-All the model is overidentified and EDiD exploits this for tighter inference; under PT-Post it reduces to the standard single-baseline estimator (Callaway-Sant'Anna). """ @@ -22,6 +22,15 @@ EDiDBootstrapResults, EfficientDiDBootstrapMixin, ) +from diff_diff.efficient_did_covariates import ( + compute_eif_cov, + compute_generated_outcomes_cov, + compute_omega_star_conditional, + compute_per_unit_weights, + estimate_inverse_propensity_sieve, + estimate_outcome_regression, + estimate_propensity_ratio_sieve, +) from diff_diff.efficient_did_results import EfficientDiDResults from diff_diff.efficient_did_weights import ( compute_efficient_weights, @@ -39,10 +48,18 @@ class EfficientDiD(EfficientDiDBootstrapMixin): """Efficient DiD estimator (Chen, Sant'Anna & Xie 2025). - Achieves the semiparametric efficiency bound for ATT(g,t) in - difference-in-differences settings with staggered treatment adoption. - Phase 1 supports the **no-covariates** path only — a closed-form - estimator using within-group sample means and covariances. + Without covariates, achieves the semiparametric efficiency bound for + ATT(g,t) using a closed-form estimator based on within-group sample + means and covariances. + + With covariates, uses a doubly robust path: sieve-based propensity + score ratios (Eq 4.1-4.2), OLS outcome regression, sieve-estimated + inverse propensities (algorithm step 4), and kernel-smoothed + conditional Omega*(X) with per-unit efficient weights (Eq 3.12). + The DR property ensures consistency if either the OLS outcome model + or the sieve propensity ratio is correctly specified. The OLS + working model for outcome regressions does not generically guarantee + the semiparametric efficiency bound (see REGISTRY.md). Parameters ---------- @@ -64,6 +81,16 @@ class EfficientDiD(EfficientDiDBootstrapMixin): anticipation : int, default 0 Number of anticipation periods (shifts the effective treatment boundary forward by this amount). + sieve_k_max : int or None + Maximum polynomial degree for sieve ratio estimation. None = auto + (``min(floor(n_gp^{1/5}), 5)``). Only used with covariates. + sieve_criterion : str, default ``"bic"`` + Information criterion for sieve degree selection: ``"aic"`` or ``"bic"``. + ratio_clip : float, default 20.0 + Clip sieve propensity ratios to ``[1/ratio_clip, ratio_clip]``. + kernel_bandwidth : float or None + Bandwidth for Gaussian kernel in conditional Omega* estimation. + None = Silverman's rule-of-thumb (automatic). Examples -------- @@ -83,6 +110,10 @@ def __init__( bootstrap_weights: str = "rademacher", seed: Optional[int] = None, anticipation: int = 0, + sieve_k_max: Optional[int] = None, + sieve_criterion: str = "bic", + ratio_clip: float = 20.0, + kernel_bandwidth: Optional[float] = None, ): if cluster is not None: raise NotImplementedError( @@ -96,6 +127,10 @@ def __init__( self.bootstrap_weights = bootstrap_weights self.seed = seed self.anticipation = anticipation + self.sieve_k_max = sieve_k_max + self.sieve_criterion = sieve_criterion + self.ratio_clip = ratio_clip + self.kernel_bandwidth = kernel_bandwidth self.is_fitted_ = False self.results_: Optional[EfficientDiDResults] = None self._validate_params() @@ -110,6 +145,24 @@ def _validate_params(self) -> None: f"bootstrap_weights must be one of {valid_weights}, " f"got '{self.bootstrap_weights}'" ) + if self.sieve_criterion not in ("aic", "bic"): + raise ValueError( + f"sieve_criterion must be 'aic' or 'bic', got '{self.sieve_criterion}'" + ) + if not (np.isfinite(self.ratio_clip) and self.ratio_clip > 1.0): + raise ValueError(f"ratio_clip must be finite and > 1.0, got {self.ratio_clip}") + if self.kernel_bandwidth is not None: + if not (np.isfinite(self.kernel_bandwidth) and self.kernel_bandwidth > 0): + raise ValueError( + f"kernel_bandwidth must be finite and > 0 (or None for auto), " + f"got {self.kernel_bandwidth}" + ) + if self.sieve_k_max is not None: + if not (isinstance(self.sieve_k_max, (int, np.integer)) and self.sieve_k_max > 0): + raise ValueError( + f"sieve_k_max must be a positive integer (or None for auto), " + f"got {self.sieve_k_max}" + ) # -- sklearn compatibility ------------------------------------------------ @@ -123,6 +176,10 @@ def get_params(self) -> Dict[str, Any]: "n_bootstrap": self.n_bootstrap, "bootstrap_weights": self.bootstrap_weights, "seed": self.seed, + "sieve_k_max": self.sieve_k_max, + "sieve_criterion": self.sieve_criterion, + "ratio_clip": self.ratio_clip, + "kernel_bandwidth": self.kernel_bandwidth, } def set_params(self, **params: Any) -> "EfficientDiD": @@ -164,7 +221,9 @@ def fit( Column indicating first treatment period. Use 0 or ``np.inf`` for never-treated units. covariates : list of str, optional - Not implemented in Phase 1. Raises ``NotImplementedError``. + Column names for time-invariant unit-level covariates. + When provided, uses the doubly robust path (outcome regression + + propensity score ratios). aggregate : str, optional ``None``, ``"simple"``, ``"event_study"``, ``"group"``, or ``"all"``. @@ -180,16 +239,13 @@ def fit( ValueError Missing columns, unbalanced panel, non-absorbing treatment, or PT-Post without a never-treated group. - NotImplementedError - If ``covariates`` is provided (Phase 2). """ self._validate_params() - if covariates is not None: - raise NotImplementedError( - "Covariates are not yet supported in EfficientDiD (Phase 1). " - "The with-covariates path will be added in Phase 2." - ) + # Normalize empty covariates list to None (use nocov path) + if covariates is not None and len(covariates) == 0: + covariates = None + use_covariates = covariates is not None # ----- Validate inputs ----- required_cols = [outcome, unit, time, first_treat] @@ -299,6 +355,46 @@ def fit( cohort_fractions[g] = float(np.sum(cohort_masks[g])) / n_units cohort_fractions[np.inf] = float(np.sum(never_treated_mask)) / n_units + # ----- Covariate preparation (if provided) ----- + covariate_matrix: Optional[np.ndarray] = None + m_hat_cache: Dict[Tuple, np.ndarray] = {} + r_hat_cache: Dict[Tuple[float, float], np.ndarray] = {} + s_hat_cache: Dict[float, np.ndarray] = {} # inverse propensities per group + + if use_covariates: + assert covariates is not None # for type narrowing + + # Validate covariate columns exist + missing_cov = [c for c in covariates if c not in data.columns] + if missing_cov: + raise ValueError(f"Missing covariate columns: {missing_cov}") + + # Validate no NaN/Inf in covariates + for col_name in covariates: + non_finite_cov = ~np.isfinite(pd.to_numeric(df[col_name], errors="coerce")) + if non_finite_cov.any(): + n_bad = int(non_finite_cov.sum()) + raise ValueError( + f"Found {n_bad} non-finite value(s) in covariate column " + f"'{col_name}'. Covariates must be finite." + ) + + # Validate time-invariance: covariates must be constant within each unit + for col_name in covariates: + cov_nunique = df.groupby(unit)[col_name].nunique() + varying = cov_nunique[cov_nunique > 1] + if len(varying) > 0: + uid = varying.index[0] + raise ValueError( + f"Covariate '{col_name}' varies over time for unit {uid}. " + "EfficientDiD requires time-invariant covariates. " + "Extract base-period values before calling fit()." + ) + + # Extract unit-level covariate matrix from period_1 observations + base_df = df[df[time] == period_1].set_index(unit).reindex(all_units) + covariate_matrix = base_df[list(covariates)].values.astype(float) + # ----- Core estimation: ATT(g, t) for each target ----- # Precompute per-group unit counts (avoid repeated np.sum in loop) n_treated_per_g = {g: int(np.sum(cohort_masks[g])) for g in treatment_groups} @@ -368,55 +464,158 @@ def fit( eif_by_gt[(g, t)] = np.zeros(n_units) continue - # Omega* matrix - omega = compute_omega_star_nocov( - target_g=g, - target_t=t, - valid_pairs=pairs, - outcome_wide=outcome_wide, - cohort_masks=cohort_masks, - never_treated_mask=never_treated_mask, - period_to_col=period_to_col, - period_1_col=effective_p1_col, - cohort_fractions=cohort_fractions, - ) + if use_covariates: + assert covariate_matrix is not None + t_col_val = period_to_col[t] + + # Lazily populate nuisance caches for this (g, t) + for gp, tpre in pairs: + tpre_col_val = period_to_col[tpre] + # m_{inf, t, tpre}(X) + key_inf_t = (np.inf, t_col_val, tpre_col_val) + if key_inf_t not in m_hat_cache: + m_hat_cache[key_inf_t] = estimate_outcome_regression( + outcome_wide, + covariate_matrix, + never_treated_mask, + t_col_val, + tpre_col_val, + ) + # m_{g', tpre, 1}(X) + key_gp_tpre = (gp, tpre_col_val, effective_p1_col) + if key_gp_tpre not in m_hat_cache: + gp_mask_for_reg = ( + never_treated_mask if np.isinf(gp) else cohort_masks[gp] + ) + m_hat_cache[key_gp_tpre] = estimate_outcome_regression( + outcome_wide, + covariate_matrix, + gp_mask_for_reg, + tpre_col_val, + effective_p1_col, + ) + # r_{g, inf}(X) and r_{g, g'}(X) via sieve (Eq 4.1-4.2) + for comp in {np.inf, gp}: + rkey = (g, comp) + if rkey not in r_hat_cache: + comp_mask = ( + never_treated_mask if np.isinf(comp) else cohort_masks[comp] + ) + r_hat_cache[rkey] = estimate_propensity_ratio_sieve( + covariate_matrix, + cohort_masks[g], + comp_mask, + k_max=self.sieve_k_max, + criterion=self.sieve_criterion, + ratio_clip=self.ratio_clip, + ) + + # Per-unit DR generated outcomes: shape (n_units, H) + gen_out = compute_generated_outcomes_cov( + target_g=g, + target_t=t, + valid_pairs=pairs, + outcome_wide=outcome_wide, + cohort_masks=cohort_masks, + never_treated_mask=never_treated_mask, + period_to_col=period_to_col, + period_1_col=effective_p1_col, + cohort_fractions=cohort_fractions, + m_hat_cache=m_hat_cache, + r_hat_cache=r_hat_cache, + ) - # Efficient weights (also returns condition number) - weights, _, cond_num = compute_efficient_weights(omega) - stored_weights[(g, t)] = weights - if omega.size > 0: - stored_cond[(g, t)] = cond_num + y_hat = np.mean(gen_out, axis=0) # shape (H,) + + # Inverse propensity estimation (algorithm step 4) + # s_hat_{g'}(X) = 1/p_{g'}(X) for Eq 3.12 scaling + for group_id in {g, np.inf} | {gp for gp, _ in pairs}: + if group_id not in s_hat_cache: + group_mask_s = ( + never_treated_mask if np.isinf(group_id) else cohort_masks[group_id] + ) + s_hat_cache[group_id] = estimate_inverse_propensity_sieve( + covariate_matrix, + group_mask_s, + k_max=self.sieve_k_max, + criterion=self.sieve_criterion, + ) + + # Conditional Omega*(X) with per-unit propensities (Eq 3.12) + omega_cond = compute_omega_star_conditional( + target_g=g, + target_t=t, + valid_pairs=pairs, + outcome_wide=outcome_wide, + cohort_masks=cohort_masks, + never_treated_mask=never_treated_mask, + period_to_col=period_to_col, + period_1_col=effective_p1_col, + cohort_fractions=cohort_fractions, + covariate_matrix=covariate_matrix, + s_hat_cache=s_hat_cache, + bandwidth=self.kernel_bandwidth, + ) - # Generated outcomes - y_hat = compute_generated_outcomes_nocov( - target_g=g, - target_t=t, - valid_pairs=pairs, - outcome_wide=outcome_wide, - cohort_masks=cohort_masks, - never_treated_mask=never_treated_mask, - period_to_col=period_to_col, - period_1_col=effective_p1_col, - ) + # Per-unit weights: (n_units, H) + per_unit_w = compute_per_unit_weights(omega_cond) + + # ATT = mean_i( w(X_i) @ gen_out[i] ) + if per_unit_w.shape[1] > 0: + att_gt = float(np.mean(np.sum(per_unit_w * gen_out, axis=1))) + else: + att_gt = np.nan + + # EIF with per-unit weights (Remark 4.2: plug-in valid) + # Center on scalar ATT, not per-pair means (ensures mean(EIF) ≈ 0) + eif_vals = compute_eif_cov(per_unit_w, gen_out, att_gt, n_units) + eif_by_gt[(g, t)] = eif_vals + else: + # No-covariates path (closed-form) + omega = compute_omega_star_nocov( + target_g=g, + target_t=t, + valid_pairs=pairs, + outcome_wide=outcome_wide, + cohort_masks=cohort_masks, + never_treated_mask=never_treated_mask, + period_to_col=period_to_col, + period_1_col=effective_p1_col, + cohort_fractions=cohort_fractions, + ) - # ATT(g,t) = w @ y_hat - att_gt = float(weights @ y_hat) if len(weights) > 0 else np.nan + weights, _, cond_num = compute_efficient_weights(omega) + stored_weights[(g, t)] = weights + if omega.size > 0: + stored_cond[(g, t)] = cond_num + + y_hat = compute_generated_outcomes_nocov( + target_g=g, + target_t=t, + valid_pairs=pairs, + outcome_wide=outcome_wide, + cohort_masks=cohort_masks, + never_treated_mask=never_treated_mask, + period_to_col=period_to_col, + period_1_col=effective_p1_col, + ) - # EIF - eif_vals = compute_eif_nocov( - target_g=g, - target_t=t, - weights=weights, - valid_pairs=pairs, - outcome_wide=outcome_wide, - cohort_masks=cohort_masks, - never_treated_mask=never_treated_mask, - period_to_col=period_to_col, - period_1_col=effective_p1_col, - cohort_fractions=cohort_fractions, - n_units=n_units, - ) - eif_by_gt[(g, t)] = eif_vals + att_gt = float(weights @ y_hat) if len(weights) > 0 else np.nan + + eif_vals = compute_eif_nocov( + target_g=g, + target_t=t, + weights=weights, + valid_pairs=pairs, + outcome_wide=outcome_wide, + cohort_masks=cohort_masks, + never_treated_mask=never_treated_mask, + period_to_col=period_to_col, + period_1_col=effective_p1_col, + cohort_fractions=cohort_fractions, + n_units=n_units, + ) + eif_by_gt[(g, t)] = eif_vals # Analytical SE = sqrt(mean(EIF^2) / n) [paper p.21] se_gt = float(np.sqrt(np.mean(eif_vals**2) / n_units)) @@ -557,6 +756,11 @@ def fit( omega_condition_numbers=stored_cond if stored_cond else None, influence_functions=None, # can store full EIF matrix if needed bootstrap_results=bootstrap_results, + estimation_path="dr" if use_covariates else "nocov", + sieve_k_max=self.sieve_k_max, + sieve_criterion=self.sieve_criterion, + ratio_clip=self.ratio_clip, + kernel_bandwidth=self.kernel_bandwidth, ) self.is_fitted_ = True return self.results_ diff --git a/diff_diff/efficient_did_covariates.py b/diff_diff/efficient_did_covariates.py new file mode 100644 index 00000000..9e3052c2 --- /dev/null +++ b/diff_diff/efficient_did_covariates.py @@ -0,0 +1,814 @@ +""" +Doubly robust math for the Efficient DiD estimator (with covariates). + +Implements the with-covariates path from Chen, Sant'Anna & Xie (2025): +OLS outcome regression (linear working model), sieve-based propensity +score ratios (Eq 4.1-4.2), sieve-based inverse propensities (step 4), +kernel-smoothed conditional Omega*(X) for per-unit efficient weights, +doubly robust generated outcomes (Eq 4.4), and the efficient influence +function for analytical standard errors. + +The DR property ensures consistency if either the OLS outcome model or +the sieve propensity ratio is correctly specified. The OLS working model +does not generically guarantee the semiparametric efficiency bound unless +the conditional mean is linear in covariates (see REGISTRY.md). + +All functions are pure (no state), operating on pre-pivoted numpy arrays. +""" + +import warnings +from itertools import combinations_with_replacement +from math import comb +from typing import Dict, List, Optional, Tuple + +import numpy as np +from scipy.spatial.distance import cdist + +from diff_diff.linalg import solve_ols + +# --------------------------------------------------------------------------- +# Outcome regression +# --------------------------------------------------------------------------- + + +def estimate_outcome_regression( + outcome_wide: np.ndarray, + covariate_matrix: np.ndarray, + group_mask: np.ndarray, + t_col: int, + tpre_col: int, +) -> np.ndarray: + """Estimate conditional mean outcome change m_hat(X) for a comparison group. + + Regresses ``(Y_t - Y_{tpre})`` on ``X`` within the units identified by + ``group_mask`` using OLS. Returns predicted values ``m_hat(X_i)`` for + **all** units (extrapolated from the within-group fit). + + This implements ``m_hat_{g',t,tpre}(X) = E[Y_t - Y_{tpre} | G=g', X]``. + + Parameters + ---------- + outcome_wide : ndarray, shape (n_units, n_periods) + Pivoted outcome matrix. + covariate_matrix : ndarray, shape (n_units, n_covariates) + Unit-level (time-invariant) covariates. + group_mask : ndarray of bool, shape (n_units,) + Mask selecting units in the comparison group. + t_col, tpre_col : int + Column indices in ``outcome_wide`` for the two time periods. + + Returns + ------- + m_hat : ndarray, shape (n_units,) + Predicted ``E[Y_t - Y_{tpre} | X]`` for every unit. + """ + Y_group = outcome_wide[group_mask] + delta_y = Y_group[:, t_col] - Y_group[:, tpre_col] + + X_group = covariate_matrix[group_mask] + X_design = np.column_stack([np.ones(len(X_group)), X_group]) + + coef, _, _ = solve_ols( + X_design, + delta_y, + return_vcov=False, + rank_deficient_action="warn", + ) + + X_all = np.column_stack([np.ones(len(covariate_matrix)), covariate_matrix]) + coef_safe = np.where(np.isfinite(coef), coef, 0.0) + m_hat = X_all @ coef_safe + + non_finite = ~np.isfinite(m_hat) + if non_finite.any(): + n_bad = int(non_finite.sum()) + warnings.warn( + f"Outcome regression produced {n_bad} non-finite prediction(s). " + "Setting to 0.0 (equivalent to no covariate adjustment).", + UserWarning, + stacklevel=2, + ) + m_hat[non_finite] = 0.0 + + return m_hat + + +# --------------------------------------------------------------------------- +# Sieve-based propensity ratio estimation (Eq 4.1-4.2) +# --------------------------------------------------------------------------- + + +def _polynomial_sieve_basis(X: np.ndarray, degree: int) -> np.ndarray: + """Build polynomial sieve basis up to total degree K. + + For d covariates and degree K, includes all monomials + ``X_1^{a_1} * ... * X_d^{a_d}`` where ``a_1 + ... + a_d <= K``, + including the intercept term (degree 0). + + Standardizes X to zero mean, unit variance for numerical stability. + + Parameters + ---------- + X : ndarray, shape (n, d) + Covariate matrix. + degree : int + Maximum total polynomial degree. + + Returns + ------- + basis : ndarray, shape (n, n_basis) + Sieve basis matrix. ``n_basis = C(K+d, d)``. + """ + n, d = X.shape + + # Standardize for numerical stability + X_mean = X.mean(axis=0) + X_std = X.std(axis=0) + X_std[X_std < 1e-10] = 1.0 # avoid division by zero for constant columns + X_s = (X - X_mean) / X_std + + # Build monomials: enumerate all (a_1, ..., a_d) with sum <= degree + columns = [np.ones(n)] # degree-0 (intercept) + for total_deg in range(1, degree + 1): + for exponents in combinations_with_replacement(range(d), total_deg): + col = np.ones(n) + for idx in exponents: + col = col * X_s[:, idx] + columns.append(col) + + return np.column_stack(columns) + + +def estimate_propensity_ratio_sieve( + covariate_matrix: np.ndarray, + mask_g: np.ndarray, + mask_gp: np.ndarray, + k_max: Optional[int] = None, + criterion: str = "bic", + ratio_clip: float = 20.0, +) -> np.ndarray: + r"""Estimate propensity ratio via sieve convex minimization (Eq 4.1-4.2). + + Solves for each sieve degree K = 1, ..., k_max: + + .. math:: + \hat\beta_K = \arg\min_{\beta} \frac{1}{n} + \sum_i \bigl[ G_{g',i} (\psi^K(X_i)'\beta)^2 + - 2 G_{g,i} (\psi^K(X_i)'\beta) \bigr] + + The FOC gives a closed-form linear system (no iterative optimization): + ``(Psi_{g'}' Psi_{g'}) beta = Psi_g.sum(axis=0)``. + + Selects K via AIC/BIC: ``IC(K) = 2*loss(K) + C_n*K/n``. + + On singular basis: tries lower K. Short-circuits r_{g,g}(X) = 1. + + Parameters + ---------- + covariate_matrix : ndarray, shape (n_units, n_covariates) + mask_g : ndarray of bool, shape (n_units,) + Target treatment group mask. + mask_gp : ndarray of bool, shape (n_units,) + Comparison group mask. + k_max : int or None + Maximum polynomial degree. None = ``min(floor(n_gp^{1/5}), 5)``. + criterion : str + ``"aic"`` or ``"bic"``. + ratio_clip : float + Clip ratios to ``[1/ratio_clip, ratio_clip]``. + + Returns + ------- + ratio : ndarray, shape (n_units,) + Estimated ``r_{g,g'}(X_i)`` for every unit. + """ + n_units = len(covariate_matrix) + n_gp = int(np.sum(mask_gp)) + + # Short-circuit: r_{g,g}(X) = 1 for same-cohort comparisons (PT-All) + if np.array_equal(mask_g, mask_gp): + return np.ones(n_units) + + d = covariate_matrix.shape[1] + + # Default k_max: use comparison group size, not total n + if k_max is None: + k_max = min(int(n_gp**0.2), 5) + k_max = max(k_max, 1) + + # Penalty multiplier for IC + n_total = int(np.sum(mask_g)) + n_gp + c_n = 2.0 if criterion == "aic" else np.log(max(n_total, 2)) + + best_ic = np.inf + best_ratio = np.ones(n_units) # fallback: constant ratio 1 + + for K in range(1, k_max + 1): + n_basis = comb(K + d, d) + + # Cap K so basis dimension < n_gp (avoid singular system) + if n_basis >= n_gp: + break + + basis_all = _polynomial_sieve_basis(covariate_matrix, K) + Psi_gp = basis_all[mask_gp] # (n_gp, n_basis) + Psi_g = basis_all[mask_g] # (n_g, n_basis) + + # Normal equations: (Psi_gp' Psi_gp) beta = Psi_g.sum(axis=0) + A = Psi_gp.T @ Psi_gp + b = Psi_g.sum(axis=0) + + try: + beta = np.linalg.solve(A, b) + except np.linalg.LinAlgError: + continue # singular — try next K + + # Check for NaN/Inf in solution + if not np.all(np.isfinite(beta)): + continue + + # Predicted ratio for all units + r_hat = basis_all @ beta + + # IC selection: loss at optimum = -(1/n) * b'beta + # Derivation: L(beta) = (1/n)(beta'A*beta - 2*b'beta). + # At optimum A*beta = b, so beta'A*beta = b'beta. + # Therefore L = (1/n)(b'beta - 2*b'beta) = -(1/n)*b'beta. + loss = -float(b @ beta) / n_total + ic_val = 2.0 * loss + c_n * n_basis / n_total + + if ic_val < best_ic: + best_ic = ic_val + best_ratio = r_hat.copy() + + # Warn if no sieve fit succeeded (falling back to constant ratio 1) + if best_ic == np.inf: + warnings.warn( + "Propensity ratio sieve estimation failed for all K values. " + "Falling back to constant ratio of 1 (no ratio adjustment). " + "The DR estimator relies on outcome regression only.", + UserWarning, + stacklevel=2, + ) + + # Overlap diagnostics: warn if ratios require significant clipping + n_extreme = int(np.sum((best_ratio < 1.0 / ratio_clip) | (best_ratio > ratio_clip))) + if n_extreme > 0: + pct = 100.0 * n_extreme / n_units + warnings.warn( + f"Sieve propensity ratios for {n_extreme} of {n_units} units " + f"({pct:.1f}%) were outside [{1.0/ratio_clip:.2f}, {ratio_clip:.1f}] " + f"and will be clipped. This may indicate overlap assumption " + f"violations (near-zero propensity scores for some covariate values).", + UserWarning, + stacklevel=2, + ) + + # Clip: population ratio p_g(X)/p_{g'}(X) is non-negative + best_ratio = np.clip(best_ratio, 1.0 / ratio_clip, ratio_clip) + + return best_ratio + + +# --------------------------------------------------------------------------- +# Sieve-based inverse propensity estimation (Algorithm step 4) +# --------------------------------------------------------------------------- + + +def estimate_inverse_propensity_sieve( + covariate_matrix: np.ndarray, + group_mask: np.ndarray, + k_max: Optional[int] = None, + criterion: str = "bic", +) -> np.ndarray: + r"""Estimate s_{g'}(X) = 1/p_{g'}(X) via sieve convex minimization. + + Solves for each sieve degree K: + + .. math:: + \hat\beta_K = \arg\min_\beta \frac{1}{n} + \sum_i \bigl[ G_{g',i} (\psi^K(X_i)'\beta)^2 + - 2 (\psi^K(X_i)'\beta) \bigr] + + FOC: ``(Psi_{g'}' Psi_{g'}) beta = Psi_all.sum(axis=0)`` + + This is the same structure as the ratio estimator but with all + units on the RHS (not just group g), following the paper's + algorithm step 4. + + Parameters + ---------- + covariate_matrix : ndarray, shape (n_units, n_covariates) + group_mask : ndarray of bool, shape (n_units,) + Mask for the group whose inverse propensity to estimate. + k_max : int or None + Maximum polynomial degree. None = auto. + criterion : str + ``"aic"`` or ``"bic"``. + + Returns + ------- + s_hat : ndarray, shape (n_units,) + Estimated ``1/p_{g'}(X_i)`` for every unit. Clipped to [1, n]. + """ + n_units = len(covariate_matrix) + n_group = int(np.sum(group_mask)) + d = covariate_matrix.shape[1] + + if n_group == 0: + return np.ones(n_units) + + if k_max is None: + k_max = min(int(n_group**0.2), 5) + k_max = max(k_max, 1) + + c_n = 2.0 if criterion == "aic" else np.log(max(n_units, 2)) + + best_ic = np.inf + best_s = np.full(n_units, n_units / n_group) # fallback: unconditional + + for K in range(1, k_max + 1): + n_basis = comb(K + d, d) + if n_basis >= n_group: + break + + basis_all = _polynomial_sieve_basis(covariate_matrix, K) + Psi_gp = basis_all[group_mask] + + A = Psi_gp.T @ Psi_gp + # RHS: sum of basis over ALL units (not just one group) + b = basis_all.sum(axis=0) + + try: + beta = np.linalg.solve(A, b) + except np.linalg.LinAlgError: + continue + if not np.all(np.isfinite(beta)): + continue + + s_hat = basis_all @ beta + + # IC: loss = -(1/n) * b'beta (same derivation as ratio estimator) + loss = -float(b @ beta) / n_units + ic_val = 2.0 * loss + c_n * n_basis / n_units + + if ic_val < best_ic: + best_ic = ic_val + best_s = s_hat.copy() + + # Warn if no sieve fit succeeded (falling back to unconditional) + if best_ic == np.inf: + warnings.warn( + "Inverse propensity sieve estimation failed for all K values. " + "Falling back to unconditional n/n_group scaling.", + UserWarning, + stacklevel=2, + ) + + # Overlap diagnostics: warn if s_hat values require clipping + n_clipped = int(np.sum((best_s < 1.0) | (best_s > float(n_units)))) + if n_clipped > 0: + pct = 100.0 * n_clipped / n_units + warnings.warn( + f"Inverse propensity estimates for {n_clipped} of {n_units} units " + f"({pct:.1f}%) were outside [1, {n_units}] and will be clipped. " + f"This may indicate overlap assumption violations.", + UserWarning, + stacklevel=2, + ) + + # s = 1/p must be >= 1 (since p <= 1) and bounded above + best_s = np.clip(best_s, 1.0, float(n_units)) + return best_s + + +# --------------------------------------------------------------------------- +# Doubly robust generated outcomes (Eq 4.4) +# --------------------------------------------------------------------------- + + +def compute_generated_outcomes_cov( + target_g: float, + target_t: float, + valid_pairs: List[Tuple[float, float]], + outcome_wide: np.ndarray, + cohort_masks: Dict[float, np.ndarray], + never_treated_mask: np.ndarray, + period_to_col: Dict[float, int], + period_1_col: int, + cohort_fractions: Dict[float, float], + m_hat_cache: Dict[Tuple, np.ndarray], + r_hat_cache: Dict[Tuple[float, float], np.ndarray], + never_treated_val: float = np.inf, +) -> np.ndarray: + """Compute per-unit doubly robust generated outcomes (Eq 4.4). + + For each valid pair ``(g', t_pre)`` and each unit ``i``, three terms:: + + Term 1 (treated): + (G_{g,i} / pi_g) * (Y_{i,t} - Y_{i,1} + - m_{inf,t,tpre}(X_i) - m_{g',tpre,1}(X_i)) + + Term 2 (never-treated): + -r_{g,inf}(X_i) * (G_{inf,i} / pi_g) + * (Y_{i,t} - Y_{i,tpre} - m_{inf,t,tpre}(X_i)) + + Term 3 (comparison cohort): + -r_{g,g'}(X_i) * (G_{g',i} / pi_g) + * (Y_{i,tpre} - Y_{i,1} - m_{g',tpre,1}(X_i)) + + Returns + ------- + gen_out : ndarray, shape (n_units, H) + Per-unit generated outcome for each valid pair. + """ + H = len(valid_pairs) + n_units = outcome_wide.shape[0] + if H == 0: + return np.empty((n_units, 0)) + + t_col = period_to_col[target_t] + y1_col = period_1_col + + g_mask = cohort_masks[target_g] + pi_g = cohort_fractions[target_g] + + gen_out = np.zeros((n_units, H)) + + for j, (gp, tpre) in enumerate(valid_pairs): + tpre_col = period_to_col[tpre] + + m_inf_t_tpre = m_hat_cache[(never_treated_val, t_col, tpre_col)] + m_gp_tpre_1 = m_hat_cache[(gp, tpre_col, y1_col)] + r_g_inf = r_hat_cache[(target_g, never_treated_val)] + r_g_gp = r_hat_cache[(target_g, gp)] + + # Term 1: treated units + if pi_g > 0: + Y_t_minus_Y1 = outcome_wide[g_mask, t_col] - outcome_wide[g_mask, y1_col] + residual_treated = Y_t_minus_Y1 - m_inf_t_tpre[g_mask] - m_gp_tpre_1[g_mask] + gen_out[g_mask, j] += (1.0 / pi_g) * residual_treated + + # Term 2: never-treated units + pi_inf = cohort_fractions.get(never_treated_val, 0.0) + if pi_inf > 0: + Y_t_minus_Ytpre = ( + outcome_wide[never_treated_mask, t_col] - outcome_wide[never_treated_mask, tpre_col] + ) + residual_inf = Y_t_minus_Ytpre - m_inf_t_tpre[never_treated_mask] + gen_out[never_treated_mask, j] -= ( + r_g_inf[never_treated_mask] * (1.0 / pi_g) * residual_inf + ) + + # Term 3: comparison cohort units + if np.isinf(gp): + gp_mask = never_treated_mask + else: + gp_mask = cohort_masks[gp] + pi_gp = cohort_fractions.get(gp, 0.0) + if pi_gp > 0: + Y_tpre_minus_Y1 = outcome_wide[gp_mask, tpre_col] - outcome_wide[gp_mask, y1_col] + residual_gp = Y_tpre_minus_Y1 - m_gp_tpre_1[gp_mask] + gen_out[gp_mask, j] -= r_g_gp[gp_mask] * (1.0 / pi_g) * residual_gp + + return gen_out + + +# --------------------------------------------------------------------------- +# Kernel-smoothed conditional Omega* (Eq 3.12) +# --------------------------------------------------------------------------- + + +def _silverman_bandwidth(X: np.ndarray) -> float: + """Silverman's rule-of-thumb bandwidth for d-dimensional X. + + ``h = (4 / (d + 2))^{1/(d+4)} * median_std * n^{-1/(d+4)}`` + """ + n, d = X.shape + stds = np.std(X, axis=0) + stds[stds < 1e-10] = 1.0 + median_std = float(np.median(stds)) + h = (4.0 / (d + 2)) ** (1.0 / (d + 4)) * median_std * n ** (-1.0 / (d + 4)) + return max(h, 1e-10) + + +def _kernel_weights_matrix( + X_all: np.ndarray, + X_group: np.ndarray, + bandwidth: float, +) -> np.ndarray: + """Gaussian kernel weight matrix. + + Returns shape ``(n_all, n_group)`` where entry ``[i, j]`` is the + normalized kernel weight ``K_h(X_group[j], X_all[i])``. + + Each row sums to 1 (Nadaraya-Watson normalization). + """ + # Squared distances: (n_all, n_group) + dist_sq = cdist(X_all, X_group, metric="sqeuclidean") + # Gaussian kernel + raw = np.exp(-dist_sq / (2.0 * bandwidth**2)) + # Normalize each row + row_sums = raw.sum(axis=1, keepdims=True) + row_sums[row_sums < 1e-15] = 1.0 # avoid division by zero + return raw / row_sums + + +def _kernel_weighted_cov( + A: np.ndarray, + B: np.ndarray, + W: np.ndarray, +) -> np.ndarray: + """Kernel-weighted local covariance. + + Parameters + ---------- + A : ndarray, shape (n_group,) + B : ndarray, shape (n_group,) + W : ndarray, shape (n_all, n_group) + Normalized kernel weights (rows sum to 1). + + Returns + ------- + cov : ndarray, shape (n_all,) + ``Cov_hat(A, B | X_i)`` for each target unit i. + """ + # Local means: (n_all,) + A_local = W @ A + B_local = W @ B + + # Centered products: (n_all, n_group) + A_centered = A[np.newaxis, :] - A_local[:, np.newaxis] # (n_all, n_group) + B_centered = B[np.newaxis, :] - B_local[:, np.newaxis] + + # Weighted local covariance: (n_all,) + cov = np.sum(W * A_centered * B_centered, axis=1) + return cov + + +def compute_omega_star_conditional( + target_g: float, + target_t: float, + valid_pairs: List[Tuple[float, float]], + outcome_wide: np.ndarray, + cohort_masks: Dict[float, np.ndarray], + never_treated_mask: np.ndarray, + period_to_col: Dict[float, int], + period_1_col: int, + cohort_fractions: Dict[float, float], + covariate_matrix: np.ndarray, + s_hat_cache: Dict[float, np.ndarray], + bandwidth: Optional[float] = None, + never_treated_val: float = np.inf, +) -> np.ndarray: + r"""Kernel-smoothed conditional Omega\*(X_i) for each unit (Eq 3.12). + + Estimates the five-term conditional covariance matrix using + Nadaraya-Watson kernel regression with Gaussian kernel and + local (kernel-weighted) means. Scales each term by per-unit + conditional inverse propensities ``s_hat_g(X_i) = 1/p_g(X_i)`` + (algorithm step 4), matching the paper's Eq 3.12. + + Parameters + ---------- + target_g, target_t : float + Target group-time. + valid_pairs : list of (g', t_pre) + outcome_wide : ndarray, shape (n_units, n_periods) + cohort_masks, never_treated_mask, period_to_col, period_1_col, + cohort_fractions : pre-computed data structures + covariate_matrix : ndarray, shape (n_units, n_covariates) + s_hat_cache : dict + Inverse propensity estimates ``{group: s_hat(X_i)}`` where each + value is shape ``(n_units,)``. Keyed by group identifier. + bandwidth : float or None + Kernel bandwidth. None = Silverman's rule. + never_treated_val : float + + Returns + ------- + omega : ndarray, shape (n_units, H, H) + Per-unit conditional covariance matrices. + """ + H = len(valid_pairs) + n_units = outcome_wide.shape[0] + if H == 0: + return np.empty((n_units, 0, 0)) + + if bandwidth is None: + bandwidth = _silverman_bandwidth(covariate_matrix) + + t_col = period_to_col[target_t] + y1_col = period_1_col + + g_mask = cohort_masks[target_g] + + Y_inf = outcome_wide[never_treated_mask] + X_inf = covariate_matrix[never_treated_mask] + + # Per-unit inverse propensities from sieve estimation (Eq 3.12) + s_g = s_hat_cache.get(target_g, np.full(n_units, 1.0 / max(cohort_fractions[target_g], 1e-10))) + s_inf = s_hat_cache.get( + never_treated_val, + np.full(n_units, 1.0 / max(cohort_fractions.get(never_treated_val, 1e-10), 1e-10)), + ) + + # Scalability warning + if n_units > 5000: + warnings.warn( + f"Conditional Omega* estimation with n={n_units} is expensive " + f"(O(n^2 * H^2)). Consider using fewer units.", + UserWarning, + stacklevel=2, + ) + + # Pre-compute kernel weight matrices per group + Y_g = outcome_wide[g_mask] + X_g = covariate_matrix[g_mask] + Yg_t_minus_1 = Y_g[:, t_col] - Y_g[:, y1_col] + + W_g = _kernel_weights_matrix(covariate_matrix, X_g, bandwidth) + W_inf = _kernel_weights_matrix(covariate_matrix, X_inf, bandwidth) + + inf_t_minus_tpre = {} + for _, tpre in valid_pairs: + tpre_col = period_to_col[tpre] + if tpre_col not in inf_t_minus_tpre: + inf_t_minus_tpre[tpre_col] = Y_inf[:, t_col] - Y_inf[:, tpre_col] + + W_gp_cache: Dict[float, np.ndarray] = {} + gp_outcomes_cache: Dict[float, np.ndarray] = {} + + omega = np.zeros((n_units, H, H)) + + # Term 1: s_g(X) * Cov(Y_t-Y_1, Y_t-Y_1 | G=g, X) — same for all (j,k) + term1 = s_g * _kernel_weighted_cov(Yg_t_minus_1, Yg_t_minus_1, W_g) + + for j in range(H): + gp_j, tpre_j = valid_pairs[j] + tpre_j_col = period_to_col[tpre_j] + + for k in range(j, H): + gp_k, tpre_k = valid_pairs[k] + tpre_k_col = period_to_col[tpre_k] + + val = term1.copy() + + # Term 2: s_inf(X) * Cov(Y_t-Y_{tpre_j}, Y_t-Y_{tpre_k} | G=inf, X) + val += s_inf * _kernel_weighted_cov( + inf_t_minus_tpre[tpre_j_col], + inf_t_minus_tpre[tpre_k_col], + W_inf, + ) + + # Term 3: -1{g==g'_j} * s_g(X) * Cov(Y_t-Y_1, Y_{tpre_j}-Y_1 | G=g, X) + if gp_j == target_g: + g_tpre_j = Y_g[:, tpre_j_col] - Y_g[:, y1_col] + val -= s_g * _kernel_weighted_cov(Yg_t_minus_1, g_tpre_j, W_g) + + # Term 4: -1{g==g'_k} * s_g(X) * Cov(Y_t-Y_1, Y_{tpre_k}-Y_1 | G=g, X) + if gp_k == target_g: + g_tpre_k = Y_g[:, tpre_k_col] - Y_g[:, y1_col] + val -= s_g * _kernel_weighted_cov(Yg_t_minus_1, g_tpre_k, W_g) + + # Term 5: 1{g'_j==g'_k} * s_{g'_j}(X) * Cov(...) + if gp_j == gp_k: + if np.isinf(gp_j): + inf_tpre_j = Y_inf[:, tpre_j_col] - Y_inf[:, y1_col] + inf_tpre_k = Y_inf[:, tpre_k_col] - Y_inf[:, y1_col] + val += s_inf * _kernel_weighted_cov(inf_tpre_j, inf_tpre_k, W_inf) + else: + s_gp_j = s_hat_cache.get( + gp_j, np.full(n_units, 1.0 / max(cohort_fractions.get(gp_j, 1e-10), 1e-10)) + ) + if gp_j not in W_gp_cache: + X_gp = covariate_matrix[cohort_masks[gp_j]] + W_gp_cache[gp_j] = _kernel_weights_matrix(covariate_matrix, X_gp, bandwidth) + gp_outcomes_cache[gp_j] = outcome_wide[cohort_masks[gp_j]] + W_gp = W_gp_cache[gp_j] + Y_gp = gp_outcomes_cache[gp_j] + gp_tpre_j = Y_gp[:, tpre_j_col] - Y_gp[:, y1_col] + gp_tpre_k = Y_gp[:, tpre_k_col] - Y_gp[:, y1_col] + val += s_gp_j * _kernel_weighted_cov(gp_tpre_j, gp_tpre_k, W_gp) + + omega[:, j, k] = val + if j != k: + omega[:, k, j] = val + + return omega + + +# --------------------------------------------------------------------------- +# Per-unit efficient weights from conditional Omega* +# --------------------------------------------------------------------------- + + +def compute_per_unit_weights( + omega_conditional: np.ndarray, + cond_threshold: float = 1e12, +) -> np.ndarray: + """Per-unit efficient weights from conditional Omega* inverse. + + ``w(X_i) = 1' Omega*(X_i)^{-1} / (1' Omega*(X_i)^{-1} 1)`` + + Falls back to pseudoinverse per unit if condition number exceeds threshold. + + Parameters + ---------- + omega_conditional : ndarray, shape (n_units, H, H) + Per-unit conditional covariance matrices. + cond_threshold : float + Condition number threshold for pseudoinverse fallback. + + Returns + ------- + weights : ndarray, shape (n_units, H) + Per-unit efficient combination weights (each row sums to 1). + """ + n_units, H, _ = omega_conditional.shape + if H == 0: + return np.empty((n_units, 0)) + if H == 1: + return np.ones((n_units, 1)) + + ones = np.ones(H) + weights = np.zeros((n_units, H)) + + for i in range(n_units): + omega_i = omega_conditional[i] + + if np.allclose(omega_i, 0.0): + weights[i] = ones / H + continue + + cond = float(np.linalg.cond(omega_i)) + if cond > cond_threshold: + omega_inv = np.linalg.pinv(omega_i) + else: + try: + omega_inv = np.linalg.inv(omega_i) + except np.linalg.LinAlgError: + omega_inv = np.linalg.pinv(omega_i) + + numerator = ones @ omega_inv + denominator = numerator @ ones + + if abs(denominator) < 1e-15: + weights[i] = ones / H + else: + weights[i] = numerator / denominator + + return weights + + +# --------------------------------------------------------------------------- +# EIF computation +# --------------------------------------------------------------------------- + + +def compute_eif_cov( + weights: np.ndarray, + generated_outcomes: np.ndarray, + att_gt: float, + n_units: int, +) -> np.ndarray: + """Per-unit efficient influence function from DR generated outcomes. + + Supports both global weights ``(H,)`` and per-unit weights ``(n_units, H)``. + + For global weights: ``EIF_i = w @ (gen_out_i - y_bar) = w @ gen_out_i - ATT`` + For per-unit weights: ``EIF_i = w(X_i) @ gen_out_i - ATT`` + + In both cases the EIF centers on the scalar ATT estimate, ensuring + ``mean(EIF) ≈ 0``. The plug-in EIF treats estimated per-unit weights + as fixed, valid under Neyman orthogonality (Remark 4.2). + + Parameters + ---------- + weights : ndarray, shape (H,) or (n_units, H) + Efficient combination weights. + generated_outcomes : ndarray, shape (n_units, H) + Per-unit generated outcomes. + att_gt : float + Scalar ATT estimate for this (g, t) cell. + n_units : int + Total number of units. + + Returns + ------- + eif : ndarray, shape (n_units,) + EIF value for every unit. Sample mean is approximately zero. + """ + if weights.size == 0: + return np.zeros(n_units) + + if weights.ndim == 1: + # Global weights: w @ gen_out_i for each unit + weighted_scores = generated_outcomes @ weights # (n_units,) + else: + # Per-unit weights: w_i @ gen_out_i for each unit + weighted_scores = np.sum(weights * generated_outcomes, axis=1) + + # Center on the scalar ATT estimate (ensures mean(EIF) ≈ 0) + eif = weighted_scores - att_gt + return eif diff --git a/diff_diff/efficient_did_results.py b/diff_diff/efficient_did_results.py index 1135f9b4..35393466 100644 --- a/diff_diff/efficient_did_results.py +++ b/diff_diff/efficient_did_results.py @@ -75,6 +75,16 @@ class EfficientDiDResults: Stored EIF matrix for bootstrap / manual SE computation. bootstrap_results : EDiDBootstrapResults, optional Bootstrap inference results. + estimation_path : str + ``"nocov"`` or ``"dr"`` — which estimation path was used. + sieve_k_max : int or None + Maximum polynomial degree for sieve ratio estimation. + sieve_criterion : str + Information criterion used (``"aic"`` or ``"bic"``). + ratio_clip : float + Clipping bound for sieve propensity ratios. + kernel_bandwidth : float or None + Bandwidth used for kernel-smoothed conditional Omega*. """ group_time_effects: Dict[Tuple[Any, Any], Dict[str, Any]] @@ -104,13 +114,19 @@ class EfficientDiDResults: ) influence_functions: Optional["np.ndarray"] = field(default=None, repr=False) bootstrap_results: Optional["EDiDBootstrapResults"] = field(default=None, repr=False) + estimation_path: str = "nocov" + sieve_k_max: Optional[int] = None + sieve_criterion: str = "bic" + ratio_clip: float = 20.0 + kernel_bandwidth: Optional[float] = None def __repr__(self) -> str: sig = _get_significance_stars(self.overall_p_value) + path = "DR" if self.estimation_path == "dr" else "nocov" return ( f"EfficientDiDResults(ATT={self.overall_att:.4f}{sig}, " f"SE={self.overall_se:.4f}, " - f"pt={self.pt_assumption}, " + f"pt={self.pt_assumption}, path={path}, " f"n_groups={len(self.groups)}, " f"n_periods={len(self.time_periods)})" ) @@ -131,6 +147,7 @@ def summary(self, alpha: Optional[float] = None) -> str: f"{'Treatment cohorts:':<30} {len(self.groups):>10}", f"{'Time periods:':<30} {len(self.time_periods):>10}", f"{'PT assumption:':<30} {self.pt_assumption:>10}", + f"{'Estimation path:':<30} {'doubly robust' if self.estimation_path == 'dr' else 'no covariates':>10}", ] if self.anticipation > 0: lines.append(f"{'Anticipation periods:':<30} {self.anticipation:>10}") diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 9ba4d511..289acc43 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -618,6 +618,8 @@ where `q_{g,e} = pi_g / sum_{g' in G_{trt,e}} pi_{g'}`. - **Single pre-treatment period (g=2)**: `V*_{gt}(X)` is 1x1, efficient weights are trivially 1, estimator collapses to standard DiD with single baseline - **Rank deficiency in `V*_{gt}(X)` or `Omega*_{gt}(X)`**: Inverse does not exist if outcome changes are linearly dependent conditional on covariates. Detect via matrix condition number; fall back to pseudoinverse or standard estimator - **Near-zero propensity scores**: Ratio `p_g(X)/p_{g'}(X)` explodes. Overlap assumption (O) rules this out in population; implement trimming or warn on finite-sample instability +- **Note:** When no sieve degree K succeeds for ratio estimation (basis dimension exceeds comparison group size, or all linear systems are singular), the estimator falls back to a constant ratio of 1 for all units with a UserWarning. The outcome regression adjustment remains active, so the generated outcomes (Eq 4.4) still incorporate covariate information via the m_hat terms. The DR property ensures consistency as long as the outcome regression is correctly specified. +- **Note:** When no sieve degree K succeeds for inverse propensity estimation (algorithm step 4), the estimator falls back to unconditional n/n_group scaling with a UserWarning, which reduces to the unconditional Omega* approximation for the affected group. - **All units eventually treated**: Last cohort serves as "never-treated" by dropping last time period (Phase 1: raises ValueError; last-cohort-as-control fallback planned for Phase 2) - **Negative weights**: Explicitly stated as harmless for bias and beneficial for precision; arise from efficiency optimization under overidentification (Section 5.2) - **PT-Post regime (just-identified)**: Under PT-Post, EDiD automatically reduces to standard single-baseline estimator (Corollary 3.2). No downside to using EDiD -- it subsumes standard estimators @@ -657,17 +659,21 @@ where `q_{g,e} = pi_g / sum_{g' in G_{trt,e}} pi_{g'}`. - [x] Implements two-step semiparametric estimator (Equation 4.3) - [x] Supports both PT-Post (just-identified) and PT-All (overidentified) regimes - [x] Computes efficient weights from conditional covariance matrix inverse -- [ ] Doubly robust: consistent if either outcome regression or propensity score ratio is correct +- [x] Doubly robust: consistent if either outcome regression or propensity score ratio is correct - [x] No-covariates case uses closed-form sample means/covariances (no tuning) -- [ ] With covariates: sieve-based propensity ratio estimation with AIC/BIC selection -- [ ] Kernel-smoothed conditional covariance estimation +- [x] With covariates: sieve-based propensity ratio estimation with AIC/BIC selection +- [x] Kernel-smoothed conditional covariance estimation - [x] Analytical SE from EIF sample variance - [ ] Cluster bootstrap SE option (recommended for small samples) - [x] Event-study aggregation ES(e) with cohort-size weights - [ ] Hausman-type pre-test for PT-All vs PT-Post (Theorem A.1) - [x] Each ATT(g,t) can be estimated independently (parallelizable) - [x] Absorbing treatment validation -- [ ] Overlap diagnostics for propensity score ratios +- [x] Overlap diagnostics for propensity score ratios +- **Note:** Sieve ratio estimation uses polynomial basis functions (total degree up to K) with AIC/BIC model selection. The paper describes sieve estimators generally without specifying a particular basis family; polynomial sieves are a standard choice (Section 4, Eq 4.2). Negative sieve ratio predictions are clipped to a small positive value since the population ratio p_g(X)/p_{g'}(X) is non-negative. +- **Note:** Kernel-smoothed conditional covariance Omega*(X) uses Gaussian kernel with Silverman's rule-of-thumb bandwidth by default. The paper specifies kernel smoothing (step 5, Section 4) without mandating a particular kernel or bandwidth selection method. +- **Note:** Conditional covariance Omega*(X) scales each term by per-unit sieve-estimated inverse propensities s_hat_{g'}(X) = 1/p_{g'}(X) (algorithm step 4), matching Eq 3.12. The inverse propensity estimation uses the same polynomial sieve convex minimization as the ratio estimator. Estimated s_hat values are clipped to [1, n] with a UserWarning when clipping binds, mirroring the ratio path's overlap diagnostics. +- **Note:** Outcome regressions m_hat_{g',t,tpre}(X) use linear OLS working models. The paper's Section 4 describes flexible nonparametric nuisance estimation (sieve regression, kernel smoothing, or ML methods). The DR property ensures consistency if either the OLS outcome model or the sieve propensity ratio is correctly specified, but the linear OLS specification does not generically guarantee attainment of the semiparametric efficiency bound unless the conditional mean is linear in the covariates. --- diff --git a/tests/helpers/edid_dgp.py b/tests/helpers/edid_dgp.py index 6fa6200c..b79959ed 100644 --- a/tests/helpers/edid_dgp.py +++ b/tests/helpers/edid_dgp.py @@ -13,11 +13,30 @@ N_PERIODS = 11 -def make_compustat_dgp(n_units=400, n_periods=N_PERIODS, rho=0.0, seed=42): +def make_compustat_dgp( + n_units=400, + n_periods=N_PERIODS, + rho=0.0, + seed=42, + add_covariates=False, + covariate_effect=0.5, + confounding_strength=0.0, +): """Simplified Compustat-style DGP from Section 5.2. Groups: G=5 (~1/3), G=8 (~1/3), G=inf (~1/3). ATT(5,t) = 0.154*(t-4), ATT(8,t) = 0.093*(t-7). + + Parameters + ---------- + add_covariates : bool + If True, adds ``x1`` (continuous) and ``x2`` (binary) columns. + covariate_effect : float + Coefficient of ``x1`` on outcome (explains residual variance). + confounding_strength : float + ``x1`` also shifts the group-mean outcome differentially by group, + creating confounding where parallel trends hold conditional on X + but not unconditionally. Set > 0 to test the DR path. """ rng = np.random.default_rng(seed) n_t = n_periods @@ -28,6 +47,18 @@ def make_compustat_dgp(n_units=400, n_periods=N_PERIODS, rho=0.0, seed=42): ft[:n_g5] = 5 ft[n_g5 : n_g5 + n_g8] = 8 + # Generate unit-level covariates (time-invariant) + # When confounding_strength > 0, shift x1 distribution by group so + # treated units have higher x1 on average (selection on observables). + if confounding_strength > 0: + x1 = np.empty(n_units) + x1[:n_g5] = rng.normal(1.0, 1.0, n_g5) # G=5: higher x1 + x1[n_g5 : n_g5 + n_g8] = rng.normal(0.5, 1.0, n_g8) # G=8: moderate + x1[n_g5 + n_g8 :] = rng.normal(0.0, 1.0, n_units - n_g5 - n_g8) # G=inf: baseline + else: + x1 = rng.normal(0, 1, n_units) + x2 = rng.binomial(1, 0.5, n_units) + units = np.repeat(np.arange(n_units), n_t) times = np.tile(np.arange(1, n_t + 1), n_units) ft_col = np.repeat(ft, n_t) @@ -57,9 +88,26 @@ def make_compustat_dgp(n_units=400, n_periods=N_PERIODS, rho=0.0, seed=42): y = unit_fe + time_fe + tau + eps_flat - return pd.DataFrame( - {"unit": units, "time": times, "first_treat": ft_col, "y": y} - ) + if add_covariates: + # Covariate effect on outcome (explains residual variance) + x1_expanded = np.repeat(x1, n_t) + y += covariate_effect * x1_expanded + + if confounding_strength > 0: + # Confounding: x1-dependent time trend applied to ALL units. + # Conditional on x1, all groups share the same trend → conditional PT holds. + # Unconditional PT fails because treated/control have different x1 distributions. + x1_expanded = np.repeat(x1, n_t) + time_trend = np.tile(np.arange(n_t), n_units) / n_t + y += confounding_strength * x1_expanded * time_trend + + result = pd.DataFrame({"unit": units, "time": times, "first_treat": ft_col, "y": y}) + + if add_covariates: + result["x1"] = np.repeat(x1, n_t) + result["x2"] = np.repeat(x2, n_t) + + return result def true_es_avg(): @@ -68,11 +116,7 @@ def true_es_avg(): all_e = range(0, max(max_e.values()) + 1) es_values = [] for e in all_e: - contributing = [ - coef * (e + 1) - for g, coef in ATT_COEFS.items() - if e <= max_e[g] - ] + contributing = [coef * (e + 1) for g, coef in ATT_COEFS.items() if e <= max_e[g]] if contributing: es_values.append(np.mean(contributing)) return np.mean(es_values) diff --git a/tests/test_efficient_did.py b/tests/test_efficient_did.py index 3b727348..e304f033 100644 --- a/tests/test_efficient_did.py +++ b/tests/test_efficient_did.py @@ -13,6 +13,7 @@ import numpy as np import pandas as pd import pytest +from edid_dgp import make_compustat_dgp from diff_diff import CallawaySantAnna, EDiD, EfficientDiD from diff_diff.efficient_did_results import EfficientDiDResults @@ -112,9 +113,6 @@ def _make_staggered_panel( ) -from edid_dgp import make_compustat_dgp - - def _make_compustat_dgp(n_units=400, n_periods=11, rho=0.0, seed=42): """Delegate to shared DGP in edid_dgp.py.""" return make_compustat_dgp(n_units=n_units, n_periods=n_periods, rho=rho, seed=seed) @@ -250,10 +248,10 @@ def test_absorbing_treatment_validation(self): with pytest.raises(ValueError, match="Non-absorbing"): EfficientDiD().fit(df, "y", "unit", "time", "first_treat") - def test_covariates_not_implemented(self): + def test_missing_covariate_column_raises(self): df = _make_simple_panel() - with pytest.raises(NotImplementedError, match="covariates"): - EfficientDiD().fit(df, "y", "unit", "time", "first_treat", covariates=["y"]) + with pytest.raises(ValueError, match="Missing covariate columns"): + EfficientDiD().fit(df, "y", "unit", "time", "first_treat", covariates=["nonexistent"]) def test_missing_columns(self): df = _make_simple_panel() @@ -1061,3 +1059,425 @@ def test_cohort_drop_warning(self): groups_present = {g for (g, t) in result.group_time_effects} assert 2.0 not in groups_present, "g=2 should have been dropped" assert 4.0 in groups_present, "g=4 should still be present" + + +# ============================================================================= +# Covariate Tests +# ============================================================================= + + +def _make_covariate_panel( + n_units=300, + n_periods=11, + seed=42, + covariate_effect=0.5, + confounding_strength=0.0, +): + """Helper: staggered panel with time-invariant covariates. + + Uses n_periods=11 (default) so both treatment groups g=5 and g=8 are valid. + """ + return make_compustat_dgp( + n_units=n_units, + n_periods=n_periods, + rho=0.0, + seed=seed, + add_covariates=True, + covariate_effect=covariate_effect, + confounding_strength=confounding_strength, + ) + + +class TestCovariatesBasic: + """Tier 1: basic covariate path correctness.""" + + def test_covariates_fit_produces_results(self): + """Smoke test: fit with covariates returns valid results.""" + df = _make_covariate_panel() + result = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=["x1", "x2"] + ) + assert isinstance(result, EfficientDiDResults) + assert result.estimation_path == "dr" + assert np.isfinite(result.overall_att) + assert result.overall_se > 0 + assert len(result.group_time_effects) > 0 + for (g, t), eff in result.group_time_effects.items(): + assert np.isfinite(eff["effect"]) + # Baseline cells (t == g-1 under PT-Post) have SE=0 by construction + if t >= g: + assert eff["se"] > 0, f"SE=0 for post-treatment cell ({g}, {t})" + + def test_nocov_match_when_irrelevant(self): + """Random noise covariates should give ~same ATT as nocov.""" + df = _make_covariate_panel(covariate_effect=0.0) + edid = EfficientDiD(pt_assumption="post") + r_nocov = edid.fit(df, "y", "unit", "time", "first_treat") + r_cov = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=["x1", "x2"] + ) + # ATT should be close (not identical due to nuisance estimation noise) + assert ( + abs(r_cov.overall_att - r_nocov.overall_att) < 0.3 + ), f"DR ATT {r_cov.overall_att:.4f} too far from nocov {r_nocov.overall_att:.4f}" + + def test_covariates_produce_valid_se(self): + """DR path with covariates explaining variance produces valid SE.""" + df = _make_covariate_panel(covariate_effect=2.0, n_units=600) + r_cov = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=["x1"] + ) + # DR SE should be positive and finite + assert r_cov.overall_se > 0 + assert np.isfinite(r_cov.overall_se) + # ATT should be close to the nocov estimate (no confounding) + r_nocov = EfficientDiD(pt_assumption="post").fit(df, "y", "unit", "time", "first_treat") + assert abs(r_cov.overall_att - r_nocov.overall_att) < 0.2 + + def test_covariates_recover_effect_under_confounding(self): + """DGP with confounding: DR should recover true ATT closer to truth than nocov. + + The DGP adds x1-dependent time trends to ALL units and shifts x1 + distribution by group, so unconditional PT fails but conditional PT holds. + True ATT is unchanged by confounding (only levels shift, not treatment). + """ + from edid_dgp import true_overall_att + + true_att = true_overall_att() + df = _make_covariate_panel( + n_units=900, + covariate_effect=1.0, + confounding_strength=2.0, + seed=123, + ) + r_nocov = EfficientDiD(pt_assumption="post").fit(df, "y", "unit", "time", "first_treat") + r_cov = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=["x1"] + ) + assert np.isfinite(r_nocov.overall_att) + assert np.isfinite(r_cov.overall_att) + # DR should be closer to the true ATT than nocov + bias_nocov = abs(r_nocov.overall_att - true_att) + bias_cov = abs(r_cov.overall_att - true_att) + assert ( + bias_cov < bias_nocov + ), f"DR bias ({bias_cov:.4f}) should be smaller than nocov bias ({bias_nocov:.4f})" + + def test_empty_covariates_uses_nocov(self): + """covariates=[] should normalize to nocov path.""" + df = _make_covariate_panel() + result = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=[] + ) + assert result.estimation_path == "nocov" + + +class TestCovariateValidation: + """Tier 1: input validation for covariates.""" + + def test_missing_covariate_column_raises(self): + df = _make_covariate_panel() + with pytest.raises(ValueError, match="Missing covariate columns"): + EfficientDiD().fit(df, "y", "unit", "time", "first_treat", covariates=["nonexistent"]) + + def test_nan_covariates_raises(self): + df = _make_covariate_panel() + df.loc[0, "x1"] = np.nan + with pytest.raises(ValueError, match="non-finite"): + EfficientDiD().fit(df, "y", "unit", "time", "first_treat", covariates=["x1"]) + + def test_ratio_clip_validation(self): + with pytest.raises(ValueError, match="ratio_clip"): + EfficientDiD(ratio_clip=0.5) + with pytest.raises(ValueError, match="ratio_clip"): + EfficientDiD(ratio_clip=1.0) + with pytest.raises(ValueError, match="ratio_clip"): + EfficientDiD(ratio_clip=np.nan) + with pytest.raises(ValueError, match="ratio_clip"): + EfficientDiD(ratio_clip=np.inf) + + def test_kernel_bandwidth_validation(self): + with pytest.raises(ValueError, match="kernel_bandwidth"): + EfficientDiD(kernel_bandwidth=0.0) + with pytest.raises(ValueError, match="kernel_bandwidth"): + EfficientDiD(kernel_bandwidth=-1.0) + with pytest.raises(ValueError, match="kernel_bandwidth"): + EfficientDiD(kernel_bandwidth=np.nan) + with pytest.raises(ValueError, match="kernel_bandwidth"): + EfficientDiD(kernel_bandwidth=np.inf) + # None is valid (auto bandwidth) + edid = EfficientDiD(kernel_bandwidth=None) + assert edid.kernel_bandwidth is None + + def test_sieve_k_max_validation(self): + with pytest.raises(ValueError, match="sieve_k_max"): + EfficientDiD(sieve_k_max=0) + with pytest.raises(ValueError, match="sieve_k_max"): + EfficientDiD(sieve_k_max=-1) + # None is valid (auto) + edid = EfficientDiD(sieve_k_max=None) + assert edid.sieve_k_max is None + + def test_sieve_criterion_validation(self): + with pytest.raises(ValueError, match="sieve_criterion"): + EfficientDiD(sieve_criterion="invalid") + + def test_new_params_in_get_params(self): + edid = EfficientDiD(sieve_k_max=3, sieve_criterion="aic", ratio_clip=10.0) + params = edid.get_params() + assert params["sieve_k_max"] == 3 + assert params["sieve_criterion"] == "aic" + assert params["ratio_clip"] == 10.0 + assert "kernel_bandwidth" in params + + def test_time_varying_covariates_raises(self): + df = _make_covariate_panel() + # Make x1 vary over time for one unit + mask = (df["unit"] == 0) & (df["time"] == 2) + df.loc[mask, "x1"] = 999.0 + with pytest.raises(ValueError, match="varies over time"): + EfficientDiD().fit(df, "y", "unit", "time", "first_treat", covariates=["x1"]) + + +class TestCovariatesPTAssumptions: + """Tier 2: covariates under different PT assumptions.""" + + def test_covariates_pt_post(self): + df = _make_covariate_panel() + result = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=["x1"] + ) + assert isinstance(result, EfficientDiDResults) + assert result.estimation_path == "dr" + assert np.isfinite(result.overall_att) + + def test_covariates_pt_all(self): + df = _make_covariate_panel() + result = EfficientDiD(pt_assumption="all").fit( + df, "y", "unit", "time", "first_treat", covariates=["x1"] + ) + assert isinstance(result, EfficientDiDResults) + assert result.estimation_path == "dr" + assert np.isfinite(result.overall_att) + + def test_covariates_aggregate_event_study(self): + df = _make_covariate_panel() + result = EfficientDiD(pt_assumption="post").fit( + df, + "y", + "unit", + "time", + "first_treat", + covariates=["x1"], + aggregate="event_study", + ) + assert result.event_study_effects is not None + assert len(result.event_study_effects) > 0 + for e, eff in result.event_study_effects.items(): + assert np.isfinite(eff["effect"]) + + def test_covariates_aggregate_group(self): + df = _make_covariate_panel() + result = EfficientDiD(pt_assumption="post").fit( + df, + "y", + "unit", + "time", + "first_treat", + covariates=["x1"], + aggregate="group", + ) + assert result.group_effects is not None + assert len(result.group_effects) > 0 + + def test_covariates_aggregate_all(self): + df = _make_covariate_panel() + result = EfficientDiD(pt_assumption="post").fit( + df, + "y", + "unit", + "time", + "first_treat", + covariates=["x1"], + aggregate="all", + ) + assert result.event_study_effects is not None + assert result.group_effects is not None + assert np.isfinite(result.overall_att) + + +class TestCovariatesEdgeCases: + """Tier 2: edge cases for covariate path.""" + + def test_single_covariate(self): + df = _make_covariate_panel() + result = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=["x1"] + ) + assert np.isfinite(result.overall_att) + + def test_binary_covariate(self): + df = _make_covariate_panel() + result = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=["x2"] + ) + assert np.isfinite(result.overall_att) + + def test_many_covariates(self): + """Multiple covariates including derived ones.""" + df = _make_covariate_panel() + # Create a unit-level covariate (must be time-invariant) + rng = np.random.default_rng(99) + units = df["unit"].unique() + x3_map = dict( + zip(units, df.groupby("unit")["x1"].first() * 0.5 + rng.normal(0, 0.1, len(units))) + ) + df["x3"] = df["unit"].map(x3_map) + result = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=["x1", "x2", "x3"] + ) + assert np.isfinite(result.overall_att) + + def test_sieve_ratio_produces_valid_results(self): + """Sieve ratio estimation produces finite ATT with valid ratios.""" + df = _make_covariate_panel(n_units=300, seed=88) + result = EfficientDiD(pt_assumption="post", sieve_k_max=3, sieve_criterion="bic").fit( + df, "y", "unit", "time", "first_treat", covariates=["x1"] + ) + assert np.isfinite(result.overall_att) + assert result.overall_se > 0 + + def test_shuffled_units_match_ordered(self): + """Shuffled unit ordering must produce same ATT as original ordering. + + Regression test for P0 label-alignment bug in estimate_propensity_ratio: + D labels must follow the row order of combined_mask, not assume + g-units come before g'-units. + """ + df_ordered = _make_covariate_panel(n_units=300, seed=55) + # Shuffle: randomize unit IDs so cohorts are interleaved + rng = np.random.default_rng(55) + df_shuffled = df_ordered.copy() + units = df_shuffled["unit"].unique() + perm = rng.permutation(len(units)) + unit_map = dict(zip(units, perm)) + df_shuffled["unit"] = df_shuffled["unit"].map(unit_map) + df_shuffled = df_shuffled.sort_values(["unit", "time"]).reset_index(drop=True) + + edid = EfficientDiD(pt_assumption="post") + r_ordered = edid.fit(df_ordered, "y", "unit", "time", "first_treat", covariates=["x1"]) + r_shuffled = EfficientDiD(pt_assumption="post").fit( + df_shuffled, "y", "unit", "time", "first_treat", covariates=["x1"] + ) + assert abs(r_ordered.overall_att - r_shuffled.overall_att) < 1e-10, ( + f"ATT mismatch: ordered={r_ordered.overall_att:.6f} " + f"vs shuffled={r_shuffled.overall_att:.6f}" + ) + + def test_extreme_covariates_warns_overlap(self): + """Extreme covariates should trigger overlap warning and still produce valid results.""" + df = _make_covariate_panel(n_units=300, seed=77) + rng = np.random.default_rng(77) + units = df["unit"].unique() + n_units = len(units) + ft_map = df.groupby("unit")["first_treat"].first() + sep_vals = np.where( + ft_map.values < np.inf, + 5.0 + rng.normal(0, 0.01, n_units), + -5.0 + rng.normal(0, 0.01, n_units), + ) + sep_map = dict(zip(units, sep_vals)) + df["x_sep"] = df["unit"].map(sep_map) + with pytest.warns(UserWarning, match="overlap|clipped|propensity"): + result = EfficientDiD(pt_assumption="post").fit( + df, "y", "unit", "time", "first_treat", covariates=["x_sep"] + ) + assert np.isfinite(result.overall_att) + assert result.overall_se > 0 + + def test_eif_mean_approximately_zero(self): + """EIF with per-unit weights should have sample mean ≈ 0.""" + from diff_diff.efficient_did_covariates import compute_eif_cov + + rng = np.random.default_rng(42) + n, H = 200, 3 + gen_out = rng.normal(0, 1, (n, H)) + # Non-constant per-unit weights (each row sums to 1) + raw_w = rng.exponential(1, (n, H)) + per_unit_w = raw_w / raw_w.sum(axis=1, keepdims=True) + att = float(np.mean(np.sum(per_unit_w * gen_out, axis=1))) + eif = compute_eif_cov(per_unit_w, gen_out, att, n) + assert abs(np.mean(eif)) < 1e-10, f"EIF mean should be ≈ 0, got {np.mean(eif):.2e}" + + +class TestCovariatesBootstrap: + """Tier 2: bootstrap with covariates.""" + + def test_bootstrap_with_covariates_smoke(self): + """Bootstrap with covariates produces valid inference.""" + df = _make_covariate_panel(n_units=300) + result = EfficientDiD(pt_assumption="post", n_bootstrap=99, seed=42).fit( + df, "y", "unit", "time", "first_treat", covariates=["x1"] + ) + assert result.bootstrap_results is not None + assert np.isfinite(result.overall_att) + assert result.overall_se > 0 + ci = result.overall_conf_int + assert ci[0] < ci[1], "CI lower must be less than upper" + assert np.isfinite(result.overall_p_value) + + def test_covariates_pt_all_bootstrap(self): + """PT-All + bootstrap + covariates end-to-end.""" + df = _make_covariate_panel(n_units=300) + result = EfficientDiD(pt_assumption="all", n_bootstrap=99, seed=42).fit( + df, + "y", + "unit", + "time", + "first_treat", + covariates=["x1"], + aggregate="all", + ) + assert result.bootstrap_results is not None + assert result.event_study_effects is not None + assert result.group_effects is not None + assert np.isfinite(result.overall_att) + assert result.overall_se > 0 + + +class TestSieveFallbacks: + """Tier 2: sieve estimation failure fallbacks.""" + + def test_ratio_sieve_fallback_tiny_group_warns(self): + """When comparison group is too small for any basis, fall back with warning.""" + from diff_diff.efficient_did_covariates import estimate_propensity_ratio_sieve + + rng = np.random.default_rng(42) + n = 100 + X = rng.normal(0, 1, (n, 3)) # 3 covariates + mask_g = np.zeros(n, dtype=bool) + mask_g[:50] = True + # Tiny comparison group: only 2 units (fewer than any basis dimension) + mask_gp = np.zeros(n, dtype=bool) + mask_gp[50:52] = True + with pytest.warns(UserWarning, match="Propensity ratio sieve estimation failed"): + ratio = estimate_propensity_ratio_sieve(X, mask_g, mask_gp, k_max=3) + assert np.all(np.isfinite(ratio)) + # Fallback: constant ratio of 1 (clipped to [1/ratio_clip, ratio_clip]) + assert np.allclose(ratio, 1.0) + + def test_inverse_propensity_sieve_fallback_warns(self): + """When group is too small for sieve, fall back with warning.""" + from diff_diff.efficient_did_covariates import estimate_inverse_propensity_sieve + + rng = np.random.default_rng(42) + n = 100 + X = rng.normal(0, 1, (n, 5)) # 5 covariates + # Tiny group: only 2 units + mask = np.zeros(n, dtype=bool) + mask[:2] = True + with pytest.warns(UserWarning, match="Inverse propensity sieve estimation failed"): + s_hat = estimate_inverse_propensity_sieve(X, mask, k_max=3) + assert np.all(np.isfinite(s_hat)) + # Should fall back to unconditional n/n_group = 100/2 = 50 + assert np.allclose(s_hat, 50.0)