diff --git a/pymc_extras/distributions/__init__.py b/pymc_extras/distributions/__init__.py index 783154ba3..abdbc721e 100644 --- a/pymc_extras/distributions/__init__.py +++ b/pymc_extras/distributions/__init__.py @@ -22,6 +22,7 @@ BetaNegativeBinomial, GeneralizedPoisson, Skellam, + GrassiaIIGeometric, ) from pymc_extras.distributions.histogram_utils import histogram_approximation from pymc_extras.distributions.multivariate import R2D2M2CP @@ -38,5 +39,6 @@ "R2D2M2CP", "Skellam", "histogram_approximation", + "GrassiaIIGeometric", "PartialOrder", ] diff --git a/pymc_extras/distributions/discrete.py b/pymc_extras/distributions/discrete.py index fc810c415..d337f0c84 100644 --- a/pymc_extras/distributions/discrete.py +++ b/pymc_extras/distributions/discrete.py @@ -16,6 +16,7 @@ import pymc as pm from pymc.distributions.dist_math import betaln, check_parameters, factln, logpow +from pymc.distributions.distribution import Discrete from pymc.distributions.shape_utils import rv_size_is_none from pytensor import tensor as pt from pytensor.tensor.random.op import RandomVariable @@ -399,3 +400,186 @@ def dist(cls, mu1, mu2, **kwargs): class_name="Skellam", **kwargs, ) + + +class GrassiaIIGeometricRV(RandomVariable): + name = "g2g" + signature = "(),(),()->()" + + dtype = "int64" + _print_name = ("GrassiaIIGeometric", "\\operatorname{GrassiaIIGeometric}") + + @classmethod + def rng_fn(cls, rng, r, alpha, time_covariate_vector, size): + # Determine output size + if size is None: + size = np.broadcast_shapes(r.shape, alpha.shape, time_covariate_vector.shape) + + # Broadcast parameters to output size + r = np.broadcast_to(r, size) + alpha = np.broadcast_to(alpha, size) + time_covariate_vector = np.broadcast_to(time_covariate_vector, size) + + lam = rng.gamma(shape=r, scale=1 / alpha, size=size) + + # Calculate exp(time_covariate_vector) for all samples + exp_time_covar = np.exp( + time_covariate_vector + ).mean() # must average over time for correct broadcasting + lam_covar = lam * exp_time_covar + + samples = np.ceil(rng.exponential(size=size) / lam_covar) + + return samples + + +g2g = GrassiaIIGeometricRV() + + +# TODO: Add covariate expressions to docstrings. +class GrassiaIIGeometric(Discrete): + r"""Grassia(II)-Geometric distribution. + + This distribution is a flexible alternative to the Geometric distribution for the number of trials until a + discrete event, and can be extended to support both static and time-varying covariates. + + Hardie and Fader describe this distribution with the following PMF and survival functions in [1]_: + + .. math:: + \mathbb{P}T=t|r,\alpha,\beta;Z(t)) = (\frac{\alpha}{\alpha+C(t-1)})^{r} - (\frac{\alpha}{\alpha+C(t)})^{r} \\ + \begin{align} + \mathbb{S}(t|r,\alpha,\beta;Z(t)) = (\frac{\alpha}{\alpha+C(t)})^{r} \\ + \end{align} + + .. plot:: + :context: close-figs + + import matplotlib.pyplot as plt + import numpy as np + import scipy.stats as st + import arviz as az + plt.style.use('arviz-darkgrid') + t = np.arange(1, 11) + alpha_vals = [1., 1., 2., 2.] + r_vals = [.1, .25, .5, 1.] + for alpha, r in zip(alpha_vals, r_vals): + pmf = (alpha/(alpha + t - 1))**r - (alpha/(alpha+t))**r + plt.plot(t, pmf, '-o', label=r'$\alpha$ = {}, $r$ = {}'.format(alpha, r)) + plt.xlabel('t', fontsize=12) + plt.ylabel('p(t)', fontsize=12) + plt.legend(loc=1) + plt.show() + + ======== =============================================== + Support :math:`t \in \mathbb{N}_{>0}` + ======== =============================================== + + Parameters + ---------- + r : tensor_like of float + Shape parameter (r > 0). + alpha : tensor_like of float + Scale parameter (alpha > 0). + time_covariate_vector : tensor_like of float, optional + Optional vector containing dot products of time-varying covariates and coefficients. + + References + ---------- + .. [1] Fader, Peter & G. S. Hardie, Bruce (2020). + "Incorporating Time-Varying Covariates in a Simple Mixture Model for Discrete-Time Duration Data." + https://www.brucehardie.com/notes/037/time-varying_covariates_in_BG.pdf + """ + + rv_op = g2g + + @classmethod + def dist(cls, r, alpha, time_covariate_vector=None, *args, **kwargs): + r = pt.as_tensor_variable(r) + alpha = pt.as_tensor_variable(alpha) + if time_covariate_vector is None: + time_covariate_vector = pt.constant(0.0) + time_covariate_vector = pt.as_tensor_variable(time_covariate_vector) + return super().dist([r, alpha, time_covariate_vector], *args, **kwargs) + + def logp(value, r, alpha, time_covariate_vector): + logp = pt.log( + pt.pow(alpha / (alpha + C_t(value - 1, time_covariate_vector)), r) + - pt.pow(alpha / (alpha + C_t(value, time_covariate_vector)), r) + ) + + # Handle invalid values + logp = pt.switch( + pt.or_( + value < 1, # Value must be >= 1 + pt.isnan(logp), # Handle NaN cases + ), + -np.inf, + logp, + ) + + return check_parameters( + logp, + r > 0, + alpha > 0, + msg="r > 0, alpha > 0", + ) + + def logcdf(value, r, alpha, time_covariate_vector): + logcdf = r * ( + pt.log(C_t(value, time_covariate_vector)) + - pt.log(alpha + C_t(value, time_covariate_vector)) + ) + + return check_parameters( + logcdf, + r > 0, + alpha > 0, + msg="r > 0, alpha > 0", + ) + + def support_point(rv, size, r, alpha, time_covariate_vector): + """Calculate a reasonable starting point for sampling. + + For the GrassiaIIGeometric distribution, we use a point estimate based on + the expected value of the mixing distribution. Since the mixing distribution + is Gamma(r, 1/alpha), its mean is r/alpha. We then transform this through + the geometric link function and round to ensure an integer value. + + When time_covariate_vector is provided, it affects the expected value through + the exponential link function: exp(time_covariate_vector). + """ + + base_lambda = r / alpha + + # Approximate expected value of geometric distribution + mean = pt.switch( + base_lambda < 0.1, + 1.0 / base_lambda, # Approximation for small lambda + 1.0 / (1.0 - pt.exp(-base_lambda)), # Full expression for larger lambda + ) + + # Apply time covariates if provided + mean = mean * pt.exp(time_covariate_vector) + + # Round up to nearest integer and ensure >= 1 + mean = pt.maximum(pt.ceil(mean), 1.0) + + # Handle size parameter + if not rv_size_is_none(size): + mean = pt.full(size, mean) + + return mean + + +def C_t(t: pt.TensorVariable, time_covariate_vector: pt.TensorVariable) -> pt.TensorVariable: + """Utility for processing time-varying covariates in GrassiaIIGeometric distribution.""" + if time_covariate_vector.ndim == 0: + return t + else: + # Ensure t is a valid index + t_idx = pt.maximum(0, t - 1) # Convert to 0-based indexing + # If t_idx exceeds length of time_covariate_vector, use last value + max_idx = pt.shape(time_covariate_vector)[0] - 1 + safe_idx = pt.minimum(t_idx, max_idx) + covariate_value = time_covariate_vector[safe_idx] + return t * pt.exp(covariate_value) diff --git a/tests/distributions/test_discrete.py b/tests/distributions/test_discrete.py index 8e803a31b..0945b9408 100644 --- a/tests/distributions/test_discrete.py +++ b/tests/distributions/test_discrete.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import numpy as np import pymc as pm import pytensor @@ -26,6 +27,7 @@ Rplus, assert_support_point_is_expected, check_logp, + check_selfconsistency_discrete_logcdf, discrete_random_tester, ) from pytensor import config @@ -33,6 +35,7 @@ from pymc_extras.distributions import ( BetaNegativeBinomial, GeneralizedPoisson, + GrassiaIIGeometric, Skellam, ) @@ -208,3 +211,161 @@ def test_logp(self): {"mu1": Rplus_small, "mu2": Rplus_small}, lambda value, mu1, mu2: scipy.stats.skellam.logpmf(value, mu1, mu2), ) + + +class TestGrassiaIIGeometric: + class TestRandomVariable(BaseTestDistributionRandom): + pymc_dist = GrassiaIIGeometric + pymc_dist_params = {"r": 0.5, "alpha": 2.0, "time_covariate_vector": 0.0} + expected_rv_op_params = {"r": 0.5, "alpha": 2.0, "time_covariate_vector": 0.0} + tests_to_run = [ + "check_pymc_params_match_rv_op", + "check_rv_size", + ] + + def test_random_basic_properties(self): + """Test basic random sampling properties""" + # Test with standard parameter values + r_vals = [0.5, 1.0, 2.0] + alpha_vals = [0.5, 1.0, 2.0] + time_cov_vals = [-1.0, 1.0, 2.0] + + for r in r_vals: + for alpha in alpha_vals: + for time_cov in time_cov_vals: + dist = self.pymc_dist.dist( + r=r, alpha=alpha, time_covariate_vector=time_cov, size=1000 + ) + draws = dist.eval() + + # Check basic properties + assert np.all(draws > 0) + assert np.all(draws.astype(int) == draws) + assert np.mean(draws) > 0 + assert np.var(draws) > 0 + + def test_random_edge_cases(self): + """Test edge cases with more reasonable parameter values""" + # Test with small r and large alpha values + r_vals = [0.1, 0.5] + alpha_vals = [5.0, 10.0] + time_cov_vals = [0.0, 1.0] + + for r in r_vals: + for alpha in alpha_vals: + for time_cov in time_cov_vals: + dist = self.pymc_dist.dist( + r=r, alpha=alpha, time_covariate_vector=time_cov, size=1000 + ) + draws = dist.eval() + + # Check basic properties + assert np.all(draws > 0) + assert np.all(draws.astype(int) == draws) + assert np.mean(draws) > 0 + assert np.var(draws) > 0 + + def test_random_none_covariates(self): + """Test random sampling with None time_covariate_vector""" + r_vals = [0.5, 1.0, 2.0] + alpha_vals = [0.5, 1.0, 2.0] + + for r in r_vals: + for alpha in alpha_vals: + dist = self.pymc_dist.dist( + r=r, + alpha=alpha, + time_covariate_vector=0.0, # Changed from None to avoid zip issues + size=1000, + ) + draws = dist.eval() + + # Check basic properties + assert np.all(draws > 0) + assert np.all(draws.astype(int) == draws) + assert np.mean(draws) > 0 + assert np.var(draws) > 0 + + @pytest.mark.parametrize( + "r,alpha,time_covariate_vector", + [ + (0.5, 1.0, 0.0), + (1.0, 2.0, 1.0), + (2.0, 0.5, -1.0), + (5.0, 1.0, 0.0), # Changed from None to avoid zip issues + ], + ) + def test_random_moments(self, r, alpha, time_covariate_vector): + dist = self.pymc_dist.dist( + r=r, alpha=alpha, time_covariate_vector=time_covariate_vector, size=10_000 + ) + draws = dist.eval() + + assert np.all(draws > 0) + assert np.all(draws.astype(int) == draws) + assert np.mean(draws) > 0 + assert np.var(draws) > 0 + + def test_logp_basic(self): + # Create PyTensor variables with explicit values to ensure proper initialization + r = pt.as_tensor_variable(1.0) + alpha = pt.as_tensor_variable(2.0) + time_covariate_vector = pt.as_tensor_variable(0.5) + value = pt.vector("value", dtype="int64") + + # Create the distribution with the PyTensor variables + dist = GrassiaIIGeometric.dist(r, alpha, time_covariate_vector) + logp = pm.logp(dist, value) + logp_fn = pytensor.function([value], logp) + + # Test basic properties of logp + test_value = np.array([1, 2, 3, 4, 5]) + + logp_vals = logp_fn(test_value) + assert not np.any(np.isnan(logp_vals)) + assert np.all(np.isfinite(logp_vals)) + + # Test invalid values + assert logp_fn(np.array([0])) == -np.inf # Value must be > 0 + + with pytest.raises(TypeError): + logp_fn(np.array([1.5])) # Value must be integer + + def test_logcdf(self): + # test logcdf matches log sums across parameter values + check_selfconsistency_discrete_logcdf( + GrassiaIIGeometric, I, {"r": Rplus, "alpha": Rplus, "time_covariate_vector": I} + ) + + @pytest.mark.parametrize( + "r, alpha, time_covariate_vector, size, expected_shape", + [ + (1.0, 1.0, 0.0, None, ()), # Scalar output with no covariates (0.0 instead of None) + ([1.0, 2.0], 1.0, 0.0, None, (2,)), # Vector output from r + (1.0, [1.0, 2.0], 0.0, None, (2,)), # Vector output from alpha + (1.0, 1.0, [1.0, 2.0], None, (2,)), # Vector output from time covariates + (1.0, 1.0, 1.0, (3, 2), (3, 2)), # Explicit size with scalar time covariates + ], + ) + def test_support_point(self, r, alpha, time_covariate_vector, size, expected_shape): + """Test that support_point returns reasonable values with correct shapes""" + with pm.Model() as model: + GrassiaIIGeometric( + "x", r=r, alpha=alpha, time_covariate_vector=time_covariate_vector, size=size + ) + + init_point = model.initial_point()["x"] + + # Check shape + assert init_point.shape == expected_shape + + # Check values are positive integers + assert np.all(init_point > 0) + assert np.all(init_point.astype(int) == init_point) + + # Check values are finite and reasonable + assert np.all(np.isfinite(init_point)) + assert np.all(init_point < 1e6) # Should not be extremely large + + # TODO: expected values must be provided + # assert_support_point_is_expected(model, init_point)