Merge branch 'LTV_MPC'

master
matssteinweg 2020-01-02 17:14:29 +01:00
commit 0c3b261615
9 changed files with 1444 additions and 974 deletions

2
.gitignore vendored
View File

@ -9,3 +9,5 @@ __pycache__/map.cpython-37.pyc
__pycache__/MPC.cpython-37.pyc
__pycache__/reference_path.cpython-37.pyc
__pycache__/spatial_bicycle_models.cpython-37.pyc
*.mp4
*.png

277
MPC.py
View File

@ -1,127 +1,162 @@
import numpy as np
import cvxpy as cp
import osqp
from scipy import sparse
import matplotlib.pyplot as plt
# Colors
PREDICTION = '#BA4A00'
##################
# MPC Controller #
##################
class MPC:
def __init__(self, model, T, Q, R, Qf, StateConstraints, InputConstraints,
Reference):
def __init__(self, model, N, Q, R, QN, StateConstraints, InputConstraints,
ay_max):
"""
Constructor for the Model Predictive Controller.
:param model: bicycle model object to be controlled
:param T: time horizon | int
:param N: time horizon | int
:param Q: state cost matrix
:param R: input cost matrix
:param Qf: final state cost matrix
:param QN: final state cost matrix
:param StateConstraints: dictionary of state constraints
:param InputConstraints: dictionary of input constraints
:param Reference: reference values for state variables
:param ay_max: maximum allowed lateral acceleration in curves
"""
# Parameters
self.T = T # horizon
self.N = N # horizon
self.Q = Q # weight matrix state vector
self.R = R # weight matrix input vector
self.Qf = Qf # weight matrix terminal
self.QN = QN # weight matrix terminal
# Model
self.model = model
# Dimensions
self.nx = self.model.n_states
self.nu = 2
# Constraints
self.state_constraints = StateConstraints
self.input_constraints = InputConstraints
# Reference
self.reference = Reference
# Maximum lateral acceleration
self.ay_max = ay_max
# Current control and prediction
self.current_control = None
self.current_prediction = None
# Counter for old control signals in case of infeasible problem
self.infeasibility_counter = 0
# Current control signals
self.current_control = np.zeros((self.nu*self.N))
# Initialize Optimization Problem
self.problem = self._init_problem()
self.optimizer = osqp.OSQP()
def _init_problem(self):
"""
Initialize parametrized optimization problem to be solved at each
time step.
Initialize optimization problem for current time step.
"""
# Instantiate optimization variables
self.x = cp.Variable((self.model.n_states+1, self.T + 1))
self.u = cp.Variable((2, self.T))
# Constraints
umin = self.input_constraints['umin']
umax = self.input_constraints['umax']
xmin = self.state_constraints['xmin']
xmax = self.state_constraints['xmax']
# Instantiate optimization parameters
self.kappa = cp.Parameter(self.T+1)
self.x_0 = cp.Parameter(self.model.n_states+1, 1)
self.A = cp.Parameter(self.model.A.shape)
self.B = cp.Parameter(self.model.B.shape)
# LTV System Matrices
A = np.zeros((self.nx * (self.N + 1), self.nx * (self.N + 1)))
B = np.zeros((self.nx * (self.N + 1), self.nu * (self.N)))
# Reference vector for state and input variables
ur = np.zeros(self.nu*self.N)
xr = np.zeros(self.nx*(self.N+1))
# Offset for equality constraint (due to B * (u - ur))
uq = np.zeros(self.N * self.nx)
# Dynamic state constraints
xmin_dyn = np.kron(np.ones(self.N + 1), xmin)
xmax_dyn = np.kron(np.ones(self.N + 1), xmax)
# Dynamic input constraints
umax_dyn = np.kron(np.ones(self.N), umax)
# Get curvature predictions from previous control signals
kappa_pred = np.tan(np.array(self.current_control[3::] +
self.current_control[-1:])) / self.model.length
# Initialize cost
cost = 0
# Iterate over horizon
for n in range(self.N):
# Initialize constraints
constraints = [self.x[:, 0] == self.x_0]
# Get information about current waypoint
current_waypoint = self.model.reference_path.get_waypoint(self.model.wp_id + n)
next_waypoint = self.model.reference_path.get_waypoint(self.model.wp_id + n + 1)
delta_s = next_waypoint - current_waypoint
kappa_ref = current_waypoint.kappa
v_ref = current_waypoint.v_ref
for t in range(self.T):
# Compute LTV matrices
f, A_lin, B_lin = self.model.linearize(v_ref, kappa_ref, delta_s)
A[(n+1) * self.nx: (n+2)*self.nx, n * self.nx:(n+1)*self.nx] = A_lin
B[(n+1) * self.nx: (n+2)*self.nx, n * self.nu:(n+1)*self.nu] = B_lin
# set dynamic constraints
constraints += [self.x[:-1, t + 1] == self.A[:-1, :]
@ self.x[:, t] + self.B[:-1, :] @ self.u[:, t],
self.x[-1, t + 1] == self.kappa[t+1]]
# Set reference for input signal
ur[n*self.nu:(n+1)*self.nu] = np.array([v_ref, kappa_ref])
# Compute equality constraint offset (B*ur)
uq[n * self.nx:(n+1)*self.nx] = B_lin.dot(np.array
([v_ref, kappa_ref])) - f
# set input constraints
inputs = ['D', 'delta']
for input_name, constraint in self.input_constraints.items():
input_id = inputs.index(input_name)
if constraint[0] is not None:
constraints.append(-self.u[input_id, t] <= -constraint[0])
if constraint[1] is not None:
constraints.append(self.u[input_id, t] <= constraint[1])
# Constrain maximum speed based on predicted car curvature
vmax_dyn = np.sqrt(self.ay_max / (np.abs(kappa_pred[n]) + 1e-12))
if vmax_dyn < umax_dyn[self.nu*n]:
umax_dyn[self.nu*n] = vmax_dyn
# Set state constraints
for state_name, constraint in self.state_constraints.items():
state_id = self.model.spatial_state.list_states(). \
index(state_name)
if constraint[0] is not None:
constraints.append(-self.x[state_id, t] <= -constraint[0])
if constraint[1] is not None:
constraints.append(self.x[state_id, t] <= constraint[1])
# Compute dynamic constraints on e_y
ub, lb, _ = self.model.reference_path.update_path_constraints(
self.model.wp_id+1, self.N, 2*self.model.safety_margin,
self.model.safety_margin)
xmin_dyn[0] = self.model.spatial_state.e_y
xmax_dyn[0] = self.model.spatial_state.e_y
xmin_dyn[self.nx::self.nx] = lb
xmax_dyn[self.nx::self.nx] = ub
# update cost function for states
for state_name, state_reference in self.reference.items():
state_id = self.model.spatial_state.list_states(). \
index(state_name)
cost += cp.norm(self.x[state_id, t] - state_reference, 2) * self.Q[
state_id, state_id]
# Set reference for state as center-line of drivable area
xr[self.nx::self.nx] = (lb + ub) / 2
# update cost function for inputs
cost += cp.norm(self.u[0, t], 2) * self.R[0, 0]
cost += cp.norm(self.u[1, t], 2) * self.R[1, 1]
# Get equality matrix
Ax = sparse.kron(sparse.eye(self.N + 1),
-sparse.eye(self.nx)) + sparse.csc_matrix(A)
Bu = sparse.csc_matrix(B)
Aeq = sparse.hstack([Ax, Bu])
# Get inequality matrix
Aineq = sparse.eye((self.N + 1) * self.nx + self.N * self.nu)
# Combine constraint matrices
A = sparse.vstack([Aeq, Aineq], format='csc')
# set state constraints
for state_name, constraint in self.state_constraints.items():
state_id = self.model.spatial_state.list_states(). \
index(state_name)
if constraint[0] is not None:
constraints.append(-self.x[state_id, self.T] <= -constraint[0])
if constraint[1] is not None:
constraints.append(self.x[state_id, self.T] <= constraint[1])
# Get upper and lower bound vectors for equality constraints
lineq = np.hstack([xmin_dyn,
np.kron(np.ones(self.N), umin)])
uineq = np.hstack([xmax_dyn, umax_dyn])
# Get upper and lower bound vectors for inequality constraints
x0 = np.array(self.model.spatial_state[:])
leq = np.hstack([-x0, uq])
ueq = leq
# Combine upper and lower bound vectors
l = np.hstack([leq, lineq])
u = np.hstack([ueq, uineq])
# update cost function for states
for state_name, state_reference in self.reference.items():
state_id = self.model.spatial_state.list_states(). \
index(state_name)
cost += cp.norm(self.x[state_id, self.T] - state_reference, 2) * \
self.Qf[state_id, state_id]
# Set cost matrices
P = sparse.block_diag([sparse.kron(sparse.eye(self.N), self.Q), self.QN,
sparse.kron(sparse.eye(self.N), self.R)], format='csc')
q = np.hstack(
[-np.tile(np.diag(self.Q.A), self.N) * xr[:-self.nx],
-self.QN.dot(xr[-self.nx:]),
-np.tile(np.diag(self.R.A), self.N) * ur])
# sums problem objectives and concatenates constraints.
problem = cp.Problem(cp.Minimize(cost), constraints)
return problem
# Initialize optimizer
self.optimizer = osqp.OSQP()
self.optimizer.setup(P=P, q=q, A=A, l=l, u=u, verbose=False)
def get_control(self):
"""
@ -129,32 +164,62 @@ class MPC:
finite time optimization problem based on the linearized car model.
"""
# get current waypoint curvature
kappa_ref = [wp.kappa for wp in self.model.reference_path.waypoints
[self.model.wp_id:self.model.wp_id+self.T+1]]
# Number of state variables
nx = self.model.n_states
nu = 2
# Instantiate optimization parameters
self.kappa.value = kappa_ref
self.x_0.value = np.array(self.model.spatial_state[:] + [kappa_ref[0]])
self.A.value = self.model.A
self.B.value = self.model.B
# Update current waypoint
self.model.get_current_waypoint()
# Update spatial state
self.model.spatial_state = self.model.t2s(reference_state=
self.model.temporal_state, reference_waypoint=
self.model.current_waypoint)
# Initialize optimization problem
self._init_problem()
# Solve optimization problem
self.problem.solve(solver=cp.ECOS, warm_start=True)
dec = self.optimizer.solve()
# Store computed control signals and associated prediction
try:
self.current_control = self.u.value
self.current_prediction = self.update_prediction(self.x.value)
except TypeError:
print('No solution found!')
# Get control signals
control_signals = np.array(dec.x[-self.N*nu:])
control_signals[1::2] = np.arctan(control_signals[1::2] *
self.model.length)
v = control_signals[0]
delta = control_signals[1]
# Update control signals
self.current_control = control_signals
# Get predicted spatial states
x = np.reshape(dec.x[:(self.N+1)*nx], (self.N+1, nx))
# Update predicted temporal states
self.current_prediction = self.update_prediction(x)
# Get current control signal
u = np.array([v, delta])
# if problem solved, reset infeasibility counter
self.infeasibility_counter = 0
except:
print('Infeasible problem. Previously predicted'
' control signal used!')
id = nu * (self.infeasibility_counter + 1)
u = np.array(self.current_control[id:id+2])
# increase infeasibility counter
self.infeasibility_counter += 1
if self.infeasibility_counter == (self.N - 1):
print('No control signal computed!')
exit(1)
# RCH - get first control signal
D_0 = self.u.value[0, 0]
delta_0 = self.u.value[1, 0]
return D_0, delta_0
return u
def update_prediction(self, spatial_state_prediction):
"""
@ -164,18 +229,30 @@ class MPC:
:return: lists of predicted x and y coordinates
"""
# containers for x and y coordinates of predicted states
# Containers for x and y coordinates of predicted states
x_pred, y_pred = [], []
# get current waypoint ID
wp_id_ = np.copy(self.model.wp_id)
for t in range(self.T):
associated_waypoint = self.model.reference_path.waypoints[wp_id_+t]
# Iterate over prediction horizon
for n in range(2, self.N):
# Get associated waypoint
associated_waypoint = self.model.reference_path.\
get_waypoint(self.model.wp_id+n)
# Transform predicted spatial state to temporal state
predicted_temporal_state = self.model.s2t(associated_waypoint,
spatial_state_prediction[:, t])
spatial_state_prediction[n, :])
# Save predicted coordinates in world coordinate frame
x_pred.append(predicted_temporal_state.x)
y_pred.append(predicted_temporal_state.y)
return x_pred, y_pred
def show_prediction(self):
"""
Display predicted car trajectory in current axis.
"""
if self.current_prediction is not None:
plt.scatter(self.current_prediction[0], self.current_prediction[1],
c=PREDICTION, s=30)

