"""Extensions to scipy.stats functions."""
from copy import deepcopy
import numpy as np
import scipy.stats
from numpy.linalg import cholesky, inv
from scipy.special import erf, logsumexp
from lsbi.utils import bisect, logdet
[docs]class multivariate_normal(object):
"""Vectorised multivariate normal distribution.
This extends scipy.stats.multivariate_normal to allow for vectorisation across
the distribution parameters mean and cov.
Implemented with the same style as scipy.stats.multivariate_normal, except that
results are not squeezed.
mean and cov are lazily broadcasted to the same shape to improve performance.
Parameters
----------
mean : array_like, shape `(..., dim)`
Mean of each component.
cov array_like, shape `(..., dim, dim)`if diagonal is False else shape `(..., dim)`
Covariance matrix of each component.
shape: tuple, optional, default=()
Shape of the distribution. Useful for forcing a broadcast beyond that
inferred by mean and cov shapes
dim: int, optional, default=0
Dimension of the distribution. Useful for forcing a broadcast beyond that
inferred by mean and cov dimensions
diagonal: bool, optional, default=False
If True, cov is interpreted as the diagonal of the covariance matrix.
"""
def __init__(self, mean=0, cov=1, shape=(), dim=1, diagonal=False):
self.mean = mean
self.cov = cov
self._shape = shape
self._dim = dim
self.diagonal = diagonal
if len(np.shape(self.cov)) < 2:
self.diagonal = True
@property
def shape(self):
"""Shape of the distribution."""
return np.broadcast_shapes(
np.shape(self.mean)[:-1],
np.shape(self.cov)[: -2 + self.diagonal],
self._shape,
)
@property
def dim(self):
"""Dimension of the distribution."""
return np.max(
[
*np.shape(self.mean)[-1:],
*np.shape(self.cov)[-2 + self.diagonal :],
self._dim,
]
)
[docs] def logpdf(self, x, broadcast=False):
"""Log of the probability density function.
Parameters
----------
x : array_like, shape `(*size, dim)`
Points at which to evaluate the log of the probability density
function.
broadcast : bool, optional, default=False
If True, broadcast x across the shape of the distribution
Returns
-------
logpdf : array_like
Log of the probability density function evaluated at x.
if not broadcast: shape `(*size, *self.shape)`
else: shape broadcast of `(*size,) and `self.shape`
"""
x = np.array(x)
if broadcast:
dx = x - self.mean
else:
size = x.shape[:-1]
mean = np.broadcast_to(self.mean, (*self.shape, self.dim))
dx = x.reshape(*size, *np.ones_like(self.shape), self.dim) - mean
if self.diagonal:
chi2 = (dx**2 / self.cov).sum(axis=-1)
norm = -np.log(2 * np.pi * np.ones(self.dim) * self.cov).sum(axis=-1) / 2
else:
chi2 = np.einsum("...j,...jk,...k->...", dx, inv(self.cov), dx)
norm = -logdet(2 * np.pi * self.cov) / 2
return norm - chi2 / 2
[docs] def pdf(self, x, broadcast=False):
"""Probability density function.
Parameters
----------
x : array_like, shape `(*size, dim)`
Points at which to evaluate the probability density function.
broadcast : bool, optional, default=False
If True, broadcast x across the distribution parameters.
Returns
-------
pdf : array_like
Probability density function evaluated at x.
if not broadcast: shape `(*size, *self.shape)`
else: shape broadcast of `(*size,) and `self.shape`
"""
return np.exp(self.logpdf(x, broadcast=broadcast))
[docs] def rvs(self, size=(), broadcast=False):
"""Draw random samples from the distribution.
Parameters
----------
size : int or tuple of ints, optional, default=()
Number of samples to draw.
broadcast : bool, optional, default=False
If True, broadcast x across the distribution parameters.
Returns
-------
rvs : ndarray, shape `(*size, *shape, dim)`
Random samples from the distribution.
"""
size = np.atleast_1d(size)
if broadcast:
x = np.random.randn(*size, self.dim)
else:
x = np.random.randn(*size, *self.shape, self.dim)
if self.diagonal:
return self.mean + np.sqrt(self.cov) * x
else:
return self.mean + np.einsum("...jk,...k->...j", cholesky(self.cov), x)
[docs] def predict(self, A=1, b=0, diagonal=False):
"""Predict the mean and covariance of a linear transformation.
if: x ~ N(μ, Σ)
then: Ax + b ~ N(A μ + b, A Σ A^T)
Parameters
----------
A : array_like, shape `(..., k, dim)`
Linear transformation matrix.
b : array_like, shape `(..., k)`, optional
Linear transformation vector.
diagonal : bool, optional, default=False
If True, A is interpreted as the diagonal of the transformation matrix.
where self.shape is broadcastable to ...
Returns
-------
transformed distribution shape `(..., k)`
"""
if len(np.shape(A)) < 2:
diagonal = True
dist = deepcopy(self)
if diagonal:
dist.mean = A * self.mean + b
if self.diagonal:
dist.cov = A * self.cov * A
else:
dist.cov = (
self.cov
* np.atleast_1d(A)[..., None]
* np.atleast_1d(A)[..., None, :]
)
else:
dist.mean = (
np.einsum("...qn,...n->...q", A, np.ones(self.dim) * self.mean) + b
)
if self.diagonal:
dist.cov = np.einsum(
"...qn,...pn->...qp", A, A * np.atleast_1d(self.cov)[..., None, :]
)
dist.diagonal = False
else:
dist.cov = np.einsum("...qn,...nm,...pm->...qp", A, self.cov, A)
dist._dim = np.shape(A)[-2]
return dist
[docs] def marginalise(self, indices):
"""Marginalise over indices.
Parameters
----------
indices : array_like
Indices to marginalise.
Returns
-------
marginalised distribution, shape `(*shape, dim - len(indices))`
"""
dist = deepcopy(self)
i = self._bar(indices)
dist.mean = (np.ones(self.dim) * self.mean)[..., i]
if self.diagonal:
dist.cov = (np.ones(self.dim) * self.cov)[..., i]
else:
dist.cov = self.cov[..., i, :][..., i]
dist._dim = sum(i)
return dist
[docs] def condition(self, indices, values):
"""Condition on indices with values.
Parameters
----------
indices : array_like
Indices to condition over.
values : array_like shape `(..., len(indices))`
Values to condition on.
where where self.shape is broadcastable to ...
Returns
-------
conditioned distribution shape `(..., len(indices))`
"""
i = self._bar(indices)
k = indices
dist = deepcopy(self)
dist.mean = (np.ones(self.dim) * self.mean)[..., i]
if self.diagonal:
dist.cov = (np.ones(self.dim) * self.cov)[..., i]
dist._shape = np.broadcast_shapes(self.shape, values.shape[:-1])
else:
dist.mean = dist.mean + np.einsum(
"...ja,...ab,...b->...j",
self.cov[..., i, :][..., :, k],
inv(self.cov[..., k, :][..., :, k]),
values - (np.ones(self.dim) * self.mean)[..., k],
)
dist.cov = self.cov[..., i, :][..., :, i] - np.einsum(
"...ja,...ab,...bk->...jk",
self.cov[..., i, :][..., :, k],
inv(self.cov[..., k, :][..., :, k]),
self.cov[..., k, :][..., :, i],
)
dist._dim = sum(i)
return dist
def _bar(self, indices):
"""Return the indices not in the given indices."""
k = np.ones(self.dim, dtype=bool)
k[indices] = False
return k
[docs] def bijector(self, x, inverse=False):
"""Bijector between U([0, 1])^d and the distribution.
- x in [0, 1]^d is the hypercube space.
- theta in R^d is the physical space.
Computes the transformation from x to theta or theta to x depending on
the value of inverse.
Parameters
----------
x : array_like, shape `(..., dim)`
if inverse: x is theta
else: x is x
inverse : bool, optional, default=False
If True: compute the inverse transformation from physical to
hypercube space.
where self.shape is broadcastable to ...
Returns
-------
transformed x or theta: array_like, shape (..., dim)
"""
x = np.array(x)
mean = np.broadcast_to(self.mean, (*self.shape, self.dim))
if inverse:
if self.diagonal:
y = (x - mean) / np.sqrt(self.cov)
else:
y = np.einsum("...jk,...k->...j", inv(cholesky(self.cov)), x - mean)
return scipy.stats.norm.cdf(y)
else:
y = scipy.stats.norm.ppf(x)
if self.diagonal:
return mean + np.sqrt(self.cov) * y
else:
L = cholesky(self.cov)
return mean + np.einsum("...jk,...k->...j", L, y)
def __getitem__(self, arg):
"""Access a subset of the distributions.
Parameters
----------
arg : int or slice or tuple of ints or tuples
Indices to access.
Returns
-------
dist : distribution
A subset of the distribution
Examples
--------
>>> dist = multivariate_normal(shape=(2, 3), dim=4)
>>> dist.shape
(2, 3)
>>> dist.dim
4
>>> dist[0].shape
(3,)
>>> dist[0, 0].shape
()
>>> dist[:, 0].shape
(2,)
"""
dist = deepcopy(self)
dist.mean = np.broadcast_to(self.mean, (*self.shape, self.dim))[arg]
if self.diagonal:
dist.cov = np.broadcast_to(self.cov, (*self.shape, self.dim))[arg]
else:
dist.cov = np.broadcast_to(self.cov, (*self.shape, self.dim, self.dim))[arg]
dist._shape = dist.mean.shape[:-1]
dist._dim = dist.mean.shape[-1]
return dist
[docs]class mixture_normal(multivariate_normal):
"""Mixture of multivariate normal distributions.
Broadcastable multivariate mixture model.
Parameters
----------
mean : array_like, shape `(..., n, dim)`
Mean of each component.
cov: array_like, shape `(..., n, dim, dim)`
Covariance matrix of each component.
logw: array_like, shape `(..., n)`
Log of the mixing weights.
shape: tuple, optional, default=()
Shape of the distribution. Useful for forcing a broadcast beyond that
inferred by mean and cov shapes
dim: int, optional, default=0
Dimension of the distribution. Useful for forcing a broadcast beyond that
inferred by mean and cov shapes
diagonal: bool, optional, default=False
If True, cov is interpreted as the diagonal of the covariance matrix.
"""
def __init__(self, logw=0, mean=0, cov=1, shape=(), dim=1, diagonal=False):
self.logw = logw
super().__init__(mean, cov, shape, dim, diagonal)
@property
def shape(self):
"""Shape of the distribution."""
return np.broadcast_shapes(np.shape(self.logw), super().shape)
@property
def k(self):
"""Number of components."""
if self.shape == ():
return 1
return self.shape[-1]
[docs] def logpdf(self, x, broadcast=False, joint=False):
"""Log of the probability density function.
Parameters
----------
x : array_like, shape `(*size, dim)`
Points at which to evaluate the log of the probability density
function.
broadcast : bool, optional, default=False
If True, broadcast x across the distribution parameters.
joint : bool, optional, default=False
If True, return the joint logpdf of the mixture P(x, N)
Returns
-------
logpdf : array_like
Log of the probability density function evaluated at x.
if not broadcast and not joint: shape `(*size, *shape[:-1])`
elif not broadcast and joint: shape `(*size, *shape)`
elif not joint: shape the broadcast of `(*size,) and `shape[:-1]`
else: shape the broadcast of `(*size,) and `shape`
"""
if broadcast:
x = np.expand_dims(x, -2)
logpdf = super().logpdf(x, broadcast=broadcast)
if self.shape == ():
return logpdf
logw = np.broadcast_to(self.logw, self.shape).copy()
logw = logw - logsumexp(logw, axis=-1, keepdims=True)
if joint:
return logpdf + logw
return logsumexp(logpdf + logw, axis=-1)
[docs] def pdf(self, x, broadcast=False, joint=False):
"""Probability density function.
Parameters
----------
x : array_like, shape `(*size, dim)`
Points at which to evaluate the probability density function.
broadcast : bool, optional, default=False
If True, broadcast x across the distribution parameters.
joint : bool, optional, default=False
If True, return the joint pdf of the mixture P(x, N)
Returns
-------
pdf :
Probability density function evaluated at x.
if not broadcast and not joint: shape `(*size, *shape[:-1])`
elif not broadcast and joint: shape `(*size, *shape)`
elif not joint: shape the broadcast of `(*size,) and `shape[:-1]`
else: shape the broadcast of `(*size,) and `shape`
"""
return np.exp(self.logpdf(x, broadcast=broadcast, joint=joint))
[docs] def rvs(self, size=(), broadcast=False):
"""Draw random samples from the distribution.
Parameters
----------
size : int or tuple of ints, optional, default=1
Number of samples to draw.
broadcast : bool, optional, default=False
If True, broadcast x across the distribution parameters.
Returns
-------
rvs : array_like, shape `(*size, *shape[:-1], dim)`
"""
if self.shape == ():
return super().rvs(size=size, broadcast=broadcast)
size = np.atleast_1d(size)
logw = np.broadcast_to(self.logw, self.shape).copy()
logw = logw - logsumexp(logw, axis=-1, keepdims=True)
p = np.exp(logw)
cump = np.cumsum(p, axis=-1)
if broadcast:
u = np.random.rand(*size).reshape(-1, *p.shape[:-1])
else:
u = np.random.rand(np.prod(size, dtype=int), *p.shape[:-1])
i = np.argmax(np.array(u)[..., None] < cump, axis=-1)
mean = np.broadcast_to(self.mean, (*self.shape, self.dim))
mean = np.take_along_axis(np.moveaxis(mean, -2, 0), i[..., None], axis=0)
if broadcast:
x = np.random.randn(*size, self.dim)
else:
x = np.random.randn(np.prod(size, dtype=int), *self.shape[:-1], self.dim)
if self.diagonal:
L = np.sqrt(self.cov)
L = np.broadcast_to(L, (*self.shape, self.dim))
L = np.take_along_axis(np.moveaxis(L, -2, 0), i[..., None], axis=0)
rvs = mean + L * x
else:
L = cholesky(self.cov)
L = np.broadcast_to(L, (*self.shape, self.dim, self.dim))
L = np.take_along_axis(np.moveaxis(L, -3, 0), i[..., None, None], axis=0)
rvs = mean + np.einsum("...ij,...j->...i", L, x)
if broadcast:
return rvs.reshape(*size, self.dim)
else:
return rvs.reshape(*size, *self.shape[:-1], self.dim)
[docs] def condition(self, indices, values):
"""Condition on indices with values.
Parameters
----------
indices : array_like
Indices to condition over.
values : array_like shape `(..., len(indices))`
Values to condition on.
where self.shape[:-1] is broadcastable to ...
Returns
-------
conditioned distribution, shape `(*shape, len(indices))`
"""
dist = super().condition(indices, np.expand_dims(values, -2))
dist.__class__ = mixture_normal
marg = self.marginalise(self._bar(indices))
dist.logw = marg.logpdf(values, broadcast=True, joint=True)
dist.logw = dist.logw - logsumexp(dist.logw, axis=-1, keepdims=True)
return dist
[docs] def bijector(self, x, inverse=False):
"""Bijector between U([0, 1])^d and the distribution.
- x in [0, 1]^d is the hypercube space.
- theta in R^d is the physical space.
Computes the transformation from x to theta or theta to x depending on
the value of inverse.
Parameters
----------
x : array_like, shape `(..., d)`
if inverse: x is theta
else: x is x
inverse : bool, optional, default=False
If True: compute the inverse transformation from physical to
hypercube space.
where self.shape[:-1] is broadcastable to ...
Returns
-------
transformed x or theta: array_like, shape `(..., d)`
"""
x = np.array(x)
theta = np.empty(np.broadcast_shapes(x.shape, self.shape[:-1] + (self.dim,)))
if inverse:
theta[:] = x
x = np.empty(np.broadcast_shapes(x.shape, self.shape[:-1] + (self.dim,)))
for i in range(self.dim):
dist = self.marginalise(np.s_[i + 1 :]).condition(
np.s_[:-1], theta[..., :i]
)
m = np.atleast_1d(dist.mean)[..., 0]
if dist.diagonal:
c = np.atleast_1d(dist.cov)[..., 0]
else:
c = np.atleast_2d(dist.cov)[..., 0, 0]
A = np.exp(dist.logw - logsumexp(dist.logw, axis=-1)[..., None])
m = np.broadcast_to(m, dist.shape)
def f(t):
return (A * 0.5 * (1 + erf((t[..., None] - m) / np.sqrt(2 * c)))).sum(
axis=-1
) - y
if inverse:
y = 0
x[..., i] = f(theta[..., i])
else:
y = x[..., i]
a = (m - 10 * np.sqrt(c)).min(axis=-1)
b = (m + 10 * np.sqrt(c)).max(axis=-1)
theta[..., i] = bisect(f, a, b)
if inverse:
return x
else:
return theta
def __getitem__(self, arg): # noqa: D105
dist = super().__getitem__(arg)
dist.__class__ = mixture_normal
dist.logw = np.broadcast_to(self.logw, self.shape)[arg]
return dist
[docs]def dkl(p, q, n=0):
"""Kullback-Leibler divergence between two distributions.
Parameters
----------
p : lsbi.stats.multivariate_normal
q : lsbi.stats.multivariate_normal
n : int, optional, default=0
Number of samples to mcmc estimate the divergence.
Returns
-------
dkl : array_like
Kullback-Leibler divergence between p and q.
"""
shape = np.broadcast_shapes(p.shape, q.shape)
if n:
x = p.rvs(size=(n, *shape), broadcast=True)
return (p.logpdf(x, broadcast=True) - q.logpdf(x, broadcast=True)).mean(axis=0)
dkl = -p.dim * np.ones(shape)
dkl = dkl + logdet(q.cov * np.ones(q.dim), q.diagonal)
dkl = dkl - logdet(p.cov * np.ones(p.dim), p.diagonal)
pq = (p.mean - q.mean) * np.ones(p.dim)
if q.diagonal:
dkl = dkl + (pq**2 / q.cov).sum(axis=-1)
if p.diagonal:
dkl = dkl + (p.cov / q.cov * np.ones(q.dim)).sum(axis=-1)
else:
dkl = dkl + (np.diagonal(p.cov, 0, -2, -1) / q.cov).sum(axis=-1)
else:
invq = inv(q.cov)
dkl = dkl + np.einsum("...i,...ij,...j->...", pq, invq, pq)
if p.diagonal:
dkl = dkl + (p.cov * np.diagonal(invq, 0, -2, -1)).sum(axis=-1)
else:
dkl = dkl + np.einsum("...ij,...ji->...", invq, p.cov)
return dkl / 2