2019-11-23 23:46:14 +08:00
|
|
|
import numpy as np
|
|
|
|
import cvxpy as cp
|
|
|
|
|
2019-11-25 16:07:02 +08:00
|
|
|
|
2019-11-23 23:46:14 +08:00
|
|
|
##################
|
|
|
|
# MPC Controller #
|
|
|
|
##################
|
|
|
|
|
|
|
|
class MPC:
|
2019-11-25 16:07:02 +08:00
|
|
|
def __init__(self, model, T, Q, R, Qf, StateConstraints, InputConstraints,
|
|
|
|
Reference):
|
|
|
|
"""
|
|
|
|
Constructor for the Model Predictive Controller.
|
|
|
|
:param model: bicycle model object to be controlled
|
|
|
|
:param T: time horizon | int
|
|
|
|
:param Q: state cost matrix
|
|
|
|
:param R: input cost matrix
|
|
|
|
:param Qf: final state cost matrix
|
|
|
|
:param StateConstraints: dictionary of state constraints
|
|
|
|
:param InputConstraints: dictionary of input constraints
|
|
|
|
:param Reference: reference values for state variables
|
|
|
|
"""
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# Parameters
|
|
|
|
self.T = T # horizon
|
|
|
|
self.Q = Q # weight matrix state vector
|
|
|
|
self.R = R # weight matrix input vector
|
|
|
|
self.Qf = Qf # weight matrix terminal
|
|
|
|
|
|
|
|
# Model
|
|
|
|
self.model = model
|
|
|
|
|
|
|
|
# Constraints
|
|
|
|
self.state_constraints = StateConstraints
|
|
|
|
self.input_constraints = InputConstraints
|
|
|
|
|
|
|
|
# Reference
|
|
|
|
self.reference = Reference
|
|
|
|
|
|
|
|
# Current control and prediction
|
|
|
|
self.current_control = None
|
|
|
|
self.current_prediction = None
|
|
|
|
|
2019-11-25 16:07:02 +08:00
|
|
|
# Initialize Optimization Problem
|
|
|
|
self.problem = self._init_problem()
|
|
|
|
|
|
|
|
def _init_problem(self):
|
2019-11-23 23:46:14 +08:00
|
|
|
"""
|
2019-11-25 16:07:02 +08:00
|
|
|
Initialize parametrized optimization problem to be solved at each
|
|
|
|
time step.
|
2019-11-23 23:46:14 +08:00
|
|
|
"""
|
|
|
|
|
|
|
|
# Instantiate optimization variables
|
2019-11-25 16:07:02 +08:00
|
|
|
self.x = cp.Variable((self.model.n_states+1, self.T + 1))
|
|
|
|
self.u = cp.Variable((2, self.T))
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# Instantiate optimization parameters
|
2019-11-25 16:07:02 +08:00
|
|
|
self.kappa = cp.Parameter(self.T+1)
|
|
|
|
self.x_0 = cp.Parameter(self.model.n_states+1, 1)
|
|
|
|
self.A = cp.Parameter(self.model.A.shape)
|
|
|
|
self.B = cp.Parameter(self.model.B.shape)
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# Initialize cost
|
|
|
|
cost = 0
|
|
|
|
|
|
|
|
# Initialize constraints
|
2019-11-25 16:07:02 +08:00
|
|
|
constraints = [self.x[:, 0] == self.x_0]
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
for t in range(self.T):
|
|
|
|
|
|
|
|
# set dynamic constraints
|
2019-11-25 16:07:02 +08:00
|
|
|
constraints += [self.x[:-1, t + 1] == self.A[:-1, :]
|
|
|
|
@ self.x[:, t] + self.B[:-1, :] @ self.u[:, t],
|
|
|
|
self.x[-1, t + 1] == self.kappa[t+1]]
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# set input constraints
|
|
|
|
inputs = ['D', 'delta']
|
|
|
|
for input_name, constraint in self.input_constraints.items():
|
|
|
|
input_id = inputs.index(input_name)
|
|
|
|
if constraint[0] is not None:
|
2019-11-25 16:07:02 +08:00
|
|
|
constraints.append(-self.u[input_id, t] <= -constraint[0])
|
2019-11-23 23:46:14 +08:00
|
|
|
if constraint[1] is not None:
|
2019-11-25 16:07:02 +08:00
|
|
|
constraints.append(self.u[input_id, t] <= constraint[1])
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# Set state constraints
|
|
|
|
for state_name, constraint in self.state_constraints.items():
|
2019-11-25 16:07:02 +08:00
|
|
|
state_id = self.model.spatial_state.list_states(). \
|
2019-11-23 23:46:14 +08:00
|
|
|
index(state_name)
|
|
|
|
if constraint[0] is not None:
|
2019-11-25 16:07:02 +08:00
|
|
|
constraints.append(-self.x[state_id, t] <= -constraint[0])
|
2019-11-23 23:46:14 +08:00
|
|
|
if constraint[1] is not None:
|
2019-11-25 16:07:02 +08:00
|
|
|
constraints.append(self.x[state_id, t] <= constraint[1])
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# update cost function for states
|
|
|
|
for state_name, state_reference in self.reference.items():
|
2019-11-25 16:07:02 +08:00
|
|
|
state_id = self.model.spatial_state.list_states(). \
|
2019-11-23 23:46:14 +08:00
|
|
|
index(state_name)
|
2019-11-25 16:07:02 +08:00
|
|
|
cost += cp.norm(self.x[state_id, t] - state_reference, 2) * self.Q[
|
|
|
|
state_id, state_id]
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# update cost function for inputs
|
2019-11-25 16:07:02 +08:00
|
|
|
cost += cp.norm(self.u[0, t], 2) * self.R[0, 0]
|
|
|
|
cost += cp.norm(self.u[1, t], 2) * self.R[1, 1]
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# set state constraints
|
|
|
|
for state_name, constraint in self.state_constraints.items():
|
|
|
|
state_id = self.model.spatial_state.list_states(). \
|
|
|
|
index(state_name)
|
|
|
|
if constraint[0] is not None:
|
2019-11-25 16:07:02 +08:00
|
|
|
constraints.append(-self.x[state_id, self.T] <= -constraint[0])
|
2019-11-23 23:46:14 +08:00
|
|
|
if constraint[1] is not None:
|
2019-11-25 16:07:02 +08:00
|
|
|
constraints.append(self.x[state_id, self.T] <= constraint[1])
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# update cost function for states
|
|
|
|
for state_name, state_reference in self.reference.items():
|
|
|
|
state_id = self.model.spatial_state.list_states(). \
|
2019-11-25 16:07:02 +08:00
|
|
|
index(state_name)
|
|
|
|
cost += cp.norm(self.x[state_id, self.T] - state_reference, 2) * \
|
2019-11-23 23:46:14 +08:00
|
|
|
self.Qf[state_id, state_id]
|
|
|
|
|
|
|
|
# sums problem objectives and concatenates constraints.
|
|
|
|
problem = cp.Problem(cp.Minimize(cost), constraints)
|
2019-11-25 16:07:02 +08:00
|
|
|
|
|
|
|
return problem
|
|
|
|
|
|
|
|
def get_control(self):
|
|
|
|
"""
|
|
|
|
Get control signal given the current position of the car. Solves a
|
|
|
|
finite time optimization problem based on the linearized car model.
|
|
|
|
"""
|
|
|
|
|
|
|
|
# get current waypoint curvature
|
|
|
|
kappa_ref = [wp.kappa for wp in self.model.reference_path.waypoints
|
|
|
|
[self.model.wp_id:self.model.wp_id+self.T+1]]
|
|
|
|
|
|
|
|
# Instantiate optimization parameters
|
|
|
|
self.kappa.value = kappa_ref
|
|
|
|
self.x_0.value = np.array(self.model.spatial_state[:] + [kappa_ref[0]])
|
|
|
|
self.A.value = self.model.A
|
|
|
|
self.B.value = self.model.B
|
|
|
|
|
|
|
|
# Solve optimization problem
|
|
|
|
self.problem.solve(solver=cp.ECOS, warm_start=True)
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# Store computed control signals and associated prediction
|
|
|
|
try:
|
2019-11-25 16:07:02 +08:00
|
|
|
self.current_control = self.u.value
|
|
|
|
self.current_prediction = self.update_prediction(self.x.value)
|
2019-11-23 23:46:14 +08:00
|
|
|
except TypeError:
|
|
|
|
print('No solution found!')
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
# RCH - get first control signal
|
2019-11-25 16:07:02 +08:00
|
|
|
D_0 = self.u.value[0, 0]
|
|
|
|
delta_0 = self.u.value[1, 0]
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
return D_0, delta_0
|
|
|
|
|
|
|
|
def update_prediction(self, spatial_state_prediction):
|
2019-11-25 16:07:02 +08:00
|
|
|
"""
|
|
|
|
Transform the predicted states to predicted x and y coordinates.
|
|
|
|
Mainly for visualization purposes.
|
|
|
|
:param spatial_state_prediction: list of predicted state variables
|
|
|
|
:return: lists of predicted x and y coordinates
|
|
|
|
"""
|
2019-11-23 23:46:14 +08:00
|
|
|
|
|
|
|
# containers for x and y coordinates of predicted states
|
|
|
|
x_pred, y_pred = [], []
|
|
|
|
|
|
|
|
# get current waypoint ID
|
|
|
|
wp_id_ = np.copy(self.model.wp_id)
|
|
|
|
|
|
|
|
for t in range(self.T):
|
|
|
|
associated_waypoint = self.model.reference_path.waypoints[wp_id_+t]
|
|
|
|
predicted_temporal_state = self.model.s2t(associated_waypoint,
|
2019-11-25 16:07:02 +08:00
|
|
|
spatial_state_prediction[:, t])
|
2019-11-23 23:46:14 +08:00
|
|
|
x_pred.append(predicted_temporal_state.x)
|
|
|
|
y_pred.append(predicted_temporal_state.y)
|
|
|
|
|
|
|
|
return x_pred, y_pred
|
|
|
|
|