import numpy as np
import scipy as sp
import pandas as pd
import warnings
from typing import Optional, Protocol, Callable, List, Dict, Type, Union, Any
from abc import ABC, abstractmethod
from functools import partial
from ..base import prediction_model, PredictionModel, prediction_likelihood_model, variance_model
from ..utils import *
[docs]
class linear_model(prediction_model, prediction_likelihood_model):
"""Ordinary least squares linear regression.
Inherits from:
- prediction_model: supervised regression base.
- prediction_likelihood_model: provides log-likelihood evaluation.
Attributes:
params (np.ndarray): Estimated regression coefficients.
"""
def __init__(self, x, y, *args, **kwargs):
"""Fit a linear regression model.
Args:
x (np.ndarray): Design matrix of shape (n_obs, n_features).
y (np.ndarray): Response vector.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
"""
super(linear_model, self).__init__(x, y)
self.params = self._fit(x, y, *args, **kwargs)
def _fit(self, x, y, *args, **kwargs):
"""Estimate regression coefficients via normal equations."""
return np.linalg.solve(x.T @ x, x.T @ y)
[docs]
def predict(self, newdata: np.ndarray) -> np.ndarray:
"""Predict responses for new data.
Args:
newdata (np.ndarray): Design matrix for prediction.
Returns:
np.ndarray: Predicted values.
"""
return newdata @ self.params
[docs]
def evaluate_lnL(self, pred: np.ndarray) -> float:
"""Evaluate log-likelihood of predictions under Gaussian errors.
Args:
pred (np.ndarray): Predicted values.
Returns:
float: Log-likelihood value.
"""
return -self.n_obs / 2 * (np.log(2 * np.pi * (self.y - pred).var()) + 1)
[docs]
class logistic_regressor(linear_model, variance_model):
"""Logistic regression with Newton–Raphson estimation.
Provides likelihood-based fit, variance-covariance matrix,
and pseudo-R² measures (McFadden, Ben-Akiva–Lerman).
"""
def __init__(self, *args, **kwargs):
"""Fit a logistic regression model using Newton–Raphson."""
super().__init__(*args, **kwargs)
self.glance = pd.DataFrame(
{
'mcfaddens.r.squared': self.mcfaddens_r_squared,
'ben.akiva.lerman.r.squared': self.ben_akiva_lerman_r_squared,
'self.df': self.n_feat,
'resid.df': self.degrees_of_freedom,
'aic': self.aic,
'bic': self.bic,
'log.likelihood': self.log_likelihood,
'deviance': self.deviance,
},
index=['']
)
def _fit(self, x, y):
"""Fit coefficients by Newton–Raphson optimization."""
params, self.iters = newton(self.gradient, self.hessian, np.zeros(self.n_feat))
return params
@property
def vcov_params(self) -> np.ndarray:
"""Variance-covariance matrix of parameters."""
H = self.hessian(self.params)
try:
return -np.linalg.inv(H)
except np.linalg.LinAlgError:
return -np.linalg.pinv(H)
[docs]
def evaluate_lnL(self, pred: np.ndarray) -> float:
"""Log-likelihood for Bernoulli outcomes.
Args:
pred (np.ndarray): Predicted probabilities.
Returns:
float: Log-likelihood value.
"""
eps = 1e-15
pred = np.clip(pred, eps, 1 - eps)
return self.y.T @ np.log(pred) + (1 - self.y).T @ np.log(1 - pred)
[docs]
def gradient(self, coefs: np.ndarray) -> np.ndarray:
"""Gradient of the log-likelihood."""
return self.x.T @ (self.y - expit(self.x @ coefs))
[docs]
def hessian(self, coefs: np.ndarray) -> np.ndarray:
"""Hessian matrix of the log-likelihood."""
x = self.x.values if isinstance(self.x, (pd.DataFrame, pd.Series)) else self.x
Fx = expit(x @ coefs)
inside = np.diagflat(Fx * (1 - Fx))
return -x.T @ inside @ x
[docs]
def predict(self, target: np.ndarray) -> np.ndarray:
"""Predict probabilities for new data."""
return expit(target @ self.params)
@property
def mcfaddens_r_squared(self) -> float:
"""McFadden's pseudo-R²."""
return 1 - self.log_likelihood / self.null_likelihood
@property
def ben_akiva_lerman_r_squared(self) -> float:
"""Ben-Akiva–Lerman pseudo-R²."""
return (self.y.T @ self.fitted + (1 - self.y).T @ (1 - self.fitted)) / self.n_obs
[docs]
def marginal_effects(
self,
newx: Optional[Union[np.ndarray, pd.DataFrame]] = None,
average: bool = True,
) -> np.ndarray:
"""Compute marginal effects of predictors.
Args:
newx (np.ndarray | pd.DataFrame, optional): New design matrix.
If None, use training data.
average (bool): If True, return average marginal effects.
If False, return case-specific effects.
Returns:
np.ndarray: Marginal effects.
"""
X = self.x if newx is None else newx
if isinstance(X, (pd.DataFrame, pd.Series)):
X = X.values
if X.ndim == 2 and X.shape[1] != self.n_feat:
if self.x.ndim == 2 and np.allclose(self.x[:, 0], 1) and X.shape[1] == self.n_feat - 1:
X = np.hstack([np.ones((X.shape[0], 1)), X])
else:
raise ValueError(f"newx has shape {X.shape}, but model expects {self.n_feat} features.")
xb = X @ self.params
Fx = expit(xb)
slope = Fx * (1.0 - Fx)
effects = slope.reshape(-1, 1) * self.params.reshape(1, -1)
return effects.mean(0) if average else effects
[docs]
class least_squares_regressor(linear_model, variance_model):
"""Ordinary least squares regression with optional robust SEs."""
def __init__(self, x, y, white: bool = False, hc: int = 3, *args, **kwargs):
"""Initialize least squares regressor.
Args:
x (np.ndarray): Design matrix.
y (np.ndarray): Response vector.
white (bool): If True, use heteroskedasticity-consistent SEs.
hc (int): HC variant (1–5).
"""
super(least_squares_regressor, self).__init__(x, y, *args, **kwargs)
self.white = white
self.hc = hc
self.glance = pd.DataFrame(
{
'r.squared': self.r_squared,
'adjusted.r.squared': self.adjusted_r_squared,
'self.df': self.n_feat,
'resid.df': self.degrees_of_freedom,
'aic': self.aic,
'bic': self.bic,
'log.likelihood': self.log_likelihood,
'deviance': self.deviance,
'resid.var': self.residual_variance,
}, index=['']
)
@property
def vcov_params(self):
"""Variance-covariance matrix of parameters."""
if self.white:
return self.__white(self.hc)
return np.linalg.inv(self.x.T @ self.x) * self.residual_variance
def __white(self, hc: int) -> np.ndarray:
"""White’s heteroskedasticity-consistent covariance estimator."""
e = self.residuals.values if isinstance(self.residuals, pd.Series) else self.residuals
esq = self.__hc_correction(e ** 2, hc)
meat = np.diagflat(esq)
bread = np.linalg.inv(self.x.T @ self.x) @ self.x.T
return bread @ meat @ bread.T
def __hc_correction(self, esq, hc):
"""Apply HC1–HC5 finite-sample corrections."""
mx = 1 - np.diagonal(self.x @ np.linalg.solve(self.x.T @ self.x, self.x.T))
match hc:
case 1:
esq *= self.n_obs / (self.n_obs - self.n_feat)
case 2:
esq /= mx
case 3:
esq /= mx ** 2
case 4:
p = int(np.round((1 - mx).sum()))
delta = 4 * np.ones((self.n_obs, 1))
delta = hstack(delta, self.n_obs * (1 - mx) / p)
delta = delta.min(1)
esq /= np.power(mx, delta)
case 5:
p = int(np.round((1 - mx).sum()))
delta = max(4, self.n_obs * 0.7 * (1 - mx).max() / p) * np.ones((self.n_obs, 1))
delta = hstack(delta.reshape(-1, 1), self.n_obs * (1 - mx.reshape(-1, 1)) / p)
delta = delta.min(1) / 2
esq /= np.power(mx, delta)
return esq
[docs]
class poisson_regressor(linear_model):
"""Poisson regression for count data."""
def __init__(self, x, y, *args, **kwargs):
"""Fit a Poisson regression model."""
super().__init__(x, y, *args, **kwargs)
self.glance = pd.DataFrame(self._glance_dict, index=[''])
def _fit(self, x, y):
"""Fit coefficients by Newton–Raphson optimization."""
params, self.iters = newton(self.gradient, self.hessian, np.zeros(self.n_feat))
return params
@property
def vcov_params(self) -> np.ndarray:
"""Variance-covariance matrix of parameters."""
return -np.linalg.inv(self.hessian(self.params))
[docs]
def evaluate_lnL(self, pred: np.ndarray) -> float:
"""Log-likelihood for Poisson-distributed outcomes."""
return (
self.y.T @ np.log(pred)
- np.ones((1, self.n_obs)) @ pred
+ np.ones((1, self.n_obs)) @ np.log(sp.special.factorial(self.y))
)
[docs]
def gradient(self, coefs: np.ndarray) -> np.ndarray:
"""Gradient of the log-likelihood."""
return self.x.T @ (self.y - np.exp(self.x @ coefs))
[docs]
def hessian(self, coefs: np.ndarray) -> np.ndarray:
"""Hessian matrix of the log-likelihood."""
Fx = np.exp(self.x @ coefs)
if isinstance(Fx, (pd.DataFrame, pd.Series)):
Fx = Fx.values
return -self.x.T @ np.diagflat(Fx) @ self.x
[docs]
def predict(self, target: np.ndarray) -> np.ndarray:
"""Predict expected counts for new data."""
return np.exp(target @ self.params)
@property
def _glance_dict(self) -> dict:
"""Model summary statistics for glance output."""
return {
'self.df': self.n_feat,
'resid.df': self.degrees_of_freedom,
'aic': self.aic,
'bic': self.bic,
'log.likelihood': self.log_likelihood,
'deviance': self.deviance
}
[docs]
def backward_stepwise(model: Any,
criterion: str = "aic",
keep_vars: list[str] = None,
min_features: int = 1,
verbose: bool = False) -> Dict[str, Any]:
"""Perform backward stepwise feature selection.
Iteratively removes features to optimize a model fit according to an
information criterion (e.g., AIC, BIC, MSE).
Args:
model: Model object. Either:
- Raw supervised_model with `.x` and `.y`.
- SupervisedModel wrapper with `.x_vars`, `.y_var`, `.data`, `.model_type`.
criterion: Model selection criterion ("aic", "bic", "mse", etc.).
keep_vars: Variable names that must not be dropped.
min_features: Minimum number of features to retain.
verbose: If True, print progress messages.
Returns:
Dict[str, Any]: A dictionary with:
- "model": The best fitted model.
- "selected_features": List of selected feature names.
- "history": pd.DataFrame with stepwise history.
"""
keep_vars = set(keep_vars or [])
# unwrap
if hasattr(model, "x_vars"): # SupervisedModel
x_vars, y_var, data = model.x_vars.copy(), model.y_var, model.data
model_type = model.model_type
def fit(subset):
X = data[subset].values
y = data[y_var].values
return model_type(X, y)
feature_names = x_vars
else: # supervised_model
x, y = model.x, model.y
feature_names = [f"x{i}" for i in range(x.shape[1])]
def fit(subset_idx):
return type(model)(x[:, subset_idx], y)
# scoring
def score(m):
val = getattr(m, criterion)
lower_is_better = criterion.lower() in ["aic","bic","mse","misclassification_probability"]
return val, lower_is_better
# initialize
current = list(range(len(feature_names)))
fitted = model
best_score, lower = score(fitted)
history = [{"step":0,"removed":None,"score":best_score,"features":feature_names.copy()}]
step = 0
while len(current) > max(min_features, len(keep_vars)):
trial = []
for j in current:
if feature_names[j] in keep_vars:
continue
cand = [idx for idx in current if idx != j]
try:
cand_model = fit([feature_names[i] for i in cand] if hasattr(model,"x_vars") else cand)
sc, _ = score(cand_model)
trial.append((j, sc, cand_model))
except Exception as e:
if verbose: print("skip", feature_names[j], e)
continue
if not trial: break
j_best, sc_best, best_model = (min(trial, key=lambda t: t[1]) if lower else max(trial, key=lambda t: t[1]))
improved = (sc_best < best_score if lower else sc_best > best_score)
if not improved: break
step += 1
if verbose:
print(f"removed {feature_names[j_best]} -> {criterion}={sc_best}")
current.remove(j_best)
best_score, fitted = sc_best, best_model
history.append({"step":step,"removed":feature_names[j_best],"score":best_score,
"features":[feature_names[i] for i in current]})
return {"model":fitted,
"selected_features":[feature_names[i] for i in current],
"history":pd.DataFrame(history)}
[docs]
def forward_stepwise(
model: Any,
criterion: str = "aic",
keep_vars: Optional[List[str]] = None,
max_features: Optional[int] = None,
prefer_intercept: bool = True,
verbose: bool = False
) -> Dict[str, Any]:
"""Perform forward stepwise feature selection.
Iteratively adds features to optimize a model according to a selection
criterion (e.g., AIC, BIC, MSE). Supports both raw models and SupervisedModel
wrappers, with optional intercept preference.
Args:
model: Model object. Either:
- Raw supervised_model with `.x` and `.y`.
- SupervisedModel wrapper with `.x_vars`, `.y_var`, `.data`, `.model_type`.
criterion: Model selection criterion ("aic", "bic", "mse", etc.).
keep_vars: Variables that must always be included.
max_features: Maximum number of features allowed to be selected.
prefer_intercept: If True, attempt to start with an intercept term (if detected).
verbose: If True, print progress messages.
Returns:
Dict[str, Any]: A dictionary with:
- "model": The best fitted model.
- "selected_features": List of selected feature names.
- "history": pd.DataFrame with stepwise history.
"""
keep_vars = set(keep_vars or [])
# -------------------------
# Unwrap the provided model
# -------------------------
if hasattr(model, "x_vars"): # SupervisedModel wrapper
x_vars: List[str] = model.x_vars.copy()
y_var: str = model.y_var
data: pd.DataFrame = model.data
model_type = model.model_type
feature_names = x_vars
def fit(subset_vars: List[str]):
X = data[subset_vars].values
y = data[y_var].values
return model_type(X, y)
# Find intercept by name or const column
intercept_name = None
if "(intercept)" in feature_names:
intercept_name = "(intercept)"
elif prefer_intercept:
for name in feature_names:
col = data[name].values
if _is_intercept_col(col, name):
intercept_name = name
break
all_pool = feature_names.copy()
start_set_vars: List[str] = []
if prefer_intercept and intercept_name:
start_set_vars = [intercept_name]
keep_vars.add(intercept_name)
current_vars = sorted(set(start_set_vars) | keep_vars, key=lambda v: feature_names.index(v) if v in feature_names else 10**9)
# initial fit on the full model we were given (for baseline score)
full_model_score, lower_is_better = _score_generic(model.model if hasattr(model, "model") else model, criterion)
else: # raw supervised_model
X = model.x
y = model.y
p = X.shape[1]
feature_names = [f"x{i}" for i in range(p)]
def fit(subset_idx: List[int]):
if len(subset_idx) == 0:
# Can't fit a 0-column model; fallback to best single variable start
raise ValueError("Empty model: no columns to fit.")
return type(model)(X[:, subset_idx], y)
# Detect intercept column (constant)
intercept_idx = None
if prefer_intercept:
for j in range(p):
if _is_intercept_col(X[:, j], feature_names[j]):
intercept_idx = j
break
all_pool_idx = list(range(p))
current_idx: List[int] = []
if prefer_intercept and intercept_idx is not None:
current_idx = [intercept_idx]
# baseline score from supplied model
full_model_score, lower_is_better = _score_generic(model, criterion)
# ----------------------------------------
# Helper to get candidates and do 1-step add
# ----------------------------------------
history = []
step = 0
def current_feature_list():
if hasattr(model, "x_vars"):
return current_vars.copy()
return [feature_names[i] for i in current_idx]
# Initialize with starting (possibly intercept-only or keep_vars)
try:
if hasattr(model, "x_vars"):
init = fit(current_vars if current_vars else [])
else:
init = fit(current_idx if current_idx else [])
best_score, _ = _score_generic(init, criterion)
best_model = init
except Exception:
# If intercept-only (or empty) fails, fall back to greedy best single variable
trial = []
if hasattr(model, "x_vars"):
for v in [v for v in all_pool if v not in keep_vars]:
try:
m = fit([v] + sorted(list(keep_vars)))
s, _ = _score_generic(m, criterion)
trial.append((v, s, m))
except Exception:
continue
if not trial:
raise
v_best, s_best, m_best = (min if lower_is_better else max)(trial, key=lambda t: t[1])
current_vars = sorted(set([v_best]) | keep_vars, key=lambda v: feature_names.index(v) if v in feature_names else 10**9)
best_score, best_model = s_best, m_best
else:
pool = [j for j in all_pool_idx if j not in (current_idx)]
trial = []
for j in pool:
try:
m = fit(sorted(list(set([j]) | set(current_idx))))
s, _ = _score_generic(m, criterion)
trial.append((j, s, m))
except Exception:
continue
if not trial:
raise
j_best, s_best, m_best = (min if lower_is_better else max)(trial, key=lambda t: t[1])
current_idx = sorted(set([j_best]) | set(current_idx))
best_score, best_model = s_best, m_best
history.append({"step": step, "added": None, "score": best_score, "features": current_feature_list()})
# Limit size
target_max = max_features if max_features is not None else (len(feature_names))
# ------------------------------------------------
# Greedy forward: add the single best new variable
# ------------------------------------------------
while True:
if hasattr(model, "x_vars"):
pool = [v for v in all_pool if v not in current_vars]
if len(current_vars) >= target_max or len(pool) == 0:
break
trials = []
for v in pool:
try:
cand = current_vars + [v]
m = fit(cand)
sc, _ = _score_generic(m, criterion)
trials.append((v, sc, m))
except Exception as e:
if verbose:
print(f"skip add {v}: {e}")
continue
if not trials:
break
v_best, s_best, m_best = (min if lower_is_better else max)(trials, key=lambda t: t[1])
improved = (s_best < best_score) if lower_is_better else (s_best > best_score)
if not improved:
break
current_vars.append(v_best)
best_score, best_model = s_best, m_best
step += 1
if verbose:
print(f"[+] add {v_best} -> {criterion}={best_score:.6g} (k={len(current_vars)})")
history.append({"step": step, "added": v_best, "score": best_score, "features": current_feature_list()})
else:
pool = [j for j in all_pool_idx if j not in current_idx]
if len(current_idx) >= target_max or len(pool) == 0:
break
trials = []
for j in pool:
try:
cand = current_idx + [j]
m = fit(cand)
sc, _ = _score_generic(m, criterion)
trials.append((j, sc, m))
except Exception as e:
if verbose:
print(f"skip add {feature_names[j]}: {e}")
continue
if not trials:
break
j_best, s_best, m_best = (min if lower_is_better else max)(trials, key=lambda t: t[1])
improved = (s_best < best_score) if lower_is_better else (s_best > best_score)
if not improved:
break
current_idx.append(j_best)
best_score, best_model = s_best, m_best
step += 1
if verbose:
print(f"[+] add {feature_names[j_best]} -> {criterion}={best_score:.6g} (k={len(current_idx)})")
history.append({"step": step, "added": feature_names[j_best], "score": best_score, "features": current_feature_list()})
# Wrap result consistently
if hasattr(model, "x_vars"):
selected = current_vars
final_model = best_model
else:
selected = [feature_names[i] for i in current_idx]
final_model = best_model
return {
"model": final_model,
"selected_features": selected,
"history": pd.DataFrame(history),
}
def _metric_value(m: Any, metric: str) -> float:
"""Extract a metric value from a model or wrapper.
Tries, in order:
1) Direct attribute on the object.
2) Attribute on a wrapped `.model`.
3) Column in a `.glance` DataFrame.
Args:
m: Model object or wrapper.
metric: Metric name (e.g., "aic", "bic", "r_squared").
Returns:
float: The extracted metric value.
Raises:
AttributeError: If the metric cannot be found anywhere.
"""
metric = metric.lower()
# direct attribute
if hasattr(m, metric):
return float(getattr(m, metric))
# wrapped model
if hasattr(m, "model") and hasattr(m.model, metric):
return float(getattr(m.model, metric))
# glance fallback
if hasattr(m, "glance"):
g = m.glance
if isinstance(g, pd.DataFrame) and metric in g.columns:
return float(g.iloc[0][metric])
raise AttributeError(f"Could not find metric '{metric}' on model.")
def _lower_is_better(metric: str) -> bool:
"""Determine if the given metric is minimized.
Args:
metric: Metric name.
Returns:
bool: True if lower values indicate better fit (e.g., AIC, BIC, MSE).
"""
metric = metric.lower()
return metric in ("aic", "bic", "mse", "misclassification_probability")
def _compare_models(m1: Any, m2: Any, metrics: List[str], tol: float = 1e-12) -> Tuple[Any, Dict[str, str]]:
"""Compare two models across multiple metrics by majority vote.
For each metric, the better model receives one vote. Ties on a metric
give no votes. If the total vote is tied, the first metric in `metrics`
is used as the final tie-breaker.
Args:
m1: First model to compare.
m2: Second model to compare.
metrics: List of metrics to evaluate (e.g., ["aic", "bic"]).
tol: Absolute tolerance for considering two values a tie.
Returns:
Tuple[Any, Dict[str, str]]: (winner_model, per_metric_winner)
- winner_model: The better of (m1, m2) by the voting rule.
- per_metric_winner: Dict mapping metric → {"m1","m2","tie"}.
"""
votes = {"m1": 0, "m2": 0, "ties": 0}
per_metric = {}
for met in metrics:
try:
v1 = _metric_value(m1, met)
v2 = _metric_value(m2, met)
except Exception:
# if metric not available on either, skip
continue
better_is_lower = _lower_is_better(met)
diff = v1 - v2
if abs(diff) <= tol:
votes["ties"] += 1
per_metric[met] = "tie"
else:
if (diff < 0) == better_is_lower: # m1 better
votes["m1"] += 1
per_metric[met] = "m1"
else:
votes["m2"] += 1
per_metric[met] = "m2"
if votes["m1"] > votes["m2"]:
winner = m1
elif votes["m2"] > votes["m1"]:
winner = m2
else:
# tie across votes: fall back to first metric in the list (if available)
first = metrics[0]
try:
v1 = _metric_value(m1, first)
v2 = _metric_value(m2, first)
better_is_lower = _lower_is_better(first)
if (v1 - v2 < 0) == better_is_lower:
winner = m1
else:
winner = m2
except Exception:
# if even that fails, default to m1
winner = m1
return winner, per_metric
[docs]
def stepwise(
model: Any,
direction: str = "both",
criterion: str = "aic",
keep_vars: List[str] | None = None,
min_features: int = 1,
max_features: int | None = None,
prefer_intercept: bool = True,
verbose: bool = False,
) -> Dict[str, Any]:
"""Unified stepwise selection wrapper.
Routes to forward, backward, or both directions and returns the
best model by a vote across metrics when `direction="both"`.
Args:
model: Model object. Either:
- Raw supervised_model with `.x` and `.y`.
- SupervisedModel wrapper with `.x_vars`, `.y_var`, `.data`, `.model_type`.
direction: Stepwise direction:
- "forwards": Forward selection.
- "backwards": Backward elimination.
- "both": Run both and select the better.
criterion: Model selection criterion ("aic", "bic", "mse", etc.).
keep_vars: Variable names that must always be included.
min_features: Minimum number of features (for backward).
max_features: Maximum number of features (for forward).
prefer_intercept: If True, prefer/include an intercept where applicable.
verbose: If True, print selection progress.
Returns:
Dict[str, Any]: A dictionary with:
- "model": Best fitted model.
- "selected_features": List of chosen features.
- "history": pd.DataFrame of the chosen direction's history.
- "direction_chosen": One of {"forwards","backwards"}.
- "comparison": Dict of per-metric winners (only if direction="both").
"""
direction = direction.lower()
keep_vars = keep_vars or []
if direction == "forwards":
fwd = forward_stepwise(
model,
criterion=criterion,
keep_vars=keep_vars,
max_features=max_features,
prefer_intercept=prefer_intercept,
verbose=verbose,
)
return {
"model": fwd["model"],
"selected_features": fwd["selected_features"],
"history": fwd["history"],
"direction_chosen": "forwards",
}
if direction == "backwards":
bwd = backward_stepwise(
model,
criterion=criterion,
keep_vars=keep_vars,
min_features=min_features,
verbose=verbose,
)
return {
"model": bwd["model"],
"selected_features": bwd["selected_features"],
"history": bwd["history"],
"direction_chosen": "backwards",
}
if direction != "both":
raise ValueError("direction must be one of {'forwards','backwards','both'}")
# run both
fwd = forward_stepwise(
model,
criterion=criterion,
keep_vars=keep_vars,
max_features=max_features,
prefer_intercept=prefer_intercept,
verbose=verbose,
)
bwd = backward_stepwise(
model,
criterion=criterion,
keep_vars=keep_vars,
min_features=min_features,
verbose=verbose,
)
m_fwd = fwd["model"]
m_bwd = bwd["model"]
# build metric set: {criterion, aic, bic} (deduped, criterion first)
metrics: List[str] = []
for met in [criterion.lower(), "aic", "bic"]:
if met not in metrics:
metrics.append(met)
winner_model, per_metric = _compare_models(m_fwd, m_bwd, metrics)
if winner_model is m_fwd:
chosen = {
"model": fwd["model"],
"selected_features": fwd["selected_features"],
"history": fwd["history"],
"direction_chosen": "forwards",
"comparison": per_metric,
}
else:
chosen = {
"model": bwd["model"],
"selected_features": bwd["selected_features"],
"history": bwd["history"],
"direction_chosen": "backwards",
"comparison": per_metric,
}
return chosen
[docs]
class LeastSquaresRegressor(PredictionModel):
"""Ordinary least squares (OLS) regression.
A high-level wrapper around :class:`least_squares_regressor` that provides a
formula interface and pandas-aware prediction methods. Fits a linear model
by minimizing the sum of squared residuals.
This class inherits from :class:`PredictionModel`, which handles parsing the
formula, extracting variables from a DataFrame, and exposing tidy/glance
summaries consistent with the rest of the package.
Examples:
Fit an OLS regression from a formula:
>>> model = LeastSquaresRegressor("y ~ x1 + x2", data=df)
>>> model.tidy # coefficient table
>>> model.glance # model summary
>>> preds = model.predict(df)
Attributes:
MODEL_TYPE (Type[supervised_model]): The underlying implementation
(:class:`least_squares_regressor`).
formula (str): Formula string used to specify the model.
x_vars (list[str]): Predictor variable names.
y_var (str): Response variable name.
data (pd.DataFrame): Parsed DataFrame containing predictors and response.
model (least_squares_regressor): Fitted underlying OLS model.
"""
MODEL_TYPE = least_squares_regressor
[docs]
class LogisticRegressor(PredictionModel):
"""Logistic regression for binary outcomes.
A high-level wrapper around :class:`logistic_regressor` that provides a
formula interface and pandas-aware prediction methods. Fits a generalized
linear model with a logit link, estimating probabilities for binary response
variables.
This class inherits from :class:`PredictionModel`, which handles parsing the
formula, extracting variables from a DataFrame, and exposing tidy/glance
summaries consistent with the rest of the package.
Examples:
Fit a logistic regression model from a formula:
>>> model = LogisticRegressor("y ~ x1 + x2", data=df)
>>> model.tidy # coefficient table with log-odds
>>> model.glance # model fit summary (AIC, log-likelihood, etc.)
>>> probs = model.predict(df) # predicted probabilities
Attributes:
MODEL_TYPE (Type[supervised_model]): The underlying implementation
(:class:`logistic_regressor`).
formula (str): Formula string used to specify the model.
x_vars (list[str]): Predictor variable names.
y_var (str): Response variable name.
data (pd.DataFrame): Parsed DataFrame containing predictors and response.
model (logistic_regressor): Fitted underlying logistic regression model.
"""
MODEL_TYPE = logistic_regressor
[docs]
class PoissonRegressor(PredictionModel):
"""Poisson regression for count outcomes.
A high-level wrapper around :class:`poisson_regressor` that provides a
formula interface and pandas-aware prediction methods. Fits a generalized
linear model with a log link, appropriate for count data where the variance
is proportional to the mean.
This class inherits from :class:`PredictionModel`, which handles parsing the
formula, extracting variables from a DataFrame, and exposing tidy/glance
summaries consistent with the rest of the package.
Examples:
Fit a Poisson regression model from a formula:
>>> model = PoissonRegressor("y ~ x1 + x2", data=df)
>>> model.tidy # coefficient table with log-incidence ratios
>>> model.glance # model summary (deviance, AIC, etc.)
>>> rates = model.predict(df) # expected counts
Attributes:
MODEL_TYPE (Type[supervised_model]): The underlying implementation
(:class:`poisson_regressor`).
formula (str): Formula string used to specify the model.
x_vars (list[str]): Predictor variable names.
y_var (str): Response variable name.
data (pd.DataFrame): Parsed DataFrame containing predictors and response.
model (poisson_regressor): Fitted underlying Poisson regression model.
"""
MODEL_TYPE = poisson_regressor