SVG graphs

release/4.3a0
Frank Dellaert 2025-04-25 09:50:41 -04:00
parent e4278687b4
commit d1780601cd
3 changed files with 99 additions and 54 deletions

View File

@ -108,17 +108,12 @@
"import numpy as np\n", "import numpy as np\n",
"from tqdm.notebook import tqdm # Progress bar\n", "from tqdm.notebook import tqdm # Progress bar\n",
"import math\n", "import math\n",
"import time # To slow down graphviz display if needed\n",
"\n", "\n",
"import gtsam\n", "import gtsam\n",
"from gtsam.symbol_shorthand import X, L # Symbols for poses and landmarks\n", "from gtsam.symbol_shorthand import X, L # Symbols for poses and landmarks\n",
"\n", "\n",
"# Imports for new modules\n",
"import simulation\n", "import simulation\n",
"from gtsam_plotly import SlamFrameData, create_slam_animation\n", "from gtsam_plotly import SlamFrameData, create_slam_animation"
"\n",
"# Imports for graph visualization\n",
"import graphviz"
] ]
}, },
{ {
@ -143,7 +138,7 @@
"# Robot parameters\n", "# Robot parameters\n",
"ROBOT_RADIUS = 3.0\n", "ROBOT_RADIUS = 3.0\n",
"ROBOT_ANGULAR_VEL = np.deg2rad(20.0) # Radians per step\n", "ROBOT_ANGULAR_VEL = np.deg2rad(20.0) # Radians per step\n",
"NUM_STEPS = 20 # Reduced steps for faster animation generation\n", "NUM_STEPS = 50 # Reduced steps for faster animation generation\n",
"DT = 1.0 # Time step duration (simplified)\n", "DT = 1.0 # Time step duration (simplified)\n",
"\n", "\n",
"# Noise parameters (GTSAM Noise Models)\n", "# Noise parameters (GTSAM Noise Models)\n",

View File

@ -164,17 +164,30 @@ def create_landmark_ellipse_shape(
) )
def dot_string_to_base64_png( def dot_string_to_base64_svg(
dot_string: Optional[str], engine: str = "neato" dot_string: Optional[str], engine: str = "neato"
) -> Optional[str]: ) -> Optional[str]:
"""Renders a DOT string to a base64 encoded PNG using graphviz.""" """Renders a DOT string to a base64 encoded SVG using graphviz."""
if not dot_string: if not dot_string:
return None return None
source = graphviz.Source(dot_string, engine="neato") try:
# Use pipe() to get bytes directly without saving a file # Set DPI for potentially better default sizing, though less critical for SVG
png_bytes = source.pipe(format="png") # graph_attr = {'dpi': '150'} # You can experiment with this
encoded_string = base64.b64encode(png_bytes).decode() source = graphviz.Source(dot_string, engine=engine) # , graph_attr=graph_attr)
return f"data:image/png;base64,{encoded_string}" # Use pipe() to get bytes directly without saving a file, format="svg"
svg_bytes = source.pipe(format="svg")
# Encode bytes to base64 string, decode result to UTF-8 string
encoded_string = base64.b64encode(svg_bytes).decode("utf-8")
# Return data URI for SVG
return f"data:image/svg+xml;base64,{encoded_string}"
except graphviz.backend.execute.CalledProcessError as e:
print(f"Graphviz rendering error ({engine}): {e}")
# Optionally return a placeholder or raise
return None
except Exception as e:
print(f"Unexpected error during Graphviz SVG generation: {e}")
return None
# --- Frame Content Generation --- # --- Frame Content Generation ---
def generate_frame_content( def generate_frame_content(
@ -183,7 +196,7 @@ def generate_frame_content(
L: Callable[[int], int], L: Callable[[int], int],
max_landmark_index: int, max_landmark_index: int,
ellipse_scale: float = 2.0, ellipse_scale: float = 2.0,
graphviz_engine: str = "neato", # Added engine parameter graphviz_engine: str = "neato",
verbose: bool = False, verbose: bool = False,
) -> Tuple[List[go.Scatter], List[Dict[str, Any]], Optional[Dict[str, Any]]]: ) -> Tuple[List[go.Scatter], List[Dict[str, Any]], Optional[Dict[str, Any]]]:
"""Generates dynamic traces, shapes, and layout image for a single frame.""" """Generates dynamic traces, shapes, and layout image for a single frame."""
@ -191,11 +204,11 @@ def generate_frame_content(
step_results = frame_data.results step_results = frame_data.results
step_marginals = frame_data.marginals step_marginals = frame_data.marginals
frame_traces: List[go.Scatter] = [] frame_dynamic_traces: List[go.Scatter] = [] # Renamed for clarity
frame_shapes: List[Dict[str, Any]] = [] frame_shapes: List[Dict[str, Any]] = []
layout_image: Optional[Dict[str, Any]] = None layout_image: Optional[Dict[str, Any]] = None
# 1. Estimated Path (Unchanged) # 1. Estimated Path
est_path_x = [ est_path_x = [
step_results.atPose2(X(i)).x() step_results.atPose2(X(i)).x()
for i in range(k + 1) for i in range(k + 1)
@ -206,9 +219,9 @@ def generate_frame_content(
for i in range(k + 1) for i in range(k + 1)
if step_results.exists(X(i)) if step_results.exists(X(i))
] ]
frame_traces.append(create_est_path_trace(est_path_x, est_path_y)) frame_dynamic_traces.append(create_est_path_trace(est_path_x, est_path_y))
# 2. Estimated Landmarks (Unchanged) # 2. Estimated Landmarks
est_landmarks_x, est_landmarks_y, landmark_keys = [], [], [] est_landmarks_x, est_landmarks_y, landmark_keys = [], [], []
for j in range(max_landmark_index + 1): for j in range(max_landmark_index + 1):
lm_key = L(j) lm_key = L(j)
@ -219,9 +232,9 @@ def generate_frame_content(
landmark_keys.append(lm_key) landmark_keys.append(lm_key)
lm_trace = create_est_landmarks_trace(est_landmarks_x, est_landmarks_y) lm_trace = create_est_landmarks_trace(est_landmarks_x, est_landmarks_y)
if lm_trace: if lm_trace:
frame_traces.append(lm_trace) frame_dynamic_traces.append(lm_trace)
# 3. Covariance Ellipses (Unchanged) # 3. Covariance Ellipses
if step_marginals: if step_marginals:
pose_key = X(k) pose_key = X(k)
if step_results.exists(pose_key): if step_results.exists(pose_key):
@ -229,9 +242,13 @@ def generate_frame_content(
cov = step_marginals.marginalCovariance(pose_key) cov = step_marginals.marginalCovariance(pose_key)
mean = step_results.atPose2(pose_key).translation() mean = step_results.atPose2(pose_key).translation()
frame_shapes.append(create_pose_ellipse_shape(mean, cov, ellipse_scale)) frame_shapes.append(create_pose_ellipse_shape(mean, cov, ellipse_scale))
except Exception as e: except RuntimeError as e: # More specific exception type
if verbose: if verbose:
print(f"Warn: Pose {k} cov err @ step {k}: {e}") print(f"Warn: Pose {k} cov err @ step {k}: {e}")
except Exception as e:
if verbose:
print(f"Warn: Pose {k} cov OTHER err @ step {k}: {e}")
for lm_key in landmark_keys: for lm_key in landmark_keys:
try: try:
cov = step_marginals.marginalCovariance(lm_key) cov = step_marginals.marginalCovariance(lm_key)
@ -239,15 +256,20 @@ def generate_frame_content(
frame_shapes.append( frame_shapes.append(
create_landmark_ellipse_shape(mean, cov, ellipse_scale) create_landmark_ellipse_shape(mean, cov, ellipse_scale)
) )
except Exception as e: except RuntimeError as e: # More specific exception type
if verbose: if verbose:
print( print(
f"Warn: LM {gtsam.Symbol(lm_key).index()} cov err @ step {k}: {e}" f"Warn: LM {gtsam.Symbol(lm_key).index()} cov err @ step {k}: {e}"
) )
except Exception as e:
if verbose:
print(
f"Warn: LM {gtsam.Symbol(lm_key).index()} cov OTHER err @ step {k}: {e}"
)
# 4. Graph Image for Layout (MODIFIED) # 4. Graph Image for Layout (MODIFIED TO USE SVG)
# Use the new function with the dot string from frame_data # Use the SVG function with the dot string from frame_data
img_src = dot_string_to_base64_png( img_src = dot_string_to_base64_svg( # Changed function call
frame_data.graph_dot_string, engine=graphviz_engine frame_data.graph_dot_string, engine=graphviz_engine
) )
if img_src: if img_src:
@ -262,10 +284,11 @@ def generate_frame_content(
xanchor="left", xanchor="left",
yanchor="top", yanchor="top",
layer="below", layer="below",
sizing="contain", sizing="contain", # Contain should work well with SVG
) )
return frame_traces, frame_shapes, layout_image # Return only the things that CHANGE frame-to-frame
return frame_dynamic_traces, frame_shapes, layout_image
# --- Figure Configuration --- # --- Figure Configuration ---
@ -374,6 +397,9 @@ def configure_figure_layout(
# --- Main Animation Orchestrator --- # --- Main Animation Orchestrator ---
# --- Main Animation Orchestrator (MODIFIED TO FIX GT DISPLAY) ---
def create_slam_animation( def create_slam_animation(
history: List[SlamFrameData], history: List[SlamFrameData],
X: Callable[[int], int], X: Callable[[int], int],
@ -383,7 +409,7 @@ def create_slam_animation(
poses_gt: Optional[List[gtsam.Pose2]] = None, poses_gt: Optional[List[gtsam.Pose2]] = None,
world_size: float = 20.0, world_size: float = 20.0,
ellipse_scale: float = 2.0, ellipse_scale: float = 2.0,
graphviz_engine: str = "neato", # Pass engine choice graphviz_engine: str = "neato",
verbose_cov_errors: bool = False, verbose_cov_errors: bool = False,
) -> go.Figure: ) -> go.Figure:
"""Creates a side-by-side Plotly SLAM animation using a history of dataclasses.""" """Creates a side-by-side Plotly SLAM animation using a history of dataclasses."""
@ -393,31 +419,63 @@ def create_slam_animation(
num_steps = history[-1].step_index num_steps = history[-1].step_index
fig = go.Figure() fig = go.Figure()
# 1. Add static GT traces (Unchanged) # 1. Create static GT traces ONCE
gt_traces = []
gt_lm_trace = create_gt_landmarks_trace(landmarks_gt_array) gt_lm_trace = create_gt_landmarks_trace(landmarks_gt_array)
if gt_lm_trace: if gt_lm_trace:
fig.add_trace(gt_lm_trace) gt_traces.append(gt_lm_trace)
gt_path_trace = create_gt_path_trace(poses_gt) gt_path_trace = create_gt_path_trace(poses_gt)
if gt_path_trace: if gt_path_trace:
fig.add_trace(gt_path_trace) gt_traces.append(gt_path_trace)
# 2. Generate frames (MODIFIED: pass graphviz_engine) # 2. Generate content for the initial frame (k=0) to set up the figure
initial_frame_data = next((item for item in history if item.step_index == 0), None)
if initial_frame_data is None:
raise ValueError("History must contain data for step 0.")
(
initial_dynamic_traces,
initial_shapes,
initial_image,
) = generate_frame_content(
initial_frame_data,
X,
L,
max_landmark_index,
ellipse_scale,
graphviz_engine,
verbose_cov_errors,
)
# 3. Add initial traces to the figure (GT first, then initial dynamic traces)
# These define the plot state BEFORE the animation starts or when slider is at 0
for trace in gt_traces:
fig.add_trace(trace)
for trace in initial_dynamic_traces:
fig.add_trace(trace)
# 4. Generate frames for the animation (k=0 to num_steps)
frames = [] frames = []
initial_dynamic_traces, initial_shapes, initial_image = [], [], None
steps_iterable = range(num_steps + 1) steps_iterable = range(num_steps + 1)
try: try:
steps_iterable = tqdm(steps_iterable, desc="Creating Frames") steps_iterable = tqdm(steps_iterable, desc="Creating Frames")
except NameError: except NameError: # Handle if tqdm is not available (e.g. standard Python script)
pass pass
except Exception as e: # Catch other potential tqdm issues
print(
f"Note: Could not initialize tqdm progress bar ({e}). Continuing without it."
)
for k in steps_iterable: for k in steps_iterable:
frame_data = next((item for item in history if item.step_index == k), None) frame_data = next((item for item in history if item.step_index == k), None)
if frame_data is None: if frame_data is None:
print(f"Warning: Missing data for step {k} in history.") print(f"Warning: Missing data for step {k} in history. Skipping frame.")
# Optionally create an empty frame or reuse previous frame's data
# For simplicity, we skip here, which might cause jumps in animation
continue continue
# Pass the engine choice to the content generator # Generate dynamic content for this specific frame
frame_traces, frame_shapes, layout_image = generate_frame_content( frame_dynamic_traces, frame_shapes, layout_image = generate_frame_content(
frame_data, frame_data,
X, X,
L, L,
@ -427,31 +485,23 @@ def create_slam_animation(
verbose_cov_errors, verbose_cov_errors,
) )
if k == 0: # IMPORTANT: Combine static GT traces with the dynamic traces for THIS frame
initial_dynamic_traces, initial_shapes, initial_image = ( # The 'data' in the frame replaces the figure's data for that step.
frame_traces,
frame_shapes,
layout_image,
)
frames.append( frames.append(
go.Frame( go.Frame(
data=frame_traces, data=gt_traces + frame_dynamic_traces, # Include GT in every frame
name=str(k), name=str(k),
layout=go.Layout( layout=go.Layout( # Only update shapes and images in layout per frame
shapes=frame_shapes, images=[layout_image] if layout_image else [] shapes=frame_shapes, images=[layout_image] if layout_image else []
), ),
) )
) )
# 3. Add initial dynamic traces (Unchanged) # 5. Assign frames to the figure
for trace in initial_dynamic_traces:
fig.add_trace(trace)
# 4. Assign frames (Unchanged)
fig.update(frames=frames) fig.update(frames=frames)
# 5. Configure layout (Unchanged) # 6. Configure overall layout (sliders, buttons, axes, etc.)
# Pass the shapes and image corresponding to the INITIAL state (k=0)
configure_figure_layout(fig, num_steps, world_size, initial_shapes, initial_image) configure_figure_layout(fig, num_steps, world_size, initial_shapes, initial_image)
print("Plotly animation generated.") print("Plotly animation generated.")

View File

@ -103,7 +103,7 @@ def generate_simulation_data(
# Check sensor limits (range and Field of View - e.g. +/- 45 degrees) # Check sensor limits (range and Field of View - e.g. +/- 45 degrees)
if ( if (
true_range <= max_sensor_range true_range <= max_sensor_range
and abs(true_bearing.theta()) < np.pi / 4 and abs(true_bearing.theta()) < np.pi / 2
): ):
# Sample noise # Sample noise
noise_vec = measurement_noise_sampler.sample() noise_vec = measurement_noise_sampler.sample()