Merge pull request #2063 from mkielo3/python-ekf

Exposed ExtendedKalmanFilter to Python with examples
release/4.3a0
Frank Dellaert 2025-03-17 21:15:10 -04:00 committed by GitHub
commit 65a80a4b19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 330 additions and 0 deletions

View File

@ -725,5 +725,22 @@ virtual class BatchFixedLagSmoother : gtsam::FixedLagSmoother {
VALUE calculateEstimate(size_t key) const;
};
#include <gtsam/nonlinear/ExtendedKalmanFilter.h>
template <T = {gtsam::Point2,
gtsam::Point3,
gtsam::Rot2,
gtsam::Rot3,
gtsam::Pose2,
gtsam::Pose3,
gtsam::NavState,
gtsam::imuBias::ConstantBias}>
virtual class ExtendedKalmanFilter {
ExtendedKalmanFilter(gtsam::Key key_initial, const T& x_initial, const gtsam::noiseModel::Gaussian* P_initial);
T predict(const gtsam::NoiseModelFactor& motionFactor);
T update(const gtsam::NoiseModelFactor& measurementFactor);
gtsam::JacobianFactor::shared_ptr Density() const;
};
} // namespace gtsam

View File

@ -0,0 +1,122 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"\"\"\"\n",
"Extended Kalman filter on a moving 2D point, but done using factor graphs.\n",
"This example uses the ExtendedKalmanFilter class to perform filtering\n",
"on a linear system, demonstrating the same operations as in elaboratePoint2KalmanFilter.\n",
"\"\"\"\n",
"\n",
"import gtsam\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Create the Kalman Filter initialization point\n",
"X0 = gtsam.Point2(0.0, 0.0)\n",
"P0 = gtsam.noiseModel.Diagonal.Sigmas(np.array([0.1, 0.1]))\n",
"\n",
"# Create Key for initial pose\n",
"x0 = gtsam.symbol('x', 0)\n",
"\n",
"# Create an ExtendedKalmanFilter object\n",
"ekf = gtsam.ExtendedKalmanFilterPoint2(x0, X0, P0)\n",
"\n",
"# For this example, we use a constant-position model where\n",
"# controls drive the point to the right at 1 m/s\n",
"# F = [1 0; 0 1], B = [1 0; 0 1], and u = [1; 0]\n",
"# Process noise Q = [0.1 0; 0 0.1]\n",
"Q = gtsam.noiseModel.Diagonal.Sigmas(np.array([0.1, 0.1]), True)\n",
"\n",
"# Measurement noise, assuming a GPS-like sensor\n",
"R = gtsam.noiseModel.Diagonal.Sigmas(np.array([0.25, 0.25]), True)\n",
"\n",
"# Motion model - move right by 1.0 units\n",
"dX = gtsam.Point2(1.0, 0.0)\n",
"\n",
"last_symbol = x0"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"X1 Predict: [1. 0.]\n",
"X1 Update: [1. 0.]\n",
"X2 Predict: [2. 0.]\n",
"X2 Update: [2. 0.]\n",
"X3 Predict: [3. 0.]\n",
"X3 Update: [3. 0.]\n",
"\n",
"Easy Final Covariance (after update):\n",
" [[0.0193 0. ]\n",
" [0. 0.0193]]\n"
]
}
],
"source": [
"for i in range(1, 4):\n",
" # Create symbol for new state\n",
" xi = gtsam.symbol('x', i)\n",
" \n",
" # Prediction step: P(x_i) ~ P(x_i|x_{i-1}) P(x_{i-1})\n",
" # In Kalman Filter notation: x_{t+1|t} and P_{t+1|t}\n",
" motion = gtsam.BetweenFactorPoint2(last_symbol, xi, dX, Q)\n",
" Xi_predict = ekf.predict(motion)\n",
" print(f\"X{i} Predict:\", Xi_predict)\n",
" \n",
" # Update step: P(x_i|z_i) ~ P(z_i|x_i)*P(x_i)\n",
" # Assuming a measurement model h(x_{t}) = H*x_{t} + v\n",
" # where H is the observation model/matrix and v is noise with covariance R\n",
" measurement = gtsam.Point2(float(i), 0.0)\n",
" meas_factor = gtsam.PriorFactorPoint2(xi, measurement, R)\n",
" Xi_update = ekf.update(meas_factor)\n",
" print(f\"X{i} Update:\", Xi_update)\n",
" \n",
" # Move to next state\n",
" last_symbol = xi\n",
"\n",
"A = ekf.Density().getA()\n",
"information_matrix = A.transpose() @ A\n",
"covariance_matrix = np.linalg.inv(information_matrix)\n",
"print (\"\\nEasy Final Covariance (after update):\\n\", covariance_matrix)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -0,0 +1,191 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"\"\"\"\n",
"Simple linear Kalman filter on a moving 2D point using factor graphs in GTSAM.\n",
"This example manually creates all of the needed data structures to show how\n",
"the Kalman filter works under the hood using factor graphs, but uses a loop\n",
"to handle the repetitive prediction and update steps.\n",
"\n",
"Based on the C++ example by Frank Dellaert and Stephen Williams\n",
"\"\"\"\n",
"\n",
"import gtsam\n",
"import numpy as np\n",
"from gtsam import Point2, noiseModel\n",
"from gtsam.symbol_shorthand import X"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# [code below basically does SRIF with Cholesky]\n",
"\n",
"# Setup containers for linearization points\n",
"linearization_points = gtsam.Values()\n",
"\n",
"# Initialize state x0 at origin\n",
"x_initial = Point2(0, 0)\n",
"p_initial = noiseModel.Isotropic.Sigma(2, 0.1)\n",
"\n",
"# Add x0 to linearization points\n",
"linearization_points.insert(X(0), x_initial)\n",
"\n",
"# Initial factor graph with prior on X(0)\n",
"gfg = gtsam.GaussianFactorGraph()\n",
"ordering = gtsam.Ordering()\n",
"ordering.push_back(X(0))\n",
"gfg.add(X(0), p_initial.R(), np.zeros(2), noiseModel.Unit.Create(2))\n",
"\n",
"# Common parameters for all steps\n",
"motion_delta = Point2(1, 0) # Always move 1 unit to the right\n",
"process_noise = noiseModel.Isotropic.Sigma(2, 0.1)\n",
"measurement_noise = noiseModel.Isotropic.Sigma(2, 0.25)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"X1 Predict: [1. 0.]\n",
"X1 Update: [1. 0.]\n",
"X2 Predict: [2. 0.]\n",
"X2 Update: [2. 0.]\n",
"X3 Predict: [3. 0.]\n",
"X3 Update: [3. 0.]\n",
"\n",
"Elaborate Final Covariance (after update):\n",
" [[0.0193 0. ]\n",
" [0. 0.0193]]\n"
]
}
],
"source": [
"# Current state and conditional\n",
"current_x = X(0)\n",
"current_conditional = None\n",
"current_result = None\n",
"\n",
"# Run three predict-update cycles\n",
"for step in range(1, 4):\n",
" # =====================================================================\n",
" # Prediction step\n",
" # =====================================================================\n",
" next_x = X(step)\n",
" \n",
" # Create new graph with prior from previous step if not the first step\n",
" if step > 1:\n",
" gfg = gtsam.GaussianFactorGraph()\n",
" gfg.add(\n",
" current_x,\n",
" current_conditional.R(),\n",
" current_conditional.d() - current_conditional.R() @ current_result.at(current_x),\n",
" current_conditional.get_model()\n",
" )\n",
" \n",
" # Add next state to ordering and create motion model\n",
" ordering = gtsam.Ordering()\n",
" ordering.push_back(current_x)\n",
" ordering.push_back(next_x)\n",
" \n",
" # Create motion factor and add to graph\n",
" motion_factor = gtsam.BetweenFactorPoint2(current_x, next_x, motion_delta, process_noise)\n",
" \n",
" # Add next state to linearization points if this is the first step\n",
" if step == 1:\n",
" linearization_points.insert(next_x, x_initial)\n",
" else:\n",
" linearization_points.insert(next_x, \n",
" linearization_points.atPoint2(current_x))\n",
" \n",
" # Add linearized factor to graph\n",
" gfg.push_back(motion_factor.linearize(linearization_points))\n",
" \n",
" # Solve for prediction\n",
" prediction_bayes_net = gfg.eliminateSequential(ordering)\n",
" next_conditional = prediction_bayes_net.back()\n",
" prediction_result = prediction_bayes_net.optimize()\n",
" \n",
" # Extract and store predicted state\n",
" next_predict = linearization_points.atPoint2(next_x) + Point2(prediction_result.at(next_x))\n",
" print(f\"X{step} Predict:\", next_predict)\n",
" linearization_points.update(next_x, next_predict)\n",
" \n",
" # =====================================================================\n",
" # Update step\n",
" # =====================================================================\n",
" # Create new graph with prior from prediction\n",
" gfg = gtsam.GaussianFactorGraph()\n",
" gfg.add(\n",
" next_x,\n",
" next_conditional.R(),\n",
" next_conditional.d() - next_conditional.R() @ prediction_result.at(next_x),\n",
" next_conditional.get_model()\n",
" )\n",
" \n",
" # Create ordering for update\n",
" ordering = gtsam.Ordering()\n",
" ordering.push_back(next_x)\n",
" \n",
" # Create measurement at correct position\n",
" measurement = Point2(float(step), 0.0)\n",
" meas_factor = gtsam.PriorFactorPoint2(next_x, measurement, measurement_noise)\n",
" \n",
" # Add measurement factor to graph\n",
" gfg.push_back(meas_factor.linearize(linearization_points))\n",
" \n",
" # Solve for update\n",
" update_bayes_net = gfg.eliminateSequential(ordering)\n",
" current_conditional = update_bayes_net.back()\n",
" current_result = update_bayes_net.optimize()\n",
" \n",
" # Extract and store updated state\n",
" next_update = linearization_points.atPoint2(next_x) + Point2(current_result.at(next_x))\n",
" print(f\"X{step} Update:\", next_update)\n",
" linearization_points.update(next_x, next_update)\n",
" \n",
" # Move to next state\n",
" current_x = next_x\n",
"\n",
"final_R = current_conditional.R()\n",
"final_information = final_R.transpose() @ final_R\n",
"final_covariance = np.linalg.inv(final_information)\n",
"print(\"\\nElaborate Final Covariance (after update):\\n\", final_covariance)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}