Skip to content

Make basic INLA interface and simple marginalisation routine #533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pymc_extras/inference/fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,17 @@ def fit(method: str, **kwargs) -> az.InferenceData:

return fit_pathfinder(**kwargs)

if method == "laplace":
elif method == "laplace":
from pymc_extras.inference.laplace import fit_laplace

return fit_laplace(**kwargs)

elif method == "INLA":
from pymc_extras.inference.laplace import fit_INLA

return fit_INLA(**kwargs)

else:
raise ValueError(
f"method '{method}' not supported. Use one of 'pathfinder', 'laplace' or 'INLA'."
)
164 changes: 164 additions & 0 deletions pymc_extras/inference/inla.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import arviz as az
import numpy as np
import pymc as pm
import pytensor
import pytensor.tensor as pt

from better_optimize.constants import minimize_method
from numpy.typing import ArrayLike
from pytensor.tensor import TensorVariable
from pytensor.tensor.optimize import minimize


def get_conditional_gaussian_approximation(
x: TensorVariable,
Q: TensorVariable | ArrayLike,
mu: TensorVariable | ArrayLike,
model: pm.Model | None = None,
method: minimize_method = "BFGS",
use_jac: bool = True,
use_hess: bool = False,
optimizer_kwargs: dict | None = None,
) -> list[TensorVariable]:
"""
Returns an estimate the a posteriori probability of a latent Gaussian field x and its mode x0 using the Laplace approximation.

That is:
y | x, sigma ~ N(Ax, sigma^2 W)
x | params ~ N(mu, Q(params)^-1)

We seek to estimate p(x | y, params) with a Gaussian:

log(p(x | y, params)) = log(p(y | x, params)) + log(p(x | params)) + const

Let f(x) = log(p(y | x, params)). From the definition of our model above, we have log(p(x | params)) = -0.5*(x - mu).T Q (x - mu) + 0.5*logdet(Q).

This gives log(p(x | y, params)) = f(x) - 0.5*(x - mu).T Q (x - mu) + 0.5*logdet(Q). We will estimate this using the Laplace approximation by Taylor expanding f(x) about the mode.

Thus:

1. Maximize log(p(x | y, params)) = f(x) - 0.5*(x - mu).T Q (x - mu) wrt x (note that logdet(Q) does not depend on x) to find the mode x0.

2. Use the Laplace approximation expanded about the mode: p(x | y, params) ~= N(mu=x0, tau=Q - f''(x0)).

Parameters
----------
x: TensorVariable
The parameter with which to maximize wrt (that is, find the mode in x). In INLA, this is the latent Gaussian field x~N(mu,Q^-1).
Q: TensorVariable | ArrayLike
The precision matrix of the latent field x.
mu: TensorVariable | ArrayLike
The mean of the latent field x.
model: Model
PyMC model to use.
method: minimize_method
Which minimization algorithm to use.
use_jac: bool
If true, the minimizer will compute the gradient of log(p(x | y, params)).
use_hess: bool
If true, the minimizer will compute the Hessian log(p(x | y, params)).
optimizer_kwargs: dict
Kwargs to pass to scipy.optimize.minimize.

Returns
-------
x0, p(x | y, params): list[TensorVariable]
Mode and Laplace approximation for posterior.
"""
model = pm.modelcontext(model)

# f = log(p(y | x, params))
f_x = model.logp()

# log(p(x | y, params)) only including terms that depend on x for the minimization step (logdet(Q) ignored as it is a constant wrt x)
log_x_posterior = f_x - 0.5 * (x - mu).T @ Q @ (x - mu)

# Maximize log(p(x | y, params)) wrt x to find mode x0
x0, _ = minimize(
objective=-log_x_posterior,
x=x,
method=method,
jac=use_jac,
hess=use_hess,
optimizer_kwargs=optimizer_kwargs,
)

# require f''(x0) for Laplace approx
hess = pytensor.gradient.hessian(f_x, x)
hess = pytensor.graph.replace.graph_replace(hess, {x: x0})

# Could be made more efficient with adding diagonals only
tau = Q - hess

