Deterministic example, much more generic importance sampler
parent
bd8d2ea2c1
commit
021ee1a5d9
|
|
@ -82,10 +82,12 @@ class TestHybridGaussianFactorGraph(GtsamTestCase):
|
||||||
self.assertEqual(hv.atDiscrete(C(0)), 1)
|
self.assertEqual(hv.atDiscrete(C(0)), 1)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def tiny(num_measurements: int = 1) -> HybridBayesNet:
|
def tiny(num_measurements: int = 1, prior_mean: float = 5.0,
|
||||||
|
prior_sigma: float = 0.5) -> HybridBayesNet:
|
||||||
"""
|
"""
|
||||||
Create a tiny two variable hybrid model which represents
|
Create a tiny two variable hybrid model which represents
|
||||||
the generative probability P(z, x, n) = P(z | x, n)P(x)P(n).
|
the generative probability P(Z, x0, mode) = P(Z|x0, mode)P(x0)P(mode).
|
||||||
|
num_measurements: number of measurements in Z = {z0, z1...}
|
||||||
"""
|
"""
|
||||||
# Create hybrid Bayes net.
|
# Create hybrid Bayes net.
|
||||||
bayesNet = HybridBayesNet()
|
bayesNet = HybridBayesNet()
|
||||||
|
|
@ -110,7 +112,8 @@ class TestHybridGaussianFactorGraph(GtsamTestCase):
|
||||||
[conditional0, conditional1])
|
[conditional0, conditional1])
|
||||||
|
|
||||||
# Create prior on X(0).
|
# Create prior on X(0).
|
||||||
prior_on_x0 = GaussianConditional.FromMeanAndStddev(X(0), [5.0], 0.5)
|
prior_on_x0 = GaussianConditional.FromMeanAndStddev(
|
||||||
|
X(0), [prior_mean], prior_sigma)
|
||||||
bayesNet.addGaussian(prior_on_x0)
|
bayesNet.addGaussian(prior_on_x0)
|
||||||
|
|
||||||
# Add prior on mode.
|
# Add prior on mode.
|
||||||
|
|
@ -118,6 +121,28 @@ class TestHybridGaussianFactorGraph(GtsamTestCase):
|
||||||
|
|
||||||
return bayesNet
|
return bayesNet
|
||||||
|
|
||||||
|
def test_evaluate(self):
|
||||||
|
"""Test evaluate with two different prior noise models."""
|
||||||
|
# TODO(dellaert): really a HBN test
|
||||||
|
# Create a tiny Bayes net P(x0) P(m0) P(z0|x0)
|
||||||
|
bayesNet1 = self.tiny(prior_sigma=0.5, num_measurements=1)
|
||||||
|
bayesNet2 = self.tiny(prior_sigma=5.0, num_measurements=1)
|
||||||
|
# bn1: # 1/sqrt(2*pi*0.5^2)
|
||||||
|
# bn2: # 1/sqrt(2*pi*5.0^2)
|
||||||
|
expected_ratio = np.sqrt(2*np.pi*5.0**2)/np.sqrt(2*np.pi*0.5**2)
|
||||||
|
mean0 = HybridValues()
|
||||||
|
mean0.insert(X(0), [5.0])
|
||||||
|
mean0.insert(Z(0), [5.0])
|
||||||
|
mean0.insert(M(0), 0)
|
||||||
|
self.assertAlmostEqual(bayesNet1.evaluate(mean0) /
|
||||||
|
bayesNet2.evaluate(mean0), expected_ratio, delta=1e-9)
|
||||||
|
mean1 = HybridValues()
|
||||||
|
mean1.insert(X(0), [5.0])
|
||||||
|
mean1.insert(Z(0), [5.0])
|
||||||
|
mean1.insert(M(0), 1)
|
||||||
|
self.assertAlmostEqual(bayesNet1.evaluate(mean1) /
|
||||||
|
bayesNet2.evaluate(mean1), expected_ratio, delta=1e-9)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def measurements(sample: HybridValues, indices) -> gtsam.VectorValues:
|
def measurements(sample: HybridValues, indices) -> gtsam.VectorValues:
|
||||||
"""Create measurements from a sample, grabbing Z(i) where i in indices."""
|
"""Create measurements from a sample, grabbing Z(i) where i in indices."""
|
||||||
|
|
@ -143,21 +168,20 @@ class TestHybridGaussianFactorGraph(GtsamTestCase):
|
||||||
return fg
|
return fg
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def estimate_marginals(cls, bayesNet: HybridBayesNet, sample: HybridValues, N=10000):
|
def estimate_marginals(cls, target, proposal_density: HybridBayesNet,
|
||||||
|
N=10000):
|
||||||
"""Do importance sampling to get an estimate of the discrete marginal P(mode)."""
|
"""Do importance sampling to get an estimate of the discrete marginal P(mode)."""
|
||||||
# Use prior on x0, mode as proposal density.
|
# Allocate space for marginals on mode.
|
||||||
prior = cls.tiny(num_measurements=0) # just P(x0)P(mode)
|
|
||||||
|
|
||||||
# Allocate space for marginals.
|
|
||||||
marginals = np.zeros((2,))
|
marginals = np.zeros((2,))
|
||||||
|
|
||||||
# Do importance sampling.
|
# Do importance sampling.
|
||||||
num_measurements = bayesNet.size() - 2
|
|
||||||
measurements = cls.measurements(sample, range(num_measurements))
|
|
||||||
for s in range(N):
|
for s in range(N):
|
||||||
proposed = prior.sample()
|
proposed = proposal_density.sample() # sample from proposal
|
||||||
proposed.insert(measurements)
|
target_proposed = target(proposed) # evaluate target
|
||||||
weight = bayesNet.evaluate(proposed) / prior.evaluate(proposed)
|
# print(target_proposed, proposal_density.evaluate(proposed))
|
||||||
|
weight = target_proposed / proposal_density.evaluate(proposed)
|
||||||
|
# print weight:
|
||||||
|
# print(f"weight: {weight}")
|
||||||
marginals[proposed.atDiscrete(M(0))] += weight
|
marginals[proposed.atDiscrete(M(0))] += weight
|
||||||
|
|
||||||
# print marginals:
|
# print marginals:
|
||||||
|
|
@ -166,23 +190,68 @@ class TestHybridGaussianFactorGraph(GtsamTestCase):
|
||||||
|
|
||||||
def test_tiny(self):
|
def test_tiny(self):
|
||||||
"""Test a tiny two variable hybrid model."""
|
"""Test a tiny two variable hybrid model."""
|
||||||
bayesNet = self.tiny()
|
prior_sigma = 0.5
|
||||||
sample = bayesNet.sample()
|
# P(x0)P(mode)P(z0|x0,mode)
|
||||||
# print(sample)
|
bayesNet = self.tiny(prior_sigma=prior_sigma)
|
||||||
|
|
||||||
|
# Deterministic values exactly at the mean, for both x and z:
|
||||||
|
values = HybridValues()
|
||||||
|
values.insert(X(0), [5.0])
|
||||||
|
values.insert(M(0), 0) # low-noise, standard deviation 0.5
|
||||||
|
z0: float = 5.0
|
||||||
|
values.insert(Z(0), [z0])
|
||||||
|
|
||||||
|
def unnormalized_posterior(x):
|
||||||
|
"""Posterior is proportional to joint, centered at 5.0 as well."""
|
||||||
|
x.insert(Z(0), [z0])
|
||||||
|
# print(x)
|
||||||
|
return bayesNet.evaluate(x)
|
||||||
|
|
||||||
|
# Create proposal density on (x0, mode), making sure it has same mean:
|
||||||
|
posterior_information = 1/(prior_sigma**2) + 1/(0.5**2)
|
||||||
|
posterior_sigma = posterior_information**(-0.5)
|
||||||
|
print(f"Posterior sigma: {posterior_sigma}")
|
||||||
|
proposal_density = self.tiny(
|
||||||
|
num_measurements=0, prior_mean=5.0, prior_sigma=posterior_sigma)
|
||||||
|
|
||||||
# Estimate marginals using importance sampling.
|
# Estimate marginals using importance sampling.
|
||||||
marginals = self.estimate_marginals(bayesNet, sample)
|
marginals = self.estimate_marginals(
|
||||||
# print(f"True mode: {sample.atDiscrete(M(0))}")
|
target=unnormalized_posterior, proposal_density=proposal_density, N=10_000)
|
||||||
# print(f"P(mode=0; z0) = {marginals[0]}")
|
print(f"True mode: {values.atDiscrete(M(0))}")
|
||||||
# print(f"P(mode=1; z0) = {marginals[1]}")
|
print(f"P(mode=0; z0) = {marginals[0]}")
|
||||||
|
print(f"P(mode=1; z0) = {marginals[1]}")
|
||||||
|
|
||||||
# Check that the estimate is close to the true value.
|
# Check that the estimate is close to the true value.
|
||||||
self.assertAlmostEqual(marginals[0], 0.4, delta=0.1)
|
self.assertAlmostEqual(marginals[0], 0.74, delta=0.01)
|
||||||
self.assertAlmostEqual(marginals[1], 0.6, delta=0.1)
|
self.assertAlmostEqual(marginals[1], 0.26, delta=0.01)
|
||||||
|
|
||||||
fg = self.factor_graph_from_bayes_net(bayesNet, sample)
|
fg = self.factor_graph_from_bayes_net(bayesNet, values)
|
||||||
self.assertEqual(fg.size(), 3)
|
self.assertEqual(fg.size(), 3)
|
||||||
|
|
||||||
|
# Test elimination.
|
||||||
|
ordering = gtsam.Ordering()
|
||||||
|
ordering.push_back(X(0))
|
||||||
|
ordering.push_back(M(0))
|
||||||
|
posterior = fg.eliminateSequential(ordering)
|
||||||
|
print(posterior)
|
||||||
|
|
||||||
|
def true_posterior(x):
|
||||||
|
"""Posterior from elimination."""
|
||||||
|
x.insert(Z(0), [z0])
|
||||||
|
# print(x)
|
||||||
|
return posterior.evaluate(x)
|
||||||
|
|
||||||
|
# Estimate marginals using importance sampling.
|
||||||
|
marginals = self.estimate_marginals(
|
||||||
|
target=true_posterior, proposal_density=proposal_density)
|
||||||
|
print(f"True mode: {values.atDiscrete(M(0))}")
|
||||||
|
print(f"P(mode=0; z0) = {marginals[0]}")
|
||||||
|
print(f"P(mode=1; z0) = {marginals[1]}")
|
||||||
|
|
||||||
|
# Check that the estimate is close to the true value.
|
||||||
|
self.assertAlmostEqual(marginals[0], 0.74, delta=0.01)
|
||||||
|
self.assertAlmostEqual(marginals[1], 0.26, delta=0.01)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def calculate_ratio(bayesNet: HybridBayesNet,
|
def calculate_ratio(bayesNet: HybridBayesNet,
|
||||||
fg: HybridGaussianFactorGraph,
|
fg: HybridGaussianFactorGraph,
|
||||||
|
|
@ -190,6 +259,7 @@ class TestHybridGaussianFactorGraph(GtsamTestCase):
|
||||||
"""Calculate ratio between Bayes net probability and the factor graph."""
|
"""Calculate ratio between Bayes net probability and the factor graph."""
|
||||||
return bayesNet.evaluate(sample) / fg.probPrime(sample) if fg.probPrime(sample) > 0 else 0
|
return bayesNet.evaluate(sample) / fg.probPrime(sample) if fg.probPrime(sample) > 0 else 0
|
||||||
|
|
||||||
|
@unittest.skip("This test is too slow.")
|
||||||
def test_ratio(self):
|
def test_ratio(self):
|
||||||
"""
|
"""
|
||||||
Given a tiny two variable hybrid model, with 2 measurements,
|
Given a tiny two variable hybrid model, with 2 measurements,
|
||||||
|
|
@ -269,5 +339,6 @@ class TestHybridGaussianFactorGraph(GtsamTestCase):
|
||||||
if (ratio > 0):
|
if (ratio > 0):
|
||||||
self.assertAlmostEqual(ratio, expected_ratio)
|
self.assertAlmostEqual(ratio, expected_ratio)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue