data = pd.read_csv(repo_root() / "ding_w_source" / "repl" / "nhanes_bmi.csv")
y = data["BMI"].to_numpy(dtype=float)
d = data["School_meal"].to_numpy(dtype=np.int32)
feature_names = [
"age",
"ChildSex",
"black",
"mexam",
"pir200_plus",
"WIC",
"Food_Stamp",
"fsdchbi",
"AnyIns",
"RefSex",
"RefAge",
]
x = data[feature_names].to_numpy(dtype=float)
x = (x - x.mean(axis=0)) / np.where(x.std(axis=0) > 1e-12, x.std(axis=0), 1.0)
naive = cm.OLS()
naive.fit(d.astype(float)[:, None], y)
adjusted = cm.OLS()
adjusted.fit(np.column_stack([d.astype(float), x]), y)
lin_design = np.column_stack([d.astype(float), x, d[:, None].astype(float) * x])
lin = cm.OLS()
lin.fit(lin_design, y)
ps_model = cm.Logit(alpha=1.0, max_iterations=300)
ps_model.fit(x, d)
ps_summary = ps_model.summary()
pscore = expit(ps_summary["intercept"] + x @ np.asarray(ps_summary["coef"]))
quantiles = np.quantile(pscore, [0.2, 0.4, 0.6, 0.8])
quintile = np.digitize(pscore, quantiles, right=True)
def stratified_ate(y, d, strata):
total = 0.0
for level in np.unique(strata):
idx = strata == level
if np.any(d[idx] == 1) and np.any(d[idx] == 0):
total += idx.mean() * (y[idx & (d == 1)].mean() - y[idx & (d == 0)].mean())
return float(total)
stratified = stratified_ate(y, d, quintile)
def stratified_neyman(y, d, strata):
estimate = 0.0
variance = 0.0
for level in np.unique(strata):
idx = strata == level
treated_k = idx & (d == 1)
control_k = idx & (d == 0)
if not (treated_k.any() and control_k.any()):
continue
weight = idx.mean()
tau_k = y[treated_k].mean() - y[control_k].mean()
var_k = y[treated_k].var(ddof=1) / treated_k.sum() + y[control_k].var(ddof=1) / control_k.sum()
estimate += weight * tau_k
variance += weight**2 * var_k
return estimate, np.sqrt(variance)
def strata_from_score(score, n_strata):
cutpoints = np.quantile(score, np.arange(1, n_strata) / n_strata)
return np.digitize(score, cutpoints, right=True)
stratification_rows = []
for n_strata in [5, 10, 20, 50, 80]:
est, se = stratified_neyman(y, d, strata_from_score(pscore, n_strata))
stratification_rows.append({"strata": n_strata, "estimate": est, "se": se})
def ipw_estimates(y, d, pscore, lower=0.0, upper=1.0):
p = np.clip(pscore, lower, upper)
ht = float(np.mean(d * y / p - (1 - d) * y / (1 - p)))
hajek = float(
np.mean(d * y / p) / np.mean(d / p)
- np.mean((1 - d) * y / (1 - p)) / np.mean((1 - d) / (1 - p))
)
return ht, hajek
ipw_rows = []
for label, bounds in {
"none": (1e-6, 1.0 - 1e-6),
"0.01-0.99": (0.01, 0.99),
"0.05-0.95": (0.05, 0.95),
"0.10-0.90": (0.10, 0.90),
}.items():
ht, hajek = ipw_estimates(y, d, pscore, *bounds)
ipw_rows.append({"truncation": label, "HT": ht, "Hajek": hajek})
treated = d == 1
control = ~treated
bal = cm.BalancingWeights(objective="entropy", autoscale=True, max_iterations=300, tolerance=1e-8, l2_norm=0.02)
bal.fit(x[control], x[treated])
weights = np.asarray(bal.summary()["weights"])
att_bal = float(y[treated].mean() - np.dot(weights, y[control]))
pd.DataFrame(
{
"estimate": [
float(naive.summary()["coef"][0]),
float(adjusted.summary()["coef"][0]),
float(lin.summary()["coef"][0]),
stratified,
att_bal,
]
},
index=[
"Naive OLS",
"Fisher OLS",
"Lin OLS",
"Propensity-stratified ATE",
"Entropy-balancing ATT",
],
)