diff --git a/python/gtsam/tests/test_HybridFactorGraph.py b/python/gtsam/tests/test_HybridFactorGraph.py index 75ab3e02c..a7657e411 100644 --- a/python/gtsam/tests/test_HybridFactorGraph.py +++ b/python/gtsam/tests/test_HybridFactorGraph.py @@ -82,10 +82,12 @@ class TestHybridGaussianFactorGraph(GtsamTestCase): self.assertEqual(hv.atDiscrete(C(0)), 1) @staticmethod - def tiny(num_measurements: int = 1) -> HybridBayesNet: + def tiny(num_measurements: int = 1, prior_mean: float = 5.0, + prior_sigma: float = 0.5) -> HybridBayesNet: """ Create a tiny two variable hybrid model which represents - the generative probability P(z, x, n) = P(z | x, n)P(x)P(n). + the generative probability P(Z, x0, mode) = P(Z|x0, mode)P(x0)P(mode). + num_measurements: number of measurements in Z = {z0, z1...} """ # Create hybrid Bayes net. bayesNet = HybridBayesNet() @@ -110,7 +112,8 @@ class TestHybridGaussianFactorGraph(GtsamTestCase): [conditional0, conditional1]) # Create prior on X(0). - prior_on_x0 = GaussianConditional.FromMeanAndStddev(X(0), [5.0], 0.5) + prior_on_x0 = GaussianConditional.FromMeanAndStddev( + X(0), [prior_mean], prior_sigma) bayesNet.addGaussian(prior_on_x0) # Add prior on mode. @@ -118,6 +121,28 @@ class TestHybridGaussianFactorGraph(GtsamTestCase): return bayesNet + def test_evaluate(self): + """Test evaluate with two different prior noise models.""" + # TODO(dellaert): really a HBN test + # Create a tiny Bayes net P(x0) P(m0) P(z0|x0) + bayesNet1 = self.tiny(prior_sigma=0.5, num_measurements=1) + bayesNet2 = self.tiny(prior_sigma=5.0, num_measurements=1) + # bn1: # 1/sqrt(2*pi*0.5^2) + # bn2: # 1/sqrt(2*pi*5.0^2) + expected_ratio = np.sqrt(2*np.pi*5.0**2)/np.sqrt(2*np.pi*0.5**2) + mean0 = HybridValues() + mean0.insert(X(0), [5.0]) + mean0.insert(Z(0), [5.0]) + mean0.insert(M(0), 0) + self.assertAlmostEqual(bayesNet1.evaluate(mean0) / + bayesNet2.evaluate(mean0), expected_ratio, delta=1e-9) + mean1 = HybridValues() + mean1.insert(X(0), [5.0]) + mean1.insert(Z(0), [5.0]) + mean1.insert(M(0), 1) + self.assertAlmostEqual(bayesNet1.evaluate(mean1) / + bayesNet2.evaluate(mean1), expected_ratio, delta=1e-9) + @staticmethod def measurements(sample: HybridValues, indices) -> gtsam.VectorValues: """Create measurements from a sample, grabbing Z(i) where i in indices.""" @@ -143,21 +168,20 @@ class TestHybridGaussianFactorGraph(GtsamTestCase): return fg @classmethod - def estimate_marginals(cls, bayesNet: HybridBayesNet, sample: HybridValues, N=10000): + def estimate_marginals(cls, target, proposal_density: HybridBayesNet, + N=10000): """Do importance sampling to get an estimate of the discrete marginal P(mode).""" - # Use prior on x0, mode as proposal density. - prior = cls.tiny(num_measurements=0) # just P(x0)P(mode) - - # Allocate space for marginals. + # Allocate space for marginals on mode. marginals = np.zeros((2,)) # Do importance sampling. - num_measurements = bayesNet.size() - 2 - measurements = cls.measurements(sample, range(num_measurements)) for s in range(N): - proposed = prior.sample() - proposed.insert(measurements) - weight = bayesNet.evaluate(proposed) / prior.evaluate(proposed) + proposed = proposal_density.sample() # sample from proposal + target_proposed = target(proposed) # evaluate target + # print(target_proposed, proposal_density.evaluate(proposed)) + weight = target_proposed / proposal_density.evaluate(proposed) + # print weight: + # print(f"weight: {weight}") marginals[proposed.atDiscrete(M(0))] += weight # print marginals: @@ -166,23 +190,68 @@ class TestHybridGaussianFactorGraph(GtsamTestCase): def test_tiny(self): """Test a tiny two variable hybrid model.""" - bayesNet = self.tiny() - sample = bayesNet.sample() - # print(sample) + prior_sigma = 0.5 + # P(x0)P(mode)P(z0|x0,mode) + bayesNet = self.tiny(prior_sigma=prior_sigma) + + # Deterministic values exactly at the mean, for both x and z: + values = HybridValues() + values.insert(X(0), [5.0]) + values.insert(M(0), 0) # low-noise, standard deviation 0.5 + z0: float = 5.0 + values.insert(Z(0), [z0]) + + def unnormalized_posterior(x): + """Posterior is proportional to joint, centered at 5.0 as well.""" + x.insert(Z(0), [z0]) + # print(x) + return bayesNet.evaluate(x) + + # Create proposal density on (x0, mode), making sure it has same mean: + posterior_information = 1/(prior_sigma**2) + 1/(0.5**2) + posterior_sigma = posterior_information**(-0.5) + print(f"Posterior sigma: {posterior_sigma}") + proposal_density = self.tiny( + num_measurements=0, prior_mean=5.0, prior_sigma=posterior_sigma) # Estimate marginals using importance sampling. - marginals = self.estimate_marginals(bayesNet, sample) - # print(f"True mode: {sample.atDiscrete(M(0))}") - # print(f"P(mode=0; z0) = {marginals[0]}") - # print(f"P(mode=1; z0) = {marginals[1]}") + marginals = self.estimate_marginals( + target=unnormalized_posterior, proposal_density=proposal_density, N=10_000) + print(f"True mode: {values.atDiscrete(M(0))}") + print(f"P(mode=0; z0) = {marginals[0]}") + print(f"P(mode=1; z0) = {marginals[1]}") # Check that the estimate is close to the true value. - self.assertAlmostEqual(marginals[0], 0.4, delta=0.1) - self.assertAlmostEqual(marginals[1], 0.6, delta=0.1) + self.assertAlmostEqual(marginals[0], 0.74, delta=0.01) + self.assertAlmostEqual(marginals[1], 0.26, delta=0.01) - fg = self.factor_graph_from_bayes_net(bayesNet, sample) + fg = self.factor_graph_from_bayes_net(bayesNet, values) self.assertEqual(fg.size(), 3) + # Test elimination. + ordering = gtsam.Ordering() + ordering.push_back(X(0)) + ordering.push_back(M(0)) + posterior = fg.eliminateSequential(ordering) + print(posterior) + + def true_posterior(x): + """Posterior from elimination.""" + x.insert(Z(0), [z0]) + # print(x) + return posterior.evaluate(x) + + # Estimate marginals using importance sampling. + marginals = self.estimate_marginals( + target=true_posterior, proposal_density=proposal_density) + print(f"True mode: {values.atDiscrete(M(0))}") + print(f"P(mode=0; z0) = {marginals[0]}") + print(f"P(mode=1; z0) = {marginals[1]}") + + # Check that the estimate is close to the true value. + self.assertAlmostEqual(marginals[0], 0.74, delta=0.01) + self.assertAlmostEqual(marginals[1], 0.26, delta=0.01) + @staticmethod def calculate_ratio(bayesNet: HybridBayesNet, fg: HybridGaussianFactorGraph, @@ -190,6 +259,7 @@ class TestHybridGaussianFactorGraph(GtsamTestCase): """Calculate ratio between Bayes net probability and the factor graph.""" return bayesNet.evaluate(sample) / fg.probPrime(sample) if fg.probPrime(sample) > 0 else 0 + @unittest.skip("This test is too slow.") def test_ratio(self): """ Given a tiny two variable hybrid model, with 2 measurements, @@ -269,5 +339,6 @@ class TestHybridGaussianFactorGraph(GtsamTestCase): if (ratio > 0): self.assertAlmostEqual(ratio, expected_ratio) + if __name__ == "__main__": unittest.main()