148
lidar_model.py Normal file
View File

@ -0,0 +1,148 @@
from map import Map
import matplotlib.pyplot as plt
import numpy as np
import math
import time
SCAN = '#5DADE2'
class LidarModel:
"""
Lidar Model
"""
def __init__(self, FoV, range, resolution):
"""
Constructor for Lidar object.
:param FoV: Sensor's Field of View in °
:param range: range in m
:param resolution: resolution in °
"""
# set sensor parameters
self.FoV = FoV
self.range = range
self.resolution = resolution
# number of measurements
self.n_measurements = int(self.FoV/self.resolution + 1)
# construct measurement container
angles = np.linspace(-math.pi / 360 * self.FoV,
math.pi / 360 * self.FoV,
self.n_measurements)
ranges = np.ones(self.n_measurements) * self.range
self.measurements = np.stack((angles, ranges), axis=0)
def scan(self, car, map):
"""
Get a Lidar Scan estimate
:param car: state containing x and y coordinates of the sensor
:param map: map object
:return: self with updated self.measurements
"""
start = time.time()
# reset measurements
self.measurements[1, :] = np.ones(self.n_measurements) * self.range
# flip map upside-down to allow for normal indexing of y axis
#map.data = np.flipud(map.data)
# get sensor's map pose
x, y = map.w2m(car.x, car.y)
# get center of mass
xc = x + 0.5
yc = y + 0.5
# get sensor range in px values
range_px = int(self.range / map.resolution)
# iterate over area within sensor's range
for i in range(x - range_px, x + range_px + 1):
if 0 <= i < map.width:
for j in range(y - range_px, y + range_px + 1):
if 0 <= j < map.height:
# if obstacle detected
if map.data[j, i] == 0:
# get center of mass of cell
xc_target = i + 0.5
yc_target = j + 0.5
# check all corner's of cell
cell_angles = []
for k in range(-1, 2):
for l in range(-1, 2):
dy = yc_target + l/2 - yc
dx = xc_target + k/2 - xc
cell_angle = np.arctan2(dy, dx) - car.psi
if cell_angle < - math.pi:
cell_angle = -np.mod(math.pi+cell_angle, 2*math.pi) + math.pi
else:
cell_angle = np.mod(math.pi+cell_angle, 2*math.pi) - math.pi
cell_angles.append(cell_angle)
# get min and max angle hitting respective cell
min_angle = np.min(cell_angles)
max_angle = np.max(cell_angles)
# get distance to mass center of cell
cell_distance = np.sqrt(
(xc - xc_target)**2 + (yc - yc_target)**2)
# get IDs of all laser beams hitting cell
valid_beam_ids = []
if min_angle < -math.pi/2 and max_angle > math.pi/2:
for beam_id in range(self.n_measurements):
if max_angle <= self.measurements[0, beam_id] <= min_angle:
valid_beam_ids.append(beam_id)
else:
for beam_id in range(self.n_measurements):
if min_angle <= self.measurements[0, beam_id] <= max_angle:
valid_beam_ids.append(beam_id)
# update distance for all valid laser beams
for beam_id in valid_beam_ids:
if cell_distance < self.measurements[1, beam_id] / map.resolution:
self.measurements[1, beam_id] = cell_distance * map.resolution
#map.data = np.flipud(map.data)
end = time.time()
print('Time elapsed: ', end - start)
def plot_scan(self, car):
"""
Display current sensor measurements.
:param car: state containing x and y coordinate of sensor
"""
start = time.time()
# get beam endpoints
beam_end_x = self.measurements[1, :] * np.cos(self.measurements[0, :] + car.psi)
beam_end_y = self.measurements[1, :] * np.sin(self.measurements[0, :] + car.psi)
# plot all laser beams
for i in range(self.n_measurements):
plt.plot((car.x, car.x+beam_end_x[i]), (car.y, car.y+beam_end_y[i]), c=SCAN)
end = time.time()
print('Time elapsed: ', end - start)
if __name__ == '__main__':
# Create Map
map = Map('real_map.png')
plt.imshow(map.data, cmap='gray',
extent=[map.origin[0], map.origin[0] +
map.width * map.resolution,
map.origin[1], map.origin[1] +
map.height * map.resolution])
car = BicycleModel(x=-4.9, y=-5.0, yaw=0.9)
sensor = LidarModel(FoV=180, range=5, resolution=1)
sensor.scan(car, map)
sensor.plot_scan(car)
plt.axis('equal')
plt.show()