# Currently x is passed both as the query point for f(x, args) = logp(x | y, params) AND as an initial guess for x0. This may cause issues if the query point is
# far from the mode x0 or in a neighbourhood which results in poor convergence.
return x0, pm.MvNormal(f"{x.name}_laplace_approx", mu=x0, tau=tau)


def get_log_marginal_likelihood(
x: TensorVariable,
Q: TensorVariable | ArrayLike,
mu: TensorVariable | ArrayLike,
model: pm.Model | None = None,
method: minimize_method = "BFGS",
use_jac: bool = True,
use_hess: bool = False,
optimizer_kwargs: dict | None = None,
) -> TensorVariable:
model = pm.modelcontext(model)

x0, laplace_approx = get_conditional_gaussian_approximation(
x, Q, mu, model, method, use_jac, use_hess, optimizer_kwargs
)
log_laplace_approx = pm.logp(laplace_approx, model.rvs_to_values[x])

_, logdetQ = pt.nlinalg.slogdet(Q)
log_x_likelihood = (
-0.5 * (x - mu).T @ Q @ (x - mu) + 0.5 * logdetQ - 0.5 * x.shape[0] * np.log(2 * np.pi)
)

log_likelihood = ( # logp(y | params) =
model.logp() # logp(y | x, params)
+ log_x_likelihood # * logp(x | params)
- log_laplace_approx # / logp(x | y, params)
)

return log_likelihood


def fit_INLA(
x: TensorVariable,
Q: TensorVariable | ArrayLike,
mu: TensorVariable | ArrayLike,
model: pm.Model | None = None,
method: minimize_method = "BFGS",
use_jac: bool = True,
use_hess: bool = False,
optimizer_kwargs: dict | None = None,
) -> az.InferenceData:
model = pm.modelcontext(model)

# logp(y | params)
log_likelihood = get_log_marginal_likelihood(
x, Q, mu, model, method, use_jac, use_hess, optimizer_kwargs
)

# TODO How to obtain prior? It can parametrise Q, mu, y, etc. Not sure if we could extract from model.logp somehow. Otherwise simply specify as a user input
prior = None
params = None
log_prior = pm.logp(prior, model.rvs_to_values[params])

# logp(params | y) = logp(y | params) + logp(params) + const
log_posterior = log_likelihood + log_prior

# TODO log_marginal_x_likelihood is almost the same as log_likelihood, but need to do some sampling?
log_marginal_x_likelihood = None
log_marginal_x_posterior = log_marginal_x_likelihood + log_prior

# TODO can we sample over log likelihoods?
# Marginalize params
idata_params = log_posterior.sample() # TODO something like NUTS, QMC, etc.?
idata_x = log_marginal_x_posterior.sample()

# Bundle up idatas somehow
return idata_params, idata_x
100 changes: 0 additions & 100 deletions pymc_extras/inference/laplace.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import logging

from collections.abc import Callable
from functools import reduce
from importlib.util import find_spec
from itertools import product
Expand All @@ -30,7 +29,6 @@

