# Simplified MPC
Checking the implementation from [Reuben Ferrante](
Here the system dynamics matrices are evaluated only once given the current state, input -> So no more need to keep track of **x_bar** and **u_bar**.
"This should give less precise results but the computation time should be gratly reduced and the overall code is slimmer, worth checking out!"
"import numpy as np\n",
"import cvxpy as opt\n",
"import time\n",
"from scipy.integrate import odeint\n",
"from scipy.interpolate import interp1d\n",
"import matplotlib.pyplot as plt\n",
"N = 4 #number of state variables\n",
"M = 2 #number of control variables\n",
"T = 10 #Prediction Horizon\n",
"DT = 0.2 #discretization step\n",
"L = 0.3 #vehicle wheelbase\n",
"MAX_SPEED = 1.5 #m/s\n",
"MAX_ACC = 1.0 #m/ss\n",
"MAX_D_ACC = 1.0 #m/sss\n",
"MAX_STEER = np.radians(30) #rad\n",
"MAX_D_STEER = np.radians(30) #rad/s\n",
"def get_linear_model_matrices(x_bar,u_bar):\n",
" \"\"\"\n",
" Computes the LTI approximated state space model x' = Ax + Bu + C\n",
" \"\"\"\n",
" \n",
" x = x_bar[0]\n",
" y = x_bar[1]\n",
" v = x_bar[2]\n",
" theta = x_bar[3]\n",
" \n",
" a = u_bar[0]\n",
" delta = u_bar[1]\n",
" \n",
" ct = np.cos(theta)\n",
" st = np.sin(theta)\n",
" cd = np.cos(delta)\n",
" td = np.tan(delta)\n",
" \n",
" A = np.zeros((N,N))\n",
" A[0,2] = ct\n",
" A[0,3] = -v*st\n",
" A[1,2] = st\n",
" A[1,3] = v*ct\n",
" A[3,2] = v*td/L\n",
" A_lin = np.eye(N)+DT*A\n",
" \n",
" B = np.zeros((N,M))\n",
" B[2,0]=1\n",
" B[3,1]=v/(L*cd**2)\n",
" B_lin=DT*B\n",
" \n",
" f_xu=np.array([v*ct, v*st, a, v*td/L]).reshape(N,1)\n",
" C_lin = DT*(f_xu -, x_bar.reshape(N,1)) -, u_bar.reshape(M,1)))#.flatten()\n",
" \n",
" #return np.round(A_lin,6), np.round(B_lin,6), np.round(C_lin,6)\n",
" return A_lin, B_lin, C_lin\n",
"class MPC():\n",
" \n",
" def __init__(self, N, M, Q, R):\n",
" \"\"\"\n",
" \"\"\"\n",
" self.state_len = N\n",
" self.action_len = M\n",
" self.state_cost = Q\n",
" self.action_cost = R\n",
" \n",
" def optimize_linearized_model(self, A, B, C, initial_state, target, time_horizon=10, Q=None, R=None, verbose=False):\n",
" \"\"\"\n",
" Optimisation problem defined for the linearised model,\n",
" :param A: \n",
" :param B:\n",
" :param C: \n",
" :param initial_state:\n",
" :param Q:\n",
" :param R:\n",
" :param target:\n",
" :param time_horizon:\n",
" :param verbose:\n",
" :return:\n",
" \"\"\"\n",
" \n",
" assert len(initial_state) == self.state_len\n",
" \n",
" if (Q == None or R==None):\n",
" Q = self.state_cost\n",
" R = self.action_cost\n",
" \n",
" # Create variables\n",
" x = opt.Variable((self.state_len, time_horizon + 1), name='states')\n",
" u = opt.Variable((self.action_len, time_horizon), name='actions')\n",
" # Loop through the entire time_horizon and append costs\n",
" cost_function = []\n",
" for t in range(time_horizon):\n",
" _cost = opt.quad_form(target[:, t + 1] - x[:, t + 1], Q) +\\\n",
2021-04-19 23:18:08 +08:00
" opt.quad_form(u[:, t], R)\n",
2021-04-15 21:46:12 +08:00
" \n",
" _constraints = [x[:, t + 1] == A @ x[:, t] + B @ u[:, t] + C,\n",
" u[0, t] >= -MAX_ACC, u[0, t] <= MAX_ACC,\n",
" u[1, t] >= -MAX_STEER, u[1, t] <= MAX_STEER]\n",
" #opt.norm(target[:, t + 1] - x[:, t + 1], 1) <= 0.1]\n",
" \n",
" # Actuation rate of change\n",
" if t < (time_horizon - 1):\n",
2021-04-19 23:18:08 +08:00
" _cost += opt.quad_form(u[:,t + 1] - u[:,t], R * 1)\n",
2021-04-15 21:46:12 +08:00
" _constraints += [opt.abs(u[0, t + 1] - u[0, t])/DT <= MAX_D_ACC]\n",
2021-04-19 23:18:08 +08:00
" _constraints += [opt.abs(u[1, t + 1] - u[1, t])/DT <= MAX_D_STEER]\n",
" \n",
" \n",
2021-04-15 21:46:12 +08:00
" if t == 0:\n",
" #_constraints += [opt.norm(target[:, time_horizon] - x[:, time_horizon], 1) <= 0.01,\n",
" # x[:, 0] == initial_state]\n",
" _constraints += [x[:, 0] == initial_state]\n",
" \n",
" cost_function.append(opt.Problem(opt.Minimize(_cost), constraints=_constraints))\n",
" # Add final cost\n",
" problem = sum(cost_function)\n",
" \n",
" # Minimize Problem\n",
" problem.solve(verbose=verbose, solver=opt.OSQP)\n",
" return x, u"
"the ODE is used to update the simulation given the mpc results\n",
"I use this insted of using the LTI twice\n",
"def kinematics_model(x,t,u):\n",
" \"\"\"\n",
" Returns the set of ODE of the vehicle model.\n",
" \"\"\"\n",
" \n",
" L=0.3 #vehicle wheelbase\n",
" dxdt = x[2]*np.cos(x[3])\n",
" dydt = x[2]*np.sin(x[3])\n",
" dvdt = u[0]\n",
" dthetadt = x[2]*np.tan(u[1])/L\n",
" dqdt = [dxdt,\n",
" dydt,\n",
" dvdt,\n",
" dthetadt]\n",
" return dqdt\n",
"def predict(x0,u):\n",
" \"\"\"\n",
" \"\"\"\n",
" \n",
" x_ = np.zeros((N,T+1))\n",
" \n",
" x_[:,0] = x0\n",
" \n",
" # solve ODE\n",
" for t in range(1,T+1):\n",
" tspan = [0,DT]\n",
" x_next = odeint(kinematics_model,\n",
" x0,\n",
" tspan,\n",
" args=(u[:,t-1],))\n",
" x0 = x_next[1]\n",
" x_[:,t]=x_next[1]\n",
" \n",
" return x_\n",
"def compute_path_from_wp(start_xp, start_yp, step = 0.1):\n",
" \"\"\"\n",
" Computes a reference path given a set of waypoints\n",
" \"\"\"\n",
" \n",
" final_xp=[]\n",
" final_yp=[]\n",
" delta = step #[m]\n",
" for idx in range(len(start_xp)-1):\n",
" section_len = np.sum(np.sqrt(np.power(np.diff(start_xp[idx:idx+2]),2)+np.power(np.diff(start_yp[idx:idx+2]),2)))\n",
" interp_range = np.linspace(0,1,np.floor(section_len/delta).astype(int))\n",
" \n",
" fx=interp1d(np.linspace(0,1,2),start_xp[idx:idx+2],kind=1)\n",
" fy=interp1d(np.linspace(0,1,2),start_yp[idx:idx+2],kind=1)\n",
" \n",
" # watch out to duplicate points!\n",
" final_xp=np.append(final_xp,fx(interp_range)[1:])\n",
" final_yp=np.append(final_yp,fy(interp_range)[1:])\n",
" \n",
" dx = np.append(0, np.diff(final_xp))\n",
" dy = np.append(0, np.diff(final_yp))\n",
" theta = np.arctan2(dy, dx)\n",
" return np.vstack((final_xp,final_yp,theta))\n",
"def get_nn_idx(state,path):\n",
" \"\"\"\n",
" Computes the index of the waypoint closest to vehicle\n",
" \"\"\"\n",
" dx = state[0]-path[0,:]\n",
" dy = state[1]-path[1,:]\n",
" dist = np.hypot(dx,dy)\n",
" nn_idx = np.argmin(dist)\n",
" try:\n",
" v = [path[0,nn_idx+1] - path[0,nn_idx],\n",
" path[1,nn_idx+1] - path[1,nn_idx]] \n",
" v /= np.linalg.norm(v)\n",
" d = [path[0,nn_idx] - state[0],\n",
" path[1,nn_idx] - state[1]]\n",
" if,v) > 0:\n",
" target_idx = nn_idx\n",
" else:\n",
" target_idx = nn_idx+1\n",
" except IndexError as e:\n",
" target_idx = nn_idx\n",
" return target_idx\n",
"def normalize_angle(angle):\n",
" \"\"\"\n",
" Normalize an angle to [-pi, pi]\n",
" \"\"\"\n",
" while angle > np.pi:\n",
" angle -= 2.0 * np.pi\n",
" while angle < -np.pi:\n",
" angle += 2.0 * np.pi\n",
" return angle\n",
"def get_ref_trajectory(state, path, target_v):\n",
" \"\"\"\n",
" For each step in the time horizon\n",
" modified reference in robot frame\n",
" \"\"\"\n",
" xref = np.zeros((N, T+1))\n",
2021-04-15 21:46:12 +08:00
" \n",
" #sp = np.ones((1,T +1))*target_v #speed profile\n",
" \n",
" ncourse = path.shape[1]\n",
" ind = get_nn_idx(state, path)\n",
" dx=path[0,ind] - state[0]\n",
" dy=path[1,ind] - state[1]\n",
" \n",
" \n",
" xref[0, 0] = dx * np.cos(-state[3]) - dy * np.sin(-state[3]) #X\n",
" xref[1, 0] = dy * np.cos(-state[3]) + dx * np.sin(-state[3]) #Y\n",
" xref[2, 0] = target_v #V\n",
" xref[3, 0] = normalize_angle(path[2,ind]- state[3]) #Theta\n",
" dref[0, 0] = 0.0 # steer operational point should be 0\n",
" \n",
" dl = 0.05 # Waypoints spacing [m]\n",
" travel = 0.0\n",
" \n",
" for i in range(1, T+1):\n",
" travel += abs(target_v) * DT #current V or target V?\n",
" dind = int(round(travel / dl))\n",
" \n",
" if (ind + dind) < ncourse:\n",
" dx=path[0,ind + dind] - state[0]\n",
" dy=path[1,ind + dind] - state[1]\n",
" \n",
" xref[0, i] = dx * np.cos(-state[3]) - dy * np.sin(-state[3])\n",
" xref[1, i] = dy * np.cos(-state[3]) + dx * np.sin(-state[3])\n",
" xref[2, i] = target_v #sp[ind + dind]\n",
" xref[3, i] = normalize_angle(path[2,ind + dind] - state[3])\n",
" dref[0, i] = 0.0\n",
" else:\n",
" dx=path[0,ncourse - 1] - state[0]\n",
" dy=path[1,ncourse - 1] - state[1]\n",
" \n",
" xref[0, i] = dx * np.cos(-state[3]) - dy * np.sin(-state[3])\n",
" xref[1, i] = dy * np.cos(-state[3]) + dx * np.sin(-state[3])\n",
" xref[2, i] = 0.0 #stop? #sp[ncourse - 1]\n",
" xref[3, i] = normalize_angle(path[2,ncourse - 1] - state[3])\n",
" dref[0, i] = 0.0\n",
" return xref, dref"
CVXPY Optimization Time: Avrg: 0.0908s Max: 0.1129s Min: 0.0846s
"track = compute_path_from_wp([0,3,4,6,10,12,14,6,1,0],\n",
" [0,0,2,4,3,3,-2,-6,-2,-2],0.05)\n",
"# track = compute_path_from_wp([0,10,10,0],\n",
"# [0,0,1,1],0.05)\n",
"sim_duration = 200 #time steps\n",
"x_sim = np.zeros((N,sim_duration))\n",
"u_sim = np.zeros((M,sim_duration-1))\n",
"# Starting Condition\n",
"x0 = np.zeros(N)\n",
"x0[0] = 0 #x\n",
"x0[1] = -0.25 #y\n",
"x0[2] = 0.0 #v\n",
"x0[3] = np.radians(-0) #yaw\n",
"x_sim[:,0] = x0 #simulation_starting conditions\n",
"#starting guess\n",
"action = np.zeros(M)\n",
"action[0] = MAX_ACC/2 #a\n",
"action[1] = 0.0 #delta\n",
"u_sim[:,0] = action\n",
2021-04-19 23:18:08 +08:00
"# Cost Matrices\n",
"Q = np.diag([20,20,10,20]) #state error cost\n",
"Qf = np.diag([30,30,30,30]) #state final error cost\n",
"R = np.diag([10,10]) #input cost\n",
"R_ = np.diag([10,10]) #input rate of change cost\n",
"mpc = MPC(N,M,Q,R)\n",
2021-04-15 21:46:12 +08:00
"REF_VEL = 1.0\n",
"for sim_time in range(sim_duration-1):\n",
" \n",
" iter_start = time.time()\n",
" \n",
" # dynamics starting state w.r.t. robot are always null except vel \n",
" start_state = np.array([0, 0, x_sim[2,sim_time], 0])\n",
2021-04-19 23:18:08 +08:00
" \n",
" # OPTIONAL: Add time delay to starting State- y\n",
" \n",
" current_action = np.array([u_sim[0,sim_time], u_sim[1,sim_time]])\n",
2021-04-15 21:46:12 +08:00
" \n",
" # State Matrices\n",
2021-04-19 23:18:08 +08:00
" A,B,C = get_linear_model_matrices(start_state, current_action)\n",
" \n",
" #Get Reference_traj -> inputs are in worldframe\n",
2021-04-15 21:46:12 +08:00
" target, _ = get_ref_trajectory(x_sim[:,sim_time] ,track, REF_VEL)\n",
" \n",
2021-04-19 23:18:08 +08:00
" x_mpc, u_mpc = mpc.optimize_linearized_model(A, B, C, start_state, target, time_horizon=T, verbose=False)\n",
2021-04-15 21:46:12 +08:00
" \n",
" #retrieved optimized U and assign to u_bar to linearize in next step\n",
" u_bar = np.vstack((np.array(u_mpc.value[0,:]).flatten(),\n",
" (np.array(u_mpc.value[1,:]).flatten())))\n",
" \n",
" u_sim[:,sim_time] = u_bar[:,0]\n",
" \n",
" # Measure elpased time to get results from cvxpy\n",
" opt_time.append(time.time()-iter_start)\n",
" \n",
" # move simulation to t+1\n",
" tspan = [0,DT]\n",
" x_sim[:,sim_time+1] = odeint(kinematics_model,\n",
" x_sim[:,sim_time],\n",
" tspan,\n",
" args=(u_bar[:,0],))[1]\n",
" \n",
"print(\"CVXPY Optimization Time: Avrg: {:.4f}s Max: {:.4f}s Min: {:.4f}s\".format(np.mean(opt_time),\n",
" np.max(opt_time),\n",
" np.min(opt_time))) "
"### RESUTS\n",
"SCS -> Optimization Time: Avrg: 0.2139s Max: 0.3517s Min: 0.1913s\n",
"OSQP -> Optimization Time: Avrg: 0.1035s Max: 0.1311s Min: 0.0959s\n",
"ECOS -> Avrg: 0.2024s Max: 0.2313s Min: 0.1904s\n",
"**Qualitative result** aka \"it drives?\" seems the same..."
"#plot trajectory\n",
"grid = plt.GridSpec(4, 5)\n",
"plt.subplot(grid[0:4, 0:4])\n",
"plt.subplot(grid[0, 4])\n",
"plt.ylabel('a(t) [m/ss]')\n",
"plt.subplot(grid[1, 4])\n",
"plt.ylabel('v(t) [m/s]')\n",
"plt.subplot(grid[2, 4])\n",
"plt.ylabel('delta(t) [rad]')\n",
"plt.subplot(grid[3, 4])\n",
"plt.ylabel('theta(t) [rad]')\n",
