Two new modules

release/4.3a0
Frank Dellaert 2025-04-24 15:49:20 -04:00
parent 104bba2422
commit 423e447de0
3 changed files with 527 additions and 304 deletions

View File

@ -62,12 +62,17 @@
" * Optimize the final Bayes Net to get the updated state estimate (`Values`).\n",
"4. **Iteration:** Repeat prediction and update for each time step.\n",
"\n",
"**Enhancements in this version:**\n",
"* Simulation code is moved to `simulation.py`.\n",
"* Plotting code is moved to `gtsam_plotly.py`.\n",
"* The evolving factor graph is displayed at each step using `graphviz`.\n",
"\n",
"**Advantages:**\n",
"* Leverages GTSAM's robust factor graph machinery.\n",
"* Handles non-linearities through iterative linearization (like EKF, but potentially more robust within GTSAM's optimization context).\n",
"* Avoids explicit management of large, dense covariance matrices.\n",
"\n",
"**Output:** The process will be visualized using a Plotly animation showing the robot's estimated path, mapped landmarks, and uncertainty ellipses evolving over time."
"**Output:** The process will be visualized with the evolving factor graph, and finally using a Plotly animation showing the robot's estimated path, mapped landmarks, and uncertainty ellipses evolving over time."
]
},
{
@ -101,13 +106,22 @@
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt # For initial plot if desired\n",
"import matplotlib.pyplot as plt # For initial plot if desired (optional)\n",
"import plotly.graph_objects as go\n",
"from tqdm.notebook import tqdm # Progress bar\n",
"import math\n",
"import time # To slow down graphviz display if needed\n",
"\n",
"import gtsam\n",
"from gtsam.symbol_shorthand import X, L # Symbols for poses and landmarks"
"from gtsam.symbol_shorthand import X, L # Symbols for poses and landmarks\n",
"\n",
"# Imports for new modules\n",
"import simulation\n",
"import gtsam_plotly \n",
"\n",
"# Imports for graph visualization\n",
"import graphviz\n",
"from IPython.display import display, clear_output"
]
},
{
@ -132,7 +146,7 @@
"# Robot parameters\n",
"ROBOT_RADIUS = 3.0\n",
"ROBOT_ANGULAR_VEL = np.deg2rad(20.0) # Radians per step\n",
"NUM_STEPS = 50 # Reduced steps for faster animation generation\n",
"NUM_STEPS = 20 # Reduced steps for faster animation generation\n",
"DT = 1.0 # Time step duration (simplified)\n",
"\n",
"# Noise parameters (GTSAM Noise Models)\n",
@ -143,13 +157,12 @@
"# Measurement noise (bearing, range)\n",
"MEASUREMENT_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([np.deg2rad(2.0), 0.2]))\n",
"\n",
"# Create samplers for noise models\n",
"prior_noise_sampler = gtsam.Sampler(PRIOR_NOISE, 42)\n",
"odometry_noise_sampler = gtsam.Sampler(ODOMETRY_NOISE, 42)\n",
"measurement_noise_sampler = gtsam.Sampler(MEASUREMENT_NOISE, 42)\n",
"\n",
"# Sensor parameters\n",
"MAX_SENSOR_RANGE = 5.0"
"MAX_SENSOR_RANGE = 5.0\n",
"\n",
"# Visualization parameters\n",
"SHOW_GRAPHVIZ_EACH_STEP = True # Set to False to only show final graph\n",
"GRAPHVIZ_PAUSE_SECONDS = 1 # Pause briefly to allow viewing the graph"
]
},
{
@ -159,79 +172,30 @@
"source": [
"## 3. Generate Ground Truth Data\n",
"\n",
"Create true landmark positions, robot path, and simulate noisy measurements."
"Use the `simulation` module to create true landmark positions, robot path, and simulate noisy measurements."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ground-truth-code-srif",
"id": "ground-truth-call-srif",
"metadata": {},
"outputs": [],
"source": [
"# 3.1 Ground Truth Landmarks\n",
"landmarks_gt = (np.random.rand(2, NUM_LANDMARKS) - 0.5) * WORLD_SIZE\n",
"landmark_ids = list(range(NUM_LANDMARKS))\n",
"landmarks_gt_dict = {L(i): gtsam.Point2(landmarks_gt[:, i]) for i in range(NUM_LANDMARKS)}\n",
"\n",
"# 3.2 Ground Truth Robot Path\n",
"poses_gt = []\n",
"current_pose_gt = gtsam.Pose2(ROBOT_RADIUS, 0, np.pi / 2) # Start on circle edge\n",
"poses_gt.append(current_pose_gt)\n",
"\n",
"for k in range(NUM_STEPS):\n",
" delta_theta = ROBOT_ANGULAR_VEL * DT\n",
" arc_length = ROBOT_ANGULAR_VEL * ROBOT_RADIUS * DT\n",
" motion_command = gtsam.Pose2(arc_length, 0, delta_theta)\n",
" current_pose_gt = current_pose_gt.compose(motion_command)\n",
" poses_gt.append(current_pose_gt)\n",
"\n",
"# 3.3 Simulate Noisy Odometry Measurements (as Pose2 objects)\n",
"odometry_measurements = []\n",
"for k in range(NUM_STEPS):\n",
" pose_k = poses_gt[k]\n",
" pose_k1 = poses_gt[k+1]\n",
" true_odom = pose_k.between(pose_k1)\n",
" \n",
" # Sample noise directly for Pose2 composition (approximate)\n",
" # A more rigorous approach samples from the tangent space\n",
" odom_noise_vec = odometry_noise_sampler.sample()\n",
" noisy_odom = true_odom.compose(gtsam.Pose2(odom_noise_vec[0], odom_noise_vec[1], odom_noise_vec[2]))\n",
" odometry_measurements.append(noisy_odom)\n",
"\n",
"# 3.4 Simulate Noisy Bearing-Range Measurements\n",
"# measurements_sim[k] = list of (L(lm_id), range, bearing) for measurements *at* pose k\n",
"measurements_sim = [[] for _ in range(NUM_STEPS + 1)]\n",
"for k in range(NUM_STEPS + 1):\n",
" robot_pose = poses_gt[k]\n",
" for lm_id in range(NUM_LANDMARKS):\n",
" lm_gt_pt = landmarks_gt_dict[L(lm_id)]\n",
" try:\n",
" # Use BearingRangeFactor2D to simulate the measurement model\n",
" measurement_factor = gtsam.BearingRangeFactor2D(X(k), L(lm_id), \n",
" robot_pose.bearing(lm_gt_pt),\n",
" robot_pose.range(lm_gt_pt), \n",
" MEASUREMENT_NOISE)\n",
" true_range = measurement_factor.measured().range()\n",
" true_bearing = measurement_factor.measured().bearing()\n",
"\n",
" if true_range <= MAX_SENSOR_RANGE and abs(true_bearing.theta())<np.pi/4:\n",
" # Sample noise\n",
" noise_vec = measurement_noise_sampler.sample()\n",
" noisy_bearing = true_bearing.retract(np.array([noise_vec[0]])) # Retract on SO(2)\n",
" noisy_range = true_range + noise_vec[1]\n",
" \n",
" if noisy_range > 0: # Ensure range is positive\n",
" measurements_sim[k].append((L(lm_id), noisy_bearing, noisy_range))\n",
" except Exception as e:\n",
" # Catch potential errors like point being too close to the pose\n",
" print(f\"Sim Warning at step {k}, landmark {lm_id}: {e}\")\n",
" pass\n",
"\n",
"print(f\"Generated {NUM_LANDMARKS} landmarks.\")\n",
"print(f\"Generated {NUM_STEPS} ground truth poses and odometry measurements.\")\n",
"num_meas_total = sum(len(m_list) for m_list in measurements_sim)\n",
"print(f\"Generated {num_meas_total} bearing-range measurements across all steps.\")"
"landmarks_gt_dict, poses_gt, odometry_measurements, measurements_sim, landmarks_gt_array = \\\n",
" simulation.generate_simulation_data(\n",
" num_landmarks=NUM_LANDMARKS,\n",
" world_size=WORLD_SIZE,\n",
" robot_radius=ROBOT_RADIUS,\n",
" robot_angular_vel=ROBOT_ANGULAR_VEL,\n",
" num_steps=NUM_STEPS,\n",
" dt=DT,\n",
" odometry_noise_model=ODOMETRY_NOISE,\n",
" measurement_noise_model=MEASUREMENT_NOISE,\n",
" max_sensor_range=MAX_SENSOR_RANGE,\n",
" X=X, # Pass symbol functions\n",
" L=L\n",
" )"
]
},
{
@ -241,7 +205,25 @@
"source": [
"## 4. Iterative Factor Graph SLAM Implementation\n",
"\n",
"Here we perform the step-by-step prediction and update using factor graphs."
"Here we perform the step-by-step prediction and update using factor graphs. The evolving graph will be displayed."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "90bf9320",
"metadata": {},
"outputs": [],
"source": [
"writer = gtsam.GraphvizFormatting()\n",
"writer.binaryEdges = True\n",
"\n",
"def maybe_show(graph, estimate, message):\n",
" if SHOW_GRAPHVIZ_EACH_STEP:\n",
" print(message)\n",
" graph_dot = graph.dot(estimate, writer=writer)\n",
" display(graphviz.Source(graph_dot, engine='neato'))\n",
" time.sleep(GRAPHVIZ_PAUSE_SECONDS)"
]
},
{
@ -258,11 +240,15 @@
"\n",
"# Add prior on the initial pose X(0)\n",
"initial_pose = poses_gt[0] # Start at ground truth (or add noise)\n",
"current_estimate.insert(current_pose_key, initial_pose)\n",
"# Add a bit of noise to the initial guess if desired:\n",
"# initial_pose_noisy = initial_pose.compose(gtsam.Pose2(0.1, 0.1, 0.01))\n",
"# current_estimate.insert(current_pose_key, initial_pose_noisy)\n",
"current_estimate.insert(current_pose_key, initial_pose) \n",
"\n",
"current_graph.add(gtsam.PriorFactorPose2(current_pose_key, initial_pose, PRIOR_NOISE))\n",
"\n",
"# Variables to store results for animation\n",
"results_history = [current_estimate] # Store Values object at each step\n",
"results_history = [gtsam.Values(current_estimate)] # Store Values object at each step\n",
"marginals_history = [] # Store Marginals object at each step\n",
"known_landmarks = set() # Set of landmark keys L(j) currently in the state\n",
"\n",
@ -273,7 +259,9 @@
" marginals_history.append(initial_marginals)\n",
"except Exception as e:\n",
" print(f\"Error computing initial marginals: {e}\")\n",
" marginals_history.append(None) # Append placeholder if fails\n"
" marginals_history.append(None) # Append placeholder if fails\n",
"\n",
"maybe_show(current_graph, current_estimate, \"--- Step 0 --- Initial Graph with Prior ---\")"
]
},
{
@ -300,18 +288,15 @@
" odom_k = odometry_measurements[k]\n",
" \n",
" # Predict next pose and add to Values (linearization point)\n",
" # Ensure previous pose exists before accessing\n",
" predicted_pose = current_estimate.atPose2(prev_pose_key).compose(odom_k)\n",
" current_estimate.insert(current_pose_key, predicted_pose)\n",
" \n",
" # Insert or update the current pose prediction\n",
" current_estimate.insert_or_assign(current_pose_key, predicted_pose)\n",
" \n",
" # Add odometry factor to the graph\n",
" current_graph.add(gtsam.BetweenFactorPose2(prev_pose_key, current_pose_key, odom_k, ODOMETRY_NOISE))\n",
"\n",
" # --- At this point, the full graph contains the history --- \n",
" # --- To mimic SRIF/EKF, we *could* eliminate prev_pose_key, but --- \n",
" # --- it's simpler here to keep the full graph and optimize it --- \n",
" # --- then extract the current belief for the next step's factors --- \n",
" # --- Let's proceed with adding measurements first ----\n",
"\n",
" # === Update Step ===\n",
" measurements_k1 = measurements_sim[k + 1] # Measurements taken AT pose k+1\n",
" landmarks_observed_this_step = set()\n",
@ -329,42 +314,29 @@
" lm_initial_guess = current_pose_val.transformFrom(gtsam.Point2(delta_x, delta_y))\n",
" current_estimate.insert(lm_key, lm_initial_guess)\n",
" known_landmarks.add(lm_key)\n",
" # print(f\"Step {k+1}: Initialized Landmark {lm_key.index()}\")\n",
"\n",
" # Add measurement factor\n",
" current_graph.add(gtsam.BearingRangeFactor2D(current_pose_key, lm_key, \n",
" measured_bearing, measured_range,\n",
" MEASUREMENT_NOISE))\n",
" measured_bearing, measured_range,\n",
" MEASUREMENT_NOISE))\n",
" \n",
" # --- Optimization (Incremental Smoothing) ---\n",
" # In a true filter, we'd eliminate previous states.\n",
" # Here, we re-optimize the growing graph. For efficiency in large problems, \n",
" # iSAM2 would be used, but Levenberg-Marquardt on the growing graph \n",
" # demonstrates the concept for this smaller example.\n",
" \n",
" optimizer = gtsam.LevenbergMarquardtOptimizer(current_graph, current_estimate)\n",
" try:\n",
" current_estimate = optimizer.optimize() # Update estimates by optimizing the whole graph so far\n",
" except Exception as e:\n",
" print(f\"Optimization failed at step {k+1}: {e}\")\n",
" # Potentially revert estimate update or handle error\n",
" # For simplicity, we continue with the potentially unoptimized estimate\n",
" pass \n",
" current_estimate = optimizer.optimize() # Update estimates by optimizing the whole graph so far\n",
" \n",
" # --- Display Factor Graph --- \n",
" maybe_show(current_graph, current_estimate, f\"--- Step {k+1} --- Factor Graph ---\")\n",
"\n",
" # Store results for animation\n",
" results_history.append(gtsam.Values(current_estimate)) # Store a copy\n",
" \n",
" # Calculate marginals for visualization (can be slow for large graphs)\n",
" try:\n",
" current_gaussian_graph = current_graph.linearize(current_estimate)\n",
" current_marginals = gtsam.Marginals(current_gaussian_graph, current_estimate)\n",
" marginals_history.append(current_marginals)\n",
" except Exception as e:\n",
" print(f\"Marginals calculation failed at step {k+1}: {e}\")\n",
" marginals_history.append(None) # Append placeholder\n",
" current_gaussian_graph = current_graph.linearize(current_estimate)\n",
" current_marginals = gtsam.Marginals(current_gaussian_graph, current_estimate)\n",
" marginals_history.append(current_marginals)\n",
"\n",
"print(\"Iterative Factor Graph SLAM finished.\")\n",
"print(f\"Final number of poses: {k+2}\")\n",
"print(\"\\nIterative Factor Graph SLAM finished.\")\n",
"print(f\"Final number of poses estimated: {len([key for key in current_estimate.keys() if gtsam.Symbol(key).chr() == ord('x')])}\")\n",
"print(f\"Number of landmarks mapped: {len(known_landmarks)}\")"
]
},
@ -373,208 +345,30 @@
"id": "plotly-animation",
"metadata": {},
"source": [
"## 5. Create Plotly Animation"
"## 5. Create Plotly Animation\n",
"\n",
"Use the `gtsam_plotly` module to visualize the results."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "plotly-animation-code",
"id": "plotly-animation-call-code",
"metadata": {
"scrolled": false,
"tags": [
"remove-input"
]
"tags": []
},
"outputs": [],
"source": [
"print(\"Generating Plotly animation...\")\n",
"\n",
"def ellipse_path(cx, cy, sizex, sizey, angle, N=60):\n",
" \"\"\"SVG path string for an ellipse centered at (cx, cy), rotated by `angle` in degrees.\"\"\"\n",
" angle_rad = np.radians(angle)\n",
" t = np.linspace(0, 2 * np.pi, N)\n",
" x = (sizex / 2) * np.cos(t)\n",
" y = (sizey / 2) * np.sin(t)\n",
"\n",
" x_rot = cx + x * np.cos(angle_rad) - y * np.sin(angle_rad)\n",
" y_rot = cy + x * np.sin(angle_rad) + y * np.cos(angle_rad)\n",
"\n",
" path = f\"M {x_rot[0]},{y_rot[0]} \" + \" \".join(\n",
" f\"L{x_},{y_}\" for x_, y_ in zip(x_rot[1:], y_rot[1:])\n",
" ) + \" Z\"\n",
" return path\n",
"\n",
"# Helper to convert GTSAM covariance to Plotly ellipse parameters\n",
"def gtsam_cov_to_plotly_ellipse(cov_matrix, scale=2.0):\n",
" \"\"\"Calculates ellipse parameters (angle, width, height) from a 2x2 covariance matrix.\"\"\"\n",
" # Ensure positive definite - add small epsilon if needed\n",
" cov = cov_matrix[:2, :2] + np.eye(2) * 1e-9 \n",
" try:\n",
" eigvals, eigvecs = np.linalg.eigh(cov)\n",
" # Ensure eigenvalues are positive for sqrt\n",
" eigvals = np.maximum(eigvals, 1e-9)\n",
" except np.linalg.LinAlgError:\n",
" print(\"Warning: Covariance matrix SVD failed, using default ellipse.\")\n",
" return 0, 0.1*scale, 0.1*scale # Default small ellipse\n",
" \n",
" # Width/Height are 2*scale*sqrt(eigenvalue)\n",
" width = 2 * scale * np.sqrt(eigvals[1])\n",
" height = 2 * scale * np.sqrt(eigvals[0])\n",
" \n",
" # Angle of the major axis (corresponding to largest eigenvalue)\n",
" angle_rad = np.arctan2(eigvecs[1, 1], eigvecs[0, 1])\n",
" angle_deg = np.degrees(angle_rad)\n",
" \n",
" return angle_deg, width, height\n",
"\n",
"# --- Create Plotly Figure --- \n",
"fig = go.Figure()\n",
"\n",
"# Add Ground Truth Landmarks (static)\n",
"fig.add_trace(go.Scatter(\n",
" x=landmarks_gt[0, :], \n",
" y=landmarks_gt[1, :], \n",
" mode='markers', \n",
" marker=dict(color='black', size=8, symbol='star'),\n",
" name='Landmarks GT'\n",
"))\n",
"\n",
"# Add Ground Truth Path (static)\n",
"gt_path_x = [p.x() for p in poses_gt]\n",
"gt_path_y = [p.y() for p in poses_gt]\n",
"fig.add_trace(go.Scatter(\n",
" x=gt_path_x,\n",
" y=gt_path_y,\n",
" mode='lines',\n",
" line=dict(color='gray', width=1, dash='dash'),\n",
" name='Path GT'\n",
"))\n",
"\n",
"# --- Animation Frames --- \n",
"frames = []\n",
"steps = list(range(NUM_STEPS + 1))\n",
"\n",
"for k in tqdm(steps):\n",
" frame_data = []\n",
" step_results = results_history[k]\n",
" step_marginals = marginals_history[k]\n",
" \n",
" # Estimated Path up to step k\n",
" est_path_x = [results_history[i].atPose2(X(i)).x() for i in range(k + 1)]\n",
" est_path_y = [results_history[i].atPose2(X(i)).y() for i in range(k + 1)]\n",
" frame_data.append(go.Scatter(\n",
" x=est_path_x,\n",
" y=est_path_y,\n",
" mode='lines+markers',\n",
" line=dict(color='red', width=2),\n",
" marker=dict(size=4, color='red'),\n",
" name='Path Est'\n",
" ))\n",
" \n",
" # Estimated Landmarks known at step k\n",
" est_landmarks_x = []\n",
" est_landmarks_y = []\n",
" landmark_keys_in_frame = []\n",
" for lm_key in step_results.keys():\n",
" if gtsam.symbolChr(lm_key)==108: # Check if it's a landmark key\n",
" lm_pose = step_results.atPoint2(lm_key)\n",
" est_landmarks_x.append(lm_pose[0])\n",
" est_landmarks_y.append(lm_pose[1])\n",
" landmark_keys_in_frame.append(lm_key)\n",
" \n",
" if est_landmarks_x:\n",
" frame_data.append(go.Scatter(\n",
" x=est_landmarks_x, \n",
" y=est_landmarks_y, \n",
" mode='markers',\n",
" marker=dict(color='blue', size=6, symbol='x'),\n",
" name='Landmarks Est'\n",
" ))\n",
"\n",
" # Current Pose Covariance Ellipse\n",
" shapes = [] # List to hold ellipse shapes for this frame\n",
" if step_marginals is not None:\n",
" try:\n",
" current_pose_key = X(k)\n",
" pose_cov = step_marginals.marginalCovariance(current_pose_key)\n",
" pose_mean = step_results.atPose2(current_pose_key).translation()\n",
" angle, width, height = gtsam_cov_to_plotly_ellipse(pose_cov)\n",
" cx, cy = pose_mean[0], pose_mean[1]\n",
" shapes.append(dict(\n",
" type=\"path\",\n",
" path=ellipse_path(cx=cx, cy=cy, sizex=width, sizey=height, angle=angle, N=60),\n",
" xref=\"x\", yref=\"y\",\n",
" fillcolor=\"rgba(255,0,255,0.2)\",\n",
" line_color=\"rgba(255,0,255,0.5)\",\n",
" name=f'Pose {k} Cov'\n",
" ))\n",
" except Exception as e:\n",
" print(f\"Warning: Failed getting pose cov ellipse at step {k}: {e}\")\n",
"\n",
" # Landmark Covariance Ellipses\n",
" for lm_key in landmark_keys_in_frame:\n",
" index = gtsam.symbolIndex(lm_key)\n",
" try:\n",
" lm_cov = step_marginals.marginalCovariance(lm_key)\n",
" lm_mean = step_results.atPoint2(lm_key)\n",
" angle, width, height = gtsam_cov_to_plotly_ellipse(lm_cov)\n",
" cx, cy = lm_mean[0], lm_mean[1]\n",
" shapes.append(dict(\n",
" type=\"path\",\n",
" path=ellipse_path(cx=cx, cy=cy, sizex=width, sizey=height, angle=angle, N=60),\n",
" xref=\"x\", yref=\"y\",\n",
" fillcolor=\"rgba(0,0,255,0.1)\",\n",
" line_color=\"rgba(0,0,255,0.3)\",\n",
" name=f'LM {index} Cov'\n",
" ))\n",
" except Exception as e:\n",
" print(f\"Warning: Failed getting landmark {index} cov ellipse at step {k}: {e}\")\n",
"\n",
" frames.append(go.Frame(data=frame_data, name=str(k), layout=go.Layout(shapes=shapes)))\n",
"\n",
"# --- Set Initial State and Layout --- \n",
"fig.update(frames=frames)\n",
"\n",
"# Set initial data to the first frame's data\n",
"if frames:\n",
" fig.add_traces(frames[0].data)\n",
" initial_shapes = frames[0].layout.shapes if frames[0].layout else []\n",
"else:\n",
" initial_shapes = []\n",
"\n",
"# Define slider\n",
"sliders = [dict(\n",
" active=0,\n",
" currentvalue={\"prefix\": \"Step: \"},\n",
" pad={\"t\": 50},\n",
" steps=[dict(label=str(k), \n",
" method='animate', \n",
" args=[[str(k)], \n",
" dict(mode='immediate', \n",
" frame=dict(duration=100, redraw=True), \n",
" transition=dict(duration=0))])\n",
" for k in steps]\n",
")]\n",
"\n",
"# Update layout\n",
"fig.update_layout(\n",
" title='Iterative Factor Graph SLAM Animation',\n",
" xaxis=dict(range=[-WORLD_SIZE/2 - 2, WORLD_SIZE/2 + 2], constrain='domain'),\n",
" yaxis=dict(range=[-WORLD_SIZE/2 - 2, WORLD_SIZE/2 + 2], scaleanchor='x', scaleratio=1),\n",
" width=800, height=800,\n",
" hovermode='closest',\n",
" updatemenus=[dict(type='buttons', \n",
" showactive=False,\n",
" buttons=[dict(label='Play',\n",
" method='animate', \n",
" args=[None, \n",
" dict(mode='immediate', \n",
" frame=dict(duration=100, redraw=True), \n",
" transition=dict(duration=0), \n",
" fromcurrent=True)])])],\n",
" sliders=sliders,\n",
" shapes=initial_shapes # Set initial shapes\n",
"fig = gtsam_plotly.create_slam_animation(\n",
" results_history=results_history,\n",
" marginals_history=marginals_history,\n",
" landmarks_gt_array=landmarks_gt_array,\n",
" poses_gt=poses_gt,\n",
" num_steps=NUM_STEPS,\n",
" world_size=WORLD_SIZE,\n",
" X=X, # Pass symbol functions\n",
" L=L\n",
")\n",
"\n",
"print(\"Displaying animation...\")\n",
@ -589,6 +383,8 @@
"## 6. Discussion\n",
"\n",
"* **Approach:** This notebook implemented SLAM iteratively using GTSAM factor graphs. At each step, new factors (odometry, measurements) were added, and the graph was re-optimized using Levenberg-Marquardt. This is more akin to **incremental smoothing** than a classic filter (which would explicitly marginalize past states).\n",
"* **Modularity:** Simulation and Plotly visualization code have been moved into separate `simulation.py` and `gtsam_plotly.py` files for better organization.\n",
"* **Graph Visualization:** The `graphviz` library was used to render the factor graph at each step (or only at the end, depending on `SHOW_GRAPHVIZ_EACH_STEP`). This helps visualize how the graph structure grows and connects poses and landmarks.\n",
"* **Efficiency:** Optimizing the entire graph at every step becomes computationally expensive for long trajectories. For real-time performance or large problems, **iSAM2 (Incremental Smoothing and Mapping)** is the preferred GTSAM algorithm. iSAM2 efficiently updates the solution by only re-linearizing and re-solving parts of the graph affected by new measurements.\n",
"* **Accuracy vs. EKF:** This factor graph approach generally handles non-linearities better than a standard EKF because it re-linearizes during optimization. It avoids some of the consistency pitfalls associated with the EKF's single linearization point per step.\n",
"* **Visualization:** The Plotly animation shows the evolution of the robot's path estimate, the map of landmarks, and their associated uncertainties (covariance ellipses). You can observe how measurements help refine the estimates and reduce uncertainty, especially when loops are closed (implicitly here through repeated observations of landmarks)."
@ -597,7 +393,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "py312",
"language": "python",
"name": "python3"
},
@ -611,7 +407,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.12.6"
}
},
"nbformat": 4,

