Better plotting, true filter

release/4.3a0
Frank Dellaert 2025-04-26 09:43:05 -04:00
parent d1780601cd
commit b07b77c40d
4 changed files with 16313 additions and 257 deletions

View File

@ -669,17 +669,17 @@ virtual class NonlinearEquality2 : gtsam::NoiseModelFactor {
};
#include <gtsam/nonlinear/FixedLagSmoother.h>
// This class is not available in python, just use a dictionary
class FixedLagSmootherKeyTimestampMapValue {
FixedLagSmootherKeyTimestampMapValue(size_t key, double timestamp);
FixedLagSmootherKeyTimestampMapValue(const gtsam::FixedLagSmootherKeyTimestampMapValue& other);
};
// This class is not available in python, just use a dictionary
class FixedLagSmootherKeyTimestampMap {
FixedLagSmootherKeyTimestampMap();
FixedLagSmootherKeyTimestampMap(const gtsam::FixedLagSmootherKeyTimestampMap& other);
// Note: no print function
// common STL methods
size_t size() const;
bool empty() const;
@ -739,6 +739,7 @@ virtual class IncrementalFixedLagSmoother : gtsam::FixedLagSmoother {
void print(string s = "IncrementalFixedLagSmoother:\n") const;
gtsam::Matrix marginalCovariance(size_t key) const;
gtsam::ISAM2Params params() const;
gtsam::NonlinearFactorGraph getFactors() const;

View File

@ -1,4 +1,4 @@
# Symbolic Module
# Symbolic
The `symbolic` module in GTSAM deals with the *structure* of factor graphs and Bayesian networks, independent of the specific numerical types of factors (like Gaussian or discrete). It allows for analyzing graph connectivity, determining optimal variable elimination orders, and understanding the sparsity structure of the resulting inference objects.

File diff suppressed because one or more lines are too long

View File

@ -1,4 +1,4 @@
# gtsam_plotly_modular_v3.py
# gtsam_plotly.py
import base64
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple
@ -6,7 +6,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple
import graphviz
import numpy as np
import plotly.graph_objects as go
from tqdm.notebook import tqdm
from tqdm.notebook import tqdm # Optional progress bar
import gtsam
@ -17,9 +17,9 @@ class SlamFrameData:
"""Holds all data needed for a single frame of the SLAM animation."""
step_index: int
results: gtsam.Values
marginals: Optional[gtsam.Marginals]
graph_dot_string: Optional[str] = None # Store the Graphviz DOT string
results: gtsam.Values # Estimates for variables active at this step
marginals: Optional[gtsam.Marginals] # Marginals for variables active at this step
graph_dot_string: Optional[str] = None # Graphviz DOT string for visualization
# --- Core Ellipse Calculation & Path Generation ---
@ -28,14 +28,13 @@ class SlamFrameData:
def create_ellipse_path_from_cov(
cx: float, cy: float, cov_matrix: np.ndarray, scale: float = 2.0, N: int = 60
) -> str:
"""Generates SVG path string for an ellipse directly from 2x2 covariance."""
cov = cov_matrix[:2, :2] + np.eye(2) * 1e-9 # Ensure positive definite
"""Generates SVG path string for an ellipse from 2x2 covariance."""
cov = cov_matrix[:2, :2] + np.eye(2) * 1e-9 # Ensure positive definite 2x2
try:
eigvals, eigvecs = np.linalg.eigh(cov)
# eigh sorts eigenvalues in ascending order
eigvals = np.maximum(eigvals, 1e-9) # Ensure positive eigenvalues
minor_std, major_std = np.sqrt(eigvals)
v_minor, v_major = eigvecs[:, 0], eigvecs[:, 1] # Corresponding eigenvectors
minor_std, major_std = np.sqrt(eigvals) # eigh sorts ascending
v_minor, v_major = eigvecs[:, 0], eigvecs[:, 1]
except np.linalg.LinAlgError:
# Fallback to a small circle if decomposition fails
radius = 0.1 * scale
@ -43,8 +42,7 @@ def create_ellipse_path_from_cov(
x_p = cx + radius * np.cos(t)
y_p = cy + radius * np.sin(t)
else:
# Parametric equation: Center + scale * major_std * v_major * cos(t) + scale * minor_std * v_minor * sin(t)
# Note: We use scale*std which corresponds to half-axis lengths a,b used before
# Parametric equation using eigenvectors and eigenvalues
t = np.linspace(0, 2 * np.pi, N)
cos_t, sin_t = np.cos(t), np.sin(t)
x_p = cx + scale * (
@ -66,7 +64,9 @@ def create_ellipse_path_from_cov(
# --- Plotly Element Generators ---
def create_gt_landmarks_trace(landmarks_gt: np.ndarray) -> Optional[go.Scatter]:
def create_gt_landmarks_trace(
landmarks_gt: Optional[np.ndarray],
) -> Optional[go.Scatter]:
"""Creates scatter trace for ground truth landmarks."""
if landmarks_gt is None or landmarks_gt.size == 0:
return None
@ -79,7 +79,7 @@ def create_gt_landmarks_trace(landmarks_gt: np.ndarray) -> Optional[go.Scatter]:
)
def create_gt_path_trace(poses_gt: List[gtsam.Pose2]) -> Optional[go.Scatter]:
def create_gt_path_trace(poses_gt: Optional[List[gtsam.Pose2]]) -> Optional[go.Scatter]:
"""Creates line trace for ground truth path."""
if not poses_gt:
return None
@ -95,21 +95,36 @@ def create_gt_path_trace(poses_gt: List[gtsam.Pose2]) -> Optional[go.Scatter]:
def create_est_path_trace(
est_path_x: List[float], est_path_y: List[float]
) -> go.Scatter:
"""Creates trace for the estimated path."""
"""Creates trace for the estimated path (all poses up to current)."""
return go.Scatter(
x=est_path_x,
y=est_path_y,
mode="lines+markers",
line=dict(color="red", width=2),
marker=dict(size=4, color="red"),
line=dict(color="rgba(255, 0, 0, 0.3)", width=1), # Fainter line for history
marker=dict(size=4, color="red"), # Keep markers prominent
name="Path Est",
)
def create_current_pose_trace(
current_pose: Optional[gtsam.Pose2],
) -> Optional[go.Scatter]:
"""Creates a single marker trace for the current estimated pose."""
if current_pose is None:
return None
return go.Scatter(
x=[current_pose.x()],
y=[current_pose.y()],
mode="markers",
marker=dict(color="magenta", size=10, symbol="cross"),
name="Current Pose",
)
def create_est_landmarks_trace(
est_landmarks_x: List[float], est_landmarks_y: List[float]
) -> Optional[go.Scatter]:
"""Creates trace for estimated landmarks."""
"""Creates trace for currently estimated landmarks."""
if not est_landmarks_x:
return None
return go.Scatter(
@ -122,7 +137,7 @@ def create_est_landmarks_trace(
def _create_ellipse_shape_dict(
cx, cy, cov, scale, fillcolor, line_color
cx: float, cy: float, cov: np.ndarray, scale: float, fillcolor: str, line_color: str
) -> Dict[str, Any]:
"""Helper: Creates dict for a Plotly ellipse shape from covariance."""
path = create_ellipse_path_from_cov(cx, cy, cov, scale)
@ -133,6 +148,7 @@ def _create_ellipse_shape_dict(
yref="y",
fillcolor=fillcolor,
line_color=line_color,
opacity=0.7, # Make ellipses slightly transparent
)
@ -145,8 +161,8 @@ def create_pose_ellipse_shape(
cy=pose_mean_xy[1],
cov=pose_cov,
scale=scale,
fillcolor="rgba(255,0,255,0.2)",
line_color="rgba(255,0,255,0.5)",
fillcolor="rgba(255,0,255,0.2)", # Magenta fill
line_color="rgba(255,0,255,0.5)", # Magenta line
)
@ -159,8 +175,8 @@ def create_landmark_ellipse_shape(
cy=lm_mean_xy[1],
cov=lm_cov,
scale=scale,
fillcolor="rgba(0,0,255,0.1)",
line_color="rgba(0,0,255,0.3)",
fillcolor="rgba(0,0,255,0.1)", # Blue fill
line_color="rgba(0,0,255,0.3)", # Blue line
)
@ -171,18 +187,12 @@ def dot_string_to_base64_svg(
if not dot_string:
return None
try:
# Set DPI for potentially better default sizing, though less critical for SVG
# graph_attr = {'dpi': '150'} # You can experiment with this
source = graphviz.Source(dot_string, engine=engine) # , graph_attr=graph_attr)
# Use pipe() to get bytes directly without saving a file, format="svg"
source = graphviz.Source(dot_string, engine=engine)
svg_bytes = source.pipe(format="svg")
# Encode bytes to base64 string, decode result to UTF-8 string
encoded_string = base64.b64encode(svg_bytes).decode("utf-8")
# Return data URI for SVG
return f"data:image/svg+xml;base64,{encoded_string}"
except graphviz.backend.execute.CalledProcessError as e:
print(f"Graphviz rendering error ({engine}): {e}")
# Optionally return a placeholder or raise
return None
except Exception as e:
print(f"Unexpected error during Graphviz SVG generation: {e}")
@ -201,65 +211,77 @@ def generate_frame_content(
) -> Tuple[List[go.Scatter], List[Dict[str, Any]], Optional[Dict[str, Any]]]:
"""Generates dynamic traces, shapes, and layout image for a single frame."""
k = frame_data.step_index
# Use the results specific to this frame for current elements
step_results = frame_data.results
step_marginals = frame_data.marginals
frame_dynamic_traces: List[go.Scatter] = [] # Renamed for clarity
frame_dynamic_traces: List[go.Scatter] = []
frame_shapes: List[Dict[str, Any]] = []
layout_image: Optional[Dict[str, Any]] = None
# 1. Estimated Path
est_path_x = [
step_results.atPose2(X(i)).x()
for i in range(k + 1)
if step_results.exists(X(i))
]
est_path_y = [
step_results.atPose2(X(i)).y()
for i in range(k + 1)
if step_results.exists(X(i))
]
frame_dynamic_traces.append(create_est_path_trace(est_path_x, est_path_y))
# 1. Estimated Path (Full History or Partial)
est_path_x = []
est_path_y = []
current_pose_est = None
# 2. Estimated Landmarks
# Plot poses currently existing in the step_results (e.g., within lag)
for i in range(k + 1): # Check poses up to current step index
pose_key = X(i)
if step_results.exists(pose_key):
pose = step_results.atPose2(pose_key)
est_path_x.append(pose.x())
est_path_y.append(pose.y())
if i == k:
current_pose_est = pose
path_trace = create_est_path_trace(est_path_x, est_path_y)
if path_trace:
frame_dynamic_traces.append(path_trace)
# Add a distinct marker for the current pose estimate
current_pose_trace = create_current_pose_trace(current_pose_est)
if current_pose_trace:
frame_dynamic_traces.append(current_pose_trace)
# 2. Estimated Landmarks (Only those present in step_results)
est_landmarks_x, est_landmarks_y, landmark_keys = [], [], []
for j in range(max_landmark_index + 1):
lm_key = L(j)
# Check existence in the results for the *current frame*
if step_results.exists(lm_key):
lm_point = step_results.atPoint2(lm_key)
est_landmarks_x.append(lm_point[0])
est_landmarks_y.append(lm_point[1])
landmark_keys.append(lm_key)
landmark_keys.append(lm_key) # Store keys for covariance lookup
lm_trace = create_est_landmarks_trace(est_landmarks_x, est_landmarks_y)
if lm_trace:
frame_dynamic_traces.append(lm_trace)
# 3. Covariance Ellipses
# 3. Covariance Ellipses (Only for items in step_results AND step_marginals)
if step_marginals:
# Current Pose Ellipse
pose_key = X(k)
if step_results.exists(pose_key):
try:
# Retrieve cov from marginals specific to this frame
cov = step_marginals.marginalCovariance(pose_key)
# Ensure mean comes from the pose in current results
mean = step_results.atPose2(pose_key).translation()
frame_shapes.append(create_pose_ellipse_shape(mean, cov, ellipse_scale))
except RuntimeError as e: # More specific exception type
if verbose:
print(f"Warn: Pose {k} cov err @ step {k}: {e}")
except Exception as e:
if verbose:
print(f"Warn: Pose {k} cov OTHER err @ step {k}: {e}")
# Landmark Ellipses (Iterate over keys found in step_results)
for lm_key in landmark_keys:
try:
# Retrieve cov from marginals specific to this frame
cov = step_marginals.marginalCovariance(lm_key)
# Ensure mean comes from the landmark in current results
mean = step_results.atPoint2(lm_key)
frame_shapes.append(
create_landmark_ellipse_shape(mean, cov, ellipse_scale)
)
except RuntimeError as e: # More specific exception type
except RuntimeError: # Covariance might not be available
if verbose:
print(
f"Warn: LM {gtsam.Symbol(lm_key).index()} cov err @ step {k}: {e}"
f"Warn: LM {gtsam.Symbol(lm_key).index()} cov not in marginals @ step {k}"
)
except Exception as e:
if verbose:
@ -267,9 +289,8 @@ def generate_frame_content(
f"Warn: LM {gtsam.Symbol(lm_key).index()} cov OTHER err @ step {k}: {e}"
)
# 4. Graph Image for Layout (MODIFIED TO USE SVG)
# Use the SVG function with the dot string from frame_data
img_src = dot_string_to_base64_svg( # Changed function call
# 4. Graph Image for Layout
img_src = dot_string_to_base64_svg(
frame_data.graph_dot_string, engine=graphviz_engine
)
if img_src:
@ -284,10 +305,10 @@ def generate_frame_content(
xanchor="left",
yanchor="top",
layer="below",
sizing="contain", # Contain should work well with SVG
sizing="contain",
)
# Return only the things that CHANGE frame-to-frame
# Return dynamic elements for this frame
return frame_dynamic_traces, frame_shapes, layout_image
@ -336,7 +357,7 @@ def configure_figure_layout(
x=plot_domain[0],
xanchor="left",
y=0,
yanchor="top", # Position relative to plot area
yanchor="top",
buttons=[
dict(
label="Play",
@ -371,35 +392,35 @@ def configure_figure_layout(
title="Factor Graph SLAM Animation (Graph Left, Results Right)",
xaxis=dict(
range=[-world_size / 2 - 2, world_size / 2 + 2],
domain=plot_domain, # Confine axis to right pane
domain=plot_domain,
constrain="domain",
),
yaxis=dict(
range=[-world_size / 2 - 2, world_size / 2 + 2],
scaleanchor="x",
scaleratio=1,
domain=[0, 1], # Full height within its domain
domain=[0, 1],
),
width=1000,
height=600, # Adjust size for side-by-side
height=600,
hovermode="closest",
updatemenus=updatemenus,
sliders=sliders,
shapes=initial_shapes, # Initial shapes for the SLAM plot
images=(
[initial_image] if initial_image else []
), # Initial image for the left pane
showlegend=False, # Forego legend for space
# Autosize=True might help responsiveness but can interfere with domain
shapes=initial_shapes, # Initial shapes (frame 0)
images=([initial_image] if initial_image else []), # Initial image (frame 0)
showlegend=True, # Keep legend for clarity
legend=dict(
x=plot_domain[0],
y=1,
traceorder="normal", # Position legend
bgcolor="rgba(255,255,255,0.5)",
),
)
# --- Main Animation Orchestrator ---
# --- Main Animation Orchestrator (MODIFIED TO FIX GT DISPLAY) ---
def create_slam_animation(
history: List[SlamFrameData],
X: Callable[[int], int],
@ -447,8 +468,7 @@ def create_slam_animation(
verbose_cov_errors,
)
# 3. Add initial traces to the figure (GT first, then initial dynamic traces)
# These define the plot state BEFORE the animation starts or when slider is at 0
# 3. Add initial traces (GT + dynamic frame 0)
for trace in gt_traces:
fig.add_trace(trace)
for trace in initial_dynamic_traces:
@ -457,24 +477,12 @@ def create_slam_animation(
# 4. Generate frames for the animation (k=0 to num_steps)
frames = []
steps_iterable = range(num_steps + 1)
try:
steps_iterable = tqdm(steps_iterable, desc="Creating Frames")
except NameError: # Handle if tqdm is not available (e.g. standard Python script)
pass
except Exception as e: # Catch other potential tqdm issues
print(
f"Note: Could not initialize tqdm progress bar ({e}). Continuing without it."
)
for k in steps_iterable:
frame_data = next((item for item in history if item.step_index == k), None)
if frame_data is None:
print(f"Warning: Missing data for step {k} in history. Skipping frame.")
# Optionally create an empty frame or reuse previous frame's data
# For simplicity, we skip here, which might cause jumps in animation
continue
# Generate dynamic content for this specific frame
# Generate dynamic content specific to this frame
frame_dynamic_traces, frame_shapes, layout_image = generate_frame_content(
frame_data,
X,
@ -485,14 +493,18 @@ def create_slam_animation(
verbose_cov_errors,
)
# IMPORTANT: Combine static GT traces with the dynamic traces for THIS frame
# The 'data' in the frame replaces the figure's data for that step.
# Frame definition: includes static GT + dynamic traces for this step
# Layout updates only include shapes and images for this step
frames.append(
go.Frame(
data=gt_traces + frame_dynamic_traces, # Include GT in every frame
data=gt_traces
+ frame_dynamic_traces, # GT must be in each frame's data
name=str(k),
layout=go.Layout( # Only update shapes and images in layout per frame
shapes=frame_shapes, images=[layout_image] if layout_image else []
layout=go.Layout(
shapes=frame_shapes, # Replaces shapes list for this frame
images=(
[layout_image] if layout_image else []
), # Replaces image list
),
)
)
@ -501,7 +513,6 @@ def create_slam_animation(
fig.update(frames=frames)
# 6. Configure overall layout (sliders, buttons, axes, etc.)
# Pass the shapes and image corresponding to the INITIAL state (k=0)
configure_figure_layout(fig, num_steps, world_size, initial_shapes, initial_image)
print("Plotly animation generated.")