154
map.py
View File

@ -1,36 +1,79 @@
#!/usr/bin/env python
import sys
import os
import math
import numpy as np
import matplotlib.pyplot as plt
from skimage.morphology import remove_small_holes
from PIL import Image
dirname = os.path.dirname(__file__)
svea = os.path.join(dirname, '../../')
sys.path.append(os.path.abspath(svea))
from skimage.draw import line_aa
import matplotlib.patches as plt_patches
# Colors
OBSTACLE = '#2E4053'
############
# Obstacle #
############
class Obstacle:
def __init__(self, cx, cy, radius):
"""
Constructor for a circular obstacle to be placed on a map.
:param cx: x coordinate of center of obstacle in world coordinates
:param cy: y coordinate of center of obstacle in world coordinates
:param radius: radius of circular obstacle in m
"""
self.cx = cx
self.cy = cy
self.radius = radius
def show(self):
"""
Display obstacle on current axis.
"""
# Draw circle
circle = plt_patches.Circle(xy=(self.cx, self.cy), radius=
self.radius, color=OBSTACLE, zorder=20)
ax = plt.gca()
ax.add_patch(circle)
#######
# Map #
#######
class Map:
"""
Handle for map message. Contains a subscriber to the map topic and
processes the map. Numpy array version of the
map available as member variable.
"""
def __init__(self, file_path, value_unknown=50, threshold_occupied=90, origin=[-30.0, -24.0], resolution=0.059999):
def __init__(self, file_path, origin, resolution, threshold_occupied=100):
"""
Constructor for map object. Map contains occupancy grid map data of
environment as well as meta information.
:param file_path: path to image of map
:param threshold_occupied: threshold value for binarization of map
image
:param origin: x and y coordinates of map origin in world coordinates
[m]
:param resolution: resolution in m/px
"""
self.value_unknown = value_unknown
# Set binarization threshold
self.threshold_occupied = threshold_occupied
# instantiate member variables
self.data = np.array(Image.open(file_path))[:, :, 0] # numpy array containing map data
#self.process_map()
# Numpy array containing map data
self.data = np.array(Image.open(file_path))[:, :, 0]
# Process raw map image
self.process_map()
# Store meta information
self.height = self.data.shape[0] # height of the map in px
self.width = self.data.shape[1] # width of the map in px
self.resolution = resolution # resolution of the map in m/px
self.origin = origin # x and y coordinates of map origin
# (bottom-left corner) in m
# Containers for user-specified additional obstacles and boundaries
self.obstacles = list()
self.boundaries = list()
def w2m(self, x, y):
"""
World2Map. Transform coordinates from global coordinate system to
@ -39,30 +82,81 @@ class Map:
:param y: y coordinate in global coordinate system
:return: discrete x and y coordinates in px
"""
d_x = np.floor((x - self.origin[0]) / self.resolution)
d_y = np.floor((y - self.origin[1]) / self.resolution)
dx = int(np.floor((x - self.origin[0]) / self.resolution))
dy = int(np.floor((y - self.origin[1]) / self.resolution))
return int(d_x), int(d_y)
return dx, dy
def m2w(self, dx, dy):
"""
World2Map. Transform coordinates from global coordinate system to
map coordinates.
:param x: x coordinate in global coordinate system
:param y: y coordinate in global coordinate system
:return: discrete x and y coordinates in px
Map2World. Transform coordinates from map coordinate system to
global coordinates.
:param dx: x coordinate in map coordinate system
:param dy: y coordinate in map coordinate system
:return: x and y coordinates of cell center in global coordinate system
"""
x = dx * self.resolution + self.origin[0]
y = dy * self.resolution + self.origin[1]
x = (dx + 0.5) * self.resolution + self.origin[0]
y = (dy + 0.5) * self.resolution + self.origin[1]
return x, y
def process_map(self):
"""
Process raw map image. Binarization and removal of small holes in map.
"""
# Binarization using specified threshold
# 1 corresponds to free, 0 to occupied
self.data = np.where(self.data >= self.threshold_occupied, 1, 0)
# Remove small holes in map corresponding to spurious measurements
self.data = remove_small_holes(self.data, area_threshold=5,
connectivity=8).astype(np.int8)
def add_obstacles(self, obstacles):
"""
Add obstacles to the map.
:param obstacles: list of obstacle objects
"""
# Extend list of obstacles
self.obstacles.extend(obstacles)
# Iterate over list of new obstacles
for obstacle in obstacles:
# Compute radius of circular object in pixels
radius_px = int(np.ceil(obstacle.radius / self.resolution))
# Get center coordinates of obstacle in map coordinates
cx_px, cy_px = self.w2m(obstacle.cx, obstacle.cy)
# Add circular object to map
y, x = np.ogrid[-radius_px: radius_px, -radius_px: radius_px]
index = x ** 2 + y ** 2 <= radius_px ** 2
self.data[cy_px-radius_px:cy_px+radius_px, cx_px-radius_px:
cx_px+radius_px][index] = 0
def add_boundary(self, boundaries):
"""
Add boundaries to the map.
:param boundaries: list of tuples containing coordinates of boundaries'
start and end points
"""
# Extend list of boundaries
self.boundaries.extend(boundaries)
# Iterate over list of boundaries
for boundary in boundaries:
sx = self.w2m(boundary[0][0], boundary[0][1])
gx = self.w2m(boundary[1][0], boundary[1][1])
path_x, path_y, _ = line_aa(sx[0], sx[1], gx[0], gx[1])
for x, y in zip(path_x, path_y):
self.data[y, x] = 0
if __name__ == '__main__':
map = Map('map_floor2.png')
plt.imshow(map.data, cmap='gray')
map = Map('real_map.png')
# map = Map('sim_map.png')
plt.imshow(np.flipud(map.data), cmap='gray')
plt.show()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 19 KiB

View File

