diff --git a/python/gtsam/examples/EqF.ipynb b/python/gtsam/examples/EqF.ipynb index 79593e3fd..33d7f74da 100644 --- a/python/gtsam/examples/EqF.ipynb +++ b/python/gtsam/examples/EqF.ipynb @@ -16,7 +16,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 68, "metadata": {}, "outputs": [ { @@ -33,16 +33,228 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 69, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import pandas as pd\n", + "import progressbar\n", + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n", + "import math\n", "\n", - "from gtsam import findExampleDataFile\n", + "import gtsam\n", + "from gtsam import findExampleDataFile, Rot3\n", "\n", - "from EqF import formatCSV, blockDiag, repBlock, sim\n" + "from EqF import *\n" + ] + }, + { + "cell_type": "code", + "execution_count": 70, + "metadata": {}, + "outputs": [], + "source": [ + "def formatCSV(df): # pass the dataframe in to this function and get \"data_list\" as an output\n", + " \"\"\"Read data from csv file formatted as follows:\n", + "\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | t: time |\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | q_x, q_y, q_z, q_w: Quaternion representing the attitude |\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | b_x, b_y, b_z: Gyro bias |\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | cq_x_0, cq_yv, cq_z_0, cq_w_0: Quaternion representing the first calibration |\n", + " | ... |\n", + " | cq_x_n, cq_y_n, cq_z_n, cq_w_n: Quaternion representing the last calibration |\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | w_x, w_y, w_z: Gyro measurements |\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | std_w_x, std_w_x, std_w_z: Gyro measurements noise standard deviation |\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | std_b_x, std_b_x, std_b_z: Gyro bias random walk standard deviation |\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | y_x_0, y_y_0, y_z_0: Direction measurement in sensor frame associated to calibration state 0 |\n", + " | ... |\n", + " | y_x_n, y_y_n, y_z_n: Direction measurement in sensor frame associated to calibration state n |\n", + " | y_x_n+1, y_y_n+1, y_z_n+1: Direction measurement in sensor frame from calibrated sensor |\n", + " | ... |\n", + " | y_x_m, y_y_m, y_z_m: Direction measurement in sensor frame from calibrated sensor |\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | std_y_x_0, std_y_y_0, std_y_z_0: Standard deviation of direction measurement y_0 |\n", + " | ... |\n", + " | std_y_x_m, std_y_y_m, std_y_z_m: Standard deviation of direction measurement y_m |\n", + " | -------------------------------------------------------------------------------------------- |\n", + " | d_x_0, d_y_0, d_z_0: Known direction in global frame associated to direction measurement 0 |\n", + " | ... |\n", + " | d_x_m, d_y_m, d_z_m: Known direction in global frame associated to direction measurement m |\n", + " | -------------------------------------------------------------------------------------------- |\n", + "\n", + " NaN cell means that value is not present at that time\n", + "\n", + " Max allowd n = 5\n", + " Max allowd m = 10\n", + "\n", + " :param pname: path name\n", + " \"\"\"\n", + "\n", + " # read .csv file into pandas dataframe\n", + " df = df.reset_index()\n", + "\n", + " # Define data_list as list\n", + " data_list = []\n", + " last_timestamp = df.t[0]\n", + "\n", + " # Check for existence of bias ground-truth into loaded data\n", + " bias_exist = False\n", + " if {'b_x', 'b_y', 'b_z'}.issubset(df.columns):\n", + " bias_exist = True\n", + "\n", + " # Check for existence of calibration ground-truth (yaw, pitch, roll angles) into loaded data\n", + " cal_exist = False\n", + " n_cal = 0\n", + " for i in range(6):\n", + " if {'cq_x_' + str(i), 'cq_y_' + str(i), 'cq_z_' + str(i), 'cq_w_' + str(i)}.issubset(df.columns):\n", + " cal_exist = True\n", + " n_cal = i+1\n", + "\n", + " # Check for existence of direction measurements\n", + " n_meas = 0\n", + " for i in range(11):\n", + " if {'y_x_' + str(i), 'y_y_' + str(i), 'y_z_' + str(i)}.issubset(df.columns):\n", + " n_meas = i + 1\n", + "\n", + " for index, row in df.iterrows():\n", + "\n", + " # Load timestamps and record dt\n", + " t = float(row['t'])\n", + " dt = t - last_timestamp\n", + "\n", + " # Skip data_list if dt is smaller than a micro second\n", + " if dt < 1e-6:\n", + " continue\n", + "\n", + " last_timestamp = t\n", + "\n", + " # Load groundtruth values\n", + " quat = np.array([float(row['q_x']), float(row['q_y']), float(row['q_z']), float(row['q_w'])])\n", + "\n", + " R = Rot3(gtsam.Rot3.Quaternion(float(row['q_w']), float(row['q_x']), float(row['q_y']), float(row['q_z'])).matrix())\n", + "\n", + " # Load Gyro biases\n", + " if bias_exist:\n", + " b = np.array([float(row['b_x']), float(row['b_y']), float(row['b_z'])]).reshape(3,)\n", + " else:\n", + " b = np.zeros(3)\n", + "\n", + " # Load GNSS calibration\n", + " S = []\n", + " if cal_exist:\n", + " for i in range(n_cal):\n", + " cal = np.array([float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)]), float(row['cq_w_' + str(i)])])\n", + " S.append(Rot3(gtsam.Rot3.Quaternion(float(row['cq_w_' + str(i)]), float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)])).matrix()))\n", + "\n", + "\n", + " # Load Gyro inputs\n", + " w = np.array([float(row['w_x']), float(row['w_y']), float(row['w_z'])]).reshape(3,)\n", + " std_w = np.array([float(row['std_w_x']), float(row['std_w_y']), float(row['std_w_z'])]).reshape(3,)\n", + " std_b = np.array([float(row['std_b_x']), float(row['std_b_y']), float(row['std_b_z'])]).reshape(3,)\n", + " Sigma_wb = blockDiag(np.eye(3) * (std_w ** 2), np.eye(3) * (std_b ** 2))\n", + "\n", + " # Load measurements\n", + " meas = []\n", + " for i in range(n_meas):\n", + " y = np.array([float(row['y_x_' + str(i)]), float(row['y_y_' + str(i)]), float(row['y_z_' + str(i)])]).reshape(3,)\n", + " d = np.array([float(row['d_x_' + str(i)]), float(row['d_y_' + str(i)]), float(row['d_z_' + str(i)])]).reshape(3,)\n", + " std_y = np.array([float(row['std_y_x_' + str(i)]), float(row['std_y_y_' + str(i)]), float(row['std_y_z_' + str(i)])]).reshape(3,)\n", + " if i < n_cal:\n", + " meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), i))\n", + " else:\n", + " meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), -1))\n", + "\n", + " # Append to data_list\n", + " d = Data(State(R, b, S), n_cal, Input(w, Sigma_wb), meas, n_meas, t, dt)\n", + " \n", + " data_list.append(d)\n", + "\n", + " return data_list" + ] + }, + { + "cell_type": "code", + "execution_count": 71, + "metadata": {}, + "outputs": [], + "source": [ + "def sim(filter_args, data):\n", + "\n", + " # Define progressbar\n", + " p = progressbar.ProgressBar()\n", + "\n", + " # EqF\n", + " filter = EqF(*filter_args)\n", + "\n", + " # Allocate variables\n", + " t = []\n", + " est = []\n", + "\n", + " # Filter loop\n", + " for d in p(data):\n", + "\n", + " t.append(d.t)\n", + "\n", + " # Run filter\n", + " try:\n", + " filter.propagation(d.u, d.dt)\n", + " except:\n", + " print('Filter.propagation Error\\n')\n", + " for y in d.y:\n", + " if not (np.isnan(np.linalg.norm(y.y.d.unitVector())) or np.isnan(np.linalg.norm(y.d.d.unitVector()))):\n", + " try:\n", + " filter.update(y)\n", + " except:\n", + " print('Filter.update Error\\n')\n", + " est.append(filter.stateEstimate())\n", + "\n", + " # Plot Attitude1\n", + " fig, (ax0, ax1, ax2) = plt.subplots(3, 1)\n", + " ax = [ax0, ax1, ax2]\n", + " for i in range(3):\n", + " ax[i].plot(t, [d.xi.R.rpy()[i] * 180 / math.pi for d in data], color=\"C0\")\n", + " ax[i].plot(t, [xi.R.rpy()[i] * 180 / math.pi for xi in est], color=\"C1\")\n", + " ax[i].set_xlabel(\"t\")\n", + " ax0.set_title(\"Attitude: Roll\")\n", + " ax1.set_title(\"Attitude: Pitch\")\n", + " ax2.set_title(\"Attitude: Yaw\")\n", + " plt.show()\n", + "\n", + " # Plot bias\n", + " fig, (ax0, ax1, ax2) = plt.subplots(3, 1)\n", + " ax = [ax0, ax1, ax2]\n", + " for i in range(3):\n", + " ax[i].plot(t, [d.xi.b[i] for d in data], color=\"C0\")\n", + " ax[i].plot(t, [xi.b[i] for xi in est], color=\"C1\")\n", + " ax[i].set_xlabel(\"t\")\n", + " ax0.set_title(\"Bias: x\")\n", + " ax1.set_title(\"Bias: y\")\n", + " ax2.set_title(\"Bias: z\")\n", + " plt.show()\n", + "\n", + "\n", + " # Plot calibration states\n", + " for j in range(data[0].n_cal):\n", + " fig, (ax0, ax1, ax2) = plt.subplots(3, 1)\n", + " ax = [ax0, ax1, ax2]\n", + " for i in range(3):\n", + " ax[i].plot(t, [d.xi.S[j].rpy()[i] * 180 / math.pi for d in data], color=\"C0\")\n", + " ax[i].plot(t, [xi.S[j].rpy()[i] * 180 / math.pi for xi in est], color=\"C1\")\n", + " ax[i].set_xlabel(\"t\")\n", + " ax0.set_title(\"Calibration: Roll\")\n", + " ax1.set_title(\"Calibration: Pitch\")\n", + " ax2.set_title(\"Calibration: Yaw\")\n", + " plt.show()" ] }, { @@ -62,7 +274,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 72, "metadata": {}, "outputs": [ { diff --git a/python/gtsam/examples/EqF.py b/python/gtsam/examples/EqF.py index 2fdaf2199..4084f41f5 100644 --- a/python/gtsam/examples/EqF.py +++ b/python/gtsam/examples/EqF.py @@ -12,48 +12,21 @@ Authors: Jennifer Oum & Darshan Rajasekaran import numpy as np import gtsam from gtsam import Rot3, Unit3 - -import math from dataclasses import dataclass -import progressbar -import pandas as pd -import matplotlib.pyplot as plt from typing import List coordinate = "EXPONENTIAL" -def Rot3LeftJacobian(arr: np.ndarray) -> np.ndarray: - """Return the SO(3) Left Jacobian - - :param arr: A numpy array with size 3 - :return: The left Jacobian of SO(3) - """ - - if not (isinstance(arr, np.ndarray) and arr.size == 3): - raise ValueError("A numpy array with size 3 has to be provided") - - angle = np.linalg.norm(arr) - - # Near |phi|==0, use first order Taylor expansion - if np.isclose(angle, 0.): - return np.eye(3) + 0.5 * Rot3.Hat(arr) - - axis = arr / angle - s = np.sin(angle) - c = np.cos(angle) - - return (s / angle) * np.eye(3) + \ - (1 - (s / angle)) * np.outer(axis, axis) + \ - ((1 - c) / angle) * Rot3.Hat(axis) def checkNorm(x: np.ndarray, tol: float = 1e-3): - """Check norm of a vector being 1 or nan + """Check norm of a vector being 1 or nan + + :param x: A numpy array + :param tol: tollerance, default 1e-3 + :return: Boolean true if norm is 1 or nan + """ + return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x)) - :param x: A numpy array - :param tol: tollerance, default 1e-3 - :return: Boolean true if norm is 1 or nan - """ - return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x)) class State: """Define the state of the Biased Attitude System @@ -76,9 +49,14 @@ class State: b: np.ndarray # Sensor calibrations S - S: List[Rot3] + S: List[Rot3] - def __init__(self, R : Rot3 = Rot3.Identity(), b : np.ndarray = np.zeros(3), S: List[Rot3] = None): + def __init__( + self, + R: Rot3 = Rot3.Identity(), + b: np.ndarray = np.zeros(3), + S: List[Rot3] = None, + ): """Initialize State :param R: A SO3 element representing the attitude of the system as a rotation matrix @@ -89,11 +67,16 @@ class State: if not isinstance(R, gtsam.Rot3): - raise TypeError("the attitude rotation matrix R has to be of type SO3 but type is", type(R)) + raise TypeError( + "the attitude rotation matrix R has to be of type SO3 but type is", + type(R), + ) self.R = R if not (isinstance(b, np.ndarray) and b.size == 3): - raise TypeError("The gyroscope bias has to be probvided as numpy array with size 3") + raise TypeError( + "The gyroscope bias has to be probvided as numpy array with size 3" + ) self.b = b if S is None: @@ -103,7 +86,9 @@ class State: raise TypeError("Calibration states has to be provided as a list") for calibration in S: if not isinstance(calibration, Rot3): - raise TypeError("Elements of the list of calibration states have to be of type SO3") + raise TypeError( + "Elements of the list of calibration states have to be of type SO3" + ) self.S = S @staticmethod @@ -114,9 +99,9 @@ class State: :return: The identity element of the State """ - return State(Rot3.Identity(), - np.zeros(3), - [Rot3.Identity() for _ in range(n)]) + return State(Rot3.Identity(), np.zeros(3), [Rot3.Identity() for _ in range(n)]) + + class Input: """Define the input space of the Biased Attitude System ---------- @@ -139,9 +124,15 @@ class Input: """ if not (isinstance(w, np.ndarray) and w.size == 3): - raise TypeError("Angular velocity has to be provided as a numpy array with size 3") - if not (isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 6): - raise TypeError("Input measurement noise covariance has to be provided as a numpy array with shape (6. 6)") + raise TypeError( + "Angular velocity has to be provided as a numpy array with size 3" + ) + if not ( + isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 6 + ): + raise TypeError( + "Input measurement noise covariance has to be provided as a numpy array with shape (6. 6)" + ) if not np.all(np.linalg.eigvals(Sigma) >= 0): raise TypeError("Covariance matrix has to be semi-positive definite") @@ -149,7 +140,7 @@ class Input: self.Sigma = Sigma @staticmethod - def random() -> 'Input': + def random() -> "Input": """Return a random angular velocity :return: A random angular velocity as a Input element @@ -165,6 +156,7 @@ class Input: return Rot3.Hat(self.w) + class G: """Symmetry group (SO(3) |x so(3)) x SO(3) x ... x SO(3) ---------- @@ -181,7 +173,12 @@ class G: a: np.ndarray B: List[Rot3] - def __init__(self, A: Rot3 = Rot3.Identity(), a: np.ndarray = np.zeros((3, 3)), B: List[Rot3] = None): + def __init__( + self, + A: Rot3 = Rot3.Identity(), + a: np.ndarray = np.zeros((3, 3)), + B: List[Rot3] = None, + ): """Initialize the symmetry group G :param A: SO3 element @@ -203,21 +200,23 @@ class G: raise TypeError("Elements of B have to be of type SO3") self.B = B - def __mul__(self, other: 'G') -> 'G': + def __mul__(self, other: "G") -> "G": """Define the group operation :param other: G :return: A element of the group G given by the "multiplication" of self and other """ - assert (isinstance(other, G)) - assert (len(self.B) == len(other.B)) - return G(self.A * other.A, - self.a + Rot3.Hat(self.A.matrix() @ Rot3.Vee(other.a)), - [self.B[i]*other.B[i] for i in range(len(self.B))]) + assert isinstance(other, G) + assert len(self.B) == len(other.B) + return G( + self.A * other.A, + self.a + Rot3.Hat(self.A.matrix() @ Rot3.Vee(other.a)), + [self.B[i] * other.B[i] for i in range(len(self.B))], + ) @staticmethod - def identity(n : int): + def identity(n: int): """Return the identity of the symmetry group with n elements of SO3 related to sensor calibration states :param n: number of elements in list B associated with calibration states @@ -226,7 +225,28 @@ class G: return G(Rot3.Identity(), np.zeros((3, 3)), [Rot3.Identity() for _ in range(n)]) - def exp(x: np.ndarray) -> 'G': + @staticmethod + def Rot3LeftJacobian(arr: np.ndarray) -> np.ndarray: + """Return the SO(3) Left Jacobian + :param arr: A numpy array with size 3 + :return: The left Jacobian of SO(3) + """ + if not (isinstance(arr, np.ndarray) and arr.size == 3): + raise ValueError("A numpy array with size 3 has to be provided") + angle = np.linalg.norm(arr) + # Near |phi|==0, use first order Taylor expansion + if np.isclose(angle, 0.0): + return np.eye(3) + 0.5 * Rot3.Hat(arr) + axis = arr / angle + s = np.sin(angle) + c = np.cos(angle) + return ( + (s / angle) * np.eye(3) + + (1 - (s / angle)) * np.outer(axis, axis) + + ((1 - c) / angle) * Rot3.Hat(axis) + ) + + def exp(x: np.ndarray) -> "G": """Return a group element X given by X = exp(x) where x is a numpy array :param x: A numpy array @@ -234,24 +254,33 @@ class G: """ if not (isinstance(x, np.ndarray) and x.size >= 6): - raise ValueError("Wrong shape, a numpy array with size 3n has to be provided") + raise ValueError( + "Wrong shape, a numpy array with size 3n has to be provided" + ) if (x.size % 3) != 0: - raise ValueError("Wrong size, a numpy array with size multiple of 3 has to be provided") + raise ValueError( + "Wrong size, a numpy array with size multiple of 3 has to be provided" + ) n = int((x.size - 6) / 3) A = Rot3.Expmap(x[0:3]) - a = Rot3.Hat(Rot3LeftJacobian(x[0:3]) @ x[3:6]) - B = [Rot3.Expmap(x[(6 + 3 * i):(9 + 3 * i)]) for i in range(n)] + a = Rot3.Hat(G.Rot3LeftJacobian(x[0:3]) @ x[3:6]) + B = [Rot3.Expmap(x[(6 + 3 * i) : (9 + 3 * i)]) for i in range(n)] return G(A, a, B) - def inv(self) -> 'G': + def inv(self) -> "G": """Return the inverse element of the symmetry group :return: A element of the group G given by the inverse of self """ - return G(self.A.inverse(), -Rot3.Hat(self.A.inverse().matrix() @ Rot3.Vee(self.a)), [B.inverse() for B in self.B]) + return G( + self.A.inverse(), + -Rot3.Hat(self.A.inverse().matrix() @ Rot3.Vee(self.a)), + [B.inverse() for B in self.B], + ) + class Direction: """Define a direction as a S2 element""" @@ -269,7 +298,8 @@ class Direction: raise TypeError("Direction has to be provided as a 3 vector") self.d = Unit3(d) -def blockDiag(A : np.ndarray, B : np.ndarray) -> np.ndarray: + +def blockDiag(A: np.ndarray, B: np.ndarray) -> np.ndarray: """Create a lock diagonal matrix from blocks A and B :param A: numpy array @@ -282,9 +312,15 @@ def blockDiag(A : np.ndarray, B : np.ndarray) -> np.ndarray: elif B is None: return A else: - return np.block([[A, np.zeros((A.shape[0], B.shape[1]))],[np.zeros((B.shape[0], A.shape[1])), B]]) + return np.block( + [ + [A, np.zeros((A.shape[0], B.shape[1]))], + [np.zeros((B.shape[0], A.shape[1])), B], + ] + ) -def repBlock(A : np.ndarray, n: int) -> np.ndarray: + +def repBlock(A: np.ndarray, n: int) -> np.ndarray: """Create a block diagonal matrix repeating the A block n times :param A: numpy array representing the block A @@ -298,30 +334,6 @@ def repBlock(A : np.ndarray, n: int) -> np.ndarray: return res -def Rot3LeftJacobian(arr: np.ndarray) -> np.ndarray: - """Return the SO(3) Left Jacobian - - :param arr: A numpy array with size 3 - :return: The left Jacobian of SO(3) - """ - - if not (isinstance(arr, np.ndarray) and arr.size == 3): - raise ValueError("A numpy array with size 3 has to be provided") - - angle = np.linalg.norm(arr) - - # Near |phi|==0, use first order Taylor expansion - if np.isclose(angle, 0.): - return np.eye(3) + 0.5 * Rot3.Hat(arr) - - axis = arr / angle - s = np.sin(angle) - c = np.cos(angle) - - return (s / angle) * np.eye(3) + \ - (1 - (s / angle)) * np.outer(axis, axis) + \ - ((1 - c) / angle) * Rot3.Hat(axis) - def numericalDifferential(f, x) -> np.ndarray: """Compute the numerical derivative via central difference""" @@ -335,9 +347,10 @@ def numericalDifferential(f, x) -> np.ndarray: for j in range(m): ej = np.zeros(m) ej[j] = 1.0 - Df[:, j:j+1] = (f(x + h * ej) - f(x - h * ej)).reshape(m, 1) / (2*h) + Df[:, j : j + 1] = (f(x + h * ej) - f(x - h * ej)).reshape(m, 1) / (2 * h) return Df + def lift(xi: State, u: Input) -> np.ndarray: """The Lift of the system (Theorem 3.8, Equation 7) @@ -348,21 +361,23 @@ def lift(xi: State, u: Input) -> np.ndarray: n = len(xi.S) L = np.zeros(6 + 3 * n) - L[0:3] = (u.w - xi.b) + L[0:3] = u.w - xi.b L[3:6] = -u.W() @ xi.b for i in range(n): - L[(6 + 3 * i):(9 + 3 * i)] = xi.S[i].inverse().matrix() @ L[0:3] + L[(6 + 3 * i) : (9 + 3 * i)] = xi.S[i].inverse().matrix() @ L[0:3] return L -def checkNorm(x: np.ndarray, tol: float = 1e-3): - """Check norm of a vector being 1 or nan - :param x: A numpy array - :param tol: tollerance, default 1e-3 - :return: Boolean true if norm is 1 or nan - """ - return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x)) +def checkNorm(x: np.ndarray, tol: float = 1e-3): + """Check norm of a vector being 1 or nan + + :param x: A numpy array + :param tol: tollerance, default 1e-3 + :return: Boolean true if norm is 1 or nan + """ + return abs(np.linalg.norm(x) - 1) < tol or np.isnan(np.linalg.norm(x)) + def stateAction(X: G, xi: State) -> State: """Action of the symmetry group on the state space, return phi(X, xi) (Equation 4) @@ -373,11 +388,15 @@ def stateAction(X: G, xi: State) -> State: """ if len(xi.S) != len(X.B): - raise ValueError("the number of calibration states and B elements of the symmetry group has to match") + raise ValueError( + "the number of calibration states and B elements of the symmetry group has to match" + ) - return State(xi.R * X.A, - X.A.inverse().matrix() @ (xi.b - Rot3.Vee(X.a)), - [(X.A.inverse() * xi.S[i] * X.B[i]) for i in range(len(X.B))]) + return State( + xi.R * X.A, + X.A.inverse().matrix() @ (xi.b - Rot3.Vee(X.a)), + [(X.A.inverse() * xi.S[i] * X.B[i]) for i in range(len(X.B))], + ) def velocityAction(X: G, u: Input) -> Input: @@ -388,7 +407,7 @@ def velocityAction(X: G, u: Input) -> Input: :return: A new element of the Input given by the action of psi of G in the Input space """ - return Input(X.A.inverse().matrix() @ (u.w - Rot3.Vee(X.a)), u.Sigma) + return Input(X.A.inverse().matrix() @ (u.w - Rot3.Vee(X.a)), u.Sigma) def outputAction(X: G, y: Direction, idx: int = -1) -> np.ndarray: @@ -415,7 +434,15 @@ def local_coords(e: State) -> np.ndarray: if coordinate == "EXPONENTIAL": tmp = [Rot3.Logmap(S) for S in e.S] - eps = np.concatenate((Rot3.Logmap(e.R), e.b, np.asarray(tmp).reshape(3 * len(tmp),))) + eps = np.concatenate( + ( + Rot3.Logmap(e.R), + e.b, + np.asarray(tmp).reshape( + 3 * len(tmp), + ), + ) + ) elif coordinate == "NORMAL": raise ValueError("Normal coordinate representation is not implemented yet") # X = G(e.R, -SO3.Rot3.Hat(e.R @ e.b), e.S) @@ -433,9 +460,9 @@ def local_coords_inv(eps: np.ndarray) -> "State": :return: Local coordinates inverse assuming __xi_0 = identity """ - X = G.exp(eps) #G + X = G.exp(eps) # G if coordinate == "EXPONENTIAL": - e = State(X.A, eps[3:6, :], X.B) #State + e = State(X.A, eps[3:6, :], X.B) # State elif coordinate == "NORMAL": raise ValueError("Normal coordinate representation is not implemented yet") # stateAction(X, State(SO3.identity(), np.zeros(3), [SO3.identity() for _ in range(len(X.B))])) @@ -455,6 +482,7 @@ def stateActionDiff(xi: State) -> np.ndarray: differential = numericalDifferential(coordsAction, np.zeros(6 + 3 * len(xi.S))) return differential + class Measurement: """Define a measurement ---------- @@ -491,8 +519,12 @@ class Measurement: raise TypeError("Measurement has to be provided as a (3, 1) vector") if not (isinstance(d, np.ndarray) and d.size == 3 and checkNorm(d)): raise TypeError("Direction has to be provided as a (3, 1) vector") - if not (isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 3): - raise TypeError("Direction measurement noise covariance has to be provided as a numpy array with shape (3. 3)") + if not ( + isinstance(Sigma, np.ndarray) and Sigma.shape[0] == Sigma.shape[1] == 3 + ): + raise TypeError( + "Direction measurement noise covariance has to be provided as a numpy array with shape (3. 3)" + ) if not np.all(np.linalg.eigvals(Sigma) >= 0): raise TypeError("Covariance matrix has to be semi-positive definite") if not (isinstance(i, int) or i == -1 or i > 0): @@ -503,6 +535,7 @@ class Measurement: self.Sigma = Sigma self.cal_idx = i + @dataclass class Data: """Define ground-truth, input and output data""" @@ -522,8 +555,8 @@ class Data: t: float dt: float -class EqF: +class EqF: def __init__(self, Sigma: np.ndarray, n: int, m: int): """Initialize EqF @@ -536,8 +569,13 @@ class EqF: self.__n_cal = n self.__n_sensor = m - if not (isinstance(Sigma, np.ndarray) and (Sigma.shape[0] == Sigma.shape[1] == self.__dof)): - raise TypeError(f"Initial covariance has to be provided as a numpy array with shape ({self.__dof}, {self.__dof})") + if not ( + isinstance(Sigma, np.ndarray) + and (Sigma.shape[0] == Sigma.shape[1] == self.__dof) + ): + raise TypeError( + f"Initial covariance has to be provided as a numpy array with shape ({self.__dof}, {self.__dof})" + ) if not np.all(np.linalg.eigvals(Sigma) >= 0): raise TypeError("Covariance matrix has to be semi-positive definite") if not (isinstance(n, int) and n >= 0): @@ -548,8 +586,8 @@ class EqF: self.__X_hat = G.identity(n) self.__Sigma = Sigma self.__xi_0 = State.identity(n) - self.__Dphi0 = stateActionDiff(self.__xi_0) # Within equation 23 - self.__InnovationLift = np.linalg.pinv(self.__Dphi0) # Within equation 23 + self.__Dphi0 = stateActionDiff(self.__xi_0) # Within equation 23 + self.__InnovationLift = np.linalg.pinv(self.__Dphi0) # Within equation 23 def stateEstimate(self) -> State: """Return estimated state @@ -566,20 +604,24 @@ class EqF: """ if not isinstance(u, Input): - raise TypeError("angular velocity measurement has to be provided as a Input element") + raise TypeError( + "angular velocity measurement has to be provided as a Input element" + ) - L = lift(self.stateEstimate(), u) # Equation 7 + L = lift(self.stateEstimate(), u) # Equation 7 - Phi_DT = self.__stateTransitionMatrix(u, dt) # Equation 17 + Phi_DT = self.__stateTransitionMatrix(u, dt) # Equation 17 # Equivalent # A0t = self.__stateMatrixA(u) # Equation 14a # Phi_DT = expm(A0t * dt) - Bt = self.__inputMatrixBt() # Equation 27 - M_DT = (Bt @ blockDiag(u.Sigma, repBlock(1e-9 * np.eye(3), self.__n_cal)) @ Bt.T) * dt + Bt = self.__inputMatrixBt() # Equation 27 + M_DT = ( + Bt @ blockDiag(u.Sigma, repBlock(1e-9 * np.eye(3), self.__n_cal)) @ Bt.T + ) * dt - self.__X_hat = self.__X_hat * G.exp(L * dt) # Equation 18 - self.__Sigma = Phi_DT @ self.__Sigma @ Phi_DT.T + M_DT # Equation 19 + self.__X_hat = self.__X_hat * G.exp(L * dt) # Equation 18 + self.__Sigma = Phi_DT @ self.__Sigma @ Phi_DT.T + M_DT # Equation 19 def update(self, y: Measurement): """Update the filter state @@ -588,16 +630,18 @@ class EqF: """ # Cross-check calibration - assert (y.cal_idx <= self.__n_cal) + assert y.cal_idx <= self.__n_cal - Ct = self.__measurementMatrixC(y.d, y.cal_idx) # Equation 14b - delta = Rot3.Hat(y.d.d.unitVector()) @ outputAction(self.__X_hat.inv(), y.y, y.cal_idx) + Ct = self.__measurementMatrixC(y.d, y.cal_idx) # Equation 14b + delta = Rot3.Hat(y.d.d.unitVector()) @ outputAction( + self.__X_hat.inv(), y.y, y.cal_idx + ) Dt = self.__outputMatrixDt(y.cal_idx) - S = Ct @ self.__Sigma @ Ct.T + Dt @ y.Sigma @ Dt.T # Equation 21 - K = self.__Sigma @ Ct.T @ np.linalg.inv(S) # Equation 22 - Delta = self.__InnovationLift @ K @ delta # Equation 23 - self.__X_hat = G.exp(Delta) * self.__X_hat # Equation 24 - self.__Sigma = (np.eye(self.__dof) - K @ Ct) @ self.__Sigma # Equation 25 + S = Ct @ self.__Sigma @ Ct.T + Dt @ y.Sigma @ Dt.T # Equation 21 + K = self.__Sigma @ Ct.T @ np.linalg.inv(S) # Equation 22 + Delta = self.__InnovationLift @ K @ delta # Equation 23 + self.__X_hat = G.exp(Delta) * self.__X_hat # Equation 24 + self.__Sigma = (np.eye(self.__dof) - K @ Ct) @ self.__Sigma # Equation 25 def __stateMatrixA(self, u: Input) -> np.ndarray: """Return the state matrix A0t (Equation 14a) @@ -675,9 +719,11 @@ class EqF: # If the measurement is related to a sensor that has a calibration state if idx >= 0: - Cc[(3 * idx):(3 + 3 * idx), :] = Rot3.Hat(d.d.unitVector()) + Cc[(3 * idx) : (3 + 3 * idx), :] = Rot3.Hat(d.d.unitVector()) - return Rot3.Hat(d.d.unitVector()) @ np.hstack((Rot3.Hat(d.d.unitVector()), np.zeros((3, 3)), Cc)) + return Rot3.Hat(d.d.unitVector()) @ np.hstack( + (Rot3.Hat(d.d.unitVector()), np.zeros((3, 3)), Cc) + ) def __outputMatrixDt(self, idx: int) -> np.ndarray: """Return the measurement output matrix Dt @@ -691,197 +737,3 @@ class EqF: return self.__X_hat.B[idx].matrix() else: return self.__X_hat.A.matrix() - -def formatCSV(df): # pass the dataframe in to this function and get "data_list" as an output - """Read data from csv file formatted as follows: - - | -------------------------------------------------------------------------------------------- | - | t: time | - | -------------------------------------------------------------------------------------------- | - | q_x, q_y, q_z, q_w: Quaternion representing the attitude | - | -------------------------------------------------------------------------------------------- | - | b_x, b_y, b_z: Gyro bias | - | -------------------------------------------------------------------------------------------- | - | cq_x_0, cq_yv, cq_z_0, cq_w_0: Quaternion representing the first calibration | - | ... | - | cq_x_n, cq_y_n, cq_z_n, cq_w_n: Quaternion representing the last calibration | - | -------------------------------------------------------------------------------------------- | - | w_x, w_y, w_z: Gyro measurements | - | -------------------------------------------------------------------------------------------- | - | std_w_x, std_w_x, std_w_z: Gyro measurements noise standard deviation | - | -------------------------------------------------------------------------------------------- | - | std_b_x, std_b_x, std_b_z: Gyro bias random walk standard deviation | - | -------------------------------------------------------------------------------------------- | - | y_x_0, y_y_0, y_z_0: Direction measurement in sensor frame associated to calibration state 0 | - | ... | - | y_x_n, y_y_n, y_z_n: Direction measurement in sensor frame associated to calibration state n | - | y_x_n+1, y_y_n+1, y_z_n+1: Direction measurement in sensor frame from calibrated sensor | - | ... | - | y_x_m, y_y_m, y_z_m: Direction measurement in sensor frame from calibrated sensor | - | -------------------------------------------------------------------------------------------- | - | std_y_x_0, std_y_y_0, std_y_z_0: Standard deviation of direction measurement y_0 | - | ... | - | std_y_x_m, std_y_y_m, std_y_z_m: Standard deviation of direction measurement y_m | - | -------------------------------------------------------------------------------------------- | - | d_x_0, d_y_0, d_z_0: Known direction in global frame associated to direction measurement 0 | - | ... | - | d_x_m, d_y_m, d_z_m: Known direction in global frame associated to direction measurement m | - | -------------------------------------------------------------------------------------------- | - - NaN cell means that value is not present at that time - - Max allowd n = 5 - Max allowd m = 10 - - :param pname: path name - """ - - # read .csv file into pandas dataframe - df = df.reset_index() - - # Define data_list as list - data_list = [] - last_timestamp = df.t[0] - - # Check for existence of bias ground-truth into loaded data - bias_exist = False - if {'b_x', 'b_y', 'b_z'}.issubset(df.columns): - bias_exist = True - - # Check for existence of calibration ground-truth (yaw, pitch, roll angles) into loaded data - cal_exist = False - n_cal = 0 - for i in range(6): - if {'cq_x_' + str(i), 'cq_y_' + str(i), 'cq_z_' + str(i), 'cq_w_' + str(i)}.issubset(df.columns): - cal_exist = True - n_cal = i+1 - - # Check for existence of direction measurements - n_meas = 0 - for i in range(11): - if {'y_x_' + str(i), 'y_y_' + str(i), 'y_z_' + str(i)}.issubset(df.columns): - n_meas = i + 1 - - for index, row in df.iterrows(): - - # Load timestamps and record dt - t = float(row['t']) - dt = t - last_timestamp - - # Skip data_list if dt is smaller than a micro second - if dt < 1e-6: - continue - - last_timestamp = t - - # Load groundtruth values - quat = np.array([float(row['q_x']), float(row['q_y']), float(row['q_z']), float(row['q_w'])]) - - R = Rot3(gtsam.Rot3.Quaternion(float(row['q_w']), float(row['q_x']), float(row['q_y']), float(row['q_z'])).matrix()) - - # Load Gyro biases - if bias_exist: - b = np.array([float(row['b_x']), float(row['b_y']), float(row['b_z'])]).reshape(3,) - else: - b = np.zeros(3) - - # Load GNSS calibration - S = [] - if cal_exist: - for i in range(n_cal): - cal = np.array([float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)]), float(row['cq_w_' + str(i)])]) - S.append(Rot3(gtsam.Rot3.Quaternion(float(row['cq_w_' + str(i)]), float(row['cq_x_' + str(i)]), float(row['cq_y_' + str(i)]), float(row['cq_z_' + str(i)])).matrix())) - - - # Load Gyro inputs - w = np.array([float(row['w_x']), float(row['w_y']), float(row['w_z'])]).reshape(3,) - std_w = np.array([float(row['std_w_x']), float(row['std_w_y']), float(row['std_w_z'])]).reshape(3,) - std_b = np.array([float(row['std_b_x']), float(row['std_b_y']), float(row['std_b_z'])]).reshape(3,) - Sigma_wb = blockDiag(np.eye(3) * (std_w ** 2), np.eye(3) * (std_b ** 2)) - - # Load measurements - meas = [] - for i in range(n_meas): - y = np.array([float(row['y_x_' + str(i)]), float(row['y_y_' + str(i)]), float(row['y_z_' + str(i)])]).reshape(3,) - d = np.array([float(row['d_x_' + str(i)]), float(row['d_y_' + str(i)]), float(row['d_z_' + str(i)])]).reshape(3,) - std_y = np.array([float(row['std_y_x_' + str(i)]), float(row['std_y_y_' + str(i)]), float(row['std_y_z_' + str(i)])]).reshape(3,) - if i < n_cal: - meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), i)) - else: - meas.append(Measurement(y, d, np.eye(3) * (std_y ** 2), -1)) - - # Append to data_list - d = Data(State(R, b, S), n_cal, Input(w, Sigma_wb), meas, n_meas, t, dt) - - data_list.append(d) - - return data_list - - -def sim(filter_args, data): - - # Define progressbar - p = progressbar.ProgressBar() - - # EqF - filter = EqF(*filter_args) - - # Allocate variables - t = [] - est = [] - - # Filter loop - for d in p(data): - - t.append(d.t) - - # Run filter - try: - filter.propagation(d.u, d.dt) - except: - print('Filter.propagation Error\n') - for y in d.y: - if not (np.isnan(np.linalg.norm(y.y.d.unitVector())) or np.isnan(np.linalg.norm(y.d.d.unitVector()))): - try: - filter.update(y) - except: - print('Filter.update Error\n') - est.append(filter.stateEstimate()) - - # Plot Attitude1 - fig, (ax0, ax1, ax2) = plt.subplots(3, 1) - ax = [ax0, ax1, ax2] - for i in range(3): - ax[i].plot(t, [d.xi.R.rpy()[i] * 180 / math.pi for d in data], color="C0") - ax[i].plot(t, [xi.R.rpy()[i] * 180 / math.pi for xi in est], color="C1") - ax[i].set_xlabel("t") - ax0.set_title("Attitude: Roll") - ax1.set_title("Attitude: Pitch") - ax2.set_title("Attitude: Yaw") - plt.show() - - # Plot bias - fig, (ax0, ax1, ax2) = plt.subplots(3, 1) - ax = [ax0, ax1, ax2] - for i in range(3): - ax[i].plot(t, [d.xi.b[i] for d in data], color="C0") - ax[i].plot(t, [xi.b[i] for xi in est], color="C1") - ax[i].set_xlabel("t") - ax0.set_title("Bias: x") - ax1.set_title("Bias: y") - ax2.set_title("Bias: z") - plt.show() - - - # Plot calibration states - for j in range(data[0].n_cal): - fig, (ax0, ax1, ax2) = plt.subplots(3, 1) - ax = [ax0, ax1, ax2] - for i in range(3): - ax[i].plot(t, [d.xi.S[j].rpy()[i] * 180 / math.pi for d in data], color="C0") - ax[i].plot(t, [xi.S[j].rpy()[i] * 180 / math.pi for xi in est], color="C1") - ax[i].set_xlabel("t") - ax0.set_title("Calibration: Roll") - ax1.set_title("Calibration: Pitch") - ax2.set_title("Calibration: Yaw") - plt.show()