mpc_python_learn/notebooks/MPC_cte_scipy.ipynb

732 lines
94 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"plt.style.use(\"ggplot\")\n",
"import time\n",
"from scipy.interpolate import interp1d\n",
"from scipy.integrate import odeint\n",
"from scipy.optimize import minimize"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"differential drive kinematics model equations"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Define process model\n",
"def kinematics_model(q,t,u):\n",
" # arguments\n",
" # y = outputs\n",
" # t = time\n",
" # u = input value\n",
" # K = process gain\n",
" # tau = process time constant\n",
" x = q[0]\n",
" y = q[1]\n",
" theta = q[2]\n",
"\n",
" v = u[0]\n",
" w = u[1]\n",
"\n",
" # calculate derivative\n",
" dxdt = v*np.cos(theta)\n",
" dydt = v*np.sin(theta)\n",
" dthetadt = w\n",
"\n",
" dqdt = [dxdt,dydt,dthetadt]\n",
"\n",
" return dqdt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"simulate"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# time points\n",
"t = np.arange(0, 40, 0.1)\n",
"\n",
"#fake inputs\n",
"# u = [v(t),\n",
"# w(t)]\n",
"u = np.zeros((2,len(t)))\n",
"u[0,100:]=0.4\n",
"u[1,200:]=0.15\n",
"u[1,300:]=-0.15"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# initial conditions\n",
"q0 = [0,0,0]\n",
"\n",
"# store solution\n",
"x = np.empty_like(t)\n",
"y = np.empty_like(t)\n",
"theta = np.empty_like(t)\n",
"\n",
"# record initial conditions\n",
"x[0] = q0[0]\n",
"y[0] = q0[1]\n",
"theta[0] = q0[2]\n",
"\n",
"# solve ODE\n",
"for i in range(1,len(t)):\n",
" # span for next time step\n",
" tspan = [t[i-1],t[i]]\n",
" # solve for next step\n",
" q_t = odeint(kinematics_model,q0,tspan,args=(u[:,i],))\n",
" # store solution for plotting\n",
" x[i] = q_t[1][0]\n",
" y[i] = q_t[1][1]\n",
" theta[i] = q_t[1][2]\n",
" # next initial condition\n",
" q0 = q_t[1]"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 4 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"\n",
"# plot results\n",
"# plt.plot(t,u,'g:',label='u(t)')\n",
"# plt.plot(t,x,'b-',label='x(t)')\n",
"# plt.plot(t,y,'r--',label='y(t)')\n",
"\n",
"#plot trajectory\n",
"plt.subplot(2, 2, 1)\n",
"plt.plot(x,y)\n",
"plt.ylabel('y')\n",
"plt.xlabel('x')\n",
"\n",
"#plot x(t)\n",
"plt.subplot(2, 2, 2)\n",
"plt.plot(t,x)\n",
"plt.ylabel('x(t)')\n",
"#plt.xlabel('time')\n",
"\n",
"\n",
"#plot y(t)\n",
"plt.subplot(2, 2, 3)\n",
"plt.plot(t,y)\n",
"plt.ylabel('y(t)')\n",
"#plt.xlabel('time')\n",
"\n",
"#plot theta(t)\n",
"plt.subplot(2, 2, 4)\n",
"plt.plot(t,theta)\n",
"plt.ylabel('theta(t)')\n",
"#plt.xlabel('time')\n",
"#plt.legend(loc='best')\n",
"\n",
"plt.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"preliminaries:\n",
"* path - > waypoints\n",
"* error computation -> cte"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"def calc_target_index(state,path):\n",
" \"\"\"\n",
" Finds the index of the closest waypoint.\n",
"\n",
" :param state: array_like, state of the vehicle [x_pos, y_pos, theta]\n",
" :param path: array_like, reference path ((x1, x2, ...), (y1, y2, ...)]\n",
" :returns: (int,float), nearest_index and cross track error\n",
" \"\"\"\n",
"\n",
" if np.shape(state)[0] is not 3 or np.shape(path)[0] is not 2:\n",
" raise ValueError(\"Input has wrong shape!\")\n",
"\n",
" # Search nearest point index by finding the point closest to the vehicle\n",
" # dx = [state[0] - icx for icx in path[0,:]]\n",
" # dy = [state[1] - icy for icy in path[1,:]]\n",
" # dist = [np.sqrt(idx ** 2 + idy ** 2) for (idx, idy) in zip(dx, dy)]\n",
" dx = state[0]-path[0,:]\n",
" dy = state[1]-path[1,:]\n",
" dist = np.sqrt(dx**2 + dy**2)\n",
" #nn_idx = dist.index(min(dist))\n",
" nn_idx = np.argmin(dist)\n",
"\n",
" try:\n",
"\n",
" # versor v from nearest wp -> next wp\n",
" v = [path[0,nn_idx+1] - path[0,nn_idx],\n",
" path[1,nn_idx+1] - path[1,nn_idx]] \n",
" v /= np.linalg.norm(v)\n",
"\n",
" # vector d from car position -> nearest wp\n",
" d = [path[0,nn_idx] - state[0],\n",
" path[1,nn_idx] - state[1]]\n",
"\n",
" # Get the scalar projection of d on v to compare direction\n",
" if np.dot(d,v) > 0:\n",
" target_idx = nn_idx\n",
" else:\n",
" target_idx = nn_idx+1\n",
"\n",
" except IndexError as e:\n",
" target_idx = nn_idx\n",
"\n",
" front_axle_vect = [np.cos(state[2] - np.pi / 2),\n",
" np.sin(state[2] - np.pi / 2)]\n",
"\n",
" # the cross-track error is given by the scalar projection of the car->wp vector onto the faxle versor\n",
" error_front_axle = np.dot([dx[target_idx], dy[target_idx]], front_axle_vect)\n",
"\n",
" return target_idx, error_front_axle"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def compute_path_from_wp(start_xp, start_yp, step = 0.25):\n",
" '''\n",
" Interpolation range is computed to assure one point every fixed distance step [m].\n",
" \n",
" :param start_xp: array_like, list of starting x coordinates\n",
" :param start_yp: array_like, list of starting y coordinates\n",
" :param step: float, interpolation distance [m] between consecutive waypoints\n",
" :returns: array_like, of shape (2,N)\n",
" '''\n",
"\n",
" final_xp=[]\n",
" final_yp=[]\n",
" delta = step #[m]\n",
"\n",
" for idx in range(len(start_xp)-1):\n",
" section_len = np.sum(np.sqrt(np.power(np.diff(start_xp[idx:idx+2]),2)+np.power(np.diff(start_yp[idx:idx+2]),2)))\n",
"\n",
" interp_range = np.linspace(0,1,section_len/delta)\n",
" \n",
" fx=interp1d(np.linspace(0,1,2),start_xp[idx:idx+2],kind=1)\n",
" fy=interp1d(np.linspace(0,1,2),start_yp[idx:idx+2],kind=1)\n",
" \n",
" final_xp=np.append(final_xp,fx(interp_range))\n",
" final_yp=np.append(final_yp,fy(interp_range))\n",
" \n",
" #dx = np.append(0, np.diff(final_xp))\n",
" #dy = np.append(0, np.diff(final_yp))\n",
" #theta = np.arctan2(dy, dx)\n",
"\n",
" return np.vstack((final_xp,final_yp))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"test path"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(2, 91)\n"
]
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"start_x=[0,5,7.5,8,10,15]\n",
"start_y=[0,0,5,10,10,12]\n",
"path = compute_path_from_wp(start_x,start_y)\n",
"\n",
"print(np.shape(path))\n",
"\n",
"plt.plot(path[0,:],path[1,:])\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"test cte"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(1, -1.642756358806414)"
]
},
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"calc_target_index([0, 2, 0.75],path)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"mpc"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"# Define Objective function\n",
"def objective(u_hat,*args):\n",
" \"\"\"\n",
" Computes objective function\n",
" \n",
" :param u_hat: array_like, input [v,w\n",
" v,w\n",
" ...]\n",
" \"\"\"\n",
" \n",
" #undo input flattening\n",
" u_hat = u_hat.reshape(2, -1).T\n",
" se = np.zeros(PRED_HZN) #squared_errors\n",
" \n",
" # Prediction\n",
" for k in range(PRED_HZN):\n",
" \n",
" # Initialize state for prediction\n",
" if k==0:\n",
" q_hat0 = args[0]\n",
" \n",
" # Clamp control horizon\n",
" elif k>CTRL_HZN:\n",
" u_hat[k,:] = u_hat[CTRL_HZN,:]\n",
"\n",
" ts_hat = [delta_t_hat*(k),delta_t_hat*(k+1)]\n",
" \n",
" #DEBUG\n",
"# print(\"k : {}\".format(k))\n",
"# print(\"q_hat0 : {}\".format(q_hat0))\n",
"# print(\"ts : {}\".format(ts_hat))\n",
"# print(\"u_hat : {}\".format(u_hat[k,:]))\n",
" \n",
" q_hat = odeint(kinematics_model,q_hat0,ts_hat,args=(u_hat[k,:],))\n",
" \n",
"# print(q_hat)\n",
" \n",
" q_hat0 = q_hat[-1,:]\n",
"\n",
" # Squared Error calculation\n",
" _,cte = calc_target_index(q_hat[-1,:],path)\n",
"# print(cte)\n",
" if k >0:\n",
" delta_u_hat = np.sum(u_hat[k,:]-u_hat[k-1,:])\n",
" else:\n",
" delta_u_hat = 0\n",
" #delta_u_hat=np.sum(np.subtract([u_hat[2*k],u_hat[2*k+1]],[u_hat[2*(k-1)],u_hat[2*(k-1)+1]]))\n",
" se[k] = 20 * (cte)**2 + 15 * (delta_u_hat)**2\n",
"\n",
" # Sum of Squared Error calculation\n",
" obj = np.sum(se[1:])\n",
" return obj"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"TODO: add heading error"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"test optimization"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"#PARAMS\n",
"v=1.5\n",
"vb=0.5\n",
"wb=0.5\n",
"\n",
"# Define horizons\n",
"PRED_HZN = 20 # Prediction Horizon\n",
"CTRL_HZN = 10 # Control Horizon\n",
"\n",
"delta_t_hat = 0.25 #time step"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/marcello/.local/lib/python3.5/site-packages/ipykernel_launcher.py:28: RuntimeWarning: invalid value encountered in true_divide\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
" fun: 36.02837407165175\n",
" jac: array([-2.19157696e+00, -3.22256088e-02, 1.18650579e+00, -4.04749918e+00,\n",
" 1.49658680e-01, 6.93027973e-01, -8.36541653e-01, -2.46588659e+00,\n",
" 1.18037081e+00, 1.26046467e+00, -4.76961613e-01, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" -2.14655476e+01, -2.45364900e+01, -2.71573195e+01, -3.44248009e+01,\n",
" -3.15746927e+01, -3.04034195e+01, -3.05026488e+01, -2.93093610e+01,\n",
" -2.17690802e+01, -1.74863310e+01, -8.52382469e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,\n",
" 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00])\n",
" message: 'Optimization terminated successfully.'\n",
" nfev: 1034\n",
" nit: 22\n",
" njev: 22\n",
" status: 0\n",
" success: True\n",
" x: array([ 1.36094105, 1.44124498, 1.49189215, 1.4944542 , 1.48896953,\n",
" 1.49211915, 1.43846883, 1.36823453, 1.34829146, 1.37191554,\n",
" 0.99609573, 0.99609573, 0.99609573, 0.99609573, 0.99609573,\n",
" 0.99609573, 0.99609573, 0.99609573, 0.99609573, 0.99609573,\n",
" -0.29866483, -0.28173159, -0.24925358, -0.24879537, -0.15900868,\n",
" -0.13969979, -0.13969797, -0.13969797, -0.13969797, -0.23856165,\n",
" 0.02904087, 0.02904087, 0.02904087, 0.02904087, 0.02904087,\n",
" 0.02904087, 0.02904087, 0.02904087, 0.02904087, 0.02904087])\n",
"time elapsed: 3.381047248840332\n"
]
}
],
"source": [
"q=np.array([[0,0.5],[0,0.4],[0,0.2]])\n",
"\n",
"u_hat0 = np.zeros((PRED_HZN,2))\n",
"u_hat0[:,0]=v\n",
"u_hat0[:,1]=0.1\n",
"u_hat0=u_hat0.flatten(\"F\")\n",
"\n",
"bnds=((v-2*vb,v+vb),)\n",
"for i in range (PRED_HZN-1):\n",
" bnds=bnds+((v-2*vb,v+vb),)\n",
" \n",
"bnds=bnds+((-wb,wb),)\n",
"for i in range (PRED_HZN-1):\n",
" bnds=bnds+((-wb,wb),)\n",
"\n",
"# print(u_hat0)\n",
"# print(u_hat0.reshape(2, -1).T)\n",
"\n",
"start = time.time()\n",
"\n",
"solution = minimize(objective,u_hat0,args=(q[:,-1],),method='SLSQP',bounds=bnds,options={'maxiter':50})\n",
"\n",
"end = time.time()\n",
"\n",
"print(solution)\n",
"print(\"time elapsed: {}\".format(end-start))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"check what the optimizer returns"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"u=solution.x.reshape(2,-1).T\n",
"x=[]\n",
"y=[]\n",
"theta=[]\n",
"q0=[0.5,0.4,0.2]\n",
"\n",
"#simulate q for optimized u_hat\n",
"def step(q0,u,delta):\n",
" \"\"\"\n",
" steps the simulated vehicle\n",
" \"\"\"\n",
" \n",
" # span for next time step\n",
" tspan = [delta*0,delta*1]\n",
" # solve for next step\n",
" q_t = odeint(kinematics_model,q0,tspan,args=(u,))\n",
" \n",
" return q_t[1]\n",
"\n",
"for i in range(1,np.shape(u)[0]):\n",
" \n",
" # step simulation of t\n",
" q_t = step(q0,u[i,:],delta_t_hat)\n",
" \n",
" # store solution for plotting\n",
" x.append(q_t[0])\n",
" y.append(q_t[1])\n",
" theta.append(q_t[2])\n",
" \n",
" # next initial condition\n",
" q0 = q_t\n",
" \n",
"plt.plot(x,y)\n",
"plt.plot(path[0,:],path[1,:])\n",
"plt.ylabel('y')\n",
"plt.xlabel('x')\n",
"\n",
"plt.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"compute MPC on the whole path"
]
},
{
"cell_type": "code",
"execution_count": 185,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/marcello/.local/lib/python3.5/site-packages/ipykernel_launcher.py:28: RuntimeWarning: invalid value encountered in true_divide\n"
]
}
],
"source": [
"# Initialize state\n",
"# q=[x1, x2, ..., xn\n",
"# y1, y2, ..., yn\n",
"# theta1, theta2, ..., thetan]\n",
"\n",
"q=np.array([0,0.5,0.15]).reshape(3,-1)\n",
"\n",
"# Initialize input for prediction horizon\n",
"# u=[v1,v2,...,vp\n",
"# w1,w2,...,wp]\n",
"\n",
"# Initial guess \n",
"# u_hat0=[v1,v2,..vp,w1,w2,...,wp]\n",
"# uhat0 -> u_hat=optimize(obj,u_hat0)\n",
"\n",
"u_hat0 = np.zeros((PRED_HZN,2))\n",
"u_hat0[:,0]=v\n",
"u_hat0[:,1]=0.01\n",
"u_hat0=u_hat0.flatten(\"F\")\n",
"\n",
"# Optimization Bounds\n",
"# bnds=((v1_min,v1_max),...,(vp_min,vp_max),(w1_min,w1_max),...,(wp_min,wp_max))\n",
"\n",
"bnds=((v-2*vb,v+vb),)\n",
"for i in range (1,PRED_HZN):\n",
" bnds=bnds+((v-2*vb,v+vb),)\n",
" \n",
"bnds=bnds+((-wb,wb),)\n",
"for i in range (1,PRED_HZN):\n",
" bnds=bnds+((-wb,wb),)\n",
"\n",
"#while np.sum(np.abs(q[-1,0:2]-path[-1,0:2]))>0.1:\n",
"for i in range(110): \n",
" \n",
" start = time.time()\n",
" \n",
" #MPC LOOP\n",
" u_hat = minimize(objective,\n",
" u_hat0,\n",
" args=(q[:,-1],),\n",
" method='SLSQP',\n",
" bounds=bnds,\n",
" options={'maxiter':100}\n",
" ).x\n",
" \n",
" end = time.time()\n",
" #print(\"iter:1 time elapsed: {}\".format(end-start))\n",
" \n",
"# print(\"u_hat0 {}\".format(np.shape(u_hat0)))\n",
"# print(\"u_hat {}\".format(np.shape(u_hat)))\n",
"# print(\"bounds {}\".format(np.shape(bnds)))\n",
" \n",
" # \"move\" vehicle\n",
" q_next = odeint(kinematics_model,\n",
" q[:,-1], # q(t)\n",
" [0,delta_t_hat], # \n",
" args=([u_hat[0],u_hat[PRED_HZN]],) # v,w\n",
" )\n",
" \n",
" q = np.hstack((q,q_next[-1,:].reshape(3,-1)))\n",
" \n",
" # next initial condition\n",
" u_hat0=u_hat\n",
"\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 186,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1080x720 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"plt.figure(figsize=(15,10))\n",
"\n",
"plt.plot(q[0,:],q[1,:])\n",
"plt.plot(path[0,:],path[1,:])\n",
"plt.ylabel('y')\n",
"plt.xlabel('x')\n",
"\n",
"plt.tight_layout()\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}