"""
ldv.py — Limited Dependent Variable (LDV) Models
================================================
This module implements models for **limited dependent variables** where the
outcome is only partially observed due to censoring, truncation, or selection
processes. These models extend standard regression by explicitly accounting
for restricted observability of the dependent variable.
Models currently implemented
----------------------------
Tobit regression (two-limit censored normal)
Latent variable:
y* = Xβ + ε, ε ~ N(0, σ²)
Observed variable:
y = L if y* ≤ L (left-censored)
y = y* if L < y* < R (uncensored)
y = R if y* ≥ R (right-censored)
Features:
* Supports left- and/or right-censoring (finite or infinite).
* Fits parameters by maximum likelihood (L-BFGS-B).
* Returns estimates of β and σ with variance-covariance matrix.
* Provides log-likelihood, AIC, BIC, deviance, convergence info.
* Includes:
- predict() for latent mean μ = Xβ
- expected_observed() for E[y | X] under censoring
- censoring_probs() for P_left, P_uncensored, P_right.
Planned models
--------------
Truncated regression
* Similar to Tobit but assumes data outside [L, R] are unobserved
(not censored).
* Log-likelihood excludes truncated cases entirely.
* Useful for survey data where only responses in a restricted range
are collected.
Heckman selection model (two-step / full MLE)
* Jointly models outcome and selection equations to correct for
sample selection bias.
* Outcome observed only if selection variable exceeds threshold.
* Widely applied in labor economics, health economics, and marketing.
Classes
-------
tobit_regressor
Core implementation of two-limit Tobit regression with MLE fitting,
prediction, and inference utilities.
Factory Aliases
---------------
TobitRegressor
Partial wrapper that exposes `tobit_regressor` through the
`PredictionModel` interface for pandas DataFrame/formula use.
Examples
--------
>>> import numpy as np, pandas as pd
>>> from cleands.Prediction.ldv import TobitRegressor
>>> df = pd.DataFrame({"x1": np.random.randn(100), "y": np.random.randn(100)})
>>> model = TobitRegressor(x_vars=["x1"], y_var="y", data=df, L=0.0)
>>> model.glance
"""
import numpy as np
import pandas as pd
import scipy as sp
from scipy.optimize import minimize
from typing import Optional, Tuple
from functools import partial
from ..base import prediction_model, prediction_likelihood_model, variance_model, PredictionModel
from ..utils import hstack # optional if you want to help users add an intercept
[docs]
class tobit_regressor(prediction_model, prediction_likelihood_model, variance_model):
"""
Two-limit Tobit (censored normal) regression model.
Latent model:
y* = Xβ + ε, ε ~ N(0, σ²)
Observed:
- y = L if y* ≤ L (left-censored)
- y = y* if L < y* < R (uncensored)
- y = R if y* ≥ R (right-censored)
Parameters are stored as:
params = [β, σ] with σ > 0
`predict(X)` returns the latent mean μ = Xβ.
Use `expected_observed(X)` to compute E[y | X] under censoring.
"""
def __init__(
self,
x: np.ndarray,
y: np.ndarray,
L: Optional[float] = 0.0,
R: Optional[float] = None,
add_intercept: bool = False,
start: Optional[Tuple[np.ndarray, float]] = None,
tol: float = 1e-8
) -> None:
"""
Initialize a Tobit regression model and fit parameters via MLE.
Args:
x (np.ndarray): Design matrix of shape (n_obs, n_features).
y (np.ndarray): Observed outcomes.
L (float, optional): Left-censoring limit. Defaults to 0.0.
R (float, optional): Right-censoring limit. Defaults to None (= +∞).
add_intercept (bool, optional): Whether to prepend an intercept column of ones. Defaults to False.
start (tuple, optional): Optional initial (β, σ) guess. Defaults to OLS-based warm start.
tol (float, optional): Numerical tolerance for censoring classification. Defaults to 1e-8.
"""
if add_intercept:
x = hstack(np.ones(x.shape[0]), x)
super().__init__(x, y)
self.L = -np.inf if L is None else float(L)
self.R = np.inf if R is None else float(R)
self._tol = tol
# Censoring masks
self._left = (self.y <= self.L + self._tol)
self._right = (self.y >= self.R - self._tol)
self._unc = ~(self._left | self._right)
# Initialize β and σ
if start is not None:
beta0, sigma0 = start
else:
try:
beta0, *_ = np.linalg.lstsq(self.x, self.y, rcond=None)
except np.linalg.LinAlgError:
beta0 = np.zeros(self.n_feat)
resid = self.y - self.x @ beta0
sigma0 = float(np.sqrt(np.maximum(resid.var(), 1e-3)))
theta0 = np.concatenate([beta0, np.array([np.log(sigma0)])])
# Optimize log-likelihood
res = minimize(self._objective_from_theta, theta0, method="L-BFGS-B", tol=1e-9)
self.converged = bool(res.success)
self.message = res.message
self.nit = res.nit
# Store fitted parameters
self.beta = res.x[:-1]
self.log_sigma = res.x[-1]
self.sigma = float(np.exp(self.log_sigma))
self.params = np.concatenate([self.beta, np.array([self.sigma])])
# Covariance of parameters via delta method
self._vcov_theta = None
try:
Hinv = res.hess_inv.todense() if hasattr(res.hess_inv, "todense") else np.array(res.hess_inv)
self._vcov_theta = np.array(Hinv)
except Exception:
self._vcov_theta = np.full((self.n_feat + 1, self.n_feat + 1), np.nan)
r = self.n_feat
V = np.zeros((r + 1, r + 1))
V[:r, :r] = self._vcov_theta[:r, :r]
V_beta_s = self._vcov_theta[:r, r] * self.sigma
V[:r, r] = V_beta_s
V[r, :r] = V_beta_s
V[r, r] = (self.sigma ** 2) * self._vcov_theta[r, r]
self._vcov_params = V
# Summary glance DataFrame
k = self.n_feat + 1
self.glance = pd.DataFrame({
'self.df': [k],
'resid.df': [max(self.n_obs - k, 1)],
'aic': [self.aic],
'bic': [self.bic],
'log.likelihood': [self.log_likelihood],
'deviance': [self.deviance],
'converged': [self.converged],
'nit': [self.nit],
'message': [str(self.message)]
})
def _objective_from_theta(self, theta: np.ndarray) -> float:
"""
Internal optimizer hook. Computes negative log-likelihood at θ.
Args:
theta (np.ndarray): Candidate parameter vector [β, logσ].
Returns:
float: Negative log-likelihood value.
"""
beta = theta[:-1]
sigma = float(np.exp(theta[-1]))
mu = self.x @ beta
old_sigma, had_sigma = getattr(self, "sigma", None), hasattr(self, "sigma")
self.sigma = sigma
try:
ll = self.evaluate_lnL(mu)
finally:
if had_sigma:
self.sigma = old_sigma
else:
del self.sigma
return -float(ll)
def _split_sets(self, mu: np.ndarray, sigma: float):
"""
Split observations into left-, right-, and uncensored sets.
Args:
mu (np.ndarray): Predicted latent mean values.
sigma (float): Current standard deviation.
Returns:
tuple: (Lmask, Rmask, Umask, yU, muU, muL, muR)
"""
Lmask, Rmask, Umask = self._left, self._right, self._unc
yU, muU = self.y[Umask], mu[Umask]
muL, muR = mu[Lmask], mu[Rmask]
return (Lmask, Rmask, Umask, yU, muU, muL, muR)
[docs]
def predict(self, target: np.ndarray) -> np.ndarray:
"""
Predict latent mean μ = Xβ.
Args:
target (np.ndarray): New design matrix.
Returns:
np.ndarray: Latent means.
"""
return target @ self.beta
[docs]
def expected_observed(self, target: np.ndarray) -> np.ndarray:
"""
Predict expected observed y under censoring.
Args:
target (np.ndarray): New design matrix.
Returns:
np.ndarray: Expected observed values, E[y | X].
"""
mu = self.predict(target)
sigma = self.sigma
zL, zR = (self.L - mu) / sigma, (self.R - mu) / sigma
PhiL = np.where(np.isfinite(self.L), sp.stats.norm.cdf(zL), 0.0)
PhiR = np.where(np.isfinite(self.R), sp.stats.norm.cdf(zR), 1.0)
phiL = np.where(np.isfinite(self.L), sp.stats.norm.pdf(zL), 0.0)
phiR = np.where(np.isfinite(self.R), sp.stats.norm.pdf(zR), 0.0)
return (self.L * PhiL) + (self.R * (1 - PhiR)) + mu * (PhiR - PhiL) + sigma * (phiL - phiR)
[docs]
def censoring_probs(self, target: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute probabilities of being left-censored, uncensored, or right-censored.
Args:
target (np.ndarray): New design matrix.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]:
(P_left, P_uncensored, P_right) for each observation.
"""
mu = self.predict(target)
sigma = self.sigma
zL, zR = (self.L - mu) / sigma, (self.R - mu) / sigma
P_left = np.where(np.isfinite(self.L), sp.stats.norm.cdf(zL), 0.0)
P_right = np.where(np.isfinite(self.R), sp.stats.norm.sf(zR), 0.0)
P_unc = 1.0 - P_left - P_right
return P_left, P_unc, P_right
[docs]
def evaluate_lnL(self, pred: np.ndarray) -> float:
"""
Evaluate the log-likelihood at a given latent mean μ.
Args:
pred (np.ndarray): Latent mean predictions μ = Xβ.
Returns:
float: Log-likelihood value.
"""
mu = np.asarray(pred).reshape(-1)
sigma = self.sigma
Lmask, Rmask, Umask, yU, muU, muL, muR = self._split_sets(mu, sigma)
ll = 0.0
if Umask.any():
t = (yU - muU) / sigma
ll += (-0.5 * t**2 - np.log(sigma) - 0.5*np.log(2*np.pi)).sum()
if Lmask.any() and np.isfinite(self.L):
ll += sp.stats.norm.logcdf((self.L - muL) / sigma).sum()
if Rmask.any() and np.isfinite(self.R):
ll += sp.stats.norm.logsf((self.R - muR) / sigma).sum()
return float(ll)
@property
def vcov_params(self) -> np.ndarray:
"""
Variance-covariance matrix of parameter estimates.
Returns:
np.ndarray: (r+1) x (r+1) covariance matrix for [β, σ].
"""
return self._vcov_params
[docs]
class TobitRegressor(PredictionModel):
"""Convenience wrapper for Tobit regression.
The Tobit model is used for censored dependent variables, where
observations below (or above) a threshold are censored rather than
fully observed. This wrapper provides a formula/DataFrame interface
for the :class:`tobit_regressor`.
Attributes:
MODEL_TYPE: Underlying model type, fixed to
:class:`tobit_regressor`.
Example:
>>> model = TobitRegressor.from_formula("y ~ x1 + x2", data=df)
>>> model.predict(df[["x1", "x2"]])
"""
MODEL_TYPE = tobit_regressor