from arviz import dict_to_dataset
from better_optimize.constants import minimize_method
from numpy.typing import ArrayLike
from pymc import DictToArrayBijection
from pymc.backends.arviz import (
coords_and_dims_for_inferencedata,
Expand All @@ -41,8 +39,6 @@
from pymc.model.transform.conditioning import remove_value_transforms
from pymc.model.transform.optimization import freeze_dims_and_data
from pymc.util import get_default_varnames
from pytensor.tensor import TensorVariable
from pytensor.tensor.optimize import minimize
from scipy import stats

from pymc_extras.inference.find_map import (
Expand All @@ -56,102 +52,6 @@
_log = logging.getLogger(__name__)


def get_conditional_gaussian_approximation(
x: TensorVariable,
Q: TensorVariable | ArrayLike,
mu: TensorVariable | ArrayLike,
args: list[TensorVariable] | None = None,
model: pm.Model | None = None,
method: minimize_method = "BFGS",
use_jac: bool = True,
use_hess: bool = False,
optimizer_kwargs: dict | None = None,
) -> Callable:
"""
Returns a function to estimate the a posteriori log probability of a latent Gaussian field x and its mode x0 using the Laplace approximation.

That is:
y | x, sigma ~ N(Ax, sigma^2 W)
x | params ~ N(mu, Q(params)^-1)

We seek to estimate log(p(x | y, params)):

log(p(x | y, params)) = log(p(y | x, params)) + log(p(x | params)) + const

Let f(x) = log(p(y | x, params)). From the definition of our model above, we have log(p(x | params)) = -0.5*(x - mu).T Q (x - mu) + 0.5*logdet(Q).

This gives log(p(x | y, params)) = f(x) - 0.5*(x - mu).T Q (x - mu) + 0.5*logdet(Q). We will estimate this using the Laplace approximation by Taylor expanding f(x) about the mode.

Thus:

1. Maximize log(p(x | y, params)) = f(x) - 0.5*(x - mu).T Q (x - mu) wrt x (note that logdet(Q) does not depend on x) to find the mode x0.

2. Substitute x0 into the Laplace approximation expanded about the mode: log(p(x | y, params)) ~= -0.5*x.T (-f''(x0) + Q) x + x.T (Q.mu + f'(x0) - f''(x0).x0) + 0.5*logdet(Q).

Parameters
----------
x: TensorVariable
The parameter with which to maximize wrt (that is, find the mode in x). In INLA, this is the latent field x~N(mu,Q^-1).
Q: TensorVariable | ArrayLike
The precision matrix of the latent field x.
mu: TensorVariable | ArrayLike
The mean of the latent field x.
args: list[TensorVariable]
Args to supply to the compiled function. That is, (x0, logp) = f(x, *args). If set to None, assumes the model RVs are args.
model: Model
PyMC model to use.
method: minimize_method
Which minimization algorithm to use.
use_jac: bool
If true, the minimizer will compute the gradient of log(p(x | y, params)).
use_hess: bool
If true, the minimizer will compute the Hessian log(p(x | y, params)).
optimizer_kwargs: dict
Kwargs to pass to scipy.optimize.minimize.

Returns
-------
f: Callable
A function which accepts a value of x and args and returns [x0, log(p(x | y, params))], where x0 is the mode. x is currently both the point at which to evaluate logp and the initial guess for the minimizer.
"""
model = pm.modelcontext(model)

if args is None:
args = model.continuous_value_vars + model.discrete_value_vars

# f = log(p(y | x, params))
f_x = model.logp()
jac = pytensor.gradient.grad(f_x, x)
hess = pytensor.gradient.jacobian(jac.flatten(), x)

# log(p(x | y, params)) only including terms that depend on x for the minimization step (logdet(Q) ignored as it is a constant wrt x)
log_x_posterior = f_x - 0.5 * (x - mu).T @ Q @ (x - mu)

# Maximize log(p(x | y, params)) wrt x to find mode x0
x0, _ = minimize(
objective=-log_x_posterior,
x=x,
method=method,
jac=use_jac,
hess=use_hess,
optimizer_kwargs=optimizer_kwargs,
)

# require f'(x0) and f''(x0) for Laplace approx
jac = pytensor.graph.replace.graph_replace(jac, {x: x0})
hess = pytensor.graph.replace.graph_replace(hess, {x: x0})

# Full log(p(x | y, params)) using the Laplace approximation (up to a constant)
_, logdetQ = pt.nlinalg.slogdet(Q)
conditional_gaussian_approx = (
-0.5 * x.T @ (-hess + Q) @ x + x.T @ (Q @ mu + jac - hess @ x0) + 0.5 * logdetQ
)

# Currently x is passed both as the query point for f(x, args) = logp(x | y, params) AND as an initial guess for x0. This may cause issues if the query point is
# far from the mode x0 or in a neighbourhood which results in poor convergence.
return pytensor.function(args, [x0, conditional_gaussian_approx])


def laplace_draws_to_inferencedata(
posterior_draws: list[np.ndarray[float | int]], model: pm.Model | None = None
) -> az.InferenceData:
Expand Down
Loading