View File

@ -0,0 +1,290 @@
# gtsam_plotly.py
import numpy as np
import plotly.graph_objects as go
from tqdm.notebook import tqdm # Progress bar
import gtsam
def ellipse_path(cx, cy, sizex, sizey, angle, N=60):
"""SVG path string for an ellipse centered at (cx, cy), rotated by `angle` in degrees."""
angle_rad = np.radians(angle)
t = np.linspace(0, 2 * np.pi, N)
x = (sizex / 2) * np.cos(t)
y = (sizey / 2) * np.sin(t)
x_rot = cx + x * np.cos(angle_rad) - y * np.sin(angle_rad)
y_rot = cy + x * np.sin(angle_rad) + y * np.cos(angle_rad)
path = (
f"M {x_rot[0]},{y_rot[0]} "
+ " ".join(f"L{x_},{y_}" for x_, y_ in zip(x_rot[1:], y_rot[1:]))
+ " Z"
)
return path
# Helper to convert GTSAM covariance to Plotly ellipse parameters
def gtsam_cov_to_plotly_ellipse(cov_matrix, scale=2.0):
"""Calculates ellipse parameters (angle, width, height) from a 2x2 covariance matrix."""
# Ensure positive definite - add small epsilon if needed
cov = cov_matrix[:2, :2] + np.eye(2) * 1e-9
try:
eigvals, eigvecs = np.linalg.eigh(cov)
# Ensure eigenvalues are positive for sqrt
eigvals = np.maximum(eigvals, 1e-9)
except np.linalg.LinAlgError:
# print("Warning: Covariance matrix SVD failed, using default ellipse.") # Can be verbose
return 0, 0.1 * scale, 0.1 * scale # Default small ellipse
# Width/Height are 2*scale*sqrt(eigenvalue)
width = 2 * scale * np.sqrt(eigvals[1])
height = 2 * scale * np.sqrt(eigvals[0])
# Angle of the major axis (corresponding to largest eigenvalue)
angle_rad = np.arctan2(eigvecs[1, 1], eigvecs[0, 1])
angle_deg = np.degrees(angle_rad)
return angle_deg, width, height
def create_slam_animation(
results_history, # List of gtsam.Values
marginals_history, # List of gtsam.Marginals or None
landmarks_gt_array, # Nx2 numpy array
poses_gt, # List of gtsam.Pose2
num_steps,
world_size,
X, # Symbol func
L, # Symbol func
):
"""Creates a Plotly animation of the SLAM results."""
print("Generating Plotly animation...")
fig = go.Figure()
# Add Ground Truth Landmarks (static)
fig.add_trace(
go.Scatter(
x=landmarks_gt_array[0, :],
y=landmarks_gt_array[1, :],
mode="markers",
marker=dict(color="black", size=8, symbol="star"),
name="Landmarks GT",
)
)
# Add Ground Truth Path (static)
gt_path_x = [p.x() for p in poses_gt]
gt_path_y = [p.y() for p in poses_gt]
fig.add_trace(
go.Scatter(
x=gt_path_x,
y=gt_path_y,
mode="lines",
line=dict(color="gray", width=1, dash="dash"),
name="Path GT",
)
)
# --- Animation Frames ---
frames = []
steps = list(range(num_steps + 1))
for k in tqdm(steps):
frame_data = []
step_results = results_history[k]
step_marginals = marginals_history[k]
# Estimated Path up to step k
est_path_x = []
est_path_y = []
for i in range(k + 1):
pose_key = X(i)
if step_results.exists(
pose_key
): # Check if pose exists in results for this step
pose = step_results.atPose2(pose_key)
est_path_x.append(pose.x())
est_path_y.append(pose.y())
frame_data.append(
go.Scatter(
x=est_path_x,
y=est_path_y,
mode="lines+markers",
line=dict(color="red", width=2),
marker=dict(size=4, color="red"),
name="Path Est",
)
)
# Estimated Landmarks known at step k
est_landmarks_x = []
est_landmarks_y = []
landmark_keys_in_frame = []
# Iterate through all possible landmark keys seen so far, check if present in current step's results
all_keys = step_results.keys()
for lm_key_val in all_keys: # Use keys() for integer keys
if gtsam.Symbol(lm_key_val).chr() == ord("l"): # Check if symbol chr is 'l'
lm_key = lm_key_val # Use the integer key directly
if step_results.exists(lm_key):
lm_pose = step_results.atPoint2(lm_key)
est_landmarks_x.append(lm_pose[0])
est_landmarks_y.append(lm_pose[1])
landmark_keys_in_frame.append(lm_key)
if est_landmarks_x:
frame_data.append(
go.Scatter(
x=est_landmarks_x,
y=est_landmarks_y,
mode="markers",
marker=dict(color="blue", size=6, symbol="x"),
name="Landmarks Est",
)
)
# Covariance Ellipses
shapes = [] # List to hold ellipse shapes for this frame
if step_marginals is not None:
# Current Pose Covariance Ellipse
try:
current_pose_key = X(k)
if step_results.exists(current_pose_key):
pose_cov = step_marginals.marginalCovariance(current_pose_key)
pose_mean = step_results.atPose2(current_pose_key).translation()
angle, width, height = gtsam_cov_to_plotly_ellipse(pose_cov)
cx, cy = pose_mean[0], pose_mean[1]
shapes.append(
dict(
type="path",
path=ellipse_path(
cx=cx,
cy=cy,
sizex=width,
sizey=height,
angle=angle,
N=60,
),
xref="x",
yref="y",
fillcolor="rgba(255,0,255,0.2)",
line_color="rgba(255,0,255,0.5)",
name=f"Pose {k} Cov",
)
)
except Exception as e:
print(
f"Warning: Failed getting pose {k} cov ellipse at step {k}: {e}"
) # Can be verbose
pass
# Landmark Covariance Ellipses
for lm_key in landmark_keys_in_frame:
try:
lm_cov = step_marginals.marginalCovariance(lm_key)
lm_mean = step_results.atPoint2(lm_key)
angle, width, height = gtsam_cov_to_plotly_ellipse(lm_cov)
cx, cy = lm_mean[0], lm_mean[1]
index = gtsam.Symbol(lm_key).index()
shapes.append(
dict(
type="path",
path=ellipse_path(
cx=cx,
cy=cy,
sizex=width,
sizey=height,
angle=angle,
N=60,
),
xref="x",
yref="y",
fillcolor="rgba(0,0,255,0.1)",
line_color="rgba(0,0,255,0.3)",
name=f"LM {index} Cov",
)
)
except Exception as e:
index = gtsam.Symbol(lm_key).index()
print(
f"Warning: Failed getting landmark {index} cov ellipse at step {k}: {e}"
) # Can be verbose
frames.append(
go.Frame(data=frame_data, name=str(k), layout=go.Layout(shapes=shapes))
)
# --- Set Initial State and Layout ---
fig.update(frames=frames)
# Set initial data to the first frame's data
if frames:
fig.add_traces(frames[0].data)
initial_shapes = frames[0].layout.shapes if frames[0].layout else []
else:
initial_shapes = []
# Define slider
sliders = [
dict(
active=0,
currentvalue={"prefix": "Step: "},
pad={"t": 50},
steps=[
dict(
label=str(k),
method="animate",
args=[
[str(k)],
dict(
mode="immediate",
frame=dict(duration=100, redraw=True),
transition=dict(duration=0),
),
],
)
for k in steps
],
)
]
# Update layout
fig.update_layout(
title="Iterative Factor Graph SLAM Animation",
xaxis=dict(range=[-world_size / 2 - 2, world_size / 2 + 2], constrain="domain"),
yaxis=dict(
range=[-world_size / 2 - 2, world_size / 2 + 2],
scaleanchor="x",
scaleratio=1,
),
width=800,
height=800,
hovermode="closest",
updatemenus=[
dict(
type="buttons",
showactive=False,
buttons=[
dict(
label="Play",
method="animate",
args=[
None,
dict(
mode="immediate",
frame=dict(duration=100, redraw=True),
transition=dict(duration=0),
fromcurrent=True,
),
],
)
],
)
],
sliders=sliders,
shapes=initial_shapes, # Set initial shapes
)
print("Plotly animation generated.")
return fig