@ -1,8 +1,16 @@
import numpy as np
import math
from map import Map
from bresenham import bresenham
from map import Map, Obstacle
from skimage.draw import line_aa
import matplotlib.pyplot as plt
from scipy import sparse
import osqp
# Colors
DRIVABLE_AREA = '#BDC3C7'
WAYPOINTS = '#D0D3D4'
PATH_CONSTRAINTS = '#F5B041'
OBSTACLE = '#2E4053'
############
@ -13,7 +21,9 @@ class Waypoint:
def __init__(self, x, y, psi, kappa):
"""
Waypoint object containing x, y location in global coordinate system,
orientation of waypoint psi and local curvature kappa
orientation of waypoint psi and local curvature kappa. Waypoint further
contains an associated reference velocity computed by the speed profile
and a path width specified by upper and lower bounds.
:param x: x position in global coordinate system | [m]
:param y: y position in global coordinate system | [m]
:param psi: orientation of waypoint | [rad]
@ -24,7 +34,26 @@ class Waypoint:
self.psi = psi
self.kappa = kappa
# Reference velocity at this waypoint according to speed profile
self.v_ref = None
# Information about drivable area at waypoint
# upper and lower bound of drivable area orthogonal to
# waypoint orientation.
# Upper bound: free drivable area to the left of center-line in m
# Lower bound: free drivable area to the right of center-line in m
self.lb = None
self.ub = None
self.static_border_cells = None
self.dynamic_border_cells = None
def __sub__(self, other):
"""
Overload subtract operator. Difference of two waypoints is equal to
their euclidean distance.
:param other: subtrahend
:return: euclidean distance between two waypoints
"""
return ((self.x - other.x)**2 + (self.y - other.y)**2)**0.5
@ -34,35 +63,57 @@ class Waypoint:
class ReferencePath:
def __init__(self, map, wp_x, wp_y, resolution, smoothing_distance, width_info=False):
def __init__(self, map, wp_x, wp_y, resolution, smoothing_distance,
max_width, circular):
"""
Reference Path object. Create a reference trajectory from specified
corner points with given resolution. Smoothing around corners can be
applied.
applied. Waypoints represent center-line of the path with specified
maximum width to both sides.
:param map: map object on which path will be placed
:param wp_x: x coordinates of corner points in global coordinates
:param wp_y: y coordinates of corner points in global coordinates
:param resolution: resolution of the path in m/wp
:param smoothing_distance: number of waypoints used for smoothing the
path by averaging neighborhood of waypoints
:param max_width: maximum width of path to both sides in m
:param circular: True if path circular
"""
# precision
# Precision
self.eps = 1e-12
# map
# Map
self.map = map
# resolution of the path
# Resolution of the path
self.resolution = resolution
# look ahead distance for path averaging
# Look ahead distance for path averaging
self.smoothing_distance = smoothing_distance
# waypoints with x, y, psi, k
self.waypoints = self.construct_path(wp_x, wp_y)
# Circular flag
self.circular = circular
# path width
self.get_width_info = width_info
if self.get_width_info:
self.width_info = self.compute_width()
self.min_width = (np.min(self.width_info[0, :]),
np.min(self.width_info[3, :]))
# List of waypoint objects
self.waypoints = self._construct_path(wp_x, wp_y)
def construct_path(self, wp_x, wp_y):
# Number of waypoints
self.n_waypoints = len(self.waypoints)
# Length of path
self.length, self.segment_lengths = self._compute_length()
# Compute path width (attribute of each waypoint)
self._compute_width(max_width=max_width)
def _construct_path(self, wp_x, wp_y):
"""
Construct path from given waypoints.
:param wp_x: x coordinates of waypoints in global coordinates
:param wp_y: y coordinates of waypoints in global coordinates
:return: list of waypoint objects
"""
# Number of waypoints
n_wp = [int(np.sqrt((wp_x[i + 1] - wp_x[i]) ** 2 +
@ -78,7 +129,7 @@ class ReferencePath:
tolist() for i in range(len(wp_y) - 1)]
wp_y = [wp for segment in wp_y for wp in segment] + [gp_y]
# smooth path
# Smooth path
wp_xs = []
wp_ys = []
for wp_id in range(self.smoothing_distance, len(wp_x) -
@ -88,147 +139,602 @@ class ReferencePath:
wp_ys.append(np.mean(wp_y[wp_id - self.smoothing_distance:wp_id
+ self.smoothing_distance + 1]))
# Construct list of waypoint objects
waypoints = list(zip(wp_xs, wp_ys))
waypoints = self.spatial_reformulation(waypoints)
waypoints = self._construct_waypoints(waypoints)
return waypoints
def spatial_reformulation(self, waypoints):
def _construct_waypoints(self, waypoint_coordinates):
"""
Reformulate conventional waypoints (x, y) coordinates into waypoint
objects containing (x, y, psi, kappa)
objects containing (x, y, psi, kappa, ub, lb)
:param waypoint_coordinates: list of (x, y) coordinates of waypoints in
global coordinates
:return: list of waypoint objects for entire reference path
"""
waypoints_spatial = []
for wp_id in range(len(waypoints) - 1):
# List containing waypoint objects
waypoints = []
# get start and goal waypoints
current_wp = np.array(waypoints[wp_id])
next_wp = np.array(waypoints[wp_id + 1])
# Iterate over all waypoints
for wp_id in range(len(waypoint_coordinates) - 1):
# difference vector
# Get start and goal waypoints
current_wp = np.array(waypoint_coordinates[wp_id])
next_wp = np.array(waypoint_coordinates[wp_id + 1])
# Difference vector
dif_ahead = next_wp - current_wp
# angle ahead
# Angle ahead
psi = np.arctan2(dif_ahead[1], dif_ahead[0])
# distance to next waypoint
# Distance to next waypoint
dist_ahead = np.linalg.norm(dif_ahead, 2)
# get x and y coordinates of current waypoint
x = current_wp[0]
y = current_wp[1]
# Get x and y coordinates of current waypoint
x, y = current_wp[0], current_wp[1]
# Compute local curvature at waypoint
# first waypoint
if wp_id == 0:
kappa = 0
else:
prev_wp = np.array(waypoints[wp_id - 1])
prev_wp = np.array(waypoint_coordinates[wp_id - 1])
dif_behind = current_wp - prev_wp
angle_behind = np.arctan2(dif_behind[1], dif_behind[0])
angle_dif = np.mod(psi - angle_behind + math.pi, 2 * math.pi) \
- math.pi
kappa = np.abs(angle_dif / (dist_ahead + self.eps))
kappa = angle_dif / (dist_ahead + self.eps)
waypoints_spatial.append(Waypoint(x, y, psi, kappa))
waypoints.append(Waypoint(x, y, psi, kappa))
return waypoints_spatial
return waypoints
def compute_width(self, max_dist=2.0):
max_dist = max_dist # m
width_info = np.zeros((6, len(self.waypoints)))
def _compute_length(self):
"""
Compute length of center-line path as sum of euclidean distance between
waypoints.
:return: length of center-line path in m
"""
segment_lengths = [0.0] + [self.waypoints[wp_id+1] - self.waypoints
[wp_id] for wp_id in range(len(self.waypoints)-1)]
s = sum(segment_lengths)
return s, segment_lengths
def _compute_width(self, max_width):
"""
Compute the width of the path by checking the maximum free space to
the left and right of the center-line.
:param max_width: maximum width of the path.
"""
# Iterate over all waypoints
for wp_id, wp in enumerate(self.waypoints):
# List containing information for current waypoint
width_info = []
# Check width left and right of the center-line
for i, dir in enumerate(['left', 'right']):
# get pixel coordinates of waypoint
wp_x, wp_y = self.map.w2m(wp.x, wp.y)
# get angle orthogonal to path in current direction
# Get angle orthogonal to path in current direction
if dir == 'left':
angle = np.mod(wp.psi + math.pi / 2 + math.pi,
2 * math.pi) - math.pi
else:
angle = np.mod(wp.psi - math.pi / 2 + math.pi,
2 * math.pi) - math.pi
# get closest cell to orthogonal vector
t_x, t_y = self.map.w2m(wp.x + max_dist * np.cos(angle), wp.y + max_dist * np.sin(angle))
# compute path between cells
width_info[3*i:3*(i+1), wp_id] = self.get_min_dist(wp_x, wp_y, t_x, t_y, max_dist)
return width_info
# Get closest cell to orthogonal vector
t_x, t_y = self.map.w2m(wp.x + max_width * np.cos(angle), wp.y
+ max_width * np.sin(angle))
# Compute distance to orthogonal cell on path border
b_value, b_cell = self._get_min_width(wp, t_x, t_y, max_width)
# Add information to list for current waypoint
width_info.append(b_value)
width_info.append(b_cell)
def get_min_dist(self, wp_x, wp_y, t_x, t_y, max_dist):
# get neighboring cells (account for discretization)
neighbors_x, neighbors_y = [], []
# Set waypoint attributes with width to the left and right
wp.ub = width_info[0]
wp.lb = -1 * width_info[2] # minus can be assumed as waypoints
# represent center-line of the path
# Set border cells of waypoint
wp.static_border_cells = (width_info[1], width_info[3])
wp.dynamic_border_cells = (width_info[1], width_info[3])
def _get_min_width(self, wp, t_x, t_y, max_width):
"""
Compute the minimum distance between the current waypoint and the
orthogonal cell on the border of the path
:param wp: current waypoint
:param t_x: x coordinate of border cell in map coordinates
:param t_y: y coordinate of border cell in map coordinates
:param max_width: maximum path width in m
:return: min_width to border and corresponding cell
"""
# Get neighboring cells of orthogonal cell (account for
# discretization inaccuracy)
tn_x, tn_y = [], []
for i in range(-1, 2, 1):
for j in range(-1, 2, 1):
neighbors_x.append(t_x + i)
neighbors_y.append(t_y + j)
tn_x.append(t_x+i)
tn_y.append(t_y+j)
# get bresenham paths to all neighboring cells
# Get pixel coordinates of waypoint
wp_x, wp_y = self.map.w2m(wp.x, wp.y)
# Get Bresenham paths to all possible cells
paths = []
for t_x, t_y in zip(neighbors_x, neighbors_y):
path = list(bresenham(wp_x, wp_y, t_x, t_y))
paths.append(path)
for t_x, t_y in zip(tn_x, tn_y):
x_list, y_list, _ = line_aa(wp_x, wp_y, t_x, t_y)
paths.append(zip(x_list, y_list))
min_dist = max_dist
# Compute minimum distance to border cell
min_width = max_width
# map inspected cell to world coordinates
min_cell = self.map.m2w(t_x, t_y)
for path in paths:
for cell in path:
t_x = cell[0]
t_y = cell[1]
# if path goes through occupied cell
t_x, t_y = cell[0], cell[1]
# If path goes through occupied cell
if self.map.data[t_y, t_x] == 0:
# get world coordinates
x, y = self.map.m2w(wp_x, wp_y)
# Get world coordinates
c_x, c_y = self.map.m2w(t_x, t_y)
cell_dist = np.sqrt((x - c_x) ** 2 + (y - c_y) ** 2)
if cell_dist < min_dist:
min_dist = cell_dist
cell_dist = np.sqrt((wp.x - c_x) ** 2 + (wp.y - c_y) ** 2)
if cell_dist < min_width:
min_width = cell_dist
min_cell = (c_x, c_y)
dist_info = np.array([min_dist, min_cell[0], min_cell[1]])
return dist_info
def show(self):
return min_width, min_cell
# plot map
def compute_speed_profile(self, Constraints):
"""
Compute a speed profile for the path. Assign a reference velocity
to each waypoint based on its curvature.
:param Constraints: constraints on acceleration and velocity
curvature of the path
"""
# Set optimization horizon
N = self.n_waypoints - 1
# Constraints
a_min = np.ones(N-1) * Constraints['a_min']
a_max = np.ones(N-1) * Constraints['a_max']
v_min = np.ones(N) * Constraints['v_min']
v_max = np.ones(N) * Constraints['v_max']
# Maximum lateral acceleration
ay_max = Constraints['ay_max']
# Inequality Matrix
D1 = np.zeros((N-1, N))
# Iterate over horizon
for i in range(N):
# Get information about current waypoint
current_waypoint = self.get_waypoint(i)
next_waypoint = self.get_waypoint(i+1)
# distance between waypoints
li = next_waypoint - current_waypoint
# curvature of waypoint
ki = current_waypoint.kappa
# Fill operator matrix
# dynamics of acceleration
if i < N-1:
D1[i, i:i+2] = np.array([-1/(2*li), 1/(2*li)])
# Compute dynamic constraint on velocity
v_max_dyn = np.sqrt(ay_max / (np.abs(ki) + self.eps))
if v_max_dyn < v_max[i]:
v_max[i] = v_max_dyn
# Construct inequality matrix
D1 = sparse.csc_matrix(D1)
D2 = sparse.eye(N)
D = sparse.vstack([D1, D2], format='csc')
# Get upper and lower bound vectors for inequality constraints
l = np.hstack([a_min, v_min])
u = np.hstack([a_max, v_max])
# Set cost matrices
P = sparse.eye(N, format='csc')
q = -1 * v_max
# Solve optimization problem
problem = osqp.OSQP()
problem.setup(P=P, q=q, A=D, l=l, u=u, verbose=False)
speed_profile = problem.solve().x
# Assign reference velocity to every waypoint
for i, wp in enumerate(self.waypoints[:-1]):
wp.v_ref = speed_profile[i]
self.waypoints[-1].v_ref = self.waypoints[-2].v_ref
def get_waypoint(self, wp_id):
"""
Get waypoint corresponding to wp_id. Circular indexing supported.
:param wp_id: unique waypoint ID
:return: waypoint object
"""
# Allow circular indexing if circular path
if wp_id >= self.n_waypoints and self.circular:
wp_id = np.mod(wp_id, self.n_waypoints)
# Terminate execution if end of path reached
elif wp_id >= self.n_waypoints and not self.circular:
print('Reached end of path!')
exit(1)
return self.waypoints[wp_id]
def show(self, display_drivable_area=True):
"""
Display path object on current figure.
:param display_drivable_area: If True, display arrows indicating width
of drivable area
"""
# Clear figure
plt.clf()
plt.imshow(np.flipud(self.map.data),cmap='gray',
# Disabled ticks
plt.xticks([])
plt.yticks([])
# Plot map in gray-scale and set extent to match world coordinates
canvas = np.ones(self.map.data.shape)
# canvas = np.flipud(self.map.data)
plt.imshow(canvas, cmap='gray',
extent=[self.map.origin[0], self.map.origin[0] +
self.map.width * self.map.resolution,
self.map.origin[1], self.map.origin[1] +
self.map.height * self.map.resolution])
# plot reference path
self.map.height * self.map.resolution], vmin=0.0,
vmax=1.0)
# Get x and y coordinates for all waypoints
wp_x = np.array([wp.x for wp in self.waypoints])
wp_y = np.array([wp.y for wp in self.waypoints])
plt.scatter(wp_x, wp_y, color='k', s=5)
if self.get_width_info:
print('Min Width Left: {:f} m'.format(self.min_width[0]))
print('Min Width Right: {:f} m'.format(self.min_width[1]))
plt.quiver(wp_x, wp_y, self.width_info[1, :] - wp_x,
self.width_info[2, :] - wp_y, scale=1, units='xy',
width=0.05, color='#D4AC0D')
plt.quiver(wp_x, wp_y, self.width_info[4, :] - wp_x,
self.width_info[5, :] - wp_y, scale=1, units='xy',
width=0.05, color='#BA4A00')
# Get x and y locations of border cells for upper and lower bound
wp_ub_x = np.array([wp.static_border_cells[0][0] for wp in self.waypoints])
wp_ub_y = np.array([wp.static_border_cells[0][1] for wp in self.waypoints])
wp_lb_x = np.array([wp.static_border_cells[1][0] for wp in self.waypoints])
wp_lb_y = np.array([wp.static_border_cells[1][1] for wp in self.waypoints])
# Plot waypoints
# colors = [wp.v_ref for wp in self.waypoints]
plt.scatter(wp_x, wp_y, c=WAYPOINTS, s=10)
# Plot arrows indicating drivable area
if display_drivable_area:
plt.quiver(wp_x, wp_y, wp_ub_x - wp_x, wp_ub_y - wp_y, scale=1,
units='xy', width=0.2*self.resolution, color=DRIVABLE_AREA,
headwidth=1, headlength=0)
plt.quiver(wp_x, wp_y, wp_lb_x - wp_x, wp_lb_y - wp_y, scale=1,
units='xy', width=0.2*self.resolution, color=DRIVABLE_AREA,
headwidth=1, headlength=0)
# Plot border of path
bl_x = np.array([wp.static_border_cells[0][0] for wp in
self.waypoints] +
[self.waypoints[0].static_border_cells[0][0]])
bl_y = np.array([wp.static_border_cells[0][1] for wp in
self.waypoints] +
[self.waypoints[0].static_border_cells[0][1]])
br_x = np.array([wp.static_border_cells[1][0] for wp in
self.waypoints] +
[self.waypoints[0].static_border_cells[1][0]])
br_y = np.array([wp.static_border_cells[1][1] for wp in
self.waypoints] +
[self.waypoints[0].static_border_cells[1][1]])
# If circular path, connect start and end point
if self.circular:
plt.plot(bl_x, bl_y, color='#5E5E5E')
plt.plot(br_x, br_y, color='#5E5E5E')
# If not circular, close path at start and end
else:
plt.plot(bl_x[:-1], bl_y[:-1], color=OBSTACLE)
plt.plot(br_x[:-1], br_y[:-1], color=OBSTACLE)
plt.plot((bl_x[-2], br_x[-2]), (bl_y[-2], br_y[-2]), color=OBSTACLE)
plt.plot((bl_x[0], br_x[0]), (bl_y[0], br_y[0]), color=OBSTACLE)
# Plot dynamic path constraints
# Get x and y locations of border cells for upper and lower bound
wp_ub_x = np.array(
[wp.dynamic_border_cells[0][0] for wp in self.waypoints]+
[self.waypoints[0].static_border_cells[0][0]])
wp_ub_y = np.array(
[wp.dynamic_border_cells[0][1] for wp in self.waypoints]+
[self.waypoints[0].static_border_cells[0][1]])
wp_lb_x = np.array(
[wp.dynamic_border_cells[1][0] for wp in self.waypoints]+
[self.waypoints[0].static_border_cells[1][0]])
wp_lb_y = np.array(
[wp.dynamic_border_cells[1][1] for wp in self.waypoints]+
[self.waypoints[0].static_border_cells[1][1]])
plt.plot(wp_ub_x, wp_ub_y, c=PATH_CONSTRAINTS)
plt.plot(wp_lb_x, wp_lb_y, c=PATH_CONSTRAINTS)
# Plot obstacles
for obstacle in self.map.obstacles:
obstacle.show()
def _compute_free_segments(self, wp, min_width):
"""
Compute free path segments.
:param wp: waypoint object
:param min_width: minimum width of valid segment
:return: segment candidates as list of tuples (ub_cell, lb_cell)
"""
# Candidate segments
free_segments = []
# Get waypoint's border cells in map coordinates
ub_p = self.map.w2m(wp.static_border_cells[0][0],
wp.static_border_cells[0][1])
lb_p = self.map.w2m(wp.static_border_cells[1][0],
wp.static_border_cells[1][1])
# Compute path from left border cell to right border cell
x_list, y_list, _ = line_aa(ub_p[0], ub_p[1], lb_p[0], lb_p[1])
# Initialize upper and lower bound of drivable area to
# upper bound of path
ub_o, lb_o = ub_p, ub_p
# Assume occupied path
free_cells = False
# Iterate over path from left border to right border
for x, y in zip(x_list[1:], y_list[1:]):
# If cell is free, update lower bound
if self.map.data[y, x] == 1:
# Free cell detected
free_cells = True
lb_o = (x, y)
# If cell is occupied or end of path, end segment. Add segment
# to list of candidates. Then, reset upper and lower bound to
# current cell.
if (self.map.data[y, x] == 0 or (x, y) == lb_p) and free_cells:
# Set lower bound to border cell of segment
lb_o = (x, y)
# Transform upper and lower bound cells to world coordinates
ub_o = self.map.m2w(ub_o[0], ub_o[1])
lb_o = self.map.m2w(lb_o[0], lb_o[1])
# If segment larger than threshold, add to candidates
if np.sqrt((ub_o[0]-lb_o[0])**2 + (ub_o[1]-lb_o[1])**2) > \
min_width:
free_segments.append((ub_o, lb_o))
# Start new segment
ub_o = (x, y)
free_cells = False
elif self.map.data[y, x] == 0 and not free_cells:
ub_o = (x, y)
lb_o = (x, y)
return free_segments
def update_path_constraints(self, wp_id, N, min_width, safety_margin):
"""
Compute upper and lower bounds of the drivable area orthogonal to
the given waypoint.
"""
# container for constraints and border cells
ub_hor = []
lb_hor = []
border_cells_hor = []
border_cells_hor_sm = []
# Iterate over horizon
for n in range(N):
# get corresponding waypoint
wp = self.get_waypoint(wp_id+n)
# Get list of free segments
free_segments = self._compute_free_segments(wp, min_width)
# First waypoint in horizon uses largest segment
if n == 0:
segment_lengths = [np.sqrt((seg[0][0]-seg[1][0])**2 +
(seg[0][1]-seg[1][1])**2) for seg in free_segments]
ls_id = segment_lengths.index(max(segment_lengths))
ub_ls, lb_ls = free_segments[ls_id]
else:
# Get border cells of selected segment at previous waypoint
ub_pw, lb_pw = border_cells_hor[n-1]
ub_pw, lb_pw = list(ub_pw), list(lb_pw)
# Project border cells onto new waypoint in path direction
wp_prev = self.get_waypoint(wp_id+n-1)
delta_s = wp_prev - wp
ub_pw[0] += delta_s * np.cos(wp_prev.psi)
ub_pw[1] += delta_s * np.cos(wp_prev.psi)
lb_pw[0] += delta_s * np.sin(wp_prev.psi)
lb_pw[1] += delta_s * np.sin(wp_prev.psi)
# Iterate over free segments for current waypoint
if len(free_segments) >= 2:
# container for overlap of segments with projection
segment_offsets = []
for free_segment in free_segments:
# Get border cells of segment
ub_fs, lb_fs = free_segment
# distance between upper bounds and lower bounds
d_ub = np.sqrt((ub_fs[0]-ub_pw[0])**2 + (ub_fs[1]-ub_pw[1])**2)
d_lb = np.sqrt((lb_fs[0]-lb_pw[0])**2 + (lb_fs[1]-lb_pw[1])**2)
mean_dist = (d_ub + d_lb) / 2
# Append offset to projected previous segment
segment_offsets.append(mean_dist)
# Select segment with minimum offset to projected previous
# segment
ls_id = segment_offsets.index(min(segment_offsets))
ub_ls, lb_ls = free_segments[ls_id]
# Select free segment in case of only one candidate
elif len(free_segments) == 1:
ub_ls, lb_ls = free_segments[0]
# Set waypoint coordinates as bound cells if no feasible
# segment available
else:
ub_ls, lb_ls = (wp.x, wp.y), (wp.x, wp.y)
# Check sign of upper and lower bound
angle_ub = np.mod(np.arctan2(ub_ls[1] - wp.y, ub_ls[0] - wp.x)
- wp.psi + math.pi, 2 * math.pi) - math.pi
angle_lb = np.mod(np.arctan2(lb_ls[1] - wp.y, lb_ls[0] - wp.x)
- wp.psi + math.pi, 2 * math.pi) - math.pi
sign_ub = np.sign(angle_ub)
sign_lb = np.sign(angle_lb)
# Compute upper and lower bound of largest drivable area
ub = sign_ub * np.sqrt(
(ub_ls[0] - wp.x) ** 2 + (ub_ls[1] - wp.y) ** 2)
lb = sign_lb * np.sqrt(
(lb_ls[0] - wp.x) ** 2 + (lb_ls[1] - wp.y) ** 2)
# Subtract safety margin
ub -= safety_margin
lb += safety_margin
# Check feasibility of the path
if ub < lb:
# Upper and lower bound of 0 indicate an infeasible path
# given the specified safety margin
ub, lb = 0.0, 0.0
# Compute absolute angle of bound cell
angle_ub = np.mod(math.pi / 2 + wp.psi + math.pi,
2 * math.pi) - math.pi
angle_lb = np.mod(-math.pi / 2 + wp.psi + math.pi,
2 * math.pi) - math.pi
# Compute cell on bound for computed distance ub and lb
ub_ls = wp.x + ub * np.cos(angle_ub), wp.y + ub * np.sin(
angle_ub)
lb_ls = wp.x - lb * np.cos(angle_lb), wp.y - lb * np.sin(
angle_lb)
bound_cells_sm = (ub_ls, lb_ls)
# Compute cell on bound for computed distance ub and lb
ub_ls = wp.x + (ub + safety_margin) * np.cos(angle_ub), wp.y + (ub + safety_margin) * np.sin(
angle_ub)
lb_ls = wp.x - (lb - safety_margin) * np.cos(angle_lb), wp.y - (lb - safety_margin) * np.sin(
angle_lb)
bound_cells = (ub_ls, lb_ls)
# Append results
ub_hor.append(ub)
lb_hor.append(lb)
border_cells_hor.append(list(bound_cells))
border_cells_hor_sm.append(list(bound_cells_sm))
# Assign dynamic border cells to waypoints
wp.dynamic_border_cells = bound_cells_sm
return np.array(ub_hor), np.array(lb_hor), border_cells_hor_sm
if __name__ == '__main__':
# Create Map
map = Map(file_path='map_race.png', origin=[-1, -2], resolution=0.005)
# Select Track | 'Real_Track' or 'Sim_Track'
path = 'Sim_Track'
# Specify waypoints
# Floor 2
# wp_x = [-9.169, -2.7, 11.9, 7.3, -6.95]
# wp_y = [-15.678, -7.12, 10.9, 14.5, -3.31]
# Race Track
wp_x = [-0.75, -0.25, -0.25, 0.25, 0.25, 1.25, 1.25, 0.75, 0.75, 1.25, 1.25, -0.75, -0.75, -0.25]
wp_y = [-1.5, -1.5, -0.5, -0.5, -1.5, -1.5, -1, -1, -0.5, -0.5, 0, 0, -1.5, -1.5]
# Specify path resolution
path_resolution = 0.05 # m / wp
if path == 'Sim_Track':
# Smooth Path
reference_path = ReferencePath(map, wp_x, wp_y, path_resolution,
smoothing_distance=5)
# Load map file
map = Map(file_path='sim_map.png', origin=[-1, -2], resolution=0.005)
# Specify waypoints
wp_x = [-0.75, -0.25, -0.25, 0.25, 0.25, 1.25, 1.25, 0.75, 0.75, 1.25,
1.25, -0.75, -0.75, -0.25]
wp_y = [-1.5, -1.5, -0.5, -0.5, -1.5, -1.5, -1, -1, -0.5, -0.5, 0, 0,
-1.5, -1.5]
# Specify path resolution
path_resolution = 0.05 # m / wp
# Create reference path
reference_path = ReferencePath(map, wp_x, wp_y, path_resolution,
smoothing_distance=5, max_width=0.15,
circular=True)
# Add obstacles
obs1 = Obstacle(cx=0.0, cy=0.0, radius=0.05)
obs2 = Obstacle(cx=-0.8, cy=-0.5, radius=0.08)
obs3 = Obstacle(cx=-0.7, cy=-1.5, radius=0.05)
obs4 = Obstacle(cx=-0.3, cy=-1.0, radius=0.08)
obs5 = Obstacle(cx=0.3, cy=-1.0, radius=0.05)
obs6 = Obstacle(cx=0.75, cy=-1.5, radius=0.05)
obs7 = Obstacle(cx=0.7, cy=-0.9, radius=0.07)
obs8 = Obstacle(cx=1.2, cy=0.0, radius=0.08)
reference_path.map.add_obstacles([obs1, obs2, obs3, obs4, obs5, obs6, obs7,
obs8])
elif path == 'Real_Track':
# Load map file
map = Map(file_path='real_map.png', origin=(-30.0, -24.0),
resolution=0.06)
# Specify waypoints
wp_x = [-1.62, -6.04, -6.6, -5.36, -2.0, 5.9,
11.9, 7.3, 0.0, -1.62]
wp_y = [3.24, -1.4, -3.0, -5.36, -6.65, 3.5,
10.9, 14.5, 5.2, 3.24]
# Specify path resolution
path_resolution = 0.2 # m / wp
# Create reference path
reference_path = ReferencePath(map, wp_x, wp_y, path_resolution,
smoothing_distance=5, max_width=2.0,
circular=True)
# Add obstacles and bounds to map
cone1 = Obstacle(-5.9, -2.9, 0.2)
cone2 = Obstacle(-2.3, -5.9, 0.2)
cone3 = Obstacle(10.9, 10.7, 0.2)
cone4 = Obstacle(7.4, 13.5, 0.2)
table1 = Obstacle(-0.30, -1.75, 0.2)
table2 = Obstacle(1.55, 1.00, 0.2)
table3 = Obstacle(4.30, 3.22, 0.2)
obstacle_list = [cone1, cone2, cone3, cone4, table1, table2, table3]
map.add_obstacles(obstacle_list)
bound1 = ((-0.02, -2.72), (1.5, 1.0))
bound2 = ((4.43, 3.07), (1.5, 1.0))
bound3 = ((4.43, 3.07), (7.5, 6.93))
bound4 = ((7.28, 13.37), (-3.32, -0.12))
boundary_list = [bound1, bound2, bound3, bound4]
map.add_boundary(boundary_list)
else:
reference_path = None
print('Invalid path!')
exit(1)
ub, lb, border_cells = reference_path.update_path_constraints(0,
reference_path.n_waypoints, 0.1, 0.01)
SpeedProfileConstraints = {'a_min': -0.1, 'a_max': 0.5,
'v_min': 0, 'v_max': 1.0, 'ay_max': 4.0}
reference_path.compute_speed_profile(SpeedProfileConstraints)
# Get x and y locations of border cells for upper and lower bound
for wp_id in range(reference_path.n_waypoints):
reference_path.waypoints[wp_id].dynamic_border_cells = border_cells[wp_id]
reference_path.show()
plt.show()

View File

@ -1,139 +1,162 @@
from map import Map
from map import Map, Obstacle
import numpy as np
from reference_path import ReferencePath
from spatial_bicycle_models import SimpleBicycleModel, ExtendedBicycleModel
from spatial_bicycle_models import BicycleModel
import matplotlib.pyplot as plt
from MPC import MPC
from time import time
from scipy import sparse
if __name__ == '__main__':
# Select Simulation Mode | 'Race' or 'Q'
sim_mode = 'Race'
# Select Model Type | 'Simple' or 'Extended'
model_type = 'Simple'
# Select Simulation Mode | 'Sim_Track' or 'Real_Track'
sim_mode = 'Sim_Track'
# Simulation Environment. Mini-Car on track specifically designed to show-
# case time-optimal driving.
if sim_mode == 'Sim_Track':
# Load map file
map = Map(file_path='sim_map.png', origin=[-1, -2], resolution=0.005)
# Create Map
if sim_mode == 'Race':
map = Map(file_path='map_race.png', origin=[-1, -2], resolution=0.005)
# Specify waypoints
wp_x = [-0.75, -0.25, -0.25, 0.25, 0.25, 1.25, 1.25, 0.75, 0.75, 1.25,
1.25, -0.75, -0.75, -0.25]
wp_y = [-1.5, -1.5, -0.5, -0.5, -1.5, -1.5, -1, -1, -0.5, -0.5, 0, 0,
-1.5, -1.5]
# Specify path resolution
path_resolution = 0.05 # m / wp
elif sim_mode == 'Q':
map = Map(file_path='map_floor2.png')
# Create smoothed reference path
reference_path = ReferencePath(map, wp_x, wp_y, path_resolution,
smoothing_distance=5, max_width=0.23,
circular=True)
# Add obstacles
use_obstacles = False
if use_obstacles:
obs1 = Obstacle(cx=0.0, cy=0.0, radius=0.05)
obs2 = Obstacle(cx=-0.8, cy=-0.5, radius=0.08)
obs3 = Obstacle(cx=-0.7, cy=-1.5, radius=0.05)
obs4 = Obstacle(cx=-0.3, cy=-1.0, radius=0.08)
obs5 = Obstacle(cx=0.27, cy=-1.0, radius=0.05)
obs6 = Obstacle(cx=0.78, cy=-1.47, radius=0.05)
obs7 = Obstacle(cx=0.73, cy=-0.9, radius=0.07)
obs8 = Obstacle(cx=1.2, cy=0.0, radius=0.08)
obs9 = Obstacle(cx=0.67, cy=-0.05, radius=0.06)
map.add_obstacles([obs1, obs2, obs3, obs4, obs5, obs6, obs7,
obs8, obs9])
# Instantiate motion model
car = BicycleModel(length=0.12, width=0.06,
reference_path=reference_path, Ts=0.05)
# Real-World Environment. Track used for testing the algorithm on a 1:12
# RC car.
elif sim_mode == 'Real_Track':
# Load map file
map = Map(file_path='real_map.png', origin=(-30.0, -24.0),
resolution=0.06)
# Specify waypoints
wp_x = [-9.169, 11.9, 7.3, -6.95]
wp_y = [-15.678, 10.9, 14.5, -3.31]
# Specify path resolution
path_resolution = 0.20 # m / wp
# Create smoothed reference path
reference_path = ReferencePath(map, wp_x, wp_y, path_resolution,
smoothing_distance=5, max_width=1.50,
circular=False)
# Add obstacles
add_obstacles = False
if add_obstacles:
obs1 = Obstacle(cx=-6.3, cy=-11.1, radius=0.20)
obs2 = Obstacle(cx=-2.2, cy=-6.8, radius=0.25)
obs4 = Obstacle(cx=2.0, cy=-0.2, radius=0.25)
obs8 = Obstacle(cx=6.0, cy=5.0, radius=0.3)
obs9 = Obstacle(cx=7.42, cy=4.97, radius=0.3)
map.add_obstacles([obs1, obs2, obs4, obs8, obs9])
# Instantiate motion model
car = BicycleModel(length=0.30, width=0.20,
reference_path=reference_path, Ts=0.05)
else:
print('Invalid Simulation Mode!')
map, wp_x, wp_y, path_resolution = None, None, None, None
exit(1)
# Create smoothed reference path
reference_path = ReferencePath(map, wp_x, wp_y, path_resolution,
smoothing_distance=5)
################
# Motion Model #
################
# Initial state
e_y_0 = 0.0
e_psi_0 = 0.0
v_x_0 = 0.1
v_y_0 = 0
omega_0 = 0
t_0 = 0
if model_type == 'Extended':
car = ExtendedBicycleModel(reference_path=reference_path,
e_y=e_y_0, e_psi=e_psi_0, v_x=v_x_0, v_y=v_y_0,
omega=omega_0, t=t_0)
elif model_type == 'Simple':
car = SimpleBicycleModel(reference_path=reference_path,
e_y=e_y_0, e_psi=e_psi_0, v=v_x_0)
else:
car = None
print('Invalid Model Type!')
map, wp_x, wp_y, path_resolution, reference_path, car \
= None, None, None, None, None, None
exit(1)
##############
# Controller #
##############
if model_type == 'Extended':
Q = np.diag([1, 0, 0, 0, 0, 0])
Qf = Q
R = np.diag([0, 0])
Reference = {'e_y': 0, 'e_psi': 0, 'v_x': 1.0, 'v_y': 0, 'omega': 0, 't':0}
elif model_type == 'Simple':
Reference = {'e_y': 0, 'e_psi': 0, 'v': 4.0}
Q = np.diag([0.0005, 0.05, 0.5])
Qf = Q
R = np.diag([0, 0])
else:
Q, Qf, R, Reference = None, None, None, None
print('Invalid Model Type!')
exit(1)
N = 30
Q = sparse.diags([1.0, 0.0, 0.0])
R = sparse.diags([0.5, 0.0])
QN = sparse.diags([1.0, 0.0, 0.0])
T = 5
StateConstraints = {'e_y': (-0.2, 0.2), 'v': (0, 4)}
InputConstraints = {'D': (-1, 1), 'delta': (-0.44, 0.44)}
mpc = MPC(car, T, Q, R, Qf, StateConstraints, InputConstraints, Reference)
v_max = 1.0 # m/s
delta_max = 0.66 # rad
ay_max = 4.0 # m/s^2
InputConstraints = {'umin': np.array([0.0, -np.tan(delta_max)/car.length]),
'umax': np.array([v_max, np.tan(delta_max)/car.length])}
StateConstraints = {'xmin': np.array([-np.inf, -np.inf, -np.inf]),
'xmax': np.array([np.inf, np.inf, np.inf])}
mpc = MPC(car, N, Q, R, QN, StateConstraints, InputConstraints, ay_max)
# Compute speed profile
a_min = -0.1 # m/s^2
a_max = 0.5 # m/s^2
SpeedProfileConstraints = {'a_min': a_min, 'a_max': a_max,
'v_min': 0.0, 'v_max': v_max, 'ay_max': ay_max}
car.reference_path.compute_speed_profile(SpeedProfileConstraints)
##############
# Simulation #
##############
# logging containers
# Set simulation time to zero
t = 0.0
# Logging containers
x_log = [car.temporal_state.x]
y_log = [car.temporal_state.y]
psi_log = [car.temporal_state.psi]
v_log = [car.temporal_state.v_x]
D_log = []
delta_log = []
v_log = [0.0]
# iterate over waypoints
for wp_id in range(len(car.reference_path.waypoints)-T-1):
# Until arrival at end of path
while car.s < reference_path.length:
# get control signals
D, delta = mpc.get_control()
u = mpc.get_control()
# drive car
car.drive(D, delta)
car.drive(u)
# log current state
# log
x_log.append(car.temporal_state.x)
y_log.append(car.temporal_state.y)
v_log.append(car.temporal_state.v_x)
D_log.append(D)
delta_log.append(delta)
v_log.append(u[0])
###################
# Plot Simulation #
###################
# plot path
car.reference_path.show()
# Increase simulation time
t += car.Ts
# plot car trajectory and velocity
plt.scatter(x_log, y_log, c='g', s=15)
# Plot path and drivable area
reference_path.show()
# plot mpc prediction
if mpc.current_prediction is not None:
x_pred = mpc.current_prediction[0]
y_pred = mpc.current_prediction[1]
plt.scatter(x_pred, y_pred, c='b', s=10)
# Plot car
car.show()
plt.title('MPC Simulation: Position: {:.2f} m, {:.2f} m, Velocity: '
'{:.2f} m/s'.format(car.temporal_state.x,
car.temporal_state.y, car.temporal_state.v_x))
plt.xticks([])
plt.yticks([])
plt.pause(0.0000001)
# Plot MPC prediction
mpc.show_prediction()
plt.close()
# Set figure title
plt.title('MPC Simulation: v(t): {:.2f}, delta(t): {:.2f}, Duration: '
'{:.2f} s'.format(u[0], u[1], t))
plt.axis('off')
plt.pause(0.001)

File diff suppressed because it is too large Load Diff