Address PR comments - code cleanup

release/4.3a0
jenniferoum 2025-03-13 00:23:35 -07:00
parent 593beed1e1
commit 880bd0cf90
2 changed files with 393 additions and 329 deletions

View File

@ -16,7 +16,7 @@
},
{
"cell_type": "code",
"execution_count": 13,
"execution_count": 68,
"metadata": {},
"outputs": [
{
@ -33,16 +33,228 @@
},
{
"cell_type": "code",
"execution_count": 14,
"execution_count": 69,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"import progressbar\n",
"import pandas as pd\n",
"import matplotlib.pyplot as plt\n",
"import math\n",
"\n",
"from gtsam import findExampleDataFile\n",
"import gtsam\n",
"from gtsam import findExampleDataFile, Rot3\n",
"\n",
"from EqF import formatCSV, blockDiag, repBlock, sim\n"
"from EqF import *\n"
]
},
{
"cell_type": "code",
"execution_count": 70,
"metadata": {},
"outputs": [],
"source": [
"def formatCSV(df): # pass the dataframe in to this function and get \"data_list\" as an output\n",
" \"\"\"Read data from csv file formatted as follows:\n",
"\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | t: time |\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | q_x, q_y, q_z, q_w: Quaternion representing the attitude |\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | b_x, b_y, b_z: Gyro bias |\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | cq_x_0, cq_yv, cq_z_0, cq_w_0: Quaternion representing the first calibration |\n",
" | ... |\n",
" | cq_x_n, cq_y_n, cq_z_n, cq_w_n: Quaternion representing the last calibration |\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | w_x, w_y, w_z: Gyro measurements |\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | std_w_x, std_w_x, std_w_z: Gyro measurements noise standard deviation |\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | std_b_x, std_b_x, std_b_z: Gyro bias random walk standard deviation |\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | y_x_0, y_y_0, y_z_0: Direction measurement in sensor frame associated to calibration state 0 |\n",
" | ... |\n",
" | y_x_n, y_y_n, y_z_n: Direction measurement in sensor frame associated to calibration state n |\n",
" | y_x_n+1, y_y_n+1, y_z_n+1: Direction measurement in sensor frame from calibrated sensor |\n",
" | ... |\n",
" | y_x_m, y_y_m, y_z_m: Direction measurement in sensor frame from calibrated sensor |\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | std_y_x_0, std_y_y_0, std_y_z_0: Standard deviation of direction measurement y_0 |\n",
" | ... |\n",
" | std_y_x_m, std_y_y_m, std_y_z_m: Standard deviation of direction measurement y_m |\n",
" | -------------------------------------------------------------------------------------------- |\n",
" | d_x_0, d_y_0, d_z_0: Known direction in global frame associated to direction measurement 0 |\n",
" | ... |\n",
" | d_x_m, d_y_m, d_z_m: Known direction in global frame associated to direction measurement m |\n",
" | -------------------------------------------------------------------------------------------- |\n",
"\n",
" NaN cell means that value is not present at that time\n",
"\n",
" Max allowd n = 5\n",
" Max allowd m = 10\n",
"\n",
" :param pname: path name\n",
" \"\"\"\n",
"\n",
" # read .csv file into pandas dataframe\n",
" df = df.reset_index()\n",
"\n",
" # Define data_list as list\n",
" data_list = []\n",
" last_timestamp = df.t[0]\n",
"\n",
" # Check for existence of bias ground-truth into loaded data\n",
" bias_exist = False\n",
" if {'b_x', 'b_y', 'b_z'}.issubset(df.columns):\n",
" bias_exist = True\n",
"\n",
" # Check for existence of calibration ground-truth (yaw, pitch, roll angles) into loaded data\n",
" cal_exist = False\n",
" n_cal = 0\n",
" for i in range(6):\n",
" if {'cq_x_' + str(i), 'cq_y_' + str(i), 'cq_z_' + str(i), 'cq_w_' + str(i)}.issubset(df.columns):\n",
" cal_exist = True\n",
" n_cal = i+1\n",
"\n",
" # Check for existence of direction measurements\n",
" n_meas = 0\n",
" for i in range(11):\n",
" if {'y_x_' + str(i), 'y_y_' + str(i), 'y_z_' + str(i)}.issubset(df.columns):\n",
" n_meas = i + 1\n",
"\n",
" for index, row in df.iterrows():\n",
"\n",
" # Load timestamps and record dt\n",
" t = float(row['t'])\n",
" dt = t - last_timestamp\n",
"\n",
" # Skip data_list if dt is smaller than a micro second\n",
" if dt < 1e-6:\n",
" continue\n",
"\n",
" last_timestamp = t\n",
"\n",
" # Load groundtruth values\n",
" quat = np.array([float(row['q_x']), float(row['q_y']), float(row['q_z']), float(row['q_w'])])\n",
"\n",
" R = Rot3(gtsam.Rot3.Quaternion(float(row['q_w']), float(row['q_x']), float(row['q_y']), float(row['q_z'])).matrix())\n",
"\n",
" # Load Gyro biases\n",
" if bias_exist:\n",
" b = np.array([float(row['b_x']), float(row['b_y']), float(row['b_z'])]).reshape(3,)\n",
" else:\n",
" b = np.zeros(3)\n",
"\n",
" # Load GNSS calibration\n",
" S = []\n",
" if cal_exist:\n",
" for i in range(n_cal):\n",
" cal = np.array([float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)]), float(row['cq_w_' + str(i)])])\n",
" S.append(Rot3(gtsam.Rot3.Quaternion(float(row['cq_w_' + str(i)]), float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)])).matrix()))\n",
"\n",
"\n",
" # Load Gyro inputs\n",
" w = np.array([float(row['w_x']), float(row['w_y']), float(row['w_z'])]).reshape(3,)\n",
" std_w = np.array([float(row['std_w_x']), float(row['std_w_y']), float(row['std_w_z'])]).reshape(3,)\n",
" std_b = np.array([float(row['std_b_x']), float(row['std_b_y']), float(row['std_b_z'])]).reshape(3,)\n",
" Sigma_wb = blockDiag(np.eye(3) * (std_w ** 2), np.eye(3) * (std_b ** 2))\n",
"\n",
" # Load measurements\n",
" meas = []\n",
" for i in range(n_meas):\n",
" y = np.array([float(row['y_x_' + str(i)]), float(row['y_y_' + str(i)]), float(row['y_z_' + str(i)])]).reshape(3,)\n",
" d = np.array([float(row['d_x_' + str(i)]), float(row['d_y_' + str(i)]), float(row['d_z_' + str(i)])]).reshape(3,)\n",
" std_y = np.array([float(row['std_y_x_' + str(i)]), float(row['std_y_y_' + str(i)]), float(row['std_y_z_' + str(i)])]).reshape(3,)\n",
" if i < n_cal:\n",
" meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), i))\n",
" else:\n",
" meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), -1))\n",
"\n",
" # Append to data_list\n",
" d = Data(State(R, b, S), n_cal, Input(w, Sigma_wb), meas, n_meas, t, dt)\n",
" \n",
" data_list.append(d)\n",
"\n",
" return data_list"
]
},
{
"cell_type": "code",
"execution_count": 71,
"metadata": {},
"outputs": [],
"source": [
"def sim(filter_args, data):\n",
"\n",
" # Define progressbar\n",
" p = progressbar.ProgressBar()\n",
"\n",
" # EqF\n",
" filter = EqF(*filter_args)\n",
"\n",
" # Allocate variables\n",
" t = []\n",
" est = []\n",
"\n",
" # Filter loop\n",
" for d in p(data):\n",
"\n",
" t.append(d.t)\n",
"\n",
" # Run filter\n",
" try:\n",
" filter.propagation(d.u, d.dt)\n",
" except:\n",
" print('Filter.propagation Error\\n')\n",
" for y in d.y:\n",
" if not (np.isnan(np.linalg.norm(y.y.d.unitVector())) or np.isnan(np.linalg.norm(y.d.d.unitVector()))):\n",
" try:\n",
" filter.update(y)\n",
" except:\n",
" print('Filter.update Error\\n')\n",
" est.append(filter.stateEstimate())\n",
"\n",
" # Plot Attitude1\n",
" fig, (ax0, ax1, ax2) = plt.subplots(3, 1)\n",
" ax = [ax0, ax1, ax2]\n",
" for i in range(3):\n",
" ax[i].plot(t, [d.xi.R.rpy()[i] * 180 / math.pi for d in data], color=\"C0\")\n",
" ax[i].plot(t, [xi.R.rpy()[i] * 180 / math.pi for xi in est], color=\"C1\")\n",
" ax[i].set_xlabel(\"t\")\n",
" ax0.set_title(\"Attitude: Roll\")\n",
" ax1.set_title(\"Attitude: Pitch\")\n",
" ax2.set_title(\"Attitude: Yaw\")\n",
" plt.show()\n",
"\n",
" # Plot bias\n",
" fig, (ax0, ax1, ax2) = plt.subplots(3, 1)\n",
" ax = [ax0, ax1, ax2]\n",
" for i in range(3):\n",
" ax[i].plot(t, [d.xi.b[i] for d in data], color=\"C0\")\n",
" ax[i].plot(t, [xi.b[i] for xi in est], color=\"C1\")\n",
" ax[i].set_xlabel(\"t\")\n",
" ax0.set_title(\"Bias: x\")\n",
" ax1.set_title(\"Bias: y\")\n",
" ax2.set_title(\"Bias: z\")\n",
" plt.show()\n",
"\n",
"\n",
" # Plot calibration states\n",
" for j in range(data[0].n_cal):\n",
" fig, (ax0, ax1, ax2) = plt.subplots(3, 1)\n",
" ax = [ax0, ax1, ax2]\n",
" for i in range(3):\n",
" ax[i].plot(t, [d.xi.S[j].rpy()[i] * 180 / math.pi for d in data], color=\"C0\")\n",
" ax[i].plot(t, [xi.S[j].rpy()[i] * 180 / math.pi for xi in est], color=\"C1\")\n",
" ax[i].set_xlabel(\"t\")\n",
" ax0.set_title(\"Calibration: Roll\")\n",
" ax1.set_title(\"Calibration: Pitch\")\n",
" ax2.set_title(\"Calibration: Yaw\")\n",
" plt.show()"
]
},
{
@ -62,7 +274,7 @@
},
{
"cell_type": "code",
"execution_count": 15,
"execution_count": 72,
"metadata": {},
"outputs": [
{

View File

@ -12,48 +12,21 @@ Authors: Jennifer Oum & Darshan Rajasekaran
import numpy as np
import gtsam
from gtsam import Rot3, Unit3
import math
from dataclasses import dataclass
import progressbar
import pandas as pd
import matplotlib.pyplot as plt
from typing import List
coordinate = "EXPONENTIAL"
def Rot3LeftJacobian(arr: np.ndarray) -> np.ndarray:
"""Return the SO(3) Left Jacobian
:param arr: A numpy array with size 3
:return: The left Jacobian of SO(3)
"""
if not (isinstance(arr, np.ndarray) and arr.size == 3):
raise ValueError("A numpy array with size 3 has to be provided")
angle = np.linalg.norm(arr)
# Near |phi|==0, use first order Taylor expansion
if np.isclose(angle, 0.):
return np.eye(3) + 0.5 * Rot3.Hat(arr)
axis = arr / angle
s = np.sin(angle)
c = np.cos(angle)
return (s / angle) * np.eye(3) + \
(1 - (s / angle)) * np.outer(axis, axis) + \
((1 - c) / angle) * Rot3.Hat(axis)
def checkNorm(x: np.ndarray, tol: float = 1e-3):
"""Check norm of a vector being 1 or nan
"""Check norm of a vector being 1 or nan
:param x: A numpy array
:param tol: tollerance, default 1e-3
:return: Boolean true if norm is 1 or nan
"""
return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x))
:param x: A numpy array
:param tol: tollerance, default 1e-3
:return: Boolean true if norm is 1 or nan
"""
return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x))
class State:
"""Define the state of the Biased Attitude System
@ -76,9 +49,14 @@ class State:
b: np.ndarray
# Sensor calibrations S
S: List[Rot3]
S: List[Rot3]
def __init__(self, R : Rot3 = Rot3.Identity(), b : np.ndarray = np.zeros(3), S: List[Rot3] = None):
def __init__(
self,
R: Rot3 = Rot3.Identity(),
b: np.ndarray = np.zeros(3),
S: List[Rot3] = None,
):
"""Initialize State
:param R: A SO3 element representing the attitude of the system as a rotation matrix
@ -89,11 +67,16 @@ class State:
if not isinstance(R, gtsam.Rot3):
raise TypeError("the attitude rotation matrix R has to be of type SO3 but type is", type(R))
raise TypeError(
"the attitude rotation matrix R has to be of type SO3 but type is",
type(R),
)
self.R = R
if not (isinstance(b, np.ndarray) and b.size == 3):
raise TypeError("The gyroscope bias has to be probvided as numpy array with size 3")
raise TypeError(
"The gyroscope bias has to be probvided as numpy array with size 3"
)
self.b = b
if S is None:
@ -103,7 +86,9 @@ class State:
raise TypeError("Calibration states has to be provided as a list")
for calibration in S:
if not isinstance(calibration, Rot3):
raise TypeError("Elements of the list of calibration states have to be of type SO3")
raise TypeError(
"Elements of the list of calibration states have to be of type SO3"
)
self.S = S
@staticmethod
@ -114,9 +99,9 @@ class State:
:return: The identity element of the State
"""
return State(Rot3.Identity(),
np.zeros(3),
[Rot3.Identity() for _ in range(n)])
return State(Rot3.Identity(), np.zeros(3), [Rot3.Identity() for _ in range(n)])
class Input:
"""Define the input space of the Biased Attitude System
----------
@ -139,9 +124,15 @@ class Input:
"""
if not (isinstance(w, np.ndarray) and w.size == 3):
raise TypeError("Angular velocity has to be provided as a numpy array with size 3")
if not (isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 6):
raise TypeError("Input measurement noise covariance has to be provided as a numpy array with shape (6. 6)")
raise TypeError(
"Angular velocity has to be provided as a numpy array with size 3"
)
if not (
isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 6
):
raise TypeError(
"Input measurement noise covariance has to be provided as a numpy array with shape (6. 6)"
)
if not np.all(np.linalg.eigvals(Sigma) >= 0):
raise TypeError("Covariance matrix has to be semi-positive definite")
@ -149,7 +140,7 @@ class Input:
self.Sigma = Sigma
@staticmethod
def random() -> 'Input':
def random() -> "Input":
"""Return a random angular velocity
:return: A random angular velocity as a Input element
@ -165,6 +156,7 @@ class Input:
return Rot3.Hat(self.w)
class G:
"""Symmetry group (SO(3) |x so(3)) x SO(3) x ... x SO(3)
----------
@ -181,7 +173,12 @@ class G:
a: np.ndarray
B: List[Rot3]
def __init__(self, A: Rot3 = Rot3.Identity(), a: np.ndarray = np.zeros((3, 3)), B: List[Rot3] = None):
def __init__(
self,
A: Rot3 = Rot3.Identity(),
a: np.ndarray = np.zeros((3, 3)),
B: List[Rot3] = None,
):
"""Initialize the symmetry group G
:param A: SO3 element
@ -203,21 +200,23 @@ class G:
raise TypeError("Elements of B have to be of type SO3")
self.B = B
def __mul__(self, other: 'G') -> 'G':
def __mul__(self, other: "G") -> "G":
"""Define the group operation
:param other: G
:return: A element of the group G given by the "multiplication" of self and other
"""
assert (isinstance(other, G))
assert (len(self.B) == len(other.B))
return G(self.A * other.A,
self.a + Rot3.Hat(self.A.matrix() @ Rot3.Vee(other.a)),
[self.B[i]*other.B[i] for i in range(len(self.B))])
assert isinstance(other, G)
assert len(self.B) == len(other.B)
return G(
self.A * other.A,
self.a + Rot3.Hat(self.A.matrix() @ Rot3.Vee(other.a)),
[self.B[i] * other.B[i] for i in range(len(self.B))],
)
@staticmethod
def identity(n : int):
def identity(n: int):
"""Return the identity of the symmetry group with n elements of SO3 related to sensor calibration states
:param n: number of elements in list B associated with calibration states
@ -226,7 +225,28 @@ class G:
return G(Rot3.Identity(), np.zeros((3, 3)), [Rot3.Identity() for _ in range(n)])
def exp(x: np.ndarray) -> 'G':
@staticmethod
def Rot3LeftJacobian(arr: np.ndarray) -> np.ndarray:
"""Return the SO(3) Left Jacobian
:param arr: A numpy array with size 3
:return: The left Jacobian of SO(3)
"""
if not (isinstance(arr, np.ndarray) and arr.size == 3):
raise ValueError("A numpy array with size 3 has to be provided")
angle = np.linalg.norm(arr)
# Near |phi|==0, use first order Taylor expansion
if np.isclose(angle, 0.0):
return np.eye(3) + 0.5 * Rot3.Hat(arr)
axis = arr / angle
s = np.sin(angle)
c = np.cos(angle)
return (
(s / angle) * np.eye(3)
+ (1 - (s / angle)) * np.outer(axis, axis)
+ ((1 - c) / angle) * Rot3.Hat(axis)
)
def exp(x: np.ndarray) -> "G":
"""Return a group element X given by X = exp(x) where x is a numpy array
:param x: A numpy array
@ -234,24 +254,33 @@ class G:
"""
if not (isinstance(x, np.ndarray) and x.size >= 6):
raise ValueError("Wrong shape, a numpy array with size 3n has to be provided")
raise ValueError(
"Wrong shape, a numpy array with size 3n has to be provided"
)
if (x.size % 3) != 0:
raise ValueError("Wrong size, a numpy array with size multiple of 3 has to be provided")
raise ValueError(
"Wrong size, a numpy array with size multiple of 3 has to be provided"
)
n = int((x.size - 6) / 3)
A = Rot3.Expmap(x[0:3])
a = Rot3.Hat(Rot3LeftJacobian(x[0:3]) @ x[3:6])
B = [Rot3.Expmap(x[(6 + 3 * i):(9 + 3 * i)]) for i in range(n)]
a = Rot3.Hat(G.Rot3LeftJacobian(x[0:3]) @ x[3:6])
B = [Rot3.Expmap(x[(6 + 3 * i) : (9 + 3 * i)]) for i in range(n)]
return G(A, a, B)
def inv(self) -> 'G':
def inv(self) -> "G":
"""Return the inverse element of the symmetry group
:return: A element of the group G given by the inverse of self
"""
return G(self.A.inverse(), -Rot3.Hat(self.A.inverse().matrix() @ Rot3.Vee(self.a)), [B.inverse() for B in self.B])
return G(
self.A.inverse(),
-Rot3.Hat(self.A.inverse().matrix() @ Rot3.Vee(self.a)),
[B.inverse() for B in self.B],
)
class Direction:
"""Define a direction as a S2 element"""
@ -269,7 +298,8 @@ class Direction:
raise TypeError("Direction has to be provided as a 3 vector")
self.d = Unit3(d)
def blockDiag(A : np.ndarray, B : np.ndarray) -> np.ndarray:
def blockDiag(A: np.ndarray, B: np.ndarray) -> np.ndarray:
"""Create a lock diagonal matrix from blocks A and B
:param A: numpy array
@ -282,9 +312,15 @@ def blockDiag(A : np.ndarray, B : np.ndarray) -> np.ndarray:
elif B is None:
return A
else:
return np.block([[A, np.zeros((A.shape[0], B.shape[1]))],[np.zeros((B.shape[0], A.shape[1])), B]])
return np.block(
[
[A, np.zeros((A.shape[0], B.shape[1]))],
[np.zeros((B.shape[0], A.shape[1])), B],
]
)
def repBlock(A : np.ndarray, n: int) -> np.ndarray:
def repBlock(A: np.ndarray, n: int) -> np.ndarray:
"""Create a block diagonal matrix repeating the A block n times
:param A: numpy array representing the block A
@ -298,30 +334,6 @@ def repBlock(A : np.ndarray, n: int) -> np.ndarray:
return res
def Rot3LeftJacobian(arr: np.ndarray) -> np.ndarray:
"""Return the SO(3) Left Jacobian
:param arr: A numpy array with size 3
:return: The left Jacobian of SO(3)
"""
if not (isinstance(arr, np.ndarray) and arr.size == 3):
raise ValueError("A numpy array with size 3 has to be provided")
angle = np.linalg.norm(arr)
# Near |phi|==0, use first order Taylor expansion
if np.isclose(angle, 0.):
return np.eye(3) + 0.5 * Rot3.Hat(arr)
axis = arr / angle
s = np.sin(angle)
c = np.cos(angle)
return (s / angle) * np.eye(3) + \
(1 - (s / angle)) * np.outer(axis, axis) + \
((1 - c) / angle) * Rot3.Hat(axis)
def numericalDifferential(f, x) -> np.ndarray:
"""Compute the numerical derivative via central difference"""
@ -335,9 +347,10 @@ def numericalDifferential(f, x) -> np.ndarray:
for j in range(m):
ej = np.zeros(m)
ej[j] = 1.0
Df[:, j:j+1] = (f(x + h * ej) - f(x - h * ej)).reshape(m, 1) / (2*h)
Df[:, j : j + 1] = (f(x + h * ej) - f(x - h * ej)).reshape(m, 1) / (2 * h)
return Df
def lift(xi: State, u: Input) -> np.ndarray:
"""The Lift of the system (Theorem 3.8, Equation 7)
@ -348,21 +361,23 @@ def lift(xi: State, u: Input) -> np.ndarray:
n = len(xi.S)
L = np.zeros(6 + 3 * n)
L[0:3] = (u.w - xi.b)
L[0:3] = u.w - xi.b
L[3:6] = -u.W() @ xi.b
for i in range(n):
L[(6 + 3 * i):(9 + 3 * i)] = xi.S[i].inverse().matrix() @ L[0:3]
L[(6 + 3 * i) : (9 + 3 * i)] = xi.S[i].inverse().matrix() @ L[0:3]
return L
def checkNorm(x: np.ndarray, tol: float = 1e-3):
"""Check norm of a vector being 1 or nan
:param x: A numpy array
:param tol: tollerance, default 1e-3
:return: Boolean true if norm is 1 or nan
"""
return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x))
def checkNorm(x: np.ndarray, tol: float = 1e-3):
"""Check norm of a vector being 1 or nan
:param x: A numpy array
:param tol: tollerance, default 1e-3
:return: Boolean true if norm is 1 or nan
"""
return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x))
def stateAction(X: G, xi: State) -> State:
"""Action of the symmetry group on the state space, return phi(X, xi) (Equation 4)
@ -373,11 +388,15 @@ def stateAction(X: G, xi: State) -> State:
"""
if len(xi.S) != len(X.B):
raise ValueError("the number of calibration states and B elements of the symmetry group has to match")
raise ValueError(
"the number of calibration states and B elements of the symmetry group has to match"
)
return State(xi.R * X.A,
X.A.inverse().matrix() @ (xi.b - Rot3.Vee(X.a)),
[(X.A.inverse() * xi.S[i] * X.B[i]) for i in range(len(X.B))])
return State(
xi.R * X.A,
X.A.inverse().matrix() @ (xi.b - Rot3.Vee(X.a)),
[(X.A.inverse() * xi.S[i] * X.B[i]) for i in range(len(X.B))],
)
def velocityAction(X: G, u: Input) -> Input:
@ -388,7 +407,7 @@ def velocityAction(X: G, u: Input) -> Input:
:return: A new element of the Input given by the action of psi of G in the Input space
"""
return Input(X.A.inverse().matrix() @ (u.w - Rot3.Vee(X.a)), u.Sigma)
return Input(X.A.inverse().matrix() @ (u.w - Rot3.Vee(X.a)), u.Sigma)
def outputAction(X: G, y: Direction, idx: int = -1) -> np.ndarray:
@ -415,7 +434,15 @@ def local_coords(e: State) -> np.ndarray:
if coordinate == "EXPONENTIAL":
tmp = [Rot3.Logmap(S) for S in e.S]
eps = np.concatenate((Rot3.Logmap(e.R), e.b, np.asarray(tmp).reshape(3 * len(tmp),)))
eps = np.concatenate(
(
Rot3.Logmap(e.R),
e.b,
np.asarray(tmp).reshape(
3 * len(tmp),
),
)
)
elif coordinate == "NORMAL":
raise ValueError("Normal coordinate representation is not implemented yet")
# X = G(e.R, -SO3.Rot3.Hat(e.R @ e.b), e.S)
@ -433,9 +460,9 @@ def local_coords_inv(eps: np.ndarray) -> "State":
:return: Local coordinates inverse assuming __xi_0 = identity
"""
X = G.exp(eps) #G
X = G.exp(eps) # G
if coordinate == "EXPONENTIAL":
e = State(X.A, eps[3:6, :], X.B) #State
e = State(X.A, eps[3:6, :], X.B) # State
elif coordinate == "NORMAL":
raise ValueError("Normal coordinate representation is not implemented yet")
# stateAction(X, State(SO3.identity(), np.zeros(3), [SO3.identity() for _ in range(len(X.B))]))
@ -455,6 +482,7 @@ def stateActionDiff(xi: State) -> np.ndarray:
differential = numericalDifferential(coordsAction, np.zeros(6 + 3 * len(xi.S)))
return differential
class Measurement:
"""Define a measurement
----------
@ -491,8 +519,12 @@ class Measurement:
raise TypeError("Measurement has to be provided as a (3, 1) vector")
if not (isinstance(d, np.ndarray) and d.size == 3 and checkNorm(d)):
raise TypeError("Direction has to be provided as a (3, 1) vector")
if not (isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 3):
raise TypeError("Direction measurement noise covariance has to be provided as a numpy array with shape (3. 3)")
if not (
isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 3
):
raise TypeError(
"Direction measurement noise covariance has to be provided as a numpy array with shape (3. 3)"
)
if not np.all(np.linalg.eigvals(Sigma) >= 0):
raise TypeError("Covariance matrix has to be semi-positive definite")
if not (isinstance(i, int) or i == -1 or i > 0):
@ -503,6 +535,7 @@ class Measurement:
self.Sigma = Sigma
self.cal_idx = i
@dataclass
class Data:
"""Define ground-truth, input and output data"""
@ -522,8 +555,8 @@ class Data:
t: float
dt: float
class EqF:
class EqF:
def __init__(self, Sigma: np.ndarray, n: int, m: int):
"""Initialize EqF
@ -536,8 +569,13 @@ class EqF:
self.__n_cal = n
self.__n_sensor = m
if not (isinstance(Sigma, np.ndarray) and (Sigma.shape[0] == Sigma.shape[1] == self.__dof)):
raise TypeError(f"Initial covariance has to be provided as a numpy array with shape ({self.__dof}, {self.__dof})")
if not (
isinstance(Sigma, np.ndarray)
and (Sigma.shape[0] == Sigma.shape[1] == self.__dof)
):
raise TypeError(
f"Initial covariance has to be provided as a numpy array with shape ({self.__dof}, {self.__dof})"
)
if not np.all(np.linalg.eigvals(Sigma) >= 0):
raise TypeError("Covariance matrix has to be semi-positive definite")
if not (isinstance(n, int) and n >= 0):
@ -548,8 +586,8 @@ class EqF:
self.__X_hat = G.identity(n)
self.__Sigma = Sigma
self.__xi_0 = State.identity(n)
self.__Dphi0 = stateActionDiff(self.__xi_0) # Within equation 23
self.__InnovationLift = np.linalg.pinv(self.__Dphi0) # Within equation 23
self.__Dphi0 = stateActionDiff(self.__xi_0) # Within equation 23
self.__InnovationLift = np.linalg.pinv(self.__Dphi0) # Within equation 23
def stateEstimate(self) -> State:
"""Return estimated state
@ -566,20 +604,24 @@ class EqF:
"""
if not isinstance(u, Input):
raise TypeError("angular velocity measurement has to be provided as a Input element")
raise TypeError(
"angular velocity measurement has to be provided as a Input element"
)
L = lift(self.stateEstimate(), u) # Equation 7
L = lift(self.stateEstimate(), u) # Equation 7
Phi_DT = self.__stateTransitionMatrix(u, dt) # Equation 17
Phi_DT = self.__stateTransitionMatrix(u, dt) # Equation 17
# Equivalent
# A0t = self.__stateMatrixA(u) # Equation 14a
# Phi_DT = expm(A0t * dt)
Bt = self.__inputMatrixBt() # Equation 27
M_DT = (Bt @ blockDiag(u.Sigma, repBlock(1e-9 * np.eye(3), self.__n_cal)) @ Bt.T) * dt
Bt = self.__inputMatrixBt() # Equation 27
M_DT = (
Bt @ blockDiag(u.Sigma, repBlock(1e-9 * np.eye(3), self.__n_cal)) @ Bt.T
) * dt
self.__X_hat = self.__X_hat * G.exp(L * dt) # Equation 18
self.__Sigma = Phi_DT @ self.__Sigma @ Phi_DT.T + M_DT # Equation 19
self.__X_hat = self.__X_hat * G.exp(L * dt) # Equation 18
self.__Sigma = Phi_DT @ self.__Sigma @ Phi_DT.T + M_DT # Equation 19
def update(self, y: Measurement):
"""Update the filter state
@ -588,16 +630,18 @@ class EqF:
"""
# Cross-check calibration
assert (y.cal_idx <= self.__n_cal)
assert y.cal_idx <= self.__n_cal
Ct = self.__measurementMatrixC(y.d, y.cal_idx) # Equation 14b
delta = Rot3.Hat(y.d.d.unitVector()) @ outputAction(self.__X_hat.inv(), y.y, y.cal_idx)
Ct = self.__measurementMatrixC(y.d, y.cal_idx) # Equation 14b
delta = Rot3.Hat(y.d.d.unitVector()) @ outputAction(
self.__X_hat.inv(), y.y, y.cal_idx
)
Dt = self.__outputMatrixDt(y.cal_idx)
S = Ct @ self.__Sigma @ Ct.T + Dt @ y.Sigma @ Dt.T # Equation 21
K = self.__Sigma @ Ct.T @ np.linalg.inv(S) # Equation 22
Delta = self.__InnovationLift @ K @ delta # Equation 23
self.__X_hat = G.exp(Delta) * self.__X_hat # Equation 24
self.__Sigma = (np.eye(self.__dof) - K @ Ct) @ self.__Sigma # Equation 25
S = Ct @ self.__Sigma @ Ct.T + Dt @ y.Sigma @ Dt.T # Equation 21
K = self.__Sigma @ Ct.T @ np.linalg.inv(S) # Equation 22
Delta = self.__InnovationLift @ K @ delta # Equation 23
self.__X_hat = G.exp(Delta) * self.__X_hat # Equation 24
self.__Sigma = (np.eye(self.__dof) - K @ Ct) @ self.__Sigma # Equation 25
def __stateMatrixA(self, u: Input) -> np.ndarray:
"""Return the state matrix A0t (Equation 14a)
@ -675,9 +719,11 @@ class EqF:
# If the measurement is related to a sensor that has a calibration state
if idx >= 0:
Cc[(3 * idx):(3 + 3 * idx), :] = Rot3.Hat(d.d.unitVector())
Cc[(3 * idx) : (3 + 3 * idx), :] = Rot3.Hat(d.d.unitVector())
return Rot3.Hat(d.d.unitVector()) @ np.hstack((Rot3.Hat(d.d.unitVector()), np.zeros((3, 3)), Cc))
return Rot3.Hat(d.d.unitVector()) @ np.hstack(
(Rot3.Hat(d.d.unitVector()), np.zeros((3, 3)), Cc)
)
def __outputMatrixDt(self, idx: int) -> np.ndarray:
"""Return the measurement output matrix Dt
@ -691,197 +737,3 @@ class EqF:
return self.__X_hat.B[idx].matrix()
else:
return self.__X_hat.A.matrix()
def formatCSV(df): # pass the dataframe in to this function and get "data_list" as an output
"""Read data from csv file formatted as follows:
| -------------------------------------------------------------------------------------------- |
| t: time |
| -------------------------------------------------------------------------------------------- |
| q_x, q_y, q_z, q_w: Quaternion representing the attitude |
| -------------------------------------------------------------------------------------------- |
| b_x, b_y, b_z: Gyro bias |
| -------------------------------------------------------------------------------------------- |
| cq_x_0, cq_yv, cq_z_0, cq_w_0: Quaternion representing the first calibration |
| ... |
| cq_x_n, cq_y_n, cq_z_n, cq_w_n: Quaternion representing the last calibration |
| -------------------------------------------------------------------------------------------- |
| w_x, w_y, w_z: Gyro measurements |
| -------------------------------------------------------------------------------------------- |
| std_w_x, std_w_x, std_w_z: Gyro measurements noise standard deviation |
| -------------------------------------------------------------------------------------------- |
| std_b_x, std_b_x, std_b_z: Gyro bias random walk standard deviation |
| -------------------------------------------------------------------------------------------- |
| y_x_0, y_y_0, y_z_0: Direction measurement in sensor frame associated to calibration state 0 |
| ... |
| y_x_n, y_y_n, y_z_n: Direction measurement in sensor frame associated to calibration state n |
| y_x_n+1, y_y_n+1, y_z_n+1: Direction measurement in sensor frame from calibrated sensor |
| ... |
| y_x_m, y_y_m, y_z_m: Direction measurement in sensor frame from calibrated sensor |
| -------------------------------------------------------------------------------------------- |
| std_y_x_0, std_y_y_0, std_y_z_0: Standard deviation of direction measurement y_0 |
| ... |
| std_y_x_m, std_y_y_m, std_y_z_m: Standard deviation of direction measurement y_m |
| -------------------------------------------------------------------------------------------- |
| d_x_0, d_y_0, d_z_0: Known direction in global frame associated to direction measurement 0 |
| ... |
| d_x_m, d_y_m, d_z_m: Known direction in global frame associated to direction measurement m |
| -------------------------------------------------------------------------------------------- |
NaN cell means that value is not present at that time
Max allowd n = 5
Max allowd m = 10
:param pname: path name
"""
# read .csv file into pandas dataframe
df = df.reset_index()
# Define data_list as list
data_list = []
last_timestamp = df.t[0]
# Check for existence of bias ground-truth into loaded data
bias_exist = False
if {'b_x', 'b_y', 'b_z'}.issubset(df.columns):
bias_exist = True
# Check for existence of calibration ground-truth (yaw, pitch, roll angles) into loaded data
cal_exist = False
n_cal = 0
for i in range(6):
if {'cq_x_' + str(i), 'cq_y_' + str(i), 'cq_z_' + str(i), 'cq_w_' + str(i)}.issubset(df.columns):
cal_exist = True
n_cal = i+1
# Check for existence of direction measurements
n_meas = 0
for i in range(11):
if {'y_x_' + str(i), 'y_y_' + str(i), 'y_z_' + str(i)}.issubset(df.columns):
n_meas = i + 1
for index, row in df.iterrows():
# Load timestamps and record dt
t = float(row['t'])
dt = t - last_timestamp
# Skip data_list if dt is smaller than a micro second
if dt < 1e-6:
continue
last_timestamp = t
# Load groundtruth values
quat = np.array([float(row['q_x']), float(row['q_y']), float(row['q_z']), float(row['q_w'])])
R = Rot3(gtsam.Rot3.Quaternion(float(row['q_w']), float(row['q_x']), float(row['q_y']), float(row['q_z'])).matrix())
# Load Gyro biases
if bias_exist:
b = np.array([float(row['b_x']), float(row['b_y']), float(row['b_z'])]).reshape(3,)
else:
b = np.zeros(3)
# Load GNSS calibration
S = []
if cal_exist:
for i in range(n_cal):
cal = np.array([float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)]), float(row['cq_w_' + str(i)])])
S.append(Rot3(gtsam.Rot3.Quaternion(float(row['cq_w_' + str(i)]), float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)])).matrix()))
# Load Gyro inputs
w = np.array([float(row['w_x']), float(row['w_y']), float(row['w_z'])]).reshape(3,)
std_w = np.array([float(row['std_w_x']), float(row['std_w_y']), float(row['std_w_z'])]).reshape(3,)
std_b = np.array([float(row['std_b_x']), float(row['std_b_y']), float(row['std_b_z'])]).reshape(3,)
Sigma_wb = blockDiag(np.eye(3) * (std_w ** 2), np.eye(3) * (std_b ** 2))
# Load measurements
meas = []
for i in range(n_meas):
y = np.array([float(row['y_x_' + str(i)]), float(row['y_y_' + str(i)]), float(row['y_z_' + str(i)])]).reshape(3,)
d = np.array([float(row['d_x_' + str(i)]), float(row['d_y_' + str(i)]), float(row['d_z_' + str(i)])]).reshape(3,)
std_y = np.array([float(row['std_y_x_' + str(i)]), float(row['std_y_y_' + str(i)]), float(row['std_y_z_' + str(i)])]).reshape(3,)
if i < n_cal:
meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), i))
else:
meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), -1))
# Append to data_list
d = Data(State(R, b, S), n_cal, Input(w, Sigma_wb), meas, n_meas, t, dt)
data_list.append(d)
return data_list
def sim(filter_args, data):
# Define progressbar
p = progressbar.ProgressBar()
# EqF
filter = EqF(*filter_args)
# Allocate variables
t = []
est = []
# Filter loop
for d in p(data):
t.append(d.t)
# Run filter
try:
filter.propagation(d.u, d.dt)
except:
print('Filter.propagation Error\n')
for y in d.y:
if not (np.isnan(np.linalg.norm(y.y.d.unitVector())) or np.isnan(np.linalg.norm(y.d.d.unitVector()))):
try:
filter.update(y)
except:
print('Filter.update Error\n')
est.append(filter.stateEstimate())
# Plot Attitude1
fig, (ax0, ax1, ax2) = plt.subplots(3, 1)
ax = [ax0, ax1, ax2]
for i in range(3):
ax[i].plot(t, [d.xi.R.rpy()[i] * 180 / math.pi for d in data], color="C0")
ax[i].plot(t, [xi.R.rpy()[i] * 180 / math.pi for xi in est], color="C1")
ax[i].set_xlabel("t")
ax0.set_title("Attitude: Roll")
ax1.set_title("Attitude: Pitch")
ax2.set_title("Attitude: Yaw")
plt.show()
# Plot bias
fig, (ax0, ax1, ax2) = plt.subplots(3, 1)
ax = [ax0, ax1, ax2]
for i in range(3):
ax[i].plot(t, [d.xi.b[i] for d in data], color="C0")
ax[i].plot(t, [xi.b[i] for xi in est], color="C1")
ax[i].set_xlabel("t")
ax0.set_title("Bias: x")
ax1.set_title("Bias: y")
ax2.set_title("Bias: z")
plt.show()
# Plot calibration states
for j in range(data[0].n_cal):
fig, (ax0, ax1, ax2) = plt.subplots(3, 1)
ax = [ax0, ax1, ax2]
for i in range(3):
ax[i].plot(t, [d.xi.S[j].rpy()[i] * 180 / math.pi for d in data], color="C0")
ax[i].plot(t, [xi.S[j].rpy()[i] * 180 / math.pi for xi in est], color="C1")
ax[i].set_xlabel("t")
ax0.set_title("Calibration: Roll")
ax1.set_title("Calibration: Pitch")
ax2.set_title("Calibration: Yaw")
plt.show()