@dataclass
class Estimate:
name: str
att: float
effect_path: np.ndarray
counterfactual: np.ndarray | None = None
pre_rmse: float | None = None
def split_panel(panel, treated_units):
treated_units = list(treated_units)
controls = [i for i in range(panel.shape[0]) if i not in set(treated_units)]
treated_mean = panel[treated_units].mean(axis=0)
control_mean = panel[controls].mean(axis=0)
return treated_mean, control_mean, controls
def treatment_matrix(panel, treated_units, t_pre):
w = np.zeros_like(panel, dtype=float)
w[np.ix_(list(treated_units), np.arange(t_pre, panel.shape[1]))] = 1.0
return w
def panel_counterfactual_mean(summary, treated_units):
return np.asarray(summary["counterfactual"])[list(treated_units)].mean(axis=0)
def panel_effect_path(summary, treated_units, t_pre):
return np.asarray(summary["treatment_effect"])[list(treated_units), t_pre:].mean(axis=0)
def ridge_solve(x, y, alpha=1.0, penalize_intercept=False):
x = np.asarray(x, dtype=float)
y = np.asarray(y, dtype=float)
penalty = alpha * np.eye(x.shape[1])
if not penalize_intercept:
penalty[0, 0] = 0.0
return np.linalg.solve(x.T @ x + penalty, x.T @ y)
def estimate_did(panel, treated_units, t_pre):
treated_mean, control_mean, _ = split_panel(panel, treated_units)
pre_gap = treated_mean[:t_pre].mean() - control_mean[:t_pre].mean()
effect = treated_mean[t_pre:] - control_mean[t_pre:] - pre_gap
return Estimate("DID", float(effect.mean()), effect, control_mean + pre_gap)
def estimate_twfe(panel, treated_units, t_pre):
n, tt = panel.shape
unit = np.repeat(np.arange(n, dtype=np.uint32), tt)
time = np.tile(np.arange(tt, dtype=np.uint32), n)
treated = np.zeros(n)
treated[treated_units] = 1.0
post = (np.arange(tt) >= t_pre).astype(float)
d = np.repeat(treated, tt) * np.tile(post, n)
fe = np.column_stack([unit, time])
model = cm.FixedEffectsOLS()
model.fit(d[:, None], fe, panel.reshape(-1))
att = float(np.asarray(model.summary()["coef"])[0])
# For a single adoption date in a balanced panel, TWFE and DID have the same
# coefficient. Use the DID post-period effect path for the time-series
# inference diagnostic rather than a degenerate constant coefficient path.
effect = estimate_did(panel, treated_units, t_pre).effect_path
return Estimate("TWFE", att, effect)
def estimate_horizontal_ridge(panel, treated_units, t_pre, alpha=0.25):
model = cm.HorizontalPanelRidge(penalty=alpha)
model.fit(panel, treatment_matrix(panel, treated_units, t_pre))
summary = model.summary()
return Estimate(
"Horizontal ridge",
float(summary["att"]),
panel_effect_path(summary, treated_units, t_pre),
panel_counterfactual_mean(summary, treated_units),
float(summary["pre_rmse"]),
)
def estimate_vertical_ridge(panel, treated_units, t_pre, alpha=1.0):
treated_mean, _, controls = split_panel(panel, treated_units)
x_controls = np.column_stack([np.ones(len(controls)), panel[controls, :t_pre]])
x_treated = np.r_[1.0, treated_mean[:t_pre]]
counter_post = []
for t in range(t_pre, panel.shape[1]):
beta = ridge_solve(x_controls, panel[controls, t], alpha=alpha)
counter_post.append(float(x_treated @ beta))
counter_post = np.asarray(counter_post)
effect = treated_mean[t_pre:] - counter_post
counterfactual = np.r_[treated_mean[:t_pre], counter_post]
return Estimate("Vertical ridge", float(effect.mean()), effect, counterfactual)
def estimate_sc(panel, treated_units, t_pre):
treated_mean, _, controls = split_panel(panel, treated_units)
donors_pre = panel[controls, :t_pre].T
donors_all = panel[controls].T
model = cm.SyntheticControl(max_iterations=800)
model.fit(donors_pre, treated_mean[:t_pre])
y0_hat = np.asarray(model.predict(donors_all))
effect = treated_mean[t_pre:] - y0_hat[t_pre:]
return Estimate("Synthetic control", float(effect.mean()), effect, y0_hat, float(model.summary()["pre_rmse"]))
def estimate_sdid(panel, treated_units, t_pre):
model = cm.SyntheticDID(zeta_omega=1e-8, zeta_lambda=1e-8, max_iterations=1200)
model.fit(panel, treatment_matrix(panel, treated_units, t_pre))
summary = model.summary()
y0_hat = panel_counterfactual_mean(summary, treated_units)
effect = panel_effect_path(summary, treated_units, t_pre)
return Estimate("Synthetic DID", float(summary["att"]), effect, y0_hat, float(summary["pre_rmse"]))
def estimate_mc(panel, treated_units, t_pre):
treated_mean, _, _ = split_panel(panel, treated_units)
model = cm.MatrixCompletion(lambda_fraction=0.035, max_iterations=350, tolerance=1e-7)
model.fit(panel, treatment_matrix(panel, treated_units, t_pre))
summary = model.summary()
y0_hat = panel_counterfactual_mean(summary, treated_units)
effect = panel_effect_path(summary, treated_units, t_pre)
pre_rmse = float(np.sqrt(np.mean((treated_mean[:t_pre] - y0_hat[:t_pre]) ** 2)))
return Estimate("Matrix completion", float(summary["att"]), effect, y0_hat, pre_rmse)
ESTIMATORS = [
("DID", estimate_did),
("TWFE", estimate_twfe),
("Horizontal ridge", estimate_horizontal_ridge),
("Vertical ridge", estimate_vertical_ridge),
("Synthetic control", estimate_sc),
("Synthetic DID", estimate_sdid),
("Matrix completion", estimate_mc),
]