307 lines
11 KiB
Python
307 lines
11 KiB
Python
import numpy as np
|
|
import cvxpy as cp
|
|
import osqp
|
|
import scipy as sp
|
|
from scipy import sparse
|
|
|
|
##################
|
|
# MPC Controller #
|
|
##################
|
|
|
|
class MPC:
|
|
def __init__(self, model, N, Q, R, QN, StateConstraints, InputConstraints):
|
|
"""
|
|
Constructor for the Model Predictive Controller.
|
|
:param model: bicycle model object to be controlled
|
|
:param T: time horizon | int
|
|
:param Q: state cost matrix
|
|
:param R: input cost matrix
|
|
:param QN: final state cost matrix
|
|
:param StateConstraints: dictionary of state constraints
|
|
:param InputConstraints: dictionary of input constraints
|
|
:param Reference: reference values for state variables
|
|
"""
|
|
|
|
# Parameters
|
|
self.N = N # horizon
|
|
self.Q = Q # weight matrix state vector
|
|
self.R = R # weight matrix input vector
|
|
self.QN = QN # weight matrix terminal
|
|
|
|
# Model
|
|
self.model = model
|
|
|
|
# Constraints
|
|
self.state_constraints = StateConstraints
|
|
self.input_constraints = InputConstraints
|
|
|
|
# Current control and prediction
|
|
self.current_prediction = None
|
|
|
|
# Initialize Optimization Problem
|
|
self.problem = self._init_problem()
|
|
|
|
def _init_problem(self):
|
|
"""
|
|
Initialize parametrized optimization problem to be solved at each
|
|
time step.
|
|
"""
|
|
|
|
# number of input and state variables
|
|
nx = self.model.n_states
|
|
nu = 1
|
|
|
|
# system matrices
|
|
self.A = cp.Parameter(shape=(nx, nx*self.N))
|
|
self.B = cp.Parameter(shape=(nx, nu*self.N))
|
|
self.A.value = np.zeros(self.A.shape)
|
|
self.B.value = np.zeros(self.B.shape)
|
|
|
|
# reference values
|
|
xr = np.array([0., 0., -1.0])
|
|
self.ur = cp.Parameter((nu, self.N))
|
|
self.ur.value = np.zeros(self.ur.shape)
|
|
|
|
# constraints
|
|
umin = self.input_constraints['umin']
|
|
umax = self.input_constraints['umax']
|
|
xmin = self.state_constraints['xmin']
|
|
xmax = self.state_constraints['xmax']
|
|
|
|
# initial state
|
|
self.x_init = cp.Parameter(self.model.n_states)
|
|
|
|
# Define problem
|
|
self.u = cp.Variable((nu, self.N))
|
|
self.x = cp.Variable((nx, self.N + 1))
|
|
objective = 0
|
|
constraints = [self.x[:, 0] == self.x_init]
|
|
for n in range(self.N):
|
|
objective += cp.quad_form(self.x[:, n] - xr, self.Q) + cp.quad_form(self.u[:, n] - self.ur[:, n], self.R)
|
|
constraints += [self.x[:, n + 1] == self.A[:, n*nx:n*nx+nx] * self.x[:, n]
|
|
+ self.B[:, n*nu] * (self.u[:, n] - self.ur[:, n])]
|
|
constraints += [umin <= self.u[:, n], self.u[:, n] <= umax]
|
|
objective += cp.quad_form(self.x[:, self.N] - xr, self.QN)
|
|
constraints += [xmin <= self.x[:, self.N], self.x[:, self.N] <= xmax]
|
|
problem = cp.Problem(cp.Minimize(objective), constraints)
|
|
|
|
return problem
|
|
|
|
def get_control(self, v):
|
|
"""
|
|
Get control signal given the current position of the car. Solves a
|
|
finite time optimization problem based on the linearized car model.
|
|
"""
|
|
|
|
nx = self.model.n_states
|
|
nu = 1
|
|
|
|
for n in range(self.N):
|
|
current_waypoint = self.model.reference_path.waypoints[self.model.wp_id+n]
|
|
next_waypoint = self.model.reference_path.waypoints[
|
|
self.model.wp_id + n + 1]
|
|
delta_s = next_waypoint - current_waypoint
|
|
kappa_r = current_waypoint.kappa
|
|
self.A.value[:, n*nx:n*nx+nx], self.B.value[:, n*nu:n*nu+nu] = self.model.linearize(v, kappa_r, delta_s)
|
|
self.ur.value[:, n] = kappa_r
|
|
|
|
self.x_init.value = np.array(self.model.spatial_state[:])
|
|
self.problem.solve(solver=cp.OSQP, verbose=True)
|
|
|
|
self.current_prediction = self.update_prediction(self.x.value)
|
|
delta = np.arctan(self.u.value[0, 0] * self.model.l)
|
|
|
|
return delta
|
|
|
|
def update_prediction(self, spatial_state_prediction):
|
|
"""
|
|
Transform the predicted states to predicted x and y coordinates.
|
|
Mainly for visualization purposes.
|
|
:param spatial_state_prediction: list of predicted state variables
|
|
:return: lists of predicted x and y coordinates
|
|
"""
|
|
|
|
# containers for x and y coordinates of predicted states
|
|
x_pred, y_pred = [], []
|
|
|
|
# get current waypoint ID
|
|
print('#########################')
|
|
|
|
for n in range(self.N):
|
|
associated_waypoint = self.model.reference_path.waypoints[self.model.wp_id+n]
|
|
predicted_temporal_state = self.model.s2t(associated_waypoint,
|
|
spatial_state_prediction[:, n])
|
|
print('delta: ', np.arctan(self.u.value[0, n] * self.model.l))
|
|
print('e_y: ', spatial_state_prediction[0, n])
|
|
print('e_psi: ', spatial_state_prediction[1, n])
|
|
print('t: ', spatial_state_prediction[2, n])
|
|
print('+++++++++++++++++++++++')
|
|
|
|
x_pred.append(predicted_temporal_state.x)
|
|
y_pred.append(predicted_temporal_state.y)
|
|
|
|
return x_pred, y_pred
|
|
|
|
|
|
class MPC_OSQP:
|
|
def __init__(self, model, N, Q, R, QN, StateConstraints, InputConstraints):
|
|
"""
|
|
Constructor for the Model Predictive Controller.
|
|
:param model: bicycle model object to be controlled
|
|
:param T: time horizon | int
|
|
:param Q: state cost matrix
|
|
:param R: input cost matrix
|
|
:param QN: final state cost matrix
|
|
:param StateConstraints: dictionary of state constraints
|
|
:param InputConstraints: dictionary of input constraints
|
|
:param Reference: reference values for state variables
|
|
"""
|
|
|
|
# Parameters
|
|
self.N = N # horizon
|
|
self.Q = Q # weight matrix state vector
|
|
self.R = R # weight matrix input vector
|
|
self.QN = QN # weight matrix terminal
|
|
|
|
# Model
|
|
self.model = model
|
|
|
|
# Constraints
|
|
self.state_constraints = StateConstraints
|
|
self.input_constraints = InputConstraints
|
|
|
|
# Current control and prediction
|
|
self.current_prediction = None
|
|
|
|
# Initialize Optimization Problem
|
|
self.optimizer = osqp.OSQP()
|
|
|
|
def _init_problem(self, v):
|
|
"""
|
|
Initialize optimization problem for current time step.
|
|
"""
|
|
|
|
# Number of state and input variables
|
|
nx = self.model.n_states
|
|
nu = 1
|
|
|
|
# Constraints
|
|
umin = self.input_constraints['umin']
|
|
umax = self.input_constraints['umax']
|
|
xmin = self.state_constraints['xmin']
|
|
xmax = self.state_constraints['xmax']
|
|
|
|
# LTV System Matrices
|
|
A = np.zeros((nx * (self.N + 1), nx * (self.N + 1)))
|
|
B = np.zeros((nx * (self.N + 1), nu * (self.N)))
|
|
# Reference vector for state and input variables
|
|
ur = np.zeros(self.N)
|
|
xr = np.array([0.0, 0.0, -1.0])
|
|
# Offset for equality constraint (due to B * (u - ur))
|
|
uq = np.zeros(self.N * nx)
|
|
|
|
# Iterate over horizon
|
|
for n in range(self.N):
|
|
|
|
# Get information about current waypoint
|
|
current_waypoint = self.model.reference_path.waypoints[
|
|
self.model.wp_id + n]
|
|
next_waypoint = self.model.reference_path.waypoints[
|
|
self.model.wp_id + n + 1]
|
|
delta_s = next_waypoint - current_waypoint
|
|
kappa_r = current_waypoint.kappa
|
|
|
|
# Compute LTV matrices
|
|
A_lin, B_lin = self.model.linearize(v, kappa_r, delta_s)
|
|
A[nx + n * nx:n * nx + 2 * nx, n * nx:n * nx + nx] = A_lin
|
|
B[nx + n * nx:n * nx + 2 * nx, n * nu:n * nu + nu] = B_lin
|
|
|
|
# Set kappa_r to reference for input signal
|
|
ur[n] = kappa_r
|
|
# Compute equality constraint offset (B*ur)
|
|
uq[n * nx:n * nx + nx] = B_lin[:, 0] * kappa_r
|
|
|
|
# Get equality matrix
|
|
Ax = sparse.kron(sparse.eye(self.N + 1),
|
|
-sparse.eye(nx)) + sparse.csc_matrix(A)
|
|
Bu = sparse.csc_matrix(B)
|
|
Aeq = sparse.hstack([Ax, Bu])
|
|
# Get inequality matrix
|
|
Aineq = sparse.eye((self.N + 1) * nx + self.N * nu)
|
|
# Combine constraint matrices
|
|
A = sparse.vstack([Aeq, Aineq], format='csc')
|
|
|
|
# Get upper and lower bound vectors for equality constraints
|
|
lineq = np.hstack([np.kron(np.ones(self.N + 1), xmin),
|
|
np.kron(np.ones(self.N), umin)])
|
|
uineq = np.hstack([np.kron(np.ones(self.N + 1), xmax),
|
|
np.kron(np.ones(self.N), umax)])
|
|
# Get upper and lower bound vectors for inequality constraints
|
|
x0 = np.array(self.model.spatial_state[:])
|
|
leq = np.hstack([-x0, uq])
|
|
ueq = leq
|
|
# Combine upper and lower bound vectors
|
|
l = np.hstack([leq, lineq])
|
|
u = np.hstack([ueq, uineq])
|
|
|
|
# Set cost matrices
|
|
P = sparse.block_diag([sparse.kron(sparse.eye(self.N), self.Q), self.QN,
|
|
sparse.kron(sparse.eye(self.N), self.R)], format='csc')
|
|
q = np.hstack(
|
|
[np.kron(np.ones(self.N), -self.Q.dot(xr)), -self.QN.dot(xr),
|
|
-self.R.A[0, 0] * ur])
|
|
|
|
# Initialize optimizer
|
|
self.optimizer = osqp.OSQP()
|
|
self.optimizer.setup(P=P, q=q, A=A, l=l, u=u, verbose=False)
|
|
|
|
def get_control(self, v):
|
|
"""
|
|
Get control signal given the current position of the car. Solves a
|
|
finite time optimization problem based on the linearized car model.
|
|
"""
|
|
|
|
# Number of state variables
|
|
nx = self.model.n_states
|
|
|
|
# Initialize optimization problem
|
|
self._init_problem(v)
|
|
|
|
# Solve optimization problem
|
|
dec = self.optimizer.solve()
|
|
x = np.reshape(dec.x[:(self.N+1)*nx], (self.N+1, nx))
|
|
u = np.arctan(dec.x[-self.N] * self.model.l)
|
|
self.current_prediction = self.update_prediction(u, x)
|
|
|
|
return u
|
|
|
|
def update_prediction(self, u, spatial_state_prediction):
|
|
"""
|
|
Transform the predicted states to predicted x and y coordinates.
|
|
Mainly for visualization purposes.
|
|
:param spatial_state_prediction: list of predicted state variables
|
|
:return: lists of predicted x and y coordinates
|
|
"""
|
|
|
|
# containers for x and y coordinates of predicted states
|
|
x_pred, y_pred = [], []
|
|
|
|
# get current waypoint ID
|
|
print('#########################')
|
|
|
|
for n in range(self.N):
|
|
associated_waypoint = self.model.reference_path.waypoints[self.model.wp_id+n]
|
|
predicted_temporal_state = self.model.s2t(associated_waypoint,
|
|
spatial_state_prediction[n, :])
|
|
print('delta: ', u)
|
|
print('e_y: ', spatial_state_prediction[n, 0])
|
|
print('e_psi: ', spatial_state_prediction[n, 1])
|
|
print('t: ', spatial_state_prediction[n, 2])
|
|
print('+++++++++++++++++++++++')
|
|
|
|
x_pred.append(predicted_temporal_state.x)
|
|
y_pred.append(predicted_temporal_state.y)
|
|
|
|
return x_pred, y_pred
|
|
|