Implement Equivariant Filter Bias example in Python
parent
6a765f2452
commit
e2a123fcd1
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,926 @@
|
|||
"""
|
||||
Implementation of Attitude-Bias-Calibration EqF form:
|
||||
"Overcoming Bias: Equivariant Filter Design for Biased Attitude Estimation with Online Calibration"
|
||||
https://ieeexplore.ieee.org/document/9905914
|
||||
|
||||
This module is Alessandro Fornasier's equivariant filter code (https://github.com/aau-cns/ABC-EqF)
|
||||
converted to use GTSAM's libraries.
|
||||
|
||||
Authors: Jennifer Oum & Darshan Rajasekaran
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import gtsam
|
||||
from gtsam import Rot3, Unit3
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
import progressbar
|
||||
import pandas as pd
|
||||
import matplotlib.pyplot as plt
|
||||
from typing import List
|
||||
|
||||
coordinate = "EXPONENTIAL"
|
||||
|
||||
def wedge(vec : np.ndarray) -> np.ndarray:
|
||||
if not isinstance(vec, np.ndarray):
|
||||
raise TypeError
|
||||
if not vec.size == 3:
|
||||
raise ValueError
|
||||
mat = np.array([[ 0.0, -vec.item(2), vec.item(1)],
|
||||
[ vec.item(2), 0.0, -vec.item(0)],
|
||||
[-vec.item(1), vec.item(0), 0.0]])
|
||||
return mat
|
||||
|
||||
def vee(mat : np.ndarray) -> np.ndarray:
|
||||
if not isinstance(mat, np.ndarray):
|
||||
raise TypeError
|
||||
if not mat.shape == (3,3):
|
||||
raise ValueError
|
||||
vec = np.array([mat[2,1],
|
||||
mat[0,2],
|
||||
mat[1,0]])
|
||||
return vec
|
||||
|
||||
def Rot3LeftJacobian(arr: np.ndarray) -> np.ndarray:
|
||||
"""Return the SO(3) Left Jacobian
|
||||
|
||||
:param arr: A numpy array with size 3
|
||||
:return: The left Jacobian of SO(3)
|
||||
"""
|
||||
|
||||
if not (isinstance(arr, np.ndarray) and arr.size == 3):
|
||||
raise ValueError("A numpy array with size 3 has to be provided")
|
||||
|
||||
angle = np.linalg.norm(arr)
|
||||
|
||||
# Near |phi|==0, use first order Taylor expansion
|
||||
if np.isclose(angle, 0.):
|
||||
return np.eye(3) + 0.5 * wedge(arr)
|
||||
|
||||
axis = arr / angle
|
||||
s = np.sin(angle)
|
||||
c = np.cos(angle)
|
||||
|
||||
return (s / angle) * np.eye(3) + \
|
||||
(1 - (s / angle)) * np.outer(axis, axis) + \
|
||||
((1 - c) / angle) * wedge(axis)
|
||||
|
||||
def checkNorm(x: np.ndarray, tol: float = 1e-3):
|
||||
"""Check norm of a vector being 1 or nan
|
||||
|
||||
:param x: A numpy array
|
||||
:param tol: tollerance, default 1e-3
|
||||
:return: Boolean true if norm is 1 or nan
|
||||
"""
|
||||
return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x))
|
||||
|
||||
class State:
|
||||
"""Define the state of the Biased Attitude System
|
||||
----------
|
||||
R is a rotation matrix representing the attitude of the body
|
||||
b is a 3-vector representing the gyroscope bias
|
||||
S is a list of rotation matrix, each representing the calibration of the corresponding direction sensor
|
||||
----------
|
||||
Let's assume we want to use three known direction a, b, and c, where only the sensor that measure b is
|
||||
uncalibrated (we'd like to estimate the calibration states). Therefore, the system's d list looks like
|
||||
d = [b, a, c], and the S list should look like S = [Sb]. The association between d and S is done via indeces.
|
||||
In general S[i] correspond to the calibration state of the sensor that measure the direcion d[i]
|
||||
----------
|
||||
"""
|
||||
|
||||
# Attitude rotation matrix R
|
||||
R: Rot3
|
||||
|
||||
# Gyroscope bias b
|
||||
b: np.ndarray
|
||||
|
||||
# Sensor calibrations S
|
||||
S: List[Rot3]
|
||||
|
||||
def __init__(self, R : Rot3 = Rot3.Identity(), b : np.ndarray = np.zeros(3), S: List[Rot3] = None):
|
||||
"""Initialize State
|
||||
|
||||
:param R: A SO3 element representing the attitude of the system as a rotation matrix
|
||||
:param b: A numpy array with size 3 representing the gyroscope bias
|
||||
:param S: A list of SO3 elements representing the calibration states for "uncalibrated" sensors,
|
||||
if no sensor require a calibration state, than S will be initialized as an empty list
|
||||
"""
|
||||
|
||||
if not isinstance(R, gtsam.Rot3):
|
||||
|
||||
raise TypeError("the attitude rotation matrix R has to be of type SO3 but type is", type(R))
|
||||
self.R = R
|
||||
|
||||
if not (isinstance(b, np.ndarray) and b.size == 3):
|
||||
raise TypeError("The gyroscope bias has to be probvided as numpy array with size 3")
|
||||
self.b = b
|
||||
|
||||
if S is None:
|
||||
self.S = []
|
||||
else:
|
||||
if not isinstance(S, list):
|
||||
raise TypeError("Calibration states has to be provided as a list")
|
||||
for calibration in S:
|
||||
if not isinstance(calibration, Rot3):
|
||||
raise TypeError("Elements of the list of calibration states have to be of type SO3")
|
||||
self.S = S
|
||||
|
||||
@staticmethod
|
||||
def identity(n: int):
|
||||
"""Return a identity state with n calibration states
|
||||
|
||||
:param n: number of elements in list B associated with calibration states
|
||||
:return: The identity element of the State
|
||||
"""
|
||||
|
||||
return State(Rot3.Identity(),
|
||||
np.zeros(3),
|
||||
[Rot3.Identity() for _ in range(n)])
|
||||
class Input:
|
||||
"""Define the input space of the Biased Attitude System
|
||||
----------
|
||||
w is a 3-vector representing the angular velocity measured by a gyroscope
|
||||
----------
|
||||
"""
|
||||
|
||||
# Angular velocity
|
||||
w: np.ndarray
|
||||
|
||||
# Noise covariance of angular velocity
|
||||
Sigma: np.ndarray
|
||||
|
||||
def __init__(self, w: np.ndarray, Sigma: np.ndarray):
|
||||
"""Initialize Input
|
||||
|
||||
:param w: A numpy array with size 3 representing the angular velocity measurement from a gyroscope
|
||||
:param Sigma: A numpy array with shape (6, 6) representing the noise covariance of the
|
||||
angular velocity measurement and gyro bias random walk
|
||||
"""
|
||||
|
||||
if not (isinstance(w, np.ndarray) and w.size == 3):
|
||||
raise TypeError("Angular velocity has to be provided as a numpy array with size 3")
|
||||
if not (isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 6):
|
||||
raise TypeError("Input measurement noise covariance has to be provided as a numpy array with shape (6. 6)")
|
||||
if not np.all(np.linalg.eigvals(Sigma) >= 0):
|
||||
raise TypeError("Covariance matrix has to be semi-positive definite")
|
||||
|
||||
self.w = w
|
||||
self.Sigma = Sigma
|
||||
|
||||
@staticmethod
|
||||
def random() -> 'Input':
|
||||
"""Return a random angular velocity
|
||||
|
||||
:return: A random angular velocity as a Input element
|
||||
"""
|
||||
|
||||
return Input(np.random.randn(3), np.eye(6))
|
||||
|
||||
def W(self) -> np.ndarray:
|
||||
"""Return the Input as a skew-symmetric matrix
|
||||
|
||||
:return: self.w as a skew-symmetric matrix
|
||||
"""
|
||||
|
||||
return wedge(self.w)
|
||||
|
||||
class G:
|
||||
"""Symmetry group (SO(3) |x so(3)) x SO(3) x ... x SO(3)
|
||||
----------
|
||||
Each element of the B list is associated with a calibration states in State's S list where the association is done
|
||||
via corresponding index. In general B[i] is the SO(3) element of the symmetry group that correspond to the
|
||||
state's calibration state S[i]. For example, let's assume we want to use three known direction a, b, and c, where
|
||||
only the sensor that measure b is uncalibrated (we'd like to estimate the calibration states). Therefore,
|
||||
the system's d list is defined as d = [b, a, c], and the state's S list is defined as S = [Sb]. The symmetry group
|
||||
B list should be defined as B = [Bb] where Ba is the SO(3) element of the symmetry group that is related to Sb
|
||||
----------
|
||||
"""
|
||||
|
||||
A: Rot3
|
||||
a: np.ndarray
|
||||
B: List[Rot3]
|
||||
|
||||
def __init__(self, A: Rot3 = Rot3.Identity(), a: np.ndarray = np.zeros((3, 3)), B: List[Rot3] = None):
|
||||
"""Initialize the symmetry group G
|
||||
|
||||
:param A: SO3 element
|
||||
:param a: np.ndarray with shape (3, 3) corresponding to a skew symmetric matrix
|
||||
:param B: list of SO3 elements
|
||||
"""
|
||||
|
||||
if not isinstance(A, Rot3):
|
||||
raise TypeError("A has to be of type SO3")
|
||||
self.A = A
|
||||
if not (isinstance(a, np.ndarray) and a.shape == (3, 3)):
|
||||
raise TypeError("a has to be a numpy array with shape (3, 3)")
|
||||
self.a = a
|
||||
if B is None:
|
||||
self.B = []
|
||||
else:
|
||||
for b in B:
|
||||
if not isinstance(b, Rot3):
|
||||
raise TypeError("Elements of B have to be of type SO3")
|
||||
self.B = B
|
||||
|
||||
def __mul__(self, other: 'G') -> 'G':
|
||||
"""Define the group operation
|
||||
|
||||
:param other: G
|
||||
:return: A element of the group G given by the "multiplication" of self and other
|
||||
"""
|
||||
|
||||
assert (isinstance(other, G))
|
||||
assert (len(self.B) == len(other.B))
|
||||
return G(self.A * other.A,
|
||||
self.a + wedge(self.A.matrix() @ vee(other.a)),
|
||||
[self.B[i]*other.B[i] for i in range(len(self.B))])
|
||||
|
||||
@staticmethod
|
||||
def identity(n : int):
|
||||
"""Return the identity of the symmetry group with n elements of SO3 related to sensor calibration states
|
||||
|
||||
:param n: number of elements in list B associated with calibration states
|
||||
:return: The identity of the group G
|
||||
"""
|
||||
|
||||
return G(Rot3.Identity(), np.zeros((3, 3)), [Rot3.Identity() for _ in range(n)])
|
||||
|
||||
def exp(x: np.ndarray) -> 'G':
|
||||
"""Return a group element X given by X = exp(x) where x is a numpy array
|
||||
|
||||
:param x: A numpy array
|
||||
:return: A element of the group G given by the exponential of x
|
||||
"""
|
||||
|
||||
if not (isinstance(x, np.ndarray) and x.size >= 6):
|
||||
raise ValueError("Wrong shape, a numpy array with size 3n has to be provided")
|
||||
if (x.size % 3) != 0:
|
||||
raise ValueError("Wrong size, a numpy array with size multiple of 3 has to be provided")
|
||||
|
||||
n = int((x.size - 6) / 3)
|
||||
A = Rot3.Expmap(x[0:3])
|
||||
a = wedge(Rot3LeftJacobian(x[0:3]) @ x[3:6])
|
||||
B = [Rot3.Expmap(x[(6 + 3 * i):(9 + 3 * i)]) for i in range(n)]
|
||||
|
||||
return G(A, a, B)
|
||||
|
||||
def inv(self) -> 'G':
|
||||
"""Return the inverse element of the symmetry group
|
||||
|
||||
:return: A element of the group G given by the inverse of self
|
||||
"""
|
||||
|
||||
return G(self.A.inverse(), -wedge(self.A.inverse().matrix() @ vee(self.a)), [B.inverse() for B in self.B])
|
||||
|
||||
class Direction:
|
||||
"""Define a direction as a S2 element"""
|
||||
|
||||
# Direction
|
||||
d: Unit3
|
||||
|
||||
def __init__(self, d: np.ndarray):
|
||||
"""Initialize direction
|
||||
|
||||
:param d: A numpy array with size 3 and norm 1 representing the direction
|
||||
"""
|
||||
|
||||
if not (isinstance(d, np.ndarray) and d.size == 3 and checkNorm(d)):
|
||||
raise TypeError("Direction has to be provided as a 3 vector")
|
||||
self.d = Unit3(d)
|
||||
|
||||
def blockDiag(A : np.ndarray, B : np.ndarray) -> np.ndarray:
|
||||
"""Create a lock diagonal matrix from blocks A and B
|
||||
|
||||
:param A: numpy array
|
||||
:param B: numpy array
|
||||
:return: numpy array representing a block diagonal matrix composed of blocks A and B
|
||||
"""
|
||||
|
||||
if A is None:
|
||||
return B
|
||||
elif B is None:
|
||||
return A
|
||||
else:
|
||||
return np.block([[A, np.zeros((A.shape[0], B.shape[1]))],[np.zeros((B.shape[0], A.shape[1])), B]])
|
||||
|
||||
def repBlock(A : np.ndarray, n: int) -> np.ndarray:
|
||||
"""Create a block diagonal matrix repeating the A block n times
|
||||
|
||||
:param A: numpy array representing the block A
|
||||
:param n: number of times to repeat A
|
||||
:return: numpy array representing a block diagonal matrix composed of n-times the blocks A
|
||||
"""
|
||||
|
||||
res = None
|
||||
for _ in range(n):
|
||||
res = blockDiag(res, A)
|
||||
return res
|
||||
|
||||
def wedge(vec : np.ndarray) -> np.ndarray:
|
||||
if not isinstance(vec, np.ndarray):
|
||||
raise TypeError
|
||||
if not vec.size == 3:
|
||||
raise ValueError
|
||||
mat = np.array([[ 0.0, -vec.item(2), vec.item(1)],
|
||||
[ vec.item(2), 0.0, -vec.item(0)],
|
||||
[-vec.item(1), vec.item(0), 0.0]])
|
||||
return mat
|
||||
|
||||
def vee(mat : np.ndarray) -> np.ndarray:
|
||||
if not isinstance(mat, np.ndarray):
|
||||
raise TypeError
|
||||
if not mat.shape == (3,3):
|
||||
raise ValueError
|
||||
vec = np.array([mat[2,1],
|
||||
mat[0,2],
|
||||
mat[1,0]])
|
||||
return vec
|
||||
|
||||
def Rot3LeftJacobian(arr: np.ndarray) -> np.ndarray:
|
||||
"""Return the SO(3) Left Jacobian
|
||||
|
||||
:param arr: A numpy array with size 3
|
||||
:return: The left Jacobian of SO(3)
|
||||
"""
|
||||
|
||||
if not (isinstance(arr, np.ndarray) and arr.size == 3):
|
||||
raise ValueError("A numpy array with size 3 has to be provided")
|
||||
|
||||
angle = np.linalg.norm(arr)
|
||||
|
||||
# Near |phi|==0, use first order Taylor expansion
|
||||
if np.isclose(angle, 0.):
|
||||
return np.eye(3) + 0.5 * wedge(arr)
|
||||
|
||||
axis = arr / angle
|
||||
s = np.sin(angle)
|
||||
c = np.cos(angle)
|
||||
|
||||
return (s / angle) * np.eye(3) + \
|
||||
(1 - (s / angle)) * np.outer(axis, axis) + \
|
||||
((1 - c) / angle) * wedge(axis)
|
||||
|
||||
def numericalDifferential(f, x) -> np.ndarray:
|
||||
"""Compute the numerical derivative via central difference"""
|
||||
|
||||
if isinstance(x, float):
|
||||
x = np.reshape([x], (1, 1))
|
||||
h = 1e-6
|
||||
fx = f(x)
|
||||
n = fx.shape[0]
|
||||
m = x.shape[0]
|
||||
Df = np.zeros((n, m))
|
||||
for j in range(m):
|
||||
ej = np.zeros(m)
|
||||
ej[j] = 1.0
|
||||
Df[:, j:j+1] = (f(x + h * ej) - f(x - h * ej)).reshape(m, 1) / (2*h)
|
||||
return Df
|
||||
|
||||
def lift(xi: State, u: Input) -> np.ndarray:
|
||||
"""The Lift of the system (Theorem 3.8, Equation 7)
|
||||
|
||||
:param xi: A element of the State
|
||||
:param u: A element of the Input space
|
||||
:return: A numpy array representing the Lift
|
||||
"""
|
||||
|
||||
n = len(xi.S)
|
||||
L = np.zeros(6 + 3 * n)
|
||||
L[0:3] = (u.w - xi.b)
|
||||
L[3:6] = -u.W() @ xi.b
|
||||
for i in range(n):
|
||||
L[(6 + 3 * i):(9 + 3 * i)] = xi.S[i].inverse().matrix() @ L[0:3]
|
||||
|
||||
return L
|
||||
|
||||
def checkNorm(x: np.ndarray, tol: float = 1e-3):
|
||||
"""Check norm of a vector being 1 or nan
|
||||
|
||||
:param x: A numpy array
|
||||
:param tol: tollerance, default 1e-3
|
||||
:return: Boolean true if norm is 1 or nan
|
||||
"""
|
||||
return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x))
|
||||
|
||||
def stateAction(X: G, xi: State) -> State:
|
||||
"""Action of the symmetry group on the state space, return phi(X, xi) (Equation 4)
|
||||
|
||||
:param X: A element of the group G
|
||||
:param xi: A element of the State
|
||||
:return: A new element of the state given by the action of phi of G in the State space
|
||||
"""
|
||||
|
||||
if len(xi.S) != len(X.B):
|
||||
raise ValueError("the number of calibration states and B elements of the symmetry group has to match")
|
||||
|
||||
return State(xi.R * X.A,
|
||||
X.A.inverse().matrix() @ (xi.b - vee(X.a)),
|
||||
[(X.A.inverse() * xi.S[i] * X.B[i]) for i in range(len(X.B))])
|
||||
|
||||
|
||||
def velocityAction(X: G, u: Input) -> Input:
|
||||
"""Action of the symmetry group on the input space, return psi(X, u) (Equation 5)
|
||||
|
||||
:param X: A element of the group G
|
||||
:param u: A element of the Input
|
||||
:return: A new element of the Input given by the action of psi of G in the Input space
|
||||
"""
|
||||
|
||||
return Input(X.A.inverse().matrix() @ (u.w - vee(X.a)), u.Sigma)
|
||||
|
||||
|
||||
def outputAction(X: G, y: Direction, idx: int = -1) -> np.ndarray:
|
||||
"""Action of the symmetry group on the output space, return rho(X, y) (Equation 6)
|
||||
|
||||
:param X: A element of the group G
|
||||
:param y: A direction measurement
|
||||
:param idx: indicate the index of the B element in the list, -1 in case no B element exist
|
||||
:return: A numpy array given by the action of rho of G in the Output space
|
||||
"""
|
||||
|
||||
if idx == -1:
|
||||
return X.A.inverse().matrix() @ y.d.unitVector()
|
||||
else:
|
||||
return X.B[idx].inverse().matrix() @ y.d.unitVector()
|
||||
|
||||
|
||||
def local_coords(e: State) -> np.ndarray:
|
||||
"""Local coordinates assuming __xi_0 = identity (Equation 9)
|
||||
|
||||
:param e: A element of the State representing the equivariant error
|
||||
:return: Local coordinates assuming __xi_0 = identity
|
||||
"""
|
||||
|
||||
if coordinate == "EXPONENTIAL":
|
||||
tmp = [Rot3.Logmap(S) for S in e.S]
|
||||
eps = np.concatenate((Rot3.Logmap(e.R), e.b, np.asarray(tmp).reshape(3 * len(tmp),)))
|
||||
elif coordinate == "NORMAL":
|
||||
raise ValueError("Normal coordinate representation is not implemented yet")
|
||||
# X = G(e.R, -SO3.wedge(e.R @ e.b), e.S)
|
||||
# eps = G.log(X)
|
||||
else:
|
||||
raise ValueError("Invalid coordinate representation")
|
||||
|
||||
return eps
|
||||
|
||||
|
||||
def local_coords_inv(eps: np.ndarray) -> "State":
|
||||
"""Local coordinates inverse assuming __xi_0 = identity
|
||||
|
||||
:param eps: A numpy array representing the equivariant error in local coordinates
|
||||
:return: Local coordinates inverse assuming __xi_0 = identity
|
||||
"""
|
||||
|
||||
X = G.exp(eps) #G
|
||||
if coordinate == "EXPONENTIAL":
|
||||
e = State(X.A, eps[3:6, :], X.B) #State
|
||||
elif coordinate == "NORMAL":
|
||||
raise ValueError("Normal coordinate representation is not implemented yet")
|
||||
# stateAction(X, State(SO3.identity(), np.zeros(3), [SO3.identity() for _ in range(len(X.B))]))
|
||||
else:
|
||||
raise ValueError("Invalid coordinate representation")
|
||||
|
||||
return e
|
||||
|
||||
|
||||
def stateActionDiff(xi: State) -> np.ndarray:
|
||||
"""Differential of the phi action phi(xi, E) at E = Id in local coordinates (can be found within equation 23)
|
||||
|
||||
:param xi: A element of the State
|
||||
:return: (Dtheta) * (Dphi(xi, E) at E = Id)
|
||||
"""
|
||||
coordsAction = lambda U: local_coords(stateAction(G.exp(U), xi))
|
||||
differential = numericalDifferential(coordsAction, np.zeros(6 + 3 * len(xi.S)))
|
||||
return differential
|
||||
|
||||
class Measurement:
|
||||
"""Define a measurement
|
||||
----------
|
||||
cal_idx is a index corresponding to the cal_idx-th calibration related to the measurement. Let's consider the case
|
||||
of 2 uncalibrated sensor with two associated calibration state in State.S = [S0, S1], and a single calibrated sensor.
|
||||
cal_idx = 0 indicates a measurement coming from the sensor that has calibration S0, cal_idx = 1 indicates a
|
||||
measurement coming from the sensor that has calibration S1. cal_idx = -1 indicates that the measurement is coming
|
||||
from a calibrated sensor
|
||||
----------
|
||||
"""
|
||||
|
||||
# measurement
|
||||
y: Direction
|
||||
|
||||
# Known direction in global frame
|
||||
d: Direction
|
||||
|
||||
# Covariance matrix of the measurement
|
||||
Sigma: np.ndarray
|
||||
|
||||
# Calibration index
|
||||
cal_idx: int = -1
|
||||
|
||||
def __init__(self, y: np.ndarray, d: np.ndarray, Sigma: np.ndarray, i: int = -1):
|
||||
"""Initialize measurement
|
||||
|
||||
:param y: A numpy array with size 3 and norm 1 representing the direction measurement in the sensor frame
|
||||
:param d: A numpy array with size 3 and norm 1 representing the direction in the global frame
|
||||
:param Sigma: A numpy array with shape (3, 3) representing the noise covariance of the direction measurement
|
||||
:param i: index of the corresponding calibration state
|
||||
"""
|
||||
|
||||
if not (isinstance(y, np.ndarray) and y.size == 3 and checkNorm(y)):
|
||||
raise TypeError("Measurement has to be provided as a (3, 1) vector")
|
||||
if not (isinstance(d, np.ndarray) and d.size == 3 and checkNorm(d)):
|
||||
raise TypeError("Direction has to be provided as a (3, 1) vector")
|
||||
if not (isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 3):
|
||||
raise TypeError("Direction measurement noise covariance has to be provided as a numpy array with shape (3. 3)")
|
||||
if not np.all(np.linalg.eigvals(Sigma) >= 0):
|
||||
raise TypeError("Covariance matrix has to be semi-positive definite")
|
||||
if not (isinstance(i, int) or i == -1 or i > 0):
|
||||
raise TypeError("calibration index is a positive integer or -1")
|
||||
|
||||
self.y = Direction(y)
|
||||
self.d = Direction(d)
|
||||
self.Sigma = Sigma
|
||||
self.cal_idx = i
|
||||
|
||||
@dataclass
|
||||
class Data:
|
||||
"""Define ground-truth, input and output data"""
|
||||
|
||||
# Ground-truth state
|
||||
xi: State
|
||||
n_cal: int
|
||||
|
||||
# Input measurements
|
||||
u: Input
|
||||
|
||||
# Output measurements as a list of Measurement
|
||||
y: list
|
||||
n_meas: int
|
||||
|
||||
# Time
|
||||
t: float
|
||||
dt: float
|
||||
|
||||
class EqF:
|
||||
|
||||
def __init__(self, Sigma: np.ndarray, n: int, m: int):
|
||||
"""Initialize EqF
|
||||
|
||||
:param Sigma: Initial covariance
|
||||
:param n: Number of calibration states
|
||||
:param m: Total number of available sensor
|
||||
"""
|
||||
|
||||
self.__dof = 6 + 3 * n
|
||||
self.__n_cal = n
|
||||
self.__n_sensor = m
|
||||
|
||||
if not (isinstance(Sigma, np.ndarray) and (Sigma.shape[0] == Sigma.shape[1] == self.__dof)):
|
||||
raise TypeError(f"Initial covariance has to be provided as a numpy array with shape ({self.__dof}, {self.__dof})")
|
||||
if not np.all(np.linalg.eigvals(Sigma) >= 0):
|
||||
raise TypeError("Covariance matrix has to be semi-positive definite")
|
||||
if not (isinstance(n, int) and n >= 0):
|
||||
raise TypeError("Number of calibration state has to be unsigned")
|
||||
if not (isinstance(m, int) and m > 1):
|
||||
raise TypeError("Number of direction sensor has to be grater-equal than 2")
|
||||
|
||||
self.__X_hat = G.identity(n)
|
||||
self.__Sigma = Sigma
|
||||
self.__xi_0 = State.identity(n)
|
||||
self.__Dphi0 = stateActionDiff(self.__xi_0) # Within equation 23
|
||||
self.__InnovationLift = np.linalg.pinv(self.__Dphi0) # Within equation 23
|
||||
|
||||
def stateEstimate(self) -> State:
|
||||
"""Return estimated state
|
||||
|
||||
:return: Estimated state
|
||||
"""
|
||||
return stateAction(self.__X_hat, self.__xi_0)
|
||||
|
||||
def propagation(self, u: Input, dt: float):
|
||||
"""Propagate the filter state
|
||||
|
||||
:param u: Angular velocity measurement from IMU
|
||||
:param dt: delta time between timestamp of last propagation/update and timestamp of angular velocity measurement
|
||||
"""
|
||||
|
||||
if not isinstance(u, Input):
|
||||
raise TypeError("angular velocity measurement has to be provided as a Input element")
|
||||
|
||||
L = lift(self.stateEstimate(), u) # Equation 7
|
||||
|
||||
Phi_DT = self.__stateTransitionMatrix(u, dt) # Equation 17
|
||||
# Equivalent
|
||||
# A0t = self.__stateMatrixA(u) # Equation 14a
|
||||
# Phi_DT = expm(A0t * dt)
|
||||
|
||||
Bt = self.__inputMatrixBt() # Equation 27
|
||||
M_DT = (Bt @ blockDiag(u.Sigma, repBlock(1e-9 * np.eye(3), self.__n_cal)) @ Bt.T) * dt
|
||||
|
||||
self.__X_hat = self.__X_hat * G.exp(L * dt) # Equation 18
|
||||
self.__Sigma = Phi_DT @ self.__Sigma @ Phi_DT.T + M_DT # Equation 19
|
||||
|
||||
def update(self, y: Measurement):
|
||||
"""Update the filter state
|
||||
|
||||
:param y: A measurement
|
||||
"""
|
||||
|
||||
# Cross-check calibration
|
||||
assert (y.cal_idx <= self.__n_cal)
|
||||
|
||||
Ct = self.__measurementMatrixC(y.d, y.cal_idx) # Equation 14b
|
||||
delta = wedge(y.d.d.unitVector()) @ outputAction(self.__X_hat.inv(), y.y, y.cal_idx)
|
||||
Dt = self.__outputMatrixDt(y.cal_idx)
|
||||
S = Ct @ self.__Sigma @ Ct.T + Dt @ y.Sigma @ Dt.T # Equation 21
|
||||
K = self.__Sigma @ Ct.T @ np.linalg.inv(S) # Equation 22
|
||||
Delta = self.__InnovationLift @ K @ delta # Equation 23
|
||||
self.__X_hat = G.exp(Delta) * self.__X_hat # Equation 24
|
||||
self.__Sigma = (np.eye(self.__dof) - K @ Ct) @ self.__Sigma # Equation 25
|
||||
|
||||
def __stateMatrixA(self, u: Input) -> np.ndarray:
|
||||
"""Return the state matrix A0t (Equation 14a)
|
||||
|
||||
:param u: Input
|
||||
:return: numpy array representing the state matrix A0t
|
||||
"""
|
||||
|
||||
W0 = velocityAction(self.__X_hat.inv(), u).W()
|
||||
A1 = np.zeros((6, 6))
|
||||
|
||||
if coordinate == "EXPONENTIAL":
|
||||
A1[0:3, 3:6] = -np.eye(3)
|
||||
A1[3:6, 3:6] = W0
|
||||
A2 = repBlock(W0, self.__n_cal)
|
||||
elif coordinate == "NORMAL":
|
||||
raise ValueError("Normal coordinate representation is not implemented yet")
|
||||
else:
|
||||
raise ValueError("Invalid coordinate representation")
|
||||
|
||||
return blockDiag(A1, A2)
|
||||
|
||||
def __stateTransitionMatrix(self, u: Input, dt: float) -> np.ndarray:
|
||||
"""Return the state transition matrix Phi (Equation 17)
|
||||
|
||||
:param u: Input
|
||||
:param dt: Delta time
|
||||
:return: numpy array representing the state transition matrix Phi
|
||||
"""
|
||||
|
||||
W0 = velocityAction(self.__X_hat.inv(), u).W()
|
||||
Phi1 = np.zeros((6, 6))
|
||||
Phi12 = -dt * (np.eye(3) + (dt / 2) * W0 + ((dt**2) / 6) * W0 * W0)
|
||||
Phi22 = np.eye(3) + dt * W0 + ((dt**2) / 2) * W0 * W0
|
||||
|
||||
if coordinate == "EXPONENTIAL":
|
||||
Phi1[0:3, 0:3] = np.eye(3)
|
||||
Phi1[0:3, 3:6] = Phi12
|
||||
Phi1[3:6, 3:6] = Phi22
|
||||
Phi2 = repBlock(Phi22, self.__n_cal)
|
||||
elif coordinate == "NORMAL":
|
||||
raise ValueError("Normal coordinate representation is not implemented yet")
|
||||
else:
|
||||
raise ValueError("Invalid coordinate representation")
|
||||
|
||||
return blockDiag(Phi1, Phi2)
|
||||
|
||||
def __inputMatrixBt(self) -> np.ndarray:
|
||||
"""Return the Input matrix Bt
|
||||
|
||||
:return: numpy array representing the state matrix Bt
|
||||
"""
|
||||
|
||||
if coordinate == "EXPONENTIAL":
|
||||
B1 = blockDiag(self.__X_hat.A.matrix(), self.__X_hat.A.matrix())
|
||||
B2 = None
|
||||
for B in self.__X_hat.B:
|
||||
B2 = blockDiag(B2, B.matrix())
|
||||
elif coordinate == "NORMAL":
|
||||
raise ValueError("Normal coordinate representation is not implemented yet")
|
||||
else:
|
||||
raise ValueError("Invalid coordinate representation")
|
||||
|
||||
return blockDiag(B1, B2)
|
||||
|
||||
def __measurementMatrixC(self, d: Direction, idx: int) -> np.ndarray:
|
||||
"""Return the measurement matrix C0 (Equation 14b)
|
||||
|
||||
:param d: Known direction
|
||||
:param idx: index of the related calibration state
|
||||
:return: numpy array representing the measurement matrix C0
|
||||
"""
|
||||
|
||||
Cc = np.zeros((3, 3 * self.__n_cal))
|
||||
|
||||
# If the measurement is related to a sensor that has a calibration state
|
||||
if idx >= 0:
|
||||
Cc[(3 * idx):(3 + 3 * idx), :] = wedge(d.d.unitVector())
|
||||
|
||||
return wedge(d.d.unitVector()) @ np.hstack((wedge(d.d.unitVector()), np.zeros((3, 3)), Cc))
|
||||
|
||||
def __outputMatrixDt(self, idx: int) -> np.ndarray:
|
||||
"""Return the measurement output matrix Dt
|
||||
|
||||
:param idx: index of the related calibration state
|
||||
:return: numpy array representing the output matrix Dt
|
||||
"""
|
||||
|
||||
# If the measurement is related to a sensor that has a calibration state
|
||||
if idx >= 0:
|
||||
return self.__X_hat.B[idx].matrix()
|
||||
else:
|
||||
return self.__X_hat.A.matrix()
|
||||
|
||||
def formatCSV(df): # pass the dataframe in to this function and get "data_list" as an output
|
||||
"""Read data from csv file formatted as follows:
|
||||
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| t: time |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| q_x, q_y, q_z, q_w: Quaternion representing the attitude |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| b_x, b_y, b_z: Gyro bias |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| cq_x_0, cq_yv, cq_z_0, cq_w_0: Quaternion representing the first calibration |
|
||||
| ... |
|
||||
| cq_x_n, cq_y_n, cq_z_n, cq_w_n: Quaternion representing the last calibration |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| w_x, w_y, w_z: Gyro measurements |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| std_w_x, std_w_x, std_w_z: Gyro measurements noise standard deviation |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| std_b_x, std_b_x, std_b_z: Gyro bias random walk standard deviation |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| y_x_0, y_y_0, y_z_0: Direction measurement in sensor frame associated to calibration state 0 |
|
||||
| ... |
|
||||
| y_x_n, y_y_n, y_z_n: Direction measurement in sensor frame associated to calibration state n |
|
||||
| y_x_n+1, y_y_n+1, y_z_n+1: Direction measurement in sensor frame from calibrated sensor |
|
||||
| ... |
|
||||
| y_x_m, y_y_m, y_z_m: Direction measurement in sensor frame from calibrated sensor |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| std_y_x_0, std_y_y_0, std_y_z_0: Standard deviation of direction measurement y_0 |
|
||||
| ... |
|
||||
| std_y_x_m, std_y_y_m, std_y_z_m: Standard deviation of direction measurement y_m |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
| d_x_0, d_y_0, d_z_0: Known direction in global frame associated to direction measurement 0 |
|
||||
| ... |
|
||||
| d_x_m, d_y_m, d_z_m: Known direction in global frame associated to direction measurement m |
|
||||
| -------------------------------------------------------------------------------------------- |
|
||||
|
||||
NaN cell means that value is not present at that time
|
||||
|
||||
Max allowd n = 5
|
||||
Max allowd m = 10
|
||||
|
||||
:param pname: path name
|
||||
"""
|
||||
|
||||
# read .csv file into pandas dataframe
|
||||
df = df.reset_index()
|
||||
|
||||
# Define data_list as list
|
||||
data_list = []
|
||||
last_timestamp = df.t[0]
|
||||
|
||||
# Check for existence of bias ground-truth into loaded data
|
||||
bias_exist = False
|
||||
if {'b_x', 'b_y', 'b_z'}.issubset(df.columns):
|
||||
bias_exist = True
|
||||
|
||||
# Check for existence of calibration ground-truth (yaw, pitch, roll angles) into loaded data
|
||||
cal_exist = False
|
||||
n_cal = 0
|
||||
for i in range(6):
|
||||
if {'cq_x_' + str(i), 'cq_y_' + str(i), 'cq_z_' + str(i), 'cq_w_' + str(i)}.issubset(df.columns):
|
||||
cal_exist = True
|
||||
n_cal = i+1
|
||||
|
||||
# Check for existence of direction measurements
|
||||
n_meas = 0
|
||||
for i in range(11):
|
||||
if {'y_x_' + str(i), 'y_y_' + str(i), 'y_z_' + str(i)}.issubset(df.columns):
|
||||
n_meas = i + 1
|
||||
|
||||
for index, row in df.iterrows():
|
||||
|
||||
# Load timestamps and record dt
|
||||
t = float(row['t'])
|
||||
dt = t - last_timestamp
|
||||
|
||||
# Skip data_list if dt is smaller than a micro second
|
||||
if dt < 1e-6:
|
||||
continue
|
||||
|
||||
last_timestamp = t
|
||||
|
||||
# Load groundtruth values
|
||||
quat = np.array([float(row['q_x']), float(row['q_y']), float(row['q_z']), float(row['q_w'])])
|
||||
|
||||
R = Rot3(gtsam.Rot3.Quaternion(float(row['q_w']), float(row['q_x']), float(row['q_y']), float(row['q_z'])).matrix())
|
||||
|
||||
# Load Gyro biases
|
||||
if bias_exist:
|
||||
b = np.array([float(row['b_x']), float(row['b_y']), float(row['b_z'])]).reshape(3,)
|
||||
else:
|
||||
b = np.zeros(3)
|
||||
|
||||
# Load GNSS calibration
|
||||
S = []
|
||||
if cal_exist:
|
||||
for i in range(n_cal):
|
||||
cal = np.array([float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)]), float(row['cq_w_' + str(i)])])
|
||||
S.append(Rot3(gtsam.Rot3.Quaternion(float(row['cq_w_' + str(i)]), float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)])).matrix()))
|
||||
|
||||
|
||||
# Load Gyro inputs
|
||||
w = np.array([float(row['w_x']), float(row['w_y']), float(row['w_z'])]).reshape(3,)
|
||||
std_w = np.array([float(row['std_w_x']), float(row['std_w_y']), float(row['std_w_z'])]).reshape(3,)
|
||||
std_b = np.array([float(row['std_b_x']), float(row['std_b_y']), float(row['std_b_z'])]).reshape(3,)
|
||||
Sigma_wb = blockDiag(np.eye(3) * (std_w ** 2), np.eye(3) * (std_b ** 2))
|
||||
|
||||
# Load measurements
|
||||
meas = []
|
||||
for i in range(n_meas):
|
||||
y = np.array([float(row['y_x_' + str(i)]), float(row['y_y_' + str(i)]), float(row['y_z_' + str(i)])]).reshape(3,)
|
||||
d = np.array([float(row['d_x_' + str(i)]), float(row['d_y_' + str(i)]), float(row['d_z_' + str(i)])]).reshape(3,)
|
||||
std_y = np.array([float(row['std_y_x_' + str(i)]), float(row['std_y_y_' + str(i)]), float(row['std_y_z_' + str(i)])]).reshape(3,)
|
||||
if i < n_cal:
|
||||
meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), i))
|
||||
else:
|
||||
meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), -1))
|
||||
|
||||
# Append to data_list
|
||||
d = Data(State(R, b, S), n_cal, Input(w, Sigma_wb), meas, n_meas, t, dt)
|
||||
|
||||
data_list.append(d)
|
||||
|
||||
return data_list
|
||||
|
||||
|
||||
def sim(filter_args, data):
|
||||
|
||||
# Define progressbar
|
||||
p = progressbar.ProgressBar()
|
||||
|
||||
# EqF
|
||||
filter = EqF(*filter_args)
|
||||
|
||||
# Allocate variables
|
||||
t = []
|
||||
est = []
|
||||
|
||||
# Filter loop
|
||||
for d in p(data):
|
||||
|
||||
t.append(d.t)
|
||||
|
||||
# Run filter
|
||||
try:
|
||||
filter.propagation(d.u, d.dt)
|
||||
except:
|
||||
print('Filter.propagation Error\n')
|
||||
for y in d.y:
|
||||
if not (np.isnan(np.linalg.norm(y.y.d.unitVector())) or np.isnan(np.linalg.norm(y.d.d.unitVector()))):
|
||||
try:
|
||||
filter.update(y)
|
||||
except:
|
||||
print('Filter.update Error\n')
|
||||
est.append(filter.stateEstimate())
|
||||
|
||||
# Plot Attitude1
|
||||
fig, (ax0, ax1, ax2) = plt.subplots(3, 1)
|
||||
ax = [ax0, ax1, ax2]
|
||||
for i in range(3):
|
||||
ax[i].plot(t, [d.xi.R.rpy()[i] * 180 / math.pi for d in data], color="C0")
|
||||
ax[i].plot(t, [xi.R.rpy()[i] * 180 / math.pi for xi in est], color="C1")
|
||||
ax[i].set_xlabel("t")
|
||||
ax0.set_title("Attitude: Roll")
|
||||
ax1.set_title("Attitude: Pitch")
|
||||
ax2.set_title("Attitude: Yaw")
|
||||
plt.show()
|
||||
|
||||
# Plot bias
|
||||
fig, (ax0, ax1, ax2) = plt.subplots(3, 1)
|
||||
ax = [ax0, ax1, ax2]
|
||||
for i in range(3):
|
||||
ax[i].plot(t, [d.xi.b[i] for d in data], color="C0")
|
||||
ax[i].plot(t, [xi.b[i] for xi in est], color="C1")
|
||||
ax[i].set_xlabel("t")
|
||||
ax0.set_title("Bias: x")
|
||||
ax1.set_title("Bias: y")
|
||||
ax2.set_title("Bias: z")
|
||||
plt.show()
|
||||
|
||||
|
||||
# Plot calibration states
|
||||
for j in range(data[0].n_cal):
|
||||
fig, (ax0, ax1, ax2) = plt.subplots(3, 1)
|
||||
ax = [ax0, ax1, ax2]
|
||||
for i in range(3):
|
||||
ax[i].plot(t, [d.xi.S[j].rpy()[i] * 180 / math.pi for d in data], color="C0")
|
||||
ax[i].plot(t, [xi.S[j].rpy()[i] * 180 / math.pi for xi in est], color="C1")
|
||||
ax[i].set_xlabel("t")
|
||||
ax0.set_title("Calibration: Roll")
|
||||
ax1.set_title("Calibration: Pitch")
|
||||
ax2.set_title("Calibration: Yaw")
|
||||
plt.show()
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue