From 9efd9fc7b1f396526b6757f74c90c8561be3c82c Mon Sep 17 00:00:00 2001 From: Varun Agrawal Date: Sat, 8 Feb 2025 22:57:46 -0500 Subject: [PATCH] finish up script --- python/gtsam/examples/HybridCity10000.py | 224 +++++++++++++++++++++++ 1 file changed, 224 insertions(+) diff --git a/python/gtsam/examples/HybridCity10000.py b/python/gtsam/examples/HybridCity10000.py index 3f70e3c49..8333b3b8d 100644 --- a/python/gtsam/examples/HybridCity10000.py +++ b/python/gtsam/examples/HybridCity10000.py @@ -86,10 +86,234 @@ class City10000Dataset: yield None, None +class Experiment: + """Experiment Class""" + + def __init__(self, + filename: str, + marginal_threshold: float = 0.9999, + max_loop_count: int = 8000, + update_frequency: int = 3, + max_num_hypotheses: int = 10, + relinearization_frequency: int = 10): + self.dataset_ = City10000Dataset(filename) + self.max_loop_count = max_loop_count + self.update_frequency = update_frequency + self.max_num_hypotheses = max_num_hypotheses + self.relinearization_frequency = relinearization_frequency + + self.smoother_ = HybridSmoother(marginal_threshold) + self.new_factors_ = HybridNonlinearFactorGraph() + self.all_factors_ = HybridNonlinearFactorGraph() + self.initial_ = Values() + + def hybrid_loop_closure_factor(self, loop_counter, key_s, key_t, + measurement: Pose2): + """ + Create a hybrid loop closure factor where + 0 - loose noise model and 1 - loop noise model. + """ + l = (L(loop_counter), 2) + f0 = BetweenFactorPose2(X(key_s), X(key_t), measurement, + open_loop_model) + f1 = BetweenFactorPose2(X(key_s), X(key_t), measurement, + pose_noise_model) + factors = [(f0, open_loop_constant), (f1, pose_noise_constant)] + mixture_factor = HybridNonlinearFactor(l, factors) + return mixture_factor + + def hybrid_odometry_factor(self, key_s, key_t, m, + pose_array) -> HybridNonlinearFactor: + """Create hybrid odometry factor with discrete measurement choices.""" + f0 = BetweenFactorPose2(X(key_s), X(key_t), pose_array[0], + pose_noise_model) + f1 = BetweenFactorPose2(X(key_s), X(key_t), pose_array[1], + pose_noise_model) + + factors = [(f0, pose_noise_constant), (f1, pose_noise_constant)] + mixture_factor = HybridNonlinearFactor(m, factors) + + return mixture_factor + + def smoother_update(self, max_num_hypotheses) -> float: + """Perform smoother update and optimize the graph.""" + print(f"Smoother update: {self.new_factors_.size()}") + before_update = time.time() + linearized = self.new_factors_.linearize(self.initial_) + self.smoother_.update(linearized, max_num_hypotheses) + self.all_factors_.push_back(self.new_factors_) + self.new_factors_.resize(0) + after_update = time.time() + return after_update - before_update + + def reInitialize(self) -> float: + """Re-linearize, solve ALL, and re-initialize smoother.""" + print(f"================= Re-Initialize: {self.all_factors_.size()}") + before_update = time.time() + self.all_factors_ = self.all_factors_.restrict( + self.smoother_.fixedValues()) + linearized = self.all_factors_.linearize(self.initial_) + bayesNet = linearized.eliminateSequential() + delta: HybridValues = bayesNet.optimize() + self.initial_ = self.initial_.retract(delta.continuous()) + self.smoother_.reInitialize(bayesNet) + after_update = time.time() + print(f"Took {after_update - before_update} seconds.") + return after_update - before_update + + def run(self): + """Run the main experiment with a given max_loop_count.""" + # Initialize local variables + discrete_count = 0 + index = 0 + loop_count = 0 + update_count = 0 + + time_list = [] #list[(int, float)] + + # Set up initial prior + priorPose = Pose2(0, 0, 0) + self.self.initial_.insert(X(0), priorPose) + self.new_factors_.push_back( + PriorFactorPose2(X(0), priorPose, prior_noise_model)) + + # Initial update + update_time = self.smoother_update(self.max_num_hypotheses) + smoother_update_times = [] # list[(int, float)] + smoother_update_times.append((index, update_time)) + + # Flag to decide whether to run smoother update + number_of_hybrid_factors = 0 + + # Start main loop + result = Values() + start_time = time.time() + + while index < self.max_loop_count: + pose_array, keys = self.dataset_.next() + if pose_array is None: + break + key_s = keys[0] + key_t = keys[1] + + num_measurements = len(pose_array) + + # Take the first one as the initial estimate + odom_pose = pose_array[0] + if key_s == key_t - 1: + # Odometry factor + if num_measurements > 1: + # Add hybrid factor + m = (M(discrete_count), num_measurements) + mixture_factor = self.hybrid_odometry_factor( + key_s, key_t, m, pose_array) + self.new_factors_.append(mixture_factor) + + discrete_count += 1 + number_of_hybrid_factors += 1 + print(f"mixture_factor: {key_s} {key_t}") + else: + self.new_factors_.add( + BetweenFactorPose2(X(key_s), X(key_t), odom_pose, + pose_noise_model)) + + # Insert next pose initial guess + self.initial_.insert( + X(key_t), + self.initial_.atPose2(X(key_s)) * odom_pose) + else: + # Loop closure + loop_factor = self.hybrid_loop_closure_factor( + loop_count, key_s, key_t, odom_pose) + + # print loop closure event keys: + print(f"Loop closure: {key_s} {key_t}") + self.new_factors_.add(loop_factor) + number_of_hybrid_factors += 1 + loop_count += 1 + + if number_of_hybrid_factors >= self.update_frequency: + update_time = self.smoother_update(self.max_num_hypotheses) + smoother_update_times.append((index, update_time)) + number_of_hybrid_factors = 0 + update_count += 1 + + if update_count % self.relinearization_frequency == 0: + self.reInitialize() + + # Record timing for odometry edges only + if key_s == key_t - 1: + cur_time = time.time() + time_list.append(cur_time - start_time) + + # Print some status every 100 steps + if index % 100 == 0: + print(f"Index: {index}") + + if len(time_list) != 0: + print(f"Accumulate time: {time_list[-1]} seconds") + + index += 1 + + # Final update + update_time = self.smoother_update(self.max_num_hypotheses) + smoother_update_times.append((index, update_time)) + + # Final optimize + delta = self.smoother_.optimize() + + result.insert_or_assign(self.initial_.retract(delta.continuous())) + + print(f"Final error: {self.smoother_.hybridBayesNet().error(delta)}") + + end_time = time.time() + total_time = end_time - start_time + print(f"Total time: {total_time} seconds") + + # Write results to file + self.write_result(result, key_t + 1, "Hybrid_City10000.txt") + + # Write timing info to file + self.write_timing_info(time_list=time_list) + + def write_result(self, result, num_poses, filename="Hybrid_city10000.txt"): + """ + Write the result of optimization to file. + + Args: + result (Values): he Values object with the final result. + num_poses (int): The number of poses to write to the file. + filename (str): The file name to save the result to. + """ + with open(filename, 'w') as outfile: + + for i in range(num_poses): + out_pose = result.atPose2(X(i)) + outfile.write( + f"{out_pose.x()} {out_pose.y()} {out_pose.theta()}\n") + + print(f"Output written to {filename}") + + def write_timing_info(self, + time_list, + time_filename="Hybrid_City10000_time.txt"): + """Log all the timing information to a file""" + + with open(time_filename, 'w') as out_file_time: + + for acc_time in time_list: + out_file_time.write(f"{acc_time}\n") + + print(f"Output {time_filename} file.") + + def main(): """Main runner""" args = parse_arguments() + experiment = Experiment(gtsam.findExampleDataFile(args.data_file)) + experiment.run() + if __name__ == "__main__": main()