mpc_python_learn/notebook/MPC_scipy_v2.ipynb

617 lines
90 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"plt.style.use(\"ggplot\")\n",
"import time\n",
"from scipy.interpolate import interp1d\n",
"from scipy.integrate import odeint\n",
"from scipy.optimize import minimize"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"differential drive kinematics model equations"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"#Preliminaries\n",
"def compute_path_from_wp(start_xp, start_yp, step = 0.1):\n",
" final_xp=[]\n",
" final_yp=[]\n",
" delta = step #[m]\n",
"\n",
" for idx in range(len(start_xp)-1):\n",
" section_len = np.sum(np.sqrt(np.power(np.diff(start_xp[idx:idx+2]),2)+np.power(np.diff(start_yp[idx:idx+2]),2)))\n",
"\n",
" interp_range = np.linspace(0,1,np.floor(section_len/delta).astype(int))\n",
" \n",
" fx=interp1d(np.linspace(0,1,2),start_xp[idx:idx+2],kind=1)\n",
" fy=interp1d(np.linspace(0,1,2),start_yp[idx:idx+2],kind=1)\n",
" \n",
" final_xp=np.append(final_xp,fx(interp_range))\n",
" final_yp=np.append(final_yp,fy(interp_range))\n",
"\n",
" return np.vstack((final_xp,final_yp))\n",
"\n",
"def get_nn_idx(state,path):\n",
"\n",
" dx = state[0]-path[0,:]\n",
" dy = state[1]-path[1,:]\n",
" dist = np.sqrt(dx**2 + dy**2)\n",
" nn_idx = np.argmin(dist)\n",
"\n",
" try:\n",
" v = [path[0,nn_idx+1] - path[0,nn_idx],\n",
" path[1,nn_idx+1] - path[1,nn_idx]] \n",
" v /= np.linalg.norm(v)\n",
"\n",
" d = [path[0,nn_idx] - state[0],\n",
" path[1,nn_idx] - state[1]]\n",
"\n",
" if np.dot(d,v) > 0:\n",
" target_idx = nn_idx\n",
" else:\n",
" target_idx = nn_idx+1\n",
"\n",
" except IndexError as e:\n",
" target_idx = nn_idx\n",
"\n",
" return target_idx\n",
"\n",
"def road_curve(state,track):\n",
" \n",
" #given vehicle pos find lookahead waypoints\n",
" nn_idx=get_nn_idx(state,track)\n",
" LOOKAHED=6\n",
" lk_wp=track[:,nn_idx:nn_idx+LOOKAHED]\n",
"\n",
" #trasform lookahead waypoints to vehicle ref frame\n",
" dx = lk_wp[0,:] - state[0]\n",
" dy = lk_wp[1,:] - state[1]\n",
"\n",
" wp_vehicle_frame = np.vstack(( dx * np.cos(-state[2]) - dy * np.sin(-state[2]),\n",
" dy * np.cos(-state[2]) + dx * np.sin(-state[2]) ))\n",
"\n",
" #fit poly\n",
" return np.polyfit(wp_vehicle_frame[0,:], wp_vehicle_frame[1,:], 3, rcond=None, full=False, w=None, cov=False)\n",
"\n",
"def f(x,coeff):\n",
" return round(coeff[0]*x**3 + coeff[1]*x**2 + coeff[2]*x**1 + coeff[3]*x**0,6)\n",
"\n",
"def df(x,coeff):\n",
" return round(3*coeff[0]*x**2 + 2*coeff[1]*x**1 + coeff[2]*x**0,6)"
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [],
"source": [
"from math import cos\n",
"# Define process model\n",
"def kinematics_model(xt,ut,K,dt=0.1):\n",
"\n",
" #current state\n",
" xp = xt[0]\n",
" yp = xt[1]\n",
" theta = xt[2]\n",
" etheta = xt[4]\n",
"\n",
" vt = ut[0]\n",
" wt = ut[1]\n",
"\n",
" #next state\n",
" xtp = xp + vt*cos(theta)*dt\n",
" ytp = yp + vt*np.sin(theta)*dt\n",
" thetatp = theta + wt*dt\n",
" ctetp = f(xp,K) - yp + vt*np.sin(etheta)*dt\n",
" ethetatp = theta - np.arctan(df(xp,K)) + wt*dt\n",
" \n",
" dqdt = [xtp,ytp,thetatp,ctetp,ethetatp]\n",
" return dqdt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"simulate"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [],
"source": [
"# time points\n",
"t = np.arange(0, 40, 0.1)\n",
"\n",
"#fake inputs\n",
"# u = [v(t),\n",
"# w(t)]\n",
"u = np.zeros((2,len(t)))\n",
"u[0,100:]=0.4\n",
"u[1,200:]=0.1\n",
"u[1,300:]=-0.0"
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [],
"source": [
"track = compute_path_from_wp([0,5,10],[0,0,2])\n",
"\n",
"# initial conditions\n",
"q0 = np.array([0,0,0,0,0])\n",
"K = road_curve(q0,track)\n",
"q0[3] = f(q0[0],K)\n",
"q0[4] = - np.arctan(df(q0[0],K))\n",
"\n",
"# store solution\n",
"x = np.empty_like(t)\n",
"y = np.empty_like(t)\n",
"theta = np.empty_like(t)\n",
"cte = np.empty_like(t)\n",
"psi = np.empty_like(t)\n",
"\n",
"# record initial conditions\n",
"x[0] = q0[0]\n",
"y[0] = q0[1]\n",
"theta[0] = q0[2]\n",
"cte[0] = q0[3]\n",
"psi[0] = q0[4]\n",
"\n",
"# solve ODE\n",
"for i in range(1,len(t)):\n",
" # span for next time step\n",
" q_t = kinematics_model(q0,u[:,i-1],K,dt=0.1)\n",
"\n",
" # store solution for plotting\n",
" x[i] = q_t[0]\n",
" y[i] = q_t[1]\n",
" theta[i] = q_t[2]\n",
" cte[i] = q_t[3]\n",
" psi[i] = q_t[4]\n",
" # next initial condition\n",
" q0 = q_t"
]
},
{
"cell_type": "code",
"execution_count": 65,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 4 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"#plot trajectory\n",
"plt.subplot(2, 2, 1)\n",
"plt.plot(x,y)\n",
"plt.scatter(track[0,:],track[1,:])\n",
"plt.ylabel('y')\n",
"plt.xlabel('x')\n",
"\n",
"#plot x(t)\n",
"plt.subplot(2, 2, 2)\n",
"plt.plot(t,theta)\n",
"plt.ylabel('theta(t)')\n",
"#plt.xlabel('time')\n",
"\n",
"\n",
"#plot y(t)\n",
"plt.subplot(2, 2, 3)\n",
"plt.plot(t,cte)\n",
"plt.ylabel('cte(t)')\n",
"#plt.xlabel('time')\n",
"\n",
"#plot theta(t)\n",
"plt.subplot(2, 2, 4)\n",
"plt.plot(t,psi)\n",
"plt.ylabel('psi(t)')\n",
"#plt.xlabel('time')\n",
"#plt.legend(loc='best')\n",
"\n",
"plt.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"mpc"
]
},
{
"cell_type": "code",
"execution_count": 75,
"metadata": {},
"outputs": [],
"source": [
"# Define Objective function\n",
"def objective(u_hat,*args):\n",
" \"\"\"\n",
" Computes objective function\n",
" \n",
" :param u_hat: array_like, input [v,w\n",
" v,w\n",
" ...]\n",
" \"\"\"\n",
" \n",
" #undo input flattening\n",
" u_hat = u_hat.reshape(2, -1).T\n",
" se = np.zeros(PRED_HZN) #squared_errors\n",
" \n",
" # Prediction\n",
" for k in range(PRED_HZN):\n",
" \n",
" # Initialize state for prediction\n",
" if k==0:\n",
" q_hat0 = args[0]\n",
" K=args[1] #road curve\n",
" \n",
" # Clamp control horizon\n",
" elif k>CTRL_HZN:\n",
" u_hat[k,:] = u_hat[CTRL_HZN,:]\n",
" \n",
" #DEBUG\n",
"# print(\"k : {}\".format(k))\n",
"# print(\"q_hat0 : {}\".format(q_hat0))\n",
"# print(\"ts : {}\".format(ts_hat))\n",
"# print(\"u_hat : {}\".format(u_hat[k,:]))\n",
" \n",
" q_hat = kinematics_model(q_hat0,u_hat[k,:],K,dt=0.1)\n",
" cte=q_hat[3]\n",
" psi=q_hat[4]\n",
" \n",
" q_hat0 = q_hat[:]\n",
"\n",
" if k >0:\n",
" delta_u_hat = np.sum(u_hat[k,:]-u_hat[k-1,:])\n",
" else:\n",
" delta_u_hat = 0\n",
"\n",
" se[k] = 100 * (cte)**2 + 15 *(psi)**2 + 15 * (delta_u_hat)**2\n",
"\n",
" # Sum of Squared Error calculation\n",
" obj = np.sum(se[1:])\n",
" return obj"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"TODO: add heading error"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"test optimization"
]
},
{
"cell_type": "code",
"execution_count": 76,
"metadata": {},
"outputs": [],
"source": [
"#PARAMS\n",
"v=1.5\n",
"vb=0.5\n",
"wb=0.5\n",
"\n",
"# Define horizons\n",
"PRED_HZN = 20 # Prediction Horizon\n",
"CTRL_HZN = 15 # Control Horizon\n",
"\n",
"delta_t_hat = 0.1 #time step"
]
},
{
"cell_type": "code",
"execution_count": 77,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" fun: 84.46152413856359\n",
" jac: array([-1.65424156, -5.46248531, -4.66883278, -3.61091423, -7.25656319,\n",
" -9.97307396, -7.68648052, -5.46970558, -3.82968616, 0.67185783,\n",
" -1.15515804, -1.96808815, 1.28362656, -2.83778381, -0.42825508,\n",
" -0.31342316, 0. , 0. , 0. , 0. ,\n",
" 38.85329056, 29.67042637, 24.17370987, 19.24147987, 10.30806732,\n",
" 3.07283115, 1.61050224, 0.68769073, -0.09304619, 2.58471298,\n",
" -0.30147457, -1.62229347, 1.39114475, -2.71041679, -0.22286892,\n",
" 1.22416592, 0. , 0. , 0. , 0. ])\n",
" message: 'Iteration limit exceeded'\n",
" nfev: 5103\n",
" nit: 101\n",
" njev: 101\n",
" status: 9\n",
" success: False\n",
" x: array([ 1.64242978, 1.69757121, 1.81361202, 1.90182763, 1.90182763,\n",
" 1.90182763, 1.90182763, 1.90182763, 1.83511965, 1.84792843,\n",
" 1.68088567, 1.43951262, 1.44726228, 1.25067572, 1.25506942,\n",
" 1.37310177, 1.37310177, 1.37310177, 1.37310177, 1.37310177,\n",
" -0.5 , -0.5 , -0.49058099, -0.47680101, -0.47680101,\n",
" -0.47680101, -0.38569852, -0.26192364, -0.07803742, 0.00765328,\n",
" 0.14826226, 0.33421738, 0.29494089, 0.39434771, 0.37513876,\n",
" 0.25069961, 0.25069961, 0.25069961, 0.25069961, 0.25069961])\n",
"time elapsed: 3.1107125282287598\n"
]
}
],
"source": [
"starting_pos=np.array([0,0.25,0.15])\n",
"track = compute_path_from_wp([0,5,10],[0,0,2])\n",
"K = road_curve(starting_pos,track)\n",
"\n",
"# initial conditions\n",
"q0 = np.array([0,0,0,0,0])\n",
"q0[3] = f(q0[0],K)\n",
"q0[4] = - np.arctan(df(q0[0],K))\n",
"\n",
"#starting guess\n",
"u_hat0 = np.zeros((PRED_HZN,2))\n",
"u_hat0[:,0]=v\n",
"u_hat0[:,1]=0.1\n",
"u_hat0=u_hat0.flatten(\"F\")\n",
"\n",
"bnds=((v-2*vb,v+vb),)\n",
"for i in range (PRED_HZN-1):\n",
" bnds=bnds+((v-2*vb,v+vb),)\n",
" \n",
"bnds=bnds+((-wb,wb),)\n",
"for i in range (PRED_HZN-1):\n",
" bnds=bnds+((-wb,wb),)\n",
"\n",
"# print(u_hat0)\n",
"# print(u_hat0.reshape(2, -1).T)\n",
"\n",
"start = time.time()\n",
"\n",
"solution = minimize(objective,u_hat0,args=(q0[:],K,),method='SLSQP',bounds=bnds,options={'maxiter':100})\n",
"\n",
"end = time.time()\n",
"\n",
"print(solution)\n",
"print(\"time elapsed: {}\".format(end-start))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"check what the optimizer returns"
]
},
{
"cell_type": "code",
"execution_count": 78,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"u=solution.x.reshape(2,-1).T\n",
"x=[]\n",
"y=[]\n",
"theta=[]\n",
"x.append(starting_pos[0])\n",
"y.append(starting_pos[1])\n",
"theta.append(starting_pos[2])\n",
"\n",
"for i in range(1,np.shape(u)[0]):\n",
" \n",
" # store solution for plotting\n",
" x.append(x[-1]+u[i,0]*np.cos(theta[-1])*0.1)\n",
" y.append(y[-1]+u[i,0]*np.sin(theta[-1])*0.1)\n",
" theta.append(theta[-1]+u[i,1]*0.1)\n",
" \n",
"plt.plot(x,y)\n",
"plt.plot(track[0,:],track[1,:])\n",
"plt.ylabel('y')\n",
"plt.xlabel('x')\n",
"\n",
"plt.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"compute MPC on the whole path"
]
},
{
"cell_type": "code",
"execution_count": 83,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"SCIPY Optimization Time: Avrg: 0.2300s Max: 3.1533s Min: 0.0260s\n"
]
}
],
"source": [
"curr_pos=np.array([0,0.25,0.15])\n",
"start_wpx=[0,5,7.5,8,10,15]\n",
"start_wpy=[0,0,5,10,10,12]\n",
"track = compute_path_from_wp(start_wpx,start_wpy)\n",
"x=[]\n",
"y=[]\n",
"theta=[]\n",
"x.append(curr_pos[0])\n",
"y.append(curr_pos[1])\n",
"theta.append(curr_pos[2])\n",
"\n",
"opt_time=[]\n",
"\n",
"# Initialize input for prediction horizon\n",
"# u=[v1,v2,...,vp\n",
"# w1,w2,...,wp]\n",
"\n",
"# Initial guess \n",
"# u_hat0=[v1,v2,..vp,w1,w2,...,wp]\n",
"# uhat0 -> u_hat=optimize(obj,u_hat0)\n",
"\n",
"u_hat0 = np.zeros((PRED_HZN,2))\n",
"u_hat0[:,0]=v\n",
"u_hat0[:,1]=0.01\n",
"u_hat0=u_hat0.flatten(\"F\")\n",
"\n",
"# Optimization Bounds\n",
"# bnds=((v1_min,v1_max),...,(vp_min,vp_max),(w1_min,w1_max),...,(wp_min,wp_max))\n",
"\n",
"bnds=((v-2*vb,v+vb),)\n",
"for i in range (1,PRED_HZN):\n",
" bnds=bnds+((v-2*vb,v+vb),)\n",
" \n",
"bnds=bnds+((-wb,wb),)\n",
"for i in range (1,PRED_HZN):\n",
" bnds=bnds+((-wb,wb),)\n",
"\n",
"#while np.sum(np.abs(q[-1,0:2]-path[-1,0:2]))>0.1:\n",
"for i in range(50): \n",
" \n",
" start = time.time()\n",
" K = road_curve(curr_pos,track)\n",
" \n",
" # initial conditions for opt.\n",
" q0 = np.array([0,0,0,0,0])\n",
" q0[3] = f(q0[0],K)\n",
" q0[4] = - np.arctan(df(q0[0],K))\n",
"\n",
" #MPC LOOP\n",
" u_hat = minimize(objective,\n",
" u_hat0,\n",
" args=(q0,K,),\n",
" method='SLSQP',\n",
" bounds=bnds,\n",
" options={'maxiter':100}\n",
" ).x\n",
" \n",
" end = time.time()\n",
" opt_time.append(end-start)\n",
" \n",
" # store solution for plotting\n",
" x.append(x[-1]+u[0,0]*np.cos(theta[-1])*0.1)\n",
" y.append(y[-1]+u[0,0]*np.sin(theta[-1])*0.1)\n",
" theta.append(theta[-1]+u[0,1]*0.1)\n",
" \n",
" next_pos = [x[-1],y[-1],theta[-1]]\n",
" \n",
" # next initial condition\n",
" curr_pos=next_pos\n",
" u_hat0=u_hat\n",
"\n",
"print(\"SCIPY Optimization Time: Avrg: {:.4f}s Max: {:.4f}s Min: {:.4f}s\".format(np.mean(opt_time),\n",
" np.max(opt_time),\n",
" np.min(opt_time))) "
]
},
{
"cell_type": "code",
"execution_count": 84,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1080x720 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"plt.figure(figsize=(15,10))\n",
"\n",
"plt.plot(x[:],y[:])\n",
"plt.plot(track[0,:],track[1,:])\n",
"plt.ylabel('y')\n",
"plt.xlabel('x')\n",
"\n",
"plt.tight_layout()\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}