Skip to content

Commit 547d518

Browse files
igerberclaude
andcommitted
Add research-grade DGP parameters to generate_survey_did_data()
Add 6 new optional parameters (icc, weight_cv, informative_sampling, heterogeneous_te_by_strata, strata_sizes, return_true_population_att) that enable coverage studies and tutorials demonstrating when survey- design methods outperform naive estimators. All default to current behavior for backward compatibility. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 53b0e6f commit 547d518

2 files changed

Lines changed: 389 additions & 14 deletions

File tree

diff_diff/prep_dgp.py

Lines changed: 215 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,6 +1129,36 @@ def generate_staggered_ddd_data(
11291129
return pd.DataFrame(records)
11301130

11311131

1132+
def _rank_pair_weights(
1133+
unit_weight: np.ndarray,
1134+
unit_stratum: np.ndarray,
1135+
y0: np.ndarray,
1136+
n_strata: int,
1137+
) -> None:
1138+
"""Rank-pair weights with Y(0) within each stratum (in-place).
1139+
1140+
High-outcome units receive lower weights, modeling informative sampling
1141+
where hard-to-reach (high-outcome) subpopulations are under-covered.
1142+
"""
1143+
for s in range(n_strata):
1144+
mask = unit_stratum == s
1145+
n_s = mask.sum()
1146+
if n_s <= 1:
1147+
continue
1148+
idx_s = np.where(mask)[0]
1149+
w_vals = unit_weight[idx_s].copy()
1150+
if w_vals.std() < 1e-10:
1151+
# No weight variation: create inverse-rank weights, mean=1
1152+
ranks = np.argsort(np.argsort(y0[idx_s]))
1153+
inv_rank = (n_s - ranks).astype(float)
1154+
unit_weight[idx_s] = inv_rank / inv_rank.mean()
1155+
else:
1156+
# Rank-pair: highest Y(0) gets lightest weight
1157+
y0_order = np.argsort(-y0[idx_s])
1158+
w_sorted = np.sort(w_vals)
1159+
unit_weight[idx_s[y0_order]] = w_sorted
1160+
1161+
11321162
def generate_survey_did_data(
11331163
n_units: int = 200,
11341164
n_periods: int = 8,
@@ -1149,6 +1179,13 @@ def generate_survey_did_data(
11491179
add_covariates: bool = False,
11501180
panel: bool = True,
11511181
seed: Optional[int] = None,
1182+
# --- Research-grade DGP parameters ---
1183+
icc: Optional[float] = None,
1184+
weight_cv: Optional[float] = None,
1185+
informative_sampling: bool = False,
1186+
heterogeneous_te_by_strata: bool = False,
1187+
strata_sizes: Optional[List[int]] = None,
1188+
return_true_population_att: bool = False,
11521189
) -> pd.DataFrame:
11531190
"""
11541191
Generate synthetic staggered DiD data with survey structure.
@@ -1215,13 +1252,46 @@ def generate_survey_did_data(
12151252
CallawaySantAnna(panel=False)).
12161253
seed : int, optional
12171254
Random seed for reproducibility.
1255+
icc : float, optional
1256+
Target intra-class correlation coefficient (0 < icc < 1). Overrides
1257+
``psu_re_sd`` via the variance decomposition:
1258+
``psu_re_sd = sqrt(icc * (unit_fe_sd^2 + noise_sd^2) /
1259+
((1 - icc) * (1 + psu_period_factor^2)))``.
1260+
Cannot be combined with a non-default ``psu_re_sd``.
1261+
weight_cv : float, optional
1262+
Target coefficient of variation for sampling weights. Generates
1263+
LogNormal weights normalized to mean 1, bypassing ``weight_variation``.
1264+
Cannot be combined with a non-default ``weight_variation``.
1265+
informative_sampling : bool, default=False
1266+
If True, sampling weights correlate with Y(0) — high-outcome units
1267+
receive lower weights (under-coverage). Uses rank-pairing within
1268+
each stratum. For panel data, ranking is done once from period-1
1269+
outcomes. For repeated cross-sections, ranking is refreshed each
1270+
period. When ``weight_variation="none"`` and no ``weight_cv``,
1271+
creates inverse-rank weights normalized to mean 1.
1272+
heterogeneous_te_by_strata : bool, default=False
1273+
If True, treatment effect varies by stratum:
1274+
``TE_h = TE * (1 + 0.5 * (h - mean) / std)``. Creates a gap
1275+
between unweighted and population ATT. With ``n_strata=1``,
1276+
all units receive the base ``treatment_effect``.
1277+
strata_sizes : list of int, optional
1278+
Custom per-stratum unit counts. Must have length ``n_strata`` and
1279+
sum to ``n_units``. Replaces equal allocation across strata.
1280+
return_true_population_att : bool, default=False
1281+
If True, attaches a diagnostic dict to ``df.attrs["dgp_truth"]``
1282+
with keys: ``population_att`` (weight-weighted average of treated
1283+
true effects), ``deff_kish`` (1 + CV(w)^2), ``stratum_effects``
1284+
(dict mapping stratum index to TE), ``icc_realized`` (between/total
1285+
variance ratio from generated data).
12181286
12191287
Returns
12201288
-------
12211289
pd.DataFrame
12221290
Columns: unit, period, outcome, first_treat, treated, true_effect,
12231291
stratum, psu, fpc, weight. Also rep_0..rep_K if
12241292
include_replicate_weights=True, and x1, x2 if add_covariates=True.
1293+
If ``return_true_population_att=True``, ``df.attrs["dgp_truth"]``
1294+
contains DGP diagnostics.
12251295
"""
12261296
rng = np.random.default_rng(seed)
12271297

@@ -1284,30 +1354,82 @@ def generate_survey_did_data(
12841354
f"weight_variation must be one of {valid_wv}, got {weight_variation!r}"
12851355
)
12861356

1357+
# --- Validate research-grade DGP parameters ---
1358+
if icc is not None:
1359+
if not (0 < icc < 1):
1360+
raise ValueError(f"icc must be between 0 and 1 (exclusive), got {icc}")
1361+
if psu_re_sd != 2.0:
1362+
raise ValueError(
1363+
"Cannot specify both icc and a non-default psu_re_sd. "
1364+
"icc overrides psu_re_sd via the ICC formula."
1365+
)
1366+
1367+
if weight_cv is not None:
1368+
if weight_cv <= 0:
1369+
raise ValueError(f"weight_cv must be positive, got {weight_cv}")
1370+
if weight_variation != "moderate":
1371+
raise ValueError(
1372+
"Cannot specify both weight_cv and a non-default "
1373+
"weight_variation. weight_cv overrides weight_variation."
1374+
)
1375+
1376+
if strata_sizes is not None:
1377+
strata_sizes = list(strata_sizes)
1378+
if len(strata_sizes) != n_strata:
1379+
raise ValueError(
1380+
f"strata_sizes must have length n_strata={n_strata}, "
1381+
f"got {len(strata_sizes)}"
1382+
)
1383+
if any(s < 1 for s in strata_sizes):
1384+
raise ValueError("All strata_sizes must be >= 1")
1385+
if sum(strata_sizes) != n_units:
1386+
raise ValueError(
1387+
f"strata_sizes must sum to n_units={n_units}, "
1388+
f"got {sum(strata_sizes)}"
1389+
)
1390+
1391+
# --- ICC -> psu_re_sd resolution ---
1392+
if icc is not None:
1393+
psu_re_sd = np.sqrt(
1394+
icc * (unit_fe_sd**2 + noise_sd**2)
1395+
/ ((1 - icc) * (1 + psu_period_factor**2))
1396+
)
1397+
12871398
# --- Survey structure: assign units to strata and PSUs ---
12881399
n_psu_total = n_strata * psu_per_stratum
1289-
units_per_stratum = n_units // n_strata
1290-
remainder = n_units % n_strata
1400+
1401+
if strata_sizes is not None:
1402+
stratum_n = strata_sizes
1403+
else:
1404+
units_per_stratum = n_units // n_strata
1405+
remainder = n_units % n_strata
1406+
stratum_n = [
1407+
units_per_stratum + (1 if s < remainder else 0)
1408+
for s in range(n_strata)
1409+
]
12911410

12921411
unit_stratum = np.empty(n_units, dtype=int)
12931412
unit_psu = np.empty(n_units, dtype=int)
12941413
idx = 0
12951414
for s in range(n_strata):
1296-
# Distribute remainder units across first strata
1297-
n_s = units_per_stratum + (1 if s < remainder else 0)
1415+
n_s = stratum_n[s]
12981416
unit_stratum[idx : idx + n_s] = s
1299-
1300-
# Assign PSUs within this stratum
13011417
psu_start = s * psu_per_stratum
13021418
for j in range(n_s):
13031419
unit_psu[idx + j] = psu_start + (j % psu_per_stratum)
13041420
idx += n_s
13051421

1306-
# Sampling weights: vary by stratum (inverse selection probability)
1307-
scale_map = {"none": 0.0, "moderate": 1.0, "high": 3.0}
1308-
scale = scale_map.get(weight_variation, 1.0)
1309-
denom = max(n_strata - 1, 1)
1310-
unit_weight = 1.0 + scale * (unit_stratum / denom)
1422+
# Sampling weights
1423+
if weight_cv is not None:
1424+
sigma_ln = np.sqrt(np.log(1 + weight_cv**2))
1425+
raw_w = rng.lognormal(-sigma_ln**2 / 2, sigma_ln, size=n_units)
1426+
unit_weight = raw_w / raw_w.mean()
1427+
else:
1428+
# Stratum-based weights (inverse selection probability)
1429+
scale_map = {"none": 0.0, "moderate": 1.0, "high": 3.0}
1430+
scale = scale_map.get(weight_variation, 1.0)
1431+
denom = max(n_strata - 1, 1)
1432+
unit_weight = 1.0 + scale * (unit_stratum / denom)
13111433

13121434
# --- Treatment assignment (cohort structure) ---
13131435
n_never = int(n_units * never_treated_frac)
@@ -1344,18 +1466,58 @@ def generate_survey_did_data(
13441466
0, psu_re_sd * psu_period_factor, size=(n_psu_total, n_periods)
13451467
)
13461468

1469+
# --- Informative sampling (panel path): pre-draw FEs, rank-pair weights ---
1470+
if informative_sampling and panel:
1471+
_panel_unit_fe = rng.normal(0, unit_fe_sd, size=n_units)
1472+
y0_period1 = (
1473+
_panel_unit_fe
1474+
+ psu_re[unit_psu]
1475+
+ psu_period_re[unit_psu, 0]
1476+
+ 0.5
1477+
)
1478+
_rank_pair_weights(unit_weight, unit_stratum, y0_period1, n_strata)
1479+
1480+
# Save base weights for cross-section informative sampling (reset each period)
1481+
if informative_sampling and not panel:
1482+
_base_weight = unit_weight.copy()
1483+
1484+
# --- Heterogeneous treatment effects by stratum ---
1485+
if heterogeneous_te_by_strata:
1486+
if n_strata == 1:
1487+
te_by_stratum = np.array([treatment_effect])
1488+
else:
1489+
strata_idx = np.arange(n_strata, dtype=float)
1490+
te_by_stratum = treatment_effect * (
1491+
1 + 0.5 * (strata_idx - strata_idx.mean()) / strata_idx.std()
1492+
)
1493+
else:
1494+
te_by_stratum = None
1495+
13471496
# --- Generate panel or repeated cross-sections ---
13481497
records = []
13491498
for t in range(1, n_periods + 1):
13501499
# For repeated cross-sections, draw fresh respondent effects each period
13511500
unit_fe = rng.normal(0, unit_fe_sd, size=n_units)
13521501
if panel and t > 1:
13531502
pass # reuse unit_fe from first period (set below)
1354-
if panel and t == 1:
1503+
if informative_sampling and panel:
1504+
unit_fe = _panel_unit_fe # use pre-drawn FEs
1505+
elif panel and t == 1:
13551506
_panel_unit_fe = unit_fe # save for reuse
1356-
if panel and t > 1:
1507+
elif panel and t > 1:
13571508
unit_fe = _panel_unit_fe # type: ignore[possibly-undefined]
13581509

1510+
# Cross-section informative sampling: re-rank weights each period
1511+
if informative_sampling and not panel:
1512+
unit_weight = _base_weight.copy() # type: ignore[possibly-undefined]
1513+
y0_t = (
1514+
unit_fe
1515+
+ psu_re[unit_psu]
1516+
+ psu_period_re[unit_psu, t - 1]
1517+
+ 0.5 * t
1518+
)
1519+
_rank_pair_weights(unit_weight, unit_stratum, y0_t, n_strata)
1520+
13591521
x1 = rng.normal(0, 1, size=n_units) if add_covariates else None
13601522
if panel and t > 1 and add_covariates:
13611523
x1 = _panel_x1 # type: ignore[possibly-undefined]
@@ -1379,7 +1541,10 @@ def generate_survey_did_data(
13791541
treated = int(g_i > 0 and t >= g_i)
13801542
true_eff = 0.0
13811543
if treated:
1382-
true_eff = treatment_effect
1544+
if te_by_stratum is not None:
1545+
true_eff = float(te_by_stratum[unit_stratum[i]])
1546+
else:
1547+
true_eff = treatment_effect
13831548
if dynamic_effects:
13841549
true_eff *= 1 + effect_growth * (t - g_i)
13851550
y += true_eff
@@ -1426,4 +1591,40 @@ def generate_survey_did_data(
14261591
w_r[w_r > 0] *= n_rep / (n_rep - 1)
14271592
df[f"rep_{r}"] = w_r
14281593

1594+
# --- DGP truth diagnostics ---
1595+
if return_true_population_att:
1596+
treated_mask = df["treated"] == 1
1597+
if treated_mask.any():
1598+
w_treated = df.loc[treated_mask, "weight"].values
1599+
te_treated = df.loc[treated_mask, "true_effect"].values
1600+
population_att = float(np.average(te_treated, weights=w_treated))
1601+
else:
1602+
population_att = 0.0
1603+
1604+
if te_by_stratum is not None:
1605+
stratum_effects = {
1606+
int(s): float(te_by_stratum[s]) for s in range(n_strata)
1607+
}
1608+
else:
1609+
stratum_effects = {
1610+
int(s): float(treatment_effect) for s in range(n_strata)
1611+
}
1612+
1613+
# Kish DEFF from weight variation
1614+
w_all = df.groupby("unit")["weight"].first().values
1615+
cv_w = float(w_all.std() / w_all.mean()) if w_all.mean() > 0 else 0.0
1616+
deff_kish = 1 + cv_w**2
1617+
1618+
# Realized ICC (between-PSU / total variance ratio)
1619+
psu_means = df.groupby("psu")["outcome"].mean()
1620+
total_var = df["outcome"].var()
1621+
icc_realized = float(psu_means.var() / total_var) if total_var > 0 else 0.0
1622+
1623+
df.attrs["dgp_truth"] = {
1624+
"population_att": population_att,
1625+
"deff_kish": float(deff_kish),
1626+
"stratum_effects": stratum_effects,
1627+
"icc_realized": icc_realized,
1628+
}
1629+
14291630
return df

0 commit comments

Comments
 (0)