working implementation

release/4.3a0
Varun Agrawal 2021-08-20 13:34:26 -04:00
parent a4a58cf803
commit 23858f31e9
1 changed files with 152 additions and 43 deletions

View File

@ -1,67 +1,173 @@
"""
Example of application of ISAM2 for GPS-aided navigation on the KITTI VISION BENCHMARK SUITE
Author: Varun Agrawal
"""
import argparse
from typing import List
import numpy as np
import gtsam
import numpy as np
from gtsam import Pose3, Rot3, noiseModel
from gtsam import Pose3, noiseModel
from gtsam.symbol_shorthand import B, V, X
class KittiCalibration:
def __init__(self, bodyTimu: gtsam.Pose3):
self.bodyTimu = bodyTimu
"""Class to hold KITTI calibration info."""
def __init__(self, body_ptx: float, body_pty: float, body_ptz: float,
body_prx: float, body_pry: float, body_prz: float,
accelerometer_sigma: float, gyroscope_sigma: float,
integration_sigma: float, accelerometer_bias_sigma: float,
gyroscope_bias_sigma: float, average_delta_t: float):
self.bodyTimu = Pose3(gtsam.Rot3.RzRyRx(body_prx, body_pry, body_prz),
gtsam.Point3(body_ptx, body_pty, body_ptz))
self.accelerometer_sigma = accelerometer_sigma
self.gyroscope_sigma = gyroscope_sigma
self.integration_sigma = integration_sigma
self.accelerometer_bias_sigma = accelerometer_bias_sigma
self.gyroscope_bias_sigma = gyroscope_bias_sigma
self.average_delta_t = average_delta_t
class ImuMeasurement:
"""An instance of an IMU measurement."""
def __init__(self, time, dt, accelerometer, gyroscope):
pass
self.time = time
self.dt = dt
self.accelerometer = accelerometer
self.gyroscope = gyroscope
class GpsMeasurement:
"""An instance of a GPS measurement."""
def __init__(self, time, position: gtsam.Point3):
self.time = time
self.position = position
def lodKittiData():
pass
def loadKittiData(imu_data_file="KittiEquivBiasedImu.txt",
gps_data_file="KittiGps_converted.txt",
imu_metadata_file="KittiEquivBiasedImu_metadata.txt"):
"""
Load the KITTI Dataset.
"""
# Read IMU metadata and compute relative sensor pose transforms
# BodyPtx BodyPty BodyPtz BodyPrx BodyPry BodyPrz AccelerometerSigma
# GyroscopeSigma IntegrationSigma AccelerometerBiasSigma GyroscopeBiasSigma
# AverageDeltaT
imu_metadata_file = gtsam.findExampleDataFile(imu_metadata_file)
with open(imu_metadata_file) as imu_metadata:
print("-- Reading sensor metadata")
line = imu_metadata.readline() # Ignore the first line
line = imu_metadata.readline().strip()
data = list(map(float, line.split(' ')))
kitti_calibration = KittiCalibration(*data)
print("IMU metadata:", data)
# Read IMU data
# Time dt accelX accelY accelZ omegaX omegaY omegaZ
imu_data_file = gtsam.findExampleDataFile(imu_data_file)
imu_measurements = []
def parse_args():
parser = argparse.ArgumentParser()
return parser.parse_args()
print("-- Reading IMU measurements from file")
with open(imu_data_file) as imu_data:
data = imu_data.readlines()
for i in range(1, len(data)): # ignore the first line
time, dt, acc_x, acc_y, acc_z, gyro_x, gyro_y, gyro_z = map(
float, data[i].split(' '))
imu_measurement = ImuMeasurement(
time, dt, np.asarray([acc_x, acc_y, acc_z]),
np.asarray([gyro_x, gyro_y, gyro_z]))
imu_measurements.append(imu_measurement)
# Read GPS data
# Time,X,Y,Z
gps_data_file = gtsam.findExampleDataFile(gps_data_file)
gps_measurements = []
print("-- Reading GPS measurements from file")
with open(gps_data_file) as gps_data:
data = gps_data.readlines()
for i in range(1, len(data)):
time, x, y, z = map(float, data[i].split(','))
gps_measurement = GpsMeasurement(time, np.asarray([x, y, z]))
gps_measurements.append(gps_measurement)
return kitti_calibration, imu_measurements, gps_measurements
def getImuParams(kitti_calibration):
"""Get the IMU parameters from the KITTI calibration data."""
GRAVITY = 9.8
w_coriolis = np.zeros(3)
# Set IMU preintegration parameters
measured_acc_cov = np.eye(3) * np.power(
kitti_calibration.accelerometer_sigma, 2)
measured_omega_cov = np.eye(3) * np.powwe(
measured_omega_cov = np.eye(3) * np.power(
kitti_calibration.gyroscope_sigma, 2)
# error committed in integrating position from velocities
integration_error_cov = np.eye(3) * np.power(
kitti_calibration.integration_sigma, 2)
imu_params = gtsam.PreintegrationParams.MakeSharedU(GRAVITY)
imu_params.accelerometerCovariance = measured_acc_cov # acc white noise in continuous
imu_params.integrationCovariance = integration_error_cov # integration uncertainty continuous
imu_params.gyroscopeCovariance = measured_omega_cov # gyro white noise in continuous
imu_params.omegaCoriolis = w_coriolis
# acc white noise in continuous
imu_params.setAccelerometerCovariance(measured_acc_cov)
# integration uncertainty continuous
imu_params.setIntegrationCovariance(integration_error_cov)
# gyro white noise in continuous
imu_params.setGyroscopeCovariance(measured_omega_cov)
imu_params.setOmegaCoriolis(w_coriolis)
return imu_params
def main():
args = parse_args()
kitti_calibration, imu_measurements, gps_measurements = lodKittiData()
def save_results(isam, output_filename, first_gps_pose, gps_measurements):
"""Write the results from `isam` to `output_filename`."""
# Save results to file
print("Writing results to file...")
with open(output_filename, 'w') as fp_out:
fp_out.write(
"#time(s),x(m),y(m),z(m),qx,qy,qz,qw,gt_x(m),gt_y(m),gt_z(m)\n")
if kitti_calibration.bodyTimu != gtsam.Pose3:
result = isam.calculateEstimate()
for i in range(first_gps_pose, len(gps_measurements)):
pose_key = X(i)
vel_key = V(i)
bias_key = B(i)
pose = result.atPose3(pose_key)
velocity = result.atVector(vel_key)
bias = result.atConstantBias(bias_key)
pose_quat = pose.rotation().toQuaternion()
gps = gps_measurements[i].position
print("State at #{}".format(i))
print("Pose:\n", pose)
print("Velocity:\n", velocity)
print("Bias:\n", bias)
fp_out.write("{},{},{},{},{},{},{},{},{},{},{}\n".format(
gps_measurements[i].time, pose.x(), pose.y(), pose.z(),
pose_quat.x(), pose_quat.y(), pose_quat.z(), pose_quat.w(),
gps[0], gps[1], gps[2]))
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument("--output_filename",
default="IMUKittiExampleGPSResults.csv")
return parser.parse_args()
def main():
"""Main runner."""
args = parse_args()
kitti_calibration, imu_measurements, gps_measurements = loadKittiData()
if not kitti_calibration.bodyTimu.equals(Pose3(), 1e-8):
raise ValueError(
"Currently only support IMUinBody is identity, i.e. IMU and body frame are the same"
)
@ -72,12 +178,13 @@ def main():
# Configure noise models
noise_model_gps = noiseModel.Diagonal.Precisions(
np.asarray([0, 0, 0, 1.0 / 0.07, 1.0 / 0.07, 1.0 / 0.07]))
np.asarray([0, 0, 0] + [1.0 / 0.07] * 3))
# Set initial conditions for the estimated trajectory
# initial pose is the reference frame (navigation frame)
current_pose_global = Pose3(Rot3(),
current_pose_global = Pose3(gtsam.Rot3(),
gps_measurements[first_gps_pose].position)
# the vehicle is stationary at the beginning at position 0,0,0
current_velocity_global = np.zeros(3)
current_bias = gtsam.imuBias.ConstantBias() # init with zero bias
@ -86,9 +193,9 @@ def main():
np.asarray([0, 0, 0, 1, 1, 1]))
sigma_init_v = noiseModel.Diagonal.Sigmas(np.ones(3) * 1000.0)
sigma_init_b = noiseModel.Diagonal.Sigmas(
np.asarray([0.1, 0.1, 0.1, 5.00e-05, 5.00e-05, 5.00e-05]))
np.asarray([0.1] * 3 + [5.00e-05] * 3))
imu_params = getImuParams()
imu_params = getImuParams(kitti_calibration)
# Set ISAM2 parameters and create ISAM2 solver object
isam_params = gtsam.ISAM2Params()
@ -100,18 +207,17 @@ def main():
# Create the factor graph and values object that will store new factors and
# values to add to the incremental graph
new_factors = gtsam.NonlinearFactorGraph()
new_values = gtsam.Values(
) # values storing the initial estimates of new nodes in the factor graph
# values storing the initial estimates of new nodes in the factor graph
new_values = gtsam.Values()
# Main loop:
# (1) we read the measurements
# (2) we create the corresponding factors in the graph
# (3) we solve the graph to obtain and optimal estimate of robot trajectory
print(
"-- Starting main loop: inference is performed at each time step, but we plot trajectory every 10 steps\n"
)
print("-- Starting main loop: inference is performed at each time step, "
"but we plot trajectory every 10 steps")
j = 0
for i in range(first_gps_pose, len(gps_measurements) - 1):
for i in range(first_gps_pose, len(gps_measurements)):
# At each non=IMU measurement we initialize a new node in the graph
current_pose_key = X(i)
current_vel_key = V(i)
@ -138,7 +244,7 @@ def main():
imu_params, current_bias)
included_imu_measurement_count = 0
while (j < imu_measurements.size()
while (j < len(imu_measurements)
and imu_measurements[j].time <= t):
if imu_measurements[j].time >= t_previous:
current_summarized_measurement.integrateMeasurement(
@ -153,13 +259,13 @@ def main():
previous_bias_key = B(i - 1)
new_factors.push_back(
gtsam.ImuFactor > (previous_pose_key, previous_vel_key,
current_pose_key, current_vel_key,
previous_bias_key,
current_summarized_measurement))
gtsam.ImuFactor(previous_pose_key, previous_vel_key,
current_pose_key, current_vel_key,
previous_bias_key,
current_summarized_measurement))
# Bias evolution as given in the IMU metadata
sigma_between_b = noiseModel.Diagonal.Sigmas(
sigma_between_b = gtsam.noiseModel.Diagonal.Sigmas(
np.asarray([
np.sqrt(included_imu_measurement_count) *
kitti_calibration.accelerometer_bias_sigma
@ -183,8 +289,8 @@ def main():
new_values.insert(current_pose_key, gps_pose)
print(
"################ POSE INCLUDED AT TIME %lf ################\n",
t)
"################ POSE INCLUDED AT TIME {} ################"
.format(t))
print(gps_pose.translation(), "\n")
else:
new_values.insert(current_pose_key, current_pose_global)
@ -200,8 +306,8 @@ def main():
# first so that the heading becomes observable.
if i > (first_gps_pose + 2 * gps_skip):
print(
"################ NEW FACTORS AT TIME %lf ################\n",
t)
"################ NEW FACTORS AT TIME {:.6f} ################"
.format(t))
new_factors.print()
isam.update(new_factors, new_values)
@ -217,10 +323,13 @@ def main():
current_velocity_global = result.atVector(current_vel_key)
current_bias = result.atConstantBias(current_bias_key)
print("\n################ POSE AT TIME %lf ################\n",
t)
print(
"################ POSE AT TIME {} ################".format(
t))
current_pose_global.print()
print("\n\n")
print("\n")
save_results(isam, args.output_filename, first_gps_pose, gps_measurements)
if __name__ == "__main__":