Source code for Tars.distributions.distribution_samples

import math
import numpy as np
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams

from ..utils import epsilon
from abc import ABCMeta, abstractmethod

__all__ = [
    'DeterministicSample',
    'BernoulliSample',
    'CategoricalSample',
    'GaussianSample',
    'GaussianConstantVarSample',
    'LaplaceSample',
    'GumbelSample',
    'ConcreteSample',
    'BetaSample',
    'DirichletSample',
    'KumaraswamySample',
    'UnitGaussianSample',
    'UnitBernoulliSample',
    'UnitCategoricalSample',
    'UnitBetaSample',
    'UnitDirichletSample',
]


class DistributionSample(object):
    __metaclass__ = ABCMeta

    def __init__(self, seed=1, **kwargs):
        self.srng = RandomStreams(seed)

    def set_seed(self, seed=1):
        self.srng = RandomStreams(seed)

    @abstractmethod
    def sample(self):
        pass

    @abstractmethod
    def log_likelihood(self):
        pass


[docs]class DeterministicSample(DistributionSample): """ Deterministic function p(x) = f(x) """ def __init__(self, **kwargs): pass
[docs] def sample(self, mean, *args): """ Paramaters ---------- mean : Theano variable, the output of a fully connected layer (any activation function) """ return mean
[docs] def log_likelihood(self, sample, mean): raise NotImplementedError
[docs]class GumbelSample(DistributionSample): """ Gumbel distribution """
[docs] def sample(self, mu, beta): U = self.srng.uniform(mu.shape, low=0, high=1, dtype=mu.dtype) return mu - beta * T.log(-T.log(U + epsilon()) + epsilon())
[docs] def log_likelihood(self, samples, mu, beta): """ Paramaters -------- sample : Theano variable mu : Theano variable, the output of a fully connected layer (Linear) beta : Theano variable, the output of a fully connected layer (Softplus) """ # for numerical stability beta += epsilon() z = (samples - mu) / beta loglike = -T.log(beta) - (z + T.exp(-z)) return mean_sum_samples(loglike)
[docs]class ConcreteSample(GumbelSample): """ Concrete distribution (Gumbel-softmax) https://arxiv.org/abs/1611.01144 https://arxiv.org/abs/1611.00712 """ def __init__(self, temp=0.1, seed=1): super(ConcreteSample, self).__init__(seed=seed) self.temp = np.float32(temp)
[docs] def sample(self, mean): """ Paramaters -------- sample : Theano variable mean : Theano variable, the output of a fully connected layer (sigmoid or softmax) """ if self.temp != 0: output = super(ConcreteSample, self).sample(T.zeros_like(mean), T.ones_like(mean)) output += T.log(mean + epsilon()) if output.ndim == 1 or output.ndim == 2: return T.nnet.softmax(output / self.temp) elif output.ndim == 3: _shape = output.shape output = output.reshape((_shape[0] * _shape[1], _shape[2])) return T.nnet.softmax(output / self.temp).reshape(_shape) raise ValueError('Input must be 1-d, 2-d or 3-d tensor. Got %s' % output.ndim) raise NotImplementedError
[docs] def log_likelihood(self): raise NotImplementedError
[docs]class BernoulliSample(GumbelSample): """ Bernoulli distribution p(x) = mean^x * (1-mean)^(1-x) """ def __init__(self, temp=0.1, seed=1): super(BernoulliSample, self).__init__(seed=seed) self.temp = np.float32(temp)
[docs] def sample(self, mean): """ Paramaters -------- mean : Theano variable, the output of a fully connected layer (Sigmoid) The paramater (mean value) of this distribution. Returns ------- Theano variable, shape (mean.shape) This variable is sampled from this distribution. i.e. sample ~ p(x|mean) """ if self.temp != 0: z1 = super(BernoulliSample, self).sample(T.zeros_like(mean), T.ones_like(mean)) z0 = super(BernoulliSample, self).sample(T.zeros_like(mean), T.ones_like(mean)) z1 += T.log(mean + epsilon()) z0 += T.log(1 - mean + epsilon()) return T.nnet.sigmoid((z1 - z0) / self.temp) raise NotImplementedError
[docs] def log_likelihood(self, sample, mean): """ Paramaters -------- sample : Theano variable This variable means test samples which you use to estimate a test log-likelihood. mean : Theano variable, the output of a fully connected layer (Sigmoid) This variable is a reconstruction of test samples. This must have the same shape as 'sample'. Returns ------- Theano variable, shape (n_samples,) A log-likelihood, which is the same meaning as a negative binary cross-entropy error. """ # For numerical stability # (When we use T.clip, the calculation time becomes very slow.) loglike = sample * T.log(mean + epsilon()) +\ (1 - sample) * T.log(1 - mean + epsilon()) return mean_sum_samples(loglike)
[docs]class CategoricalSample(ConcreteSample): """ Categorical distribution p(x) = \prod mean^x """ def __init__(self, temp=0.1, seed=1): super(CategoricalSample, self).__init__(temp=temp, seed=seed)
[docs] def sample(self, mean, onehot=True, flatten=True): """ Paramaters -------- mean : Theano variable, the output of a fully connected layer (softmax) The paramater (mean value) of this distribution. Returns ------- Theano variable, shape (mean.shape) This variable is sampled from this distribution. i.e. sample ~ p(x|mean) """ if mean.ndim == 1 or mean.ndim == 2: output = super(CategoricalSample, self).sample(mean) if not onehot: output = T.argmax(output, axis=-1) return output elif mean.ndim == 3: _shape = mean.shape mean = mean.reshape((_shape[0] * _shape[1], _shape[2])) output = super(CategoricalSample, self).sample( mean).reshape(_shape) if not onehot: output = T.argmax(output, axis=-1) if flatten: output = T.flatten(output, outdim=2) return output raise ValueError('Wrong the dimention of input.')
[docs] def log_likelihood(self, samples, mean): """ Paramaters -------- sample : Theano variable This variable means test samples which you use to estimate a test log-likelihood. mean : Theano variable, the output of a fully connected layer (Softmax) This variable is a reconstruction of test samples. This must have the same shape as 'sample'. Returns ------- Theano variable, shape (n_samples,) A log-likelihood, which is the same meaning as a negative categorical cross-entropy error. """ # For numerical stability loglike = samples * T.log(mean + epsilon()) return mean_sum_samples(loglike)
[docs]class GaussianSample(DistributionSample): """ Gaussian distribution p(x) = \frac{1}{\sqrt{2*\pi*var}} * exp{-\frac{{x-mean}^2}{2*var}} """ def __init__(self, seed=1): super(GaussianSample, self).__init__(seed=seed)
[docs] def sample(self, mean, var): """ Paramaters ---------- mean : Theano variable, the output of a fully connected layer (Linear) var : Theano variable, the output of a fully connected layer (Softplus) """ eps = self.srng.normal(mean.shape, dtype=mean.dtype) return mean + T.sqrt(var) * eps
[docs] def log_likelihood(self, samples, mean, var): """ Paramaters -------- sample : Theano variable mean : Theano variable, the output of a fully connected layer (Linear) var : Theano variable, the output of a fully connected layer (Softplus) """ loglike = self._gaussian_like(samples, mean, var) return mean_sum_samples(loglike)
def _gaussian_like(self, x, mean, var): c = - 0.5 * math.log(2 * math.pi) _var = var + epsilon() # avoid NaN return c - T.log(_var) / 2 - (x - mean)**2 / (2 * _var)
[docs]class GaussianConstantVarSample(GaussianSample): def __init__(self, constant_var=1, seed=1): self.constant_var = constant_var super(GaussianConstantVarSample, self).__init__(seed=seed)
[docs] def sample(self, mean): return super(GaussianConstantVarSample, self).sample(mean, T.ones_like(mean) * self.constant_var)
[docs] def log_likelihood(self, samples, mean): return super(GaussianConstantVarSample, self).log_likelihood(samples, mean, T.ones_like(samples) * self.constant_var)
[docs]class LaplaceSample(DistributionSample): """ Laplace distribution p(x) = \frac{1}{\sqrt{2*\phi}} * exp{-\frac{|x-mean|}{\phi}} """
[docs] def sample(self, mean, b): """ Paramaters -------- mean : Theano variable, the output of a fully connected layer (Linear) b : Theano variable, the output of a fully connected layer (Softplus) """ U = self.srng.uniform(mean.shape, low=-0.5, high=0.5, dtype=mean.dtype) return mean - b * T.sgn(U) * T.log(1 - 2 * abs(U))
[docs] def log_likelihood(self, samples, mean, b): """ Paramaters -------- sample : Theano variable mean : Theano variable, the output of a fully connected layer (Linear) b : Theano variable, the output of a fully connected layer (Softplus) """ # for numerical stability b += epsilon() loglike = -abs(samples - mean) / b - T.log(b) - T.log(2) return mean_sum_samples(loglike)
[docs]class KumaraswamySample(DistributionSample): """ [Naelisnick+ 2016] Deep Generative Models with Stick-Breaking Priors Kumaraswamy distribution p(x) = a*b*x^(a-1)(1-x^a)^(b-1) """
[docs] def sample(self, a, b): """ Paramaters -------- a : Theano variable, the output of a fully connected layer (Softplus) b : Theano variable, the output of a fully connected layer (Softplus) """ eps = self.srng.uniform(a.shape, low=epsilon(), high=1 - epsilon(), dtype=a.dtype) return (1 - eps**(1. / b))**(1. / a)
[docs] def log_likelihood(self, samples, a, b): """ Paramaters -------- sample : Theano variable a : Theano variable, the output of a fully connected layer (Softplus) b : Theano variable, the output of a fully connected layer (Softplus) """ loglike = T.log(a * b + epsilon())\ + (a - 1) * T.log(samples + epsilon())\ + (b - 1) * T.log(1 - samples**a + epsilon()) return mean_sum_samples(loglike)
class GammaSample(DistributionSample): """ Gamma distribution (beta^alpha)/gamma * x^(alpha-1) * exp^(-beta*x) [Naesseth+ 2016] Rejection Sampling Variational Inference http://www.hongliangjie.com/2012/12/19/how-to-generate-gamma-random-variables/ """ def __init__(self, iter_sampling=6, rejection_sampling=True, seed=1): super(GammaSample, self).__init__(seed=seed) self.iter_sampling = iter_sampling self.rejection_sampling = rejection_sampling def sample(self, alpha, beta): _shape = alpha.shape alpha = alpha.flatten() output_sample = -T.ones_like(alpha, dtype=alpha.dtype) index = T.arange(output_sample.shape[0]) under_one_idx = T.gt(1, alpha[index]).nonzero() added_alpha = T.inc_subtensor(alpha[under_one_idx], 1) U = self.srng.uniform(under_one_idx[0].shape, low=epsilon(), high=1 - epsilon(), dtype=alpha.dtype) if self.rejection_sampling: # We don't use theano.scan in order to avoid to use updates. for _ in range(self.iter_sampling): output_sample, index = self._rejection_sampling(output_sample, added_alpha, index) else: output_sample = self._not_rejection_sampling(alpha) output_sample = T.clip(output_sample, 0, output_sample) output_sample = T.set_subtensor(output_sample[under_one_idx], (U ** (1 / alpha[under_one_idx])) * output_sample[under_one_idx]) return output_sample.reshape(_shape) / beta def log_likelihood(self, samples, alpha, beta): output = alpha * T.log(beta + epsilon()) - T.gammaln(alpha) output += (alpha - 1) * T.log(samples + epsilon()) output += -beta * samples return mean_sum_samples(output) def _h(self, alpha, eps): d = alpha - 1 / 3. c = 1 / T.sqrt(9 * d) v = (1 + c * eps)**3 judge_1 = np.exp(0.5 * eps**2 + d - d * v + d * T.log(v)) judge_2 = -1 / c output = d * v return output, judge_1, judge_2 def _rejection_sampling(self, output_z, alpha, idx): eps = self.srng.normal(idx.shape, dtype=alpha.dtype) U = self.srng.uniform(idx.shape, low=epsilon(), high=1 - epsilon(), dtype=alpha.dtype) z, judge1, judge2 = self._h(alpha[idx], eps) _idx_binary = T.and_(T.lt(U, judge1), T.gt(eps, judge2)) output_z = T.set_subtensor(output_z[idx[_idx_binary.nonzero()]], z[_idx_binary.nonzero()]) # update idx idx = idx[T.eq(0, _idx_binary).nonzero()] return output_z, idx def _not_rejection_sampling(self, alpha): eps = self.srng.normal(alpha.shape, dtype=alpha.dtype) z, _, _ = self._h(alpha, eps) return z
[docs]class BetaSample(GammaSample): """ Beta distribution x^(alpha-1) * (1-x)^(beta-1) / B(alpha, beta) """ def __init__(self, iter_sampling=6, rejection_sampling=True, seed=1): super(BetaSample, self).__init__(iter_sampling=iter_sampling, rejection_sampling=rejection_sampling, seed=seed)
[docs] def sample(self, alpha, beta): z_1 = super(BetaSample, self).sample(alpha, T.ones_like(alpha)) z_2 = super(BetaSample, self).sample(beta, T.ones_like(beta)) return z_1 / (z_1 + z_2)
[docs] def log_likelihood(self, samples, alpha, beta): output = (alpha - 1) * T.log(samples + epsilon()) output += (beta - 1) * T.log(1 - samples + epsilon()) output -= self._log_beta_func(alpha, beta) return mean_sum_samples(output)
def _log_beta_func(self, alpha, beta): return T.gammaln(alpha) + T.gammaln(beta) - T.gammaln(alpha + beta)
[docs]class DirichletSample(GammaSample): """ Dirichlet distribution x^(alpha-1) * (1-x)^(beta-1) / B(alpha, beta) """ def __init__(self, k, iter_sampling=6, rejection_sampling=True, seed=1): super(DirichletSample, self).__init__(iter_sampling=iter_sampling, rejection_sampling=rejection_sampling, seed=seed) self.k = k
[docs] def sample(self, alpha, flatten=True): z = T.zeros_like(alpha) for _k in range(self.k): _alpha = self._slice_last(alpha, _k) _z = super(DirichletSample, self).sample(_alpha, T.ones_like(_alpha)) z = T.set_subtensor(self._slice_last(z, _k), _z) z = z / T.sum(z, axis=-1, keepdims=True) if flatten and alpha.ndim == 3: z = T.flatten(z, outdim=2) return z
[docs] def log_likelihood(self, samples, alpha): samples = samples.reshape((samples.shape[0], samples.shape[1] / self.k, self.k)) alpha = alpha.reshape((alpha.shape[0], alpha.shape[1] / self.k, self.k)) output = 0 for _k in range(self.k): _alpha = self._slice_last(alpha, _k) _samples = self._slice_last(samples, _k) output += (_alpha - 1) * T.log(_samples + epsilon()) output -= self._log_beta_vec_func(alpha) return mean_sum_samples(output)
def _log_beta_vec_func(self, alpha): output = 0 for _k in range(self.k): output += T.gammaln(self._slice_last(alpha, _k)) output -= T.gammaln(T.sum(alpha, axis=-1)) return output def _slice_last(self, a, k): if a.ndim == 1: return a[k] elif a.ndim == 2: return a[:, k] elif a.ndim == 3: return a[:, :, k] raise ValueError('Wrong the dimention of input.')
[docs]class UnitGaussianSample(GaussianSample): """ Standard normal gaussian distribution p(x) = \frac{1}{\sqrt{2*\pi}} * exp{-\frac{x^2}{2}} """
[docs] def sample(self, shape): """ Paramaters -------- shape : tuple sets a shape of the output sample """ return self.srng.normal(shape)
[docs] def log_likelihood(self, samples): return super(UnitGaussianSample, self).log_likelihood(samples, T.zeros_like(samples), T.ones_like(samples))
[docs]class UnitBernoulliSample(BernoulliSample): """ Unit bernoulli distribution """
[docs] def sample(self, shape): return super(UnitBernoulliSample, self).sample(T.ones(shape) * 0.5)
[docs] def log_likelihood(self, samples): return super(UnitBernoulliSample, self).log_likelihood(samples, T.ones_like(samples) * 0.5)
[docs]class UnitCategoricalSample(CategoricalSample): """ Unit Categorical distribution """ def __init__(self, k=1, seed=1): super(UnitCategoricalSample, self).__init__(seed=seed) self.k = k
[docs] def sample(self, shape): if self.k == shape[-1]: return super(UnitCategoricalSample, self).sample(T.ones(shape) / self.k) raise ValueError("self.k and shape don't match.")
[docs] def log_likelihood(self, samples): return super(UnitCategoricalSample, self).log_likelihood(samples, T.ones_like(samples) / self.k)
class UnitGammaSample(GammaSample): def sample(self, shape): """ Paramaters -------- shape : tuple sets a shape of the output sample """ return super(UnitGammaSample, self).sample(T.ones(shape), T.ones(shape)) def log_likelihood(self, samples): return super(UnitGammaSample, self).log_likelihood(samples, T.ones_like(samples), T.ones_like(samples))
[docs]class UnitBetaSample(BetaSample): """ Unit Beta distribution """ def __init__(self, alpha=1., beta=1., iter_sampling=6, rejection_sampling=True, seed=1): super(UnitBetaSample, self).__init__(iter_sampling=iter_sampling, rejection_sampling=rejection_sampling, seed=seed) self.alpha = alpha self.beta = beta
[docs] def sample(self, shape): return super(UnitBetaSample, self).sample(T.ones(shape) * self.alpha, T.ones(shape) * self.beta)
[docs] def log_likelihood(self, samples): alpha = T.ones_like(samples) * self.alpha beta = T.ones_like(samples) * self.beta return super(UnitBetaSample, self).log_likelihood(samples, alpha, beta)
[docs]class UnitDirichletSample(DirichletSample): def __init__(self, k, alpha=1., iter_sampling=6, rejection_sampling=True, seed=1): super(UnitDirichletSample, self).__init__(k, iter_sampling=iter_sampling, rejection_sampling=rejection_sampling, seed=seed) self.alpha = alpha
[docs] def sample(self, shape): return super(UnitDirichletSample, self).sample(T.ones(shape) * self.alpha)
[docs] def log_likelihood(self, samples): alpha = T.ones_like(samples) * self.alpha return super(UnitDirichletSample, self).log_likelihood(samples, alpha)
def mean_sum_samples(samples): n_dim = samples.ndim if n_dim == 4: return T.mean(T.sum(T.sum(samples, axis=2), axis=2), axis=1) elif n_dim == 3: return T.sum(T.sum(samples, axis=-1), axis=-1) elif n_dim == 2: return T.sum(samples, axis=-1) raise ValueError("The dim of samples must be any of 2, 3, or 4," "got dim %s." % n_dim)