Even more modular

release/4.3a0
Frank Dellaert 2025-04-24 15:58:33 -04:00
parent 8148b78af6
commit 1bf76be62b
1 changed files with 202 additions and 256 deletions

View File

@ -1,39 +1,25 @@
# gtsam_plotly_modular.py # gtsam_plotly_modular_v2.py
from typing import Any, Callable, Dict, List, Optional, Tuple
import numpy as np import numpy as np
import plotly.graph_objects as go import plotly.graph_objects as go
from tqdm.notebook import tqdm # Progress bar from tqdm.notebook import tqdm
from typing import List, Optional, Tuple, Dict, Any
import gtsam import gtsam
# --- Ellipse Calculation Helpers (Mostly unchanged) --- # --- Core Ellipse Calculations ---
def ellipse_path( def ellipse_path(
cx: float, cy: float, sizex: float, sizey: float, angle: float, N: int = 60 cx: float, cy: float, sizex: float, sizey: float, angle: float, N: int = 60
) -> str: ) -> str:
""" """Generates SVG path string for a rotated ellipse."""
Generates an SVG path string for an ellipse.
Args:
cx: Center x-coordinate.
cy: Center y-coordinate.
sizex: Full width of the ellipse along its major axis.
sizey: Full height of the ellipse along its minor axis.
angle: Rotation angle in degrees.
N: Number of points to approximate the ellipse.
Returns:
SVG path string.
"""
angle_rad = np.radians(angle) angle_rad = np.radians(angle)
t = np.linspace(0, 2 * np.pi, N) t = np.linspace(0, 2 * np.pi, N)
x = (sizex / 2) * np.cos(t) x_unit = (sizex / 2) * np.cos(t)
y = (sizey / 2) * np.sin(t) y_unit = (sizey / 2) * np.sin(t)
x_rot = cx + x_unit * np.cos(angle_rad) - y_unit * np.sin(angle_rad)
x_rot = cx + x * np.cos(angle_rad) - y * np.sin(angle_rad) y_rot = cy + x_unit * np.sin(angle_rad) + y_unit * np.cos(angle_rad)
y_rot = cy + x * np.sin(angle_rad) + y * np.cos(angle_rad)
path = ( path = (
f"M {x_rot[0]},{y_rot[0]} " f"M {x_rot[0]},{y_rot[0]} "
+ " ".join(f"L{x_},{y_}" for x_, y_ in zip(x_rot[1:], y_rot[1:])) + " ".join(f"L{x_},{y_}" for x_, y_ in zip(x_rot[1:], y_rot[1:]))
@ -45,125 +31,150 @@ def ellipse_path(
def gtsam_cov_to_plotly_ellipse( def gtsam_cov_to_plotly_ellipse(
cov_matrix: np.ndarray, scale: float = 2.0 cov_matrix: np.ndarray, scale: float = 2.0
) -> Tuple[float, float, float]: ) -> Tuple[float, float, float]:
""" """Calculates ellipse angle (deg), width, height from 2x2 covariance."""
Calculates ellipse parameters (angle, width, height) from a 2x2 covariance matrix. cov = cov_matrix[:2, :2] + np.eye(2) * 1e-9 # Ensure positive definite
Args:
cov_matrix: The 2x2 covariance matrix (or larger, only top-left 2x2 used).
scale: Scaling factor for the ellipse size (e.g., 2.0 for 2-sigma).
Returns:
Tuple containing (angle_degrees, width, height).
"""
# Ensure positive definite - add small epsilon if needed
cov = cov_matrix[:2, :2] + np.eye(2) * 1e-9
try: try:
eigvals, eigvecs = np.linalg.eigh(cov) eigvals, eigvecs = np.linalg.eigh(cov)
# Ensure eigenvalues are positive for sqrt eigvals = np.maximum(eigvals, 1e-9) # Ensure positive eigenvalues
eigvals = np.maximum(eigvals, 1e-9)
except np.linalg.LinAlgError: except np.linalg.LinAlgError:
# print("Warning: Covariance matrix SVD failed, using default ellipse.") # Optional warning return 0, 0.1 * scale, 0.1 * scale # Default on failure
return 0, 0.1 * scale, 0.1 * scale # Default small ellipse
# Width/Height are 2*scale*sqrt(eigenvalue) (using full width/height) width = 2 * scale * np.sqrt(eigvals[1]) # Major axis (largest eigenvalue)
width = ( height = 2 * scale * np.sqrt(eigvals[0]) # Minor axis (smallest eigenvalue)
2 * scale * np.sqrt(eigvals[1]) angle_rad = np.arctan2(
) # Major axis corresponds to largest eigenvalue eigvecs[1, 1], eigvecs[0, 1]
height = ( ) # Angle of major axis eigenvector
2 * scale * np.sqrt(eigvals[0])
) # Minor axis corresponds to smallest eigenvalue
# Angle of the major axis (eigenvector corresponding to largest eigenvalue eigvals[1])
angle_rad = np.arctan2(eigvecs[1, 1], eigvecs[0, 1])
angle_deg = np.degrees(angle_rad) angle_deg = np.degrees(angle_rad)
return angle_deg, width, height return angle_deg, width, height
# --- Plotting Element Creation Helpers --- # --- Plotly Element Generators ---
def _add_ground_truth_traces( def create_gt_landmarks_trace(landmarks_gt_array: np.ndarray) -> Optional[go.Scatter]:
fig: go.Figure, landmarks_gt_array: np.ndarray, poses_gt: List[gtsam.Pose2] """Creates scatter trace for ground truth landmarks."""
) -> None: if landmarks_gt_array is None or landmarks_gt_array.size == 0:
"""Adds static ground truth landmark and path traces to the figure.""" return None
# Ground Truth Landmarks return go.Scatter(
if landmarks_gt_array is not None and landmarks_gt_array.size > 0: x=landmarks_gt_array[0, :],
fig.add_trace( y=landmarks_gt_array[1, :],
go.Scatter( mode="markers",
x=landmarks_gt_array[0, :], marker=dict(color="black", size=8, symbol="star"),
y=landmarks_gt_array[1, :], name="Landmarks GT",
mode="markers", )
marker=dict(color="black", size=8, symbol="star"),
name="Landmarks GT",
)
)
# Ground Truth Path
if poses_gt: def create_gt_path_trace(poses_gt: List[gtsam.Pose2]) -> Optional[go.Scatter]:
gt_path_x = [p.x() for p in poses_gt] """Creates line trace for ground truth path."""
gt_path_y = [p.y() for p in poses_gt] if not poses_gt:
fig.add_trace( return None
go.Scatter( gt_path_x = [p.x() for p in poses_gt]
x=gt_path_x, gt_path_y = [p.y() for p in poses_gt]
y=gt_path_y, return go.Scatter(
mode="lines", x=gt_path_x,
line=dict(color="gray", width=1, dash="dash"), y=gt_path_y,
name="Path GT", mode="lines",
) line=dict(color="gray", width=1, dash="dash"),
) name="Path GT",
)
def create_est_path_trace(
est_path_x: List[float], est_path_y: List[float]
) -> go.Scatter:
"""Creates scatter/line trace for the estimated path up to current step."""
return go.Scatter(
x=est_path_x,
y=est_path_y,
mode="lines+markers",
line=dict(color="red", width=2),
marker=dict(size=4, color="red"),
name="Path Est", # This name applies to the trace in the specific frame
)
def create_est_landmarks_trace(
est_landmarks_x: List[float], est_landmarks_y: List[float]
) -> Optional[go.Scatter]:
"""Creates scatter trace for currently estimated landmarks."""
if not est_landmarks_x:
return None
return go.Scatter(
x=est_landmarks_x,
y=est_landmarks_y,
mode="markers",
marker=dict(color="blue", size=6, symbol="x"),
name="Landmarks Est", # Applies to landmarks in the specific frame
)
def _create_ellipse_shape_dict( def _create_ellipse_shape_dict(
cx: float, cx, cy, angle, width, height, fillcolor, line_color, name_suffix
cy: float,
angle: float,
width: float,
height: float,
fillcolor: str,
line_color: str,
name: str,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Creates the dictionary required for a Plotly ellipse shape.""" """Helper to create the dictionary for a Plotly ellipse shape."""
return dict( return dict(
type="path", type="path",
path=ellipse_path(cx=cx, cy=cy, sizex=width, sizey=height, angle=angle, N=60), path=ellipse_path(cx=cx, cy=cy, sizex=width, sizey=height, angle=angle),
xref="x", xref="x",
yref="y", yref="y",
fillcolor=fillcolor, fillcolor=fillcolor,
line_color=line_color, line_color=line_color,
name=name, # Note: name isn't directly displayed for shapes, but good for metadata # name=f"{name_suffix} Cov", # Name isn't really used by Plotly for shapes
) )
def _create_single_frame_data( def create_pose_ellipse_shape(
pose_mean_xy: np.ndarray, pose_cov: np.ndarray, k: int, scale: float
) -> Dict[str, Any]:
"""Creates shape dictionary for a pose covariance ellipse."""
angle, width, height = gtsam_cov_to_plotly_ellipse(pose_cov, scale)
return _create_ellipse_shape_dict(
cx=pose_mean_xy[0],
cy=pose_mean_xy[1],
angle=angle,
width=width,
height=height,
fillcolor="rgba(255,0,255,0.2)",
line_color="rgba(255,0,255,0.5)",
name_suffix=f"Pose {k}",
)
def create_landmark_ellipse_shape(
lm_mean_xy: np.ndarray, lm_cov: np.ndarray, lm_index: int, scale: float
) -> Dict[str, Any]:
"""Creates shape dictionary for a landmark covariance ellipse."""
angle, width, height = gtsam_cov_to_plotly_ellipse(lm_cov, scale)
return _create_ellipse_shape_dict(
cx=lm_mean_xy[0],
cy=lm_mean_xy[1],
angle=angle,
width=width,
height=height,
fillcolor="rgba(0,0,255,0.1)",
line_color="rgba(0,0,255,0.3)",
name_suffix=f"LM {lm_index}",
)
# --- Frame Content Generation ---
def generate_frame_content(
k: int, k: int,
step_results: gtsam.Values, step_results: gtsam.Values,
step_marginals: Optional[gtsam.Marginals], step_marginals: Optional[gtsam.Marginals],
X: callable, X: Callable[[int], int],
L: callable, L: Callable[[int], int],
max_landmark_index: int, # Need to know the potential range of landmarks
ellipse_scale: float = 2.0, ellipse_scale: float = 2.0,
verbose: bool = False, verbose: bool = False,
) -> Tuple[List[go.Scatter], List[Dict[str, Any]]]: ) -> Tuple[List[go.Scatter], List[Dict[str, Any]]]:
""" """Generates all dynamic traces and shapes for a single animation frame `k`."""
Creates the traces and shapes for a single animation frame. frame_traces: List[go.Scatter] = []
frame_shapes: List[Dict[str, Any]] = []
Args: # 1. Gather Estimated Path Data
k: The current step index.
step_results: gtsam.Values for this step.
step_marginals: gtsam.Marginals for this step (or None).
X: Symbol function for poses.
L: Symbol function for landmarks.
ellipse_scale: Scaling factor for covariance ellipses.
verbose: If True, print warnings for covariance errors.
Returns:
A tuple containing (list_of_traces, list_of_shapes).
"""
traces = []
shapes = []
# 1. Estimated Path up to step k
est_path_x = [] est_path_x = []
est_path_y = [] est_path_y = []
for i in range(k + 1): for i in range(k + 1):
@ -172,115 +183,70 @@ def _create_single_frame_data(
pose = step_results.atPose2(pose_key) pose = step_results.atPose2(pose_key)
est_path_x.append(pose.x()) est_path_x.append(pose.x())
est_path_y.append(pose.y()) est_path_y.append(pose.y())
frame_traces.append(create_est_path_trace(est_path_x, est_path_y))
traces.append( # 2. Gather Estimated Landmark Data
go.Scatter(
x=est_path_x,
y=est_path_y,
mode="lines+markers",
line=dict(color="red", width=2),
marker=dict(size=4, color="red"),
name="Path Est", # Legend entry for the whole path
)
)
# 2. Estimated Landmarks known at step k
est_landmarks_x = [] est_landmarks_x = []
est_landmarks_y = [] est_landmarks_y = []
landmark_keys_in_frame = [] landmark_keys_in_frame = []
all_keys = step_results.keys() # Check all potential landmark keys up to max_landmark_index
for key_val in all_keys: for j in range(max_landmark_index + 1):
symbol = gtsam.Symbol(key_val) lm_key = L(j)
if symbol.chr() == ord("l"): # Check if it's a landmark symbol if step_results.exists(lm_key):
# Check existence again (though keys() implies existence) lm_point = step_results.atPoint2(lm_key)
if step_results.exists(key_val): est_landmarks_x.append(lm_point[0])
lm_point = step_results.atPoint2(key_val) est_landmarks_y.append(lm_point[1])
est_landmarks_x.append(lm_point[0]) landmark_keys_in_frame.append(lm_key)
est_landmarks_y.append(lm_point[1])
landmark_keys_in_frame.append(key_val)
if est_landmarks_x: lm_trace = create_est_landmarks_trace(est_landmarks_x, est_landmarks_y)
traces.append( if lm_trace:
go.Scatter( frame_traces.append(lm_trace)
x=est_landmarks_x,
y=est_landmarks_y,
mode="markers",
marker=dict(color="blue", size=6, symbol="x"),
name="Landmarks Est", # Legend entry for all estimated landmarks
)
)
# 3. Covariance Ellipses (if marginals available) # 3. Generate Covariance Ellipses (if marginals available)
if step_marginals is not None: if step_marginals is not None:
# Current Pose Covariance Ellipse # Pose ellipse
current_pose_key = X(k) current_pose_key = X(k)
if step_results.exists(current_pose_key): if step_results.exists(current_pose_key):
try: try:
pose_cov = step_marginals.marginalCovariance(current_pose_key) pose_cov = step_marginals.marginalCovariance(current_pose_key)
pose_mean = step_results.atPose2(current_pose_key).translation() pose_mean = step_results.atPose2(current_pose_key).translation()
angle, width, height = gtsam_cov_to_plotly_ellipse( frame_shapes.append(
pose_cov, scale=ellipse_scale create_pose_ellipse_shape(pose_mean, pose_cov, k, ellipse_scale)
)
shapes.append(
_create_ellipse_shape_dict(
cx=pose_mean[0],
cy=pose_mean[1],
angle=angle,
width=width,
height=height,
fillcolor="rgba(255,0,255,0.2)",
line_color="rgba(255,0,255,0.5)",
name=f"Pose {k} Cov",
)
) )
except Exception as e: except Exception as e:
if verbose: if verbose:
print( print(f"Warn: Pose {k} cov err @ step {k}: {e}")
f"Warning: Failed getting pose {k} cov ellipse at step {k}: {e}"
)
# Landmark Covariance Ellipses # Landmark ellipses
for lm_key in landmark_keys_in_frame: for lm_key in landmark_keys_in_frame:
try: try:
lm_cov = step_marginals.marginalCovariance(lm_key) lm_cov = step_marginals.marginalCovariance(lm_key)
lm_mean = step_results.atPoint2(lm_key) lm_mean = step_results.atPoint2(lm_key)
angle, width, height = gtsam_cov_to_plotly_ellipse( lm_index = gtsam.Symbol(lm_key).index()
lm_cov, scale=ellipse_scale frame_shapes.append(
) create_landmark_ellipse_shape(
symbol = gtsam.Symbol(lm_key) lm_mean, lm_cov, lm_index, ellipse_scale
shapes.append(
_create_ellipse_shape_dict(
cx=lm_mean[0],
cy=lm_mean[1],
angle=angle,
width=width,
height=height,
fillcolor="rgba(0,0,255,0.1)",
line_color="rgba(0,0,255,0.3)",
name=f"LM {symbol.index()} Cov",
) )
) )
except Exception as e: except Exception as e:
symbol = gtsam.Symbol(lm_key) lm_index = gtsam.Symbol(lm_key).index()
if verbose: if verbose:
print( print(f"Warn: LM {lm_index} cov err @ step {k}: {e}")
f"Warning: Failed getting landmark {symbol.index()} cov ellipse at step {k}: {e}"
)
return traces, shapes return frame_traces, frame_shapes
def _configure_figure_layout( # --- Figure Configuration ---
def configure_figure_layout(
fig: go.Figure, fig: go.Figure,
num_steps: int, num_steps: int,
world_size: float, world_size: float,
initial_shapes: List[Dict[str, Any]], initial_shapes: List[Dict[str, Any]],
) -> None: ) -> None:
"""Configures the Plotly figure's layout, axes, slider, and buttons.""" """Configures Plotly figure layout, axes, slider, buttons."""
steps = list(range(num_steps + 1)) steps = list(range(num_steps + 1))
# Slider
sliders = [ sliders = [
dict( dict(
active=0, active=0,
@ -291,12 +257,10 @@ def _configure_figure_layout(
label=str(k), label=str(k),
method="animate", method="animate",
args=[ args=[
[str(k)], # Frame name [str(k)],
dict( dict(
mode="immediate", mode="immediate",
frame=dict( frame=dict(duration=100, redraw=True),
duration=100, redraw=True
), # Redraw needed for shapes
transition=dict(duration=0), transition=dict(duration=0),
), ),
], ],
@ -305,18 +269,22 @@ def _configure_figure_layout(
], ],
) )
] ]
# Buttons
updatemenus = [ updatemenus = [
dict( dict(
type="buttons", type="buttons",
showactive=False, showactive=False,
direction="left",
pad={"r": 10, "t": 87},
x=0.1,
xanchor="right",
y=0,
yanchor="top",
buttons=[ buttons=[
dict( dict(
label="Play", label="Play",
method="animate", method="animate",
args=[ args=[
None, # Animate all frames None,
dict( dict(
mode="immediate", mode="immediate",
frame=dict(duration=100, redraw=True), frame=dict(duration=100, redraw=True),
@ -329,7 +297,7 @@ def _configure_figure_layout(
label="Pause", label="Pause",
method="animate", method="animate",
args=[ args=[
[None], # Stop animation [None],
dict( dict(
mode="immediate", mode="immediate",
frame=dict(duration=0, redraw=False), frame=dict(duration=0, redraw=False),
@ -338,25 +306,15 @@ def _configure_figure_layout(
], ],
), ),
], ],
direction="left",
pad={"r": 10, "t": 87},
x=0.1,
xanchor="right",
y=0,
yanchor="top",
) )
] ]
# Layout settings
fig.update_layout( fig.update_layout(
title="Iterative Factor Graph SLAM Animation", title="Iterative Factor Graph SLAM Animation",
xaxis=dict( xaxis=dict(range=[-world_size / 2 - 2, world_size / 2 + 2], constrain="domain"),
range=[-world_size / 2 - 2, world_size / 2 + 2],
constrain="domain", # Keep aspect ratio when zooming
),
yaxis=dict( yaxis=dict(
range=[-world_size / 2 - 2, world_size / 2 + 2], range=[-world_size / 2 - 2, world_size / 2 + 2],
scaleanchor="x", # Ensure square aspect ratio scaleanchor="x",
scaleratio=1, scaleratio=1,
), ),
width=800, width=800,
@ -364,8 +322,7 @@ def _configure_figure_layout(
hovermode="closest", hovermode="closest",
updatemenus=updatemenus, updatemenus=updatemenus,
sliders=sliders, sliders=sliders,
shapes=initial_shapes, # Set initial shapes from frame 0 shapes=initial_shapes,
# Add legend if desired
legend=dict( legend=dict(
traceorder="reversed", traceorder="reversed",
title_text="Legend", title_text="Legend",
@ -378,90 +335,79 @@ def _configure_figure_layout(
) )
# --- Main Animation Function (Orchestrator) --- # --- Main Animation Orchestrator ---
def create_slam_animation( def create_slam_animation(
results_history: List[gtsam.Values], results_history: List[gtsam.Values],
marginals_history: List[Optional[gtsam.Marginals]], marginals_history: List[Optional[gtsam.Marginals]],
num_steps: int, num_steps: int,
X: callable, X: Callable[[int], int],
L: callable, L: Callable[[int], int],
max_landmark_index: int, # Required to iterate potential landmarks
landmarks_gt_array: Optional[np.ndarray] = None, landmarks_gt_array: Optional[np.ndarray] = None,
poses_gt: Optional[List[gtsam.Pose2]] = None, poses_gt: Optional[List[gtsam.Pose2]] = None,
world_size: float = 20.0, world_size: float = 20.0,
ellipse_scale: float = 2.0, ellipse_scale: float = 2.0,
verbose_cov_errors: bool = False, verbose_cov_errors: bool = False,
) -> go.Figure: ) -> go.Figure:
""" """Creates a modular Plotly SLAM animation."""
Creates a Plotly animation of the SLAM results in a modular way.
Args:
results_history: List of gtsam.Values, one per step.
marginals_history: List of gtsam.Marginals or None, one per step.
num_steps: The total number of steps (results_history should have length num_steps + 1).
X: Symbol function for poses (e.g., lambda i: gtsam.symbol('x', i)).
L: Symbol function for landmarks (e.g., lambda j: gtsam.symbol('l', j)).
landmarks_gt_array: Optional Nx2 numpy array of ground truth landmark positions.
poses_gt: Optional list of gtsam.Pose2 ground truth poses.
world_size: Approximate size of the world for axis scaling.
ellipse_scale: Scaling factor for covariance ellipses (e.g., 2.0 for 2-sigma).
verbose_cov_errors: If True, print warnings for covariance calculation errors.
Returns:
A plotly.graph_objects.Figure containing the animation.
"""
print("Generating Plotly animation...") print("Generating Plotly animation...")
fig = go.Figure() fig = go.Figure()
# 1. Add static ground truth elements # 1. Add static ground truth traces to the base figure (visible always)
_add_ground_truth_traces(fig, landmarks_gt_array, poses_gt) gt_lm_trace = create_gt_landmarks_trace(landmarks_gt_array)
if gt_lm_trace:
fig.add_trace(gt_lm_trace)
gt_path_trace = create_gt_path_trace(poses_gt)
if gt_path_trace:
fig.add_trace(gt_path_trace)
# 2. Create frames for animation # 2. Generate frames with dynamic content
frames = [] frames = []
steps_iterable = range(num_steps + 1) steps_iterable = range(num_steps + 1)
# Use tqdm for progress bar if available
try: try:
steps_iterable = tqdm(steps_iterable, desc="Creating Frames") steps_iterable = tqdm(steps_iterable, desc="Creating Frames")
except NameError: except NameError:
pass # tqdm not installed or not in notebook env pass # tqdm optional
for k in steps_iterable: for k in steps_iterable:
step_results = results_history[k] step_results = results_history[k]
step_marginals = marginals_history[k] if marginals_history else None step_marginals = marginals_history[k] if marginals_history else None
# Create traces and shapes for this specific frame frame_traces, frame_shapes = generate_frame_content(
frame_traces, frame_shapes = _create_single_frame_data( k,
k, step_results, step_marginals, X, L, ellipse_scale, verbose_cov_errors step_results,
step_marginals,
X,
L,
max_landmark_index,
ellipse_scale,
verbose_cov_errors,
) )
# Create the Plotly frame object
frames.append( frames.append(
go.Frame( go.Frame(
data=frame_traces, data=frame_traces, name=str(k), layout=go.Layout(shapes=frame_shapes)
name=str(k), # Name used by slider/buttons
layout=go.Layout(
shapes=frame_shapes
), # Shapes are part of layout per frame
) )
) )
# 3. Set initial figure state (using data from frame 0) # 3. Set initial dynamic data (from frame 0) onto the base figure
initial_dynamic_traces = []
initial_shapes = []
if frames: if frames:
# Add traces from the first frame as the initial state # Important: Add *copies* or ensure traces are regenerated if needed,
for trace in frames[0].data: # though Plotly usually handles this ok with frame data.
fig.add_trace(trace) initial_dynamic_traces = frames[0].data
initial_shapes = frames[0].layout.shapes if frames[0].layout else [] initial_shapes = frames[0].layout.shapes if frames[0].layout else []
else: for trace in initial_dynamic_traces:
initial_shapes = [] fig.add_trace(trace) # Add Est Path[0], Est Landmarks[0] traces
# 4. Assign frames to the figure # 4. Assign frames to the figure
fig.update(frames=frames) fig.update(frames=frames)
# 5. Configure overall layout, slider, buttons # 5. Configure layout, axes, controls
_configure_figure_layout(fig, num_steps, world_size, initial_shapes) # Pass initial_shapes for the layout's starting state
configure_figure_layout(fig, num_steps, world_size, initial_shapes)
print("Plotly animation generated.") print("Plotly animation generated.")
return fig return fig