View File

@ -0,0 +1,137 @@
# simulation.py
import numpy as np
import gtsam
def generate_simulation_data(
num_landmarks,
world_size,
robot_radius,
robot_angular_vel,
num_steps,
dt,
odometry_noise_model,
measurement_noise_model,
max_sensor_range,
X, # Symbol generator function
L, # Symbol generator function
odom_seed=42,
meas_seed=42,
landmark_seed=42,
):
"""Generates ground truth and simulated measurements for SLAM.
Args:
num_landmarks: Number of landmarks to generate.
world_size: Size of the square world environment.
robot_radius: Radius of the robot's circular path.
robot_angular_vel: Angular velocity of the robot (rad/step).
num_steps: Number of simulation steps.
dt: Time step duration.
odometry_noise_model: GTSAM noise model for odometry.
measurement_noise_model: GTSAM noise model for bearing-range.
max_sensor_range: Maximum range of the bearing-range sensor.
X: GTSAM symbol shorthand function for poses.
L: GTSAM symbol shorthand function for landmarks.
odom_seed: Random seed for odometry noise.
meas_seed: Random seed for measurement noise.
landmark_seed: Random seed for landmark placement.
Returns:
tuple: Contains:
- landmarks_gt_dict (dict): L(i) -> gtsam.Point2 ground truth.
- poses_gt (list): List of gtsam.Pose2 ground truth poses.
- odometry_measurements (list): List of noisy gtsam.Pose2 odometry.
- measurements_sim (list): List of lists, measurements_sim[k] contains
tuples (L(lm_id), bearing, range) for step k.
- landmarks_gt_array (np.array): 2xN numpy array of landmark positions.
"""
np.random.seed(landmark_seed)
odometry_noise_sampler = gtsam.Sampler(odometry_noise_model, odom_seed)
measurement_noise_sampler = gtsam.Sampler(measurement_noise_model, meas_seed)
# 1. Ground Truth Landmarks
landmarks_gt_array = (np.random.rand(2, num_landmarks) - 0.5) * world_size
landmarks_gt_dict = {
L(i): gtsam.Point2(landmarks_gt_array[:, i]) for i in range(num_landmarks)
}
# 2. Ground Truth Robot Path
poses_gt = []
current_pose_gt = gtsam.Pose2(robot_radius, 0, np.pi / 2) # Start on circle edge
poses_gt.append(current_pose_gt)
for _ in range(num_steps):
delta_theta = robot_angular_vel * dt
arc_length = robot_angular_vel * robot_radius * dt
motion_command = gtsam.Pose2(arc_length, 0, delta_theta)
current_pose_gt = current_pose_gt.compose(motion_command)
poses_gt.append(current_pose_gt)
# 3. Simulate Noisy Odometry Measurements
odometry_measurements = []
for k in range(num_steps):
pose_k = poses_gt[k]
pose_k1 = poses_gt[k + 1]
true_odom = pose_k.between(pose_k1)
# Sample noise directly for Pose2 composition (approximate)
odom_noise_vec = odometry_noise_sampler.sample()
noisy_odom = true_odom.compose(
gtsam.Pose2(odom_noise_vec[0], odom_noise_vec[1], odom_noise_vec[2])
)
odometry_measurements.append(noisy_odom)
# 4. Simulate Noisy Bearing-Range Measurements
measurements_sim = [[] for _ in range(num_steps + 1)]
for k in range(num_steps + 1):
robot_pose = poses_gt[k]
for lm_id in range(num_landmarks):
lm_gt_pt = landmarks_gt_dict[L(lm_id)]
try:
measurement_factor = gtsam.BearingRangeFactor2D(
X(k),
L(lm_id),
robot_pose.bearing(lm_gt_pt),
robot_pose.range(lm_gt_pt),
measurement_noise_model,
)
true_range = measurement_factor.measured().range()
true_bearing = measurement_factor.measured().bearing()
# Check sensor limits (range and Field of View - e.g. +/- 45 degrees)
if (
true_range <= max_sensor_range
and abs(true_bearing.theta()) < np.pi / 4
):
# Sample noise
noise_vec = measurement_noise_sampler.sample()
noisy_bearing = true_bearing.retract(
np.array([noise_vec[0]])
) # Retract on SO(2)
noisy_range = true_range + noise_vec[1]
if noisy_range > 0: # Ensure range is positive
measurements_sim[k].append(
(L(lm_id), noisy_bearing, noisy_range)
)
except Exception as e:
# Catch potential errors like point being too close to the pose
# print(f"Sim Warning at step {k}, landmark {lm_id}: {e}") # Can be verbose
pass
print(f"Simulation Generated: {num_landmarks} landmarks.")
print(
f"Simulation Generated: {num_steps + 1} ground truth poses and {num_steps} odometry measurements."
)
num_meas_total = sum(len(m_list) for m_list in measurements_sim)
print(f"Simulation Generated: {num_meas_total} bearing-range measurements.")
return (
landmarks_gt_dict,
poses_gt,
odometry_measurements,
measurements_sim,
landmarks_gt_array,
)