moved pybullet demo to latest MPC version, now runs ~10Hz
parent
1d6c686eaf
commit
1bf971c096
mpc_pybullet_demo
|
@ -3,17 +3,17 @@ np.seterr(divide='ignore', invalid='ignore')
|
|||
|
||||
from scipy.integrate import odeint
|
||||
from scipy.interpolate import interp1d
|
||||
import cvxpy as cp
|
||||
import cvxpy as opt
|
||||
|
||||
from utils import *
|
||||
|
||||
from mpc_config import Params
|
||||
P=Params()
|
||||
|
||||
def get_linear_model(x_bar,u_bar):
|
||||
def get_linear_model_matrices(x_bar,u_bar):
|
||||
"""
|
||||
Computes the LTI approximated state space model x' = Ax + Bu + C
|
||||
"""
|
||||
L=0.3
|
||||
|
||||
x = x_bar[0]
|
||||
y = x_bar[1]
|
||||
|
@ -23,104 +23,95 @@ def get_linear_model(x_bar,u_bar):
|
|||
a = u_bar[0]
|
||||
delta = u_bar[1]
|
||||
|
||||
ct = np.cos(theta)
|
||||
st = np.sin(theta)
|
||||
cd = np.cos(delta)
|
||||
td = np.tan(delta)
|
||||
|
||||
A = np.zeros((P.N,P.N))
|
||||
A[0,2]=np.cos(theta)
|
||||
A[0,3]=-v*np.sin(theta)
|
||||
A[1,2]=np.sin(theta)
|
||||
A[1,3]=v*np.cos(theta)
|
||||
A[3,2]=v*np.tan(delta)/L
|
||||
A_lin=np.eye(P.N)+P.dt*A
|
||||
A[0,2] = ct
|
||||
A[0,3] = -v*st
|
||||
A[1,2] = st
|
||||
A[1,3] = v*ct
|
||||
A[3,2] = v*td/P.L
|
||||
A_lin = np.eye(P.N)+P.dt*A
|
||||
|
||||
B = np.zeros((P.N,P.M))
|
||||
B[2,0]=1
|
||||
B[3,1]=v/(L*np.cos(delta)**2)
|
||||
B[3,1]=v/(P.L*cd**2)
|
||||
B_lin=P.dt*B
|
||||
|
||||
f_xu=np.array([v*np.cos(theta), v*np.sin(theta), a,v*np.tan(delta)/L]).reshape(P.N,1)
|
||||
C_lin = P.dt*(f_xu - np.dot(A,x_bar.reshape(P.N,1)) - np.dot(B,u_bar.reshape(P.M,1)))
|
||||
f_xu=np.array([v*ct, v*st, a, v*td/P.L]).reshape(P.N,1)
|
||||
C_lin = P.dt*(f_xu - np.dot(A, x_bar.reshape(P.N,1)) - np.dot(B, u_bar.reshape(P.M,1))).flatten()
|
||||
|
||||
return np.round(A_lin,4), np.round(B_lin,4), np.round(C_lin,4)
|
||||
#return np.round(A_lin,6), np.round(B_lin,6), np.round(C_lin,6)
|
||||
return A_lin, B_lin, C_lin
|
||||
|
||||
class MPC():
|
||||
|
||||
def optimize(state,u_bar,track,ref_vel=1.):
|
||||
'''
|
||||
:param state:
|
||||
:param u_bar:
|
||||
:param track:
|
||||
:returns:
|
||||
'''
|
||||
def __init__(self, N, M, Q, R):
|
||||
"""
|
||||
"""
|
||||
self.state_len = N
|
||||
self.action_len = M
|
||||
self.state_cost = Q
|
||||
self.action_cost = R
|
||||
|
||||
MAX_SPEED = 1.5 #m/s
|
||||
MAX_ACC = 1.0 #m/ss
|
||||
MAX_D_ACC = 1.0 #m/sss
|
||||
MAX_STEER = np.radians(30) #rad
|
||||
MAX_D_STEER = np.radians(30) #rad/s
|
||||
def optimize_linearized_model(self, A, B, C, initial_state, target, time_horizon=10, Q=None, R=None, verbose=False):
|
||||
"""
|
||||
Optimisation problem defined for the linearised model,
|
||||
:param A:
|
||||
:param B:
|
||||
:param C:
|
||||
:param initial_state:
|
||||
:param Q:
|
||||
:param R:
|
||||
:param target:
|
||||
:param time_horizon:
|
||||
:param verbose:
|
||||
:return:
|
||||
"""
|
||||
|
||||
REF_VEL = ref_vel #m/s
|
||||
assert len(initial_state) == self.state_len
|
||||
|
||||
# dynamics starting state
|
||||
x_bar = np.zeros((P.N,P.T+1))
|
||||
x_bar[:,0] = state
|
||||
if (Q == None or R==None):
|
||||
Q = self.state_cost
|
||||
R = self.action_cost
|
||||
|
||||
#prediction for linearization of costrains
|
||||
for t in range (1,P.T+1):
|
||||
xt=x_bar[:,t-1].reshape(P.N,1)
|
||||
ut=u_bar[:,t-1].reshape(P.M,1)
|
||||
A,B,C=get_linear_model(xt,ut)
|
||||
xt_plus_one = np.squeeze(np.dot(A,xt)+np.dot(B,ut)+C)
|
||||
x_bar[:,t]= xt_plus_one
|
||||
# Create variables
|
||||
x = opt.Variable((self.state_len, time_horizon + 1), name='states')
|
||||
u = opt.Variable((self.action_len, time_horizon), name='actions')
|
||||
|
||||
#CVXPY Linear MPC problem statement
|
||||
cost = 0
|
||||
constr = []
|
||||
x = cp.Variable((P.N, P.T+1))
|
||||
u = cp.Variable((P.M, P.T))
|
||||
# Loop through the entire time_horizon and append costs
|
||||
cost_function = []
|
||||
|
||||
# Cost Matrices
|
||||
Q = np.diag([20,20,10,0]) #state error cost
|
||||
Qf = np.diag([10,10,10,10]) #state final error cost
|
||||
R = np.diag([10,10]) #input cost
|
||||
R_ = np.diag([10,10]) #input rate of change cost
|
||||
for t in range(time_horizon):
|
||||
|
||||
#Get Reference_traj
|
||||
x_ref, d_ref = get_ref_trajectory(x_bar[:,0] ,track, REF_VEL)
|
||||
_cost = opt.quad_form(target[:, t + 1] - x[:, t + 1], Q) +\
|
||||
opt.quad_form(u[:, t], R)
|
||||
|
||||
#Prediction Horizon
|
||||
for t in range(P.T):
|
||||
|
||||
# Tracking Error
|
||||
cost += cp.quad_form(x[:,t] - x_ref[:,t], Q)
|
||||
|
||||
# Actuation effort
|
||||
cost += cp.quad_form(u[:,t], R)
|
||||
_constraints = [x[:, t + 1] == A @ x[:, t] + B @ u[:, t] + C,
|
||||
u[0, t] >= -P.MAX_ACC, u[0, t] <= P.MAX_ACC,
|
||||
u[1, t] >= -P.MAX_STEER, u[1, t] <= P.MAX_STEER]
|
||||
#opt.norm(target[:, t + 1] - x[:, t + 1], 1) <= 0.1]
|
||||
|
||||
# Actuation rate of change
|
||||
if t < (P.T - 1):
|
||||
cost += cp.quad_form(u[:,t+1] - u[:,t], R_)
|
||||
constr+= [cp.abs(u[0, t + 1] - u[0, t])/P.dt <= MAX_D_ACC] #max acc rate of change
|
||||
constr += [cp.abs(u[1, t + 1] - u[1, t])/P.dt <= MAX_D_STEER] #max steer rate of change
|
||||
if t < (time_horizon - 1):
|
||||
_cost += opt.quad_form(u[:,t + 1] - u[:,t], R * 1)
|
||||
_constraints += [opt.abs(u[0, t + 1] - u[0, t])/P.dt <= P.MAX_D_ACC]
|
||||
_constraints += [opt.abs(u[1, t + 1] - u[1, t])/P.dt <= P.MAX_D_STEER]
|
||||
|
||||
# Kinrmatics Constrains (Linearized model)
|
||||
A,B,C = get_linear_model(x_bar[:,t], u_bar[:,t])
|
||||
constr += [x[:,t+1] == A@x[:,t] + B@u[:,t] + C.flatten()]
|
||||
|
||||
# sums problem objectives and concatenates constraints.
|
||||
constr += [x[:,0] == x_bar[:,0]] #starting condition
|
||||
constr += [x[2,:] <= MAX_SPEED] #max speed
|
||||
constr += [x[2,:] >= 0.0] #min_speed (not really needed)
|
||||
constr += [cp.abs(u[0,:]) <= MAX_ACC] #max acc
|
||||
constr += [cp.abs(u[1,:]) <= MAX_STEER] #max steer
|
||||
if t == 0:
|
||||
#_constraints += [opt.norm(target[:, time_horizon] - x[:, time_horizon], 1) <= 0.01,
|
||||
# x[:, 0] == initial_state]
|
||||
_constraints += [x[:, 0] == initial_state]
|
||||
|
||||
# Solve
|
||||
prob = cp.Problem(cp.Minimize(cost), constr)
|
||||
prob.solve(solver=cp.OSQP, verbose=False)
|
||||
cost_function.append(opt.Problem(opt.Minimize(_cost), constraints=_constraints))
|
||||
|
||||
if "optimal" not in prob.status:
|
||||
print("WARN: No optimal solution")
|
||||
return u_bar
|
||||
# Add final cost
|
||||
problem = sum(cost_function)
|
||||
|
||||
#retrieved optimized U and assign to u_bar to linearize in next step
|
||||
u_opt=np.vstack((np.array(u.value[0, :]).flatten(),
|
||||
(np.array(u.value[1, :]).flatten())))
|
||||
|
||||
return u_opt
|
||||
# Minimize Problem
|
||||
problem.solve(verbose=verbose, solver=opt.OSQP)
|
||||
return x, u
|
||||
|
|
|
@ -1,7 +1,15 @@
|
|||
import numpy as np
|
||||
|
||||
class Params():
|
||||
def __init__(self):
|
||||
self.N = 4 #number of state variables
|
||||
self.M = 2 #number of control variables
|
||||
self.T = 10 #Prediction Horizon
|
||||
self.dt = 0.25 #discretization step
|
||||
self.dt = 0.2 #discretization step
|
||||
self.path_tick = 0.05
|
||||
self.L = 0.3 #vehicle wheelbase
|
||||
self.MAX_SPEED = 1.5 #m/s
|
||||
self.MAX_ACC = 1.0 #m/ss
|
||||
self.MAX_D_ACC = 1.0 #m/sss
|
||||
self.MAX_STEER = np.radians(30) #rad
|
||||
self.MAX_D_STEER = np.radians(30) #rad/s
|
||||
|
|
|
@ -3,7 +3,7 @@ import matplotlib.pyplot as plt
|
|||
from matplotlib import animation
|
||||
|
||||
from utils import compute_path_from_wp
|
||||
from cvxpy_mpc import optimize
|
||||
from cvxpy_mpc import *
|
||||
|
||||
from mpc_config import Params
|
||||
P=Params()
|
||||
|
@ -20,7 +20,10 @@ def get_state(robotId):
|
|||
robPos, robOrn = p.getBasePositionAndOrientation(robotId)
|
||||
linVel,angVel = p.getBaseVelocity(robotId)
|
||||
|
||||
return[robPos[0], robPos[1], np.sqrt(linVel[0]**2 + linVel[1]**2), p.getEulerFromQuaternion(robOrn)[2]]
|
||||
return np.array([robPos[0],
|
||||
robPos[1],
|
||||
np.sqrt(linVel[0]**2 + linVel[1]**2),
|
||||
p.getEulerFromQuaternion(robOrn)[2]])
|
||||
|
||||
def set_ctrl(robotId,currVel,acceleration,steeringAngle):
|
||||
|
||||
|
@ -34,10 +37,17 @@ def set_ctrl(robotId,currVel,acceleration,steeringAngle):
|
|||
#print(targetVelocity)
|
||||
|
||||
for wheel in wheels:
|
||||
p.setJointMotorControl2(robotId,wheel,p.VELOCITY_CONTROL,targetVelocity=targetVelocity/gearRatio,force=maxForce)
|
||||
p.setJointMotorControl2(robotId,
|
||||
wheel,
|
||||
p.VELOCITY_CONTROL,
|
||||
targetVelocity=targetVelocity/gearRatio,
|
||||
force=maxForce)
|
||||
|
||||
for steer in steering:
|
||||
p.setJointMotorControl2(robotId,steer,p.POSITION_CONTROL,targetPosition=steeringAngle)
|
||||
p.setJointMotorControl2(robotId,
|
||||
steer,
|
||||
p.POSITION_CONTROL,
|
||||
targetPosition=steeringAngle)
|
||||
|
||||
def plot_results(path,x_history,y_history):
|
||||
"""
|
||||
|
@ -56,7 +66,10 @@ def run_sim():
|
|||
"""
|
||||
"""
|
||||
p.connect(p.GUI)
|
||||
p.resetDebugVisualizerCamera(cameraDistance=1.0, cameraYaw=-90, cameraPitch=-45, cameraTargetPosition=[-0.1,-0.0,0.65])
|
||||
p.resetDebugVisualizerCamera(cameraDistance=1.0,
|
||||
cameraYaw=-90,
|
||||
cameraPitch=-45,
|
||||
cameraTargetPosition=[-0.1,-0.0,0.65])
|
||||
|
||||
p.resetSimulation()
|
||||
|
||||
|
@ -71,7 +84,7 @@ def run_sim():
|
|||
|
||||
car = p.loadURDF("racecar/f10_racecar/racecar_differential.urdf", [0,0.3,.3])
|
||||
for wheel in range(p.getNumJoints(car)):
|
||||
print("joint[",wheel,"]=", p.getJointInfo(car,wheel))
|
||||
#print("joint[",wheel,"]=", p.getJointInfo(car,wheel))
|
||||
p.setJointMotorControl2(car,wheel,p.VELOCITY_CONTROL,targetVelocity=0,force=0)
|
||||
p.getJointInfo(car,wheel)
|
||||
|
||||
|
@ -99,10 +112,6 @@ def run_sim():
|
|||
c = p.createConstraint(car,3,car,19,jointType=p.JOINT_GEAR,jointAxis =[0,1,0],parentFramePosition=[0,0,0],childFramePosition=[0,0,0])
|
||||
p.changeConstraint(c,gearRatio=-1, gearAuxLink = 15,maxForce=10000)
|
||||
|
||||
opt_u = np.zeros((P.M,P.T))
|
||||
opt_u[0,:] = 1 #m/ss
|
||||
opt_u[1,:] = np.radians(0) #rad/
|
||||
|
||||
# Interpolated Path to follow given waypoints
|
||||
path = compute_path_from_wp([0,3,4,6,10,11,12,6,1,0],
|
||||
[0,0,2,4,3,3,-1,-6,-2,-2],P.path_tick)
|
||||
|
@ -110,27 +119,34 @@ def run_sim():
|
|||
for x_,y_ in zip(path[0,:],path[1,:]):
|
||||
p.addUserDebugLine([x_,y_,0],[x_,y_,0.33],[0,0,1])
|
||||
|
||||
#starting guess
|
||||
action = np.zeros(P.M)
|
||||
action[0] = P.MAX_ACC/2 #a
|
||||
action[1] = 0.0 #delta
|
||||
|
||||
# Cost Matrices
|
||||
Q = np.diag([20,20,10,20]) #state error cost
|
||||
Qf = np.diag([30,30,30,30]) #state final error cost
|
||||
R = np.diag([10,10]) #input cost
|
||||
R_ = np.diag([10,10]) #input rate of change cost
|
||||
|
||||
mpc = MPC(P.N,P.M,Q,R)
|
||||
x_history=[]
|
||||
y_history=[]
|
||||
|
||||
time.sleep(0.5)
|
||||
input("Press Enter to continue...")
|
||||
|
||||
while (1):
|
||||
|
||||
state = get_state(car)
|
||||
x_history.append(state[0])
|
||||
y_history.append(state[1])
|
||||
|
||||
#add 1 timestep delay to input
|
||||
state[0]=state[0]+state[2]*np.cos(state[3])*P.dt
|
||||
state[1]=state[1]+state[2]*np.sin(state[3])*P.dt
|
||||
state[2]=state[2]+opt_u[0,0]*P.dt
|
||||
state[3]=state[3]+opt_u[0,0]*np.tan(opt_u[1,0])/0.3*P.dt
|
||||
|
||||
#track path in bullet
|
||||
p.addUserDebugLine([state[0],state[1],0],[state[0],state[1],0.5],[1,0,0])
|
||||
|
||||
if np.sqrt((state[0]-path[0,-1])**2+(state[1]-path[1,-1])**2)<0.1:
|
||||
if np.sqrt((state[0]-path[0,-1])**2+(state[1]-path[1,-1])**2)<0.2:
|
||||
print("Success! Goal Reached")
|
||||
set_ctrl(car,0,0,0)
|
||||
plot_results(path,x_history,y_history)
|
||||
|
@ -138,13 +154,38 @@ def run_sim():
|
|||
p.disconnect()
|
||||
return
|
||||
|
||||
#for MPC car ref frame is used
|
||||
state[0:2] = 0.0
|
||||
state[3] = 0.0
|
||||
|
||||
#add 1 timestep delay to input
|
||||
state[0]=state[0]+state[2]*np.cos(state[3])*P.dt
|
||||
state[1]=state[1]+state[2]*np.sin(state[3])*P.dt
|
||||
state[2]=state[2]+action[0]*P.dt
|
||||
state[3]=state[3]+action[0]*np.tan(action[1])/P.L*P.dt
|
||||
|
||||
|
||||
#optimization loop
|
||||
start=time.time()
|
||||
opt_u = optimize(state,opt_u,path,ref_vel=1.0)
|
||||
|
||||
# State Matrices
|
||||
A,B,C = get_linear_model_matrices(state, action)
|
||||
|
||||
#Get Reference_traj -> inputs are in worldframe
|
||||
target, _ = get_ref_trajectory(get_state(car),
|
||||
path, 1.0)
|
||||
|
||||
x_mpc, u_mpc = mpc.optimize_linearized_model(A, B, C, state, target, time_horizon=P.T, verbose=False)
|
||||
|
||||
#action = np.vstack((np.array(u_mpc.value[0,:]).flatten(),
|
||||
# (np.array(u_mpc.value[1,:]).flatten())))
|
||||
|
||||
action[:] = [u_mpc.value[0,1],u_mpc.value[1,1]]
|
||||
|
||||
elapsed=time.time()-start
|
||||
print("CVXPY Optimization Time: {:.4f}s".format(elapsed))
|
||||
|
||||
set_ctrl(car,state[2],opt_u[0,1],opt_u[1,1])
|
||||
set_ctrl(car,state[2],action[0],action[1])
|
||||
|
||||
if P.dt-elapsed>0:
|
||||
time.sleep(P.dt-elapsed)
|
||||
|
|
|
@ -21,8 +21,9 @@ def compute_path_from_wp(start_xp, start_yp, step = 0.1):
|
|||
fx=interp1d(np.linspace(0,1,2),start_xp[idx:idx+2],kind=1)
|
||||
fy=interp1d(np.linspace(0,1,2),start_yp[idx:idx+2],kind=1)
|
||||
|
||||
final_xp=np.append(final_xp,fx(interp_range))
|
||||
final_yp=np.append(final_yp,fy(interp_range))
|
||||
# watch out to duplicate points!
|
||||
final_xp=np.append(final_xp,fx(interp_range)[1:])
|
||||
final_yp=np.append(final_yp,fy(interp_range)[1:])
|
||||
|
||||
dx = np.append(0, np.diff(final_xp))
|
||||
dy = np.append(0, np.diff(final_yp))
|
||||
|
@ -59,42 +60,65 @@ def get_nn_idx(state,path):
|
|||
|
||||
return target_idx
|
||||
|
||||
def normalize_angle(angle):
|
||||
"""
|
||||
Normalize an angle to [-pi, pi]
|
||||
"""
|
||||
while angle > np.pi:
|
||||
angle -= 2.0 * np.pi
|
||||
|
||||
while angle < -np.pi:
|
||||
angle += 2.0 * np.pi
|
||||
|
||||
return angle
|
||||
|
||||
def get_ref_trajectory(state, path, target_v):
|
||||
"""
|
||||
For each step in the time horizon
|
||||
modified reference in robot frame
|
||||
"""
|
||||
xref = np.zeros((P.N, P.T + 1))
|
||||
dref = np.zeros((1, P.T + 1))
|
||||
xref = np.zeros((P.N, P.T+1))
|
||||
dref = np.zeros((1, P.T+1))
|
||||
|
||||
#sp = np.ones((1,T +1))*target_v #speed profile
|
||||
|
||||
ncourse = path.shape[1]
|
||||
|
||||
ind = get_nn_idx(state, path)
|
||||
dx=path[0,ind] - state[0]
|
||||
dy=path[1,ind] - state[1]
|
||||
|
||||
xref[0, 0] = path[0,ind] #X
|
||||
xref[1, 0] = path[1,ind] #Y
|
||||
xref[2, 0] = target_v #sp[ind] #V
|
||||
xref[3, 0] = path[2,ind] #Theta
|
||||
|
||||
xref[0, 0] = dx * np.cos(-state[3]) - dy * np.sin(-state[3]) #X
|
||||
xref[1, 0] = dy * np.cos(-state[3]) + dx * np.sin(-state[3]) #Y
|
||||
xref[2, 0] = target_v #V
|
||||
xref[3, 0] = normalize_angle(path[2,ind]- state[3]) #Theta
|
||||
dref[0, 0] = 0.0 # steer operational point should be 0
|
||||
|
||||
dl = P.path_tick # Waypoints spacing [m]
|
||||
dl = 0.05 # Waypoints spacing [m]
|
||||
travel = 0.0
|
||||
|
||||
for i in range(P.T + 1):
|
||||
for i in range(1, P.T+1):
|
||||
travel += abs(target_v) * P.dt #current V or target V?
|
||||
dind = int(round(travel / dl))
|
||||
|
||||
if (ind + dind) < ncourse:
|
||||
xref[0, i] = path[0,ind + dind]
|
||||
xref[1, i] = path[1,ind + dind]
|
||||
dx=path[0,ind + dind] - state[0]
|
||||
dy=path[1,ind + dind] - state[1]
|
||||
|
||||
xref[0, i] = dx * np.cos(-state[3]) - dy * np.sin(-state[3])
|
||||
xref[1, i] = dy * np.cos(-state[3]) + dx * np.sin(-state[3])
|
||||
xref[2, i] = target_v #sp[ind + dind]
|
||||
xref[3, i] = path[2,ind + dind]
|
||||
xref[3, i] = normalize_angle(path[2,ind + dind] - state[3])
|
||||
dref[0, i] = 0.0
|
||||
else:
|
||||
xref[0, i] = path[0,ncourse - 1]
|
||||
xref[1, i] = path[1,ncourse - 1]
|
||||
dx=path[0,ncourse - 1] - state[0]
|
||||
dy=path[1,ncourse - 1] - state[1]
|
||||
|
||||
xref[0, i] = dx * np.cos(-state[3]) - dy * np.sin(-state[3])
|
||||
xref[1, i] = dy * np.cos(-state[3]) + dx * np.sin(-state[3])
|
||||
xref[2, i] = 0.0 #stop? #sp[ncourse - 1]
|
||||
xref[3, i] = path[2,ncourse - 1]
|
||||
xref[3, i] = normalize_angle(path[2,ncourse - 1] - state[3])
|
||||
dref[0, i] = 0.0
|
||||
|
||||
return xref, dref
|
Loading…
Reference in New Issue