diff --git a/python/gtsam/tests/test_HybridFactorGraph.py b/python/gtsam/tests/test_HybridFactorGraph.py index 37bb5b93c..37243b937 100644 --- a/python/gtsam/tests/test_HybridFactorGraph.py +++ b/python/gtsam/tests/test_HybridFactorGraph.py @@ -13,73 +13,152 @@ Author: Fan Jiang import unittest import numpy as np -from gtsam.symbol_shorthand import C, X +from gtsam.symbol_shorthand import C, M, X, Z from gtsam.utils.test_case import GtsamTestCase import gtsam +from gtsam import ( + DecisionTreeFactor, + DiscreteConditional, + DiscreteKeys, + GaussianConditional, + GaussianMixture, + GaussianMixtureFactor, + HybridGaussianFactorGraph, + JacobianFactor, + Ordering, + noiseModel, +) class TestHybridGaussianFactorGraph(GtsamTestCase): """Unit tests for HybridGaussianFactorGraph.""" + def test_create(self): """Test construction of hybrid factor graph.""" - noiseModel = gtsam.noiseModel.Unit.Create(3) - dk = gtsam.DiscreteKeys() + model = noiseModel.Unit.Create(3) + dk = DiscreteKeys() dk.push_back((C(0), 2)) - jf1 = gtsam.JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), - noiseModel) - jf2 = gtsam.JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), - noiseModel) + jf1 = JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), model) + jf2 = JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), model) - gmf = gtsam.GaussianMixtureFactor.FromFactors([X(0)], dk, [jf1, jf2]) + gmf = GaussianMixtureFactor([X(0)], dk, [jf1, jf2]) - hfg = gtsam.HybridGaussianFactorGraph() - hfg.add(jf1) - hfg.add(jf2) + hfg = HybridGaussianFactorGraph() + hfg.push_back(jf1) + hfg.push_back(jf2) hfg.push_back(gmf) hbn = hfg.eliminateSequential( - gtsam.Ordering.ColamdConstrainedLastHybridGaussianFactorGraph( - hfg, [C(0)])) + Ordering.ColamdConstrainedLastHybridGaussianFactorGraph(hfg, [C(0)]) + ) self.assertEqual(hbn.size(), 2) mixture = hbn.at(0).inner() - self.assertIsInstance(mixture, gtsam.GaussianMixture) + self.assertIsInstance(mixture, GaussianMixture) self.assertEqual(len(mixture.keys()), 2) discrete_conditional = hbn.at(hbn.size() - 1).inner() - self.assertIsInstance(discrete_conditional, gtsam.DiscreteConditional) + self.assertIsInstance(discrete_conditional, DiscreteConditional) def test_optimize(self): """Test construction of hybrid factor graph.""" - noiseModel = gtsam.noiseModel.Unit.Create(3) - dk = gtsam.DiscreteKeys() + model = noiseModel.Unit.Create(3) + dk = DiscreteKeys() dk.push_back((C(0), 2)) - jf1 = gtsam.JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), - noiseModel) - jf2 = gtsam.JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), - noiseModel) + jf1 = JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), model) + jf2 = JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), model) - gmf = gtsam.GaussianMixtureFactor.FromFactors([X(0)], dk, [jf1, jf2]) + gmf = GaussianMixtureFactor([X(0)], dk, [jf1, jf2]) - hfg = gtsam.HybridGaussianFactorGraph() - hfg.add(jf1) - hfg.add(jf2) + hfg = HybridGaussianFactorGraph() + hfg.push_back(jf1) + hfg.push_back(jf2) hfg.push_back(gmf) dtf = gtsam.DecisionTreeFactor([(C(0), 2)], "0 1") - hfg.add(dtf) + hfg.push_back(dtf) hbn = hfg.eliminateSequential( - gtsam.Ordering.ColamdConstrainedLastHybridGaussianFactorGraph( - hfg, [C(0)])) + Ordering.ColamdConstrainedLastHybridGaussianFactorGraph(hfg, [C(0)]) + ) hv = hbn.optimize() self.assertEqual(hv.atDiscrete(C(0)), 1) + @staticmethod + def tiny(num_measurements: int = 1): + """Create a tiny two variable hybrid model.""" + # Create hybrid Bayes net. + bayesNet = gtsam.HybridBayesNet() + + # Create mode key: 0 is low-noise, 1 is high-noise. + modeKey = M(0) + mode = (modeKey, 2) + + # Create Gaussian mixture Z(0) = X(0) + noise for each measurement. + I = np.eye(1) + keys = DiscreteKeys() + keys.push_back(mode) + for i in range(num_measurements): + conditional0 = GaussianConditional.FromMeanAndStddev( + Z(i), I, X(0), [0], sigma=0.5 + ) + conditional1 = GaussianConditional.FromMeanAndStddev( + Z(i), I, X(0), [0], sigma=3 + ) + bayesNet.emplaceMixture([Z(i)], [X(0)], keys, [conditional0, conditional1]) + + # Create prior on X(0). + prior_on_x0 = GaussianConditional.FromMeanAndStddev(X(0), [5.0], 5.0) + bayesNet.addGaussian(prior_on_x0) + + # Add prior on mode. + bayesNet.emplaceDiscrete(mode, "1/1") + + return bayesNet + + def test_tiny(self): + """Test a tiny two variable hybrid model.""" + bayesNet = self.tiny() + sample = bayesNet.sample() + # print(sample) + + # Create a factor graph from the Bayes net with sampled measurements. + fg = HybridGaussianFactorGraph() + conditional = bayesNet.atMixture(0) + measurement = gtsam.VectorValues() + measurement.insert(Z(0), sample.at(Z(0))) + factor = conditional.likelihood(measurement) + fg.push_back(factor) + fg.push_back(bayesNet.atGaussian(1)) + fg.push_back(bayesNet.atDiscrete(2)) + + self.assertEqual(fg.size(), 3) + + def test_tiny2(self): + """Test a tiny two variable hybrid model, with 2 measurements.""" + # Create the Bayes net and sample from it. + bayesNet = self.tiny(num_measurements=2) + sample = bayesNet.sample() + # print(sample) + + # Create a factor graph from the Bayes net with sampled measurements. + fg = HybridGaussianFactorGraph() + for i in range(2): + conditional = bayesNet.atMixture(i) + measurement = gtsam.VectorValues() + measurement.insert(Z(i), sample.at(Z(i))) + factor = conditional.likelihood(measurement) + fg.push_back(factor) + fg.push_back(bayesNet.atGaussian(2)) + fg.push_back(bayesNet.atDiscrete(3)) + + self.assertEqual(fg.size(), 4) + if __name__ == "__main__": unittest.main()