streamline

< stocks | back to Gallery | quiver >

                  from __future__ import division
                  
                  import numpy as np
                  
                  from bokeh.plotting import figure, show, output_file, vplot
                  
                  def streamlines(x, y, u, v, density=1):
                      '''Returns streamlines of a vector flow.
                  
                      * x and y are 1d arrays defining an *evenly spaced* grid.
                      * u and v are 2d arrays (shape [y,x]) giving velocities.
                      * density controls the closeness of the streamlines. For different
                        densities in each direction, use a tuple or list [densityx, densityy].
                  
                       '''
                  
                      ## Set up some constants - size of the grid used.
                      NGX = len(x)
                      NGY = len(y)
                  
                      ## Constants used to convert between grid index coords and user coords.
                      DX = x[1]-x[0]
                      DY = y[1]-y[0]
                      XOFF = x[0]
                      YOFF = y[0]
                  
                      ## Now rescale velocity onto axes-coordinates
                      u = u / (x[-1]-x[0])
                      v = v / (y[-1]-y[0])
                      speed = np.sqrt(u*u+v*v)
                      ## s (path length) will now be in axes-coordinates, but we must
                      ## rescale u for integrations.
                      u *= NGX
                      v *= NGY
                      ## Now u and v in grid-coordinates.
                  
                      NBX = int(30*density)
                      NBY = int(30*density)
                      blank = np.zeros((NBY,NBX))
                  
                      bx_spacing = NGX/float(NBX-1)
                      by_spacing = NGY/float(NBY-1)
                  
                      def blank_pos(xi, yi):
                          return int((xi / bx_spacing) + 0.5), \
                                 int((yi / by_spacing) + 0.5)
                  
                      def value_at(a, xi, yi):
                          if type(xi) == np.ndarray:
                              x = xi.astype(np.int)
                              y = yi.astype(np.int)
                          else:
                              x = np.int(xi)
                              y = np.int(yi)
                          a00 = a[y,x]
                          a01 = a[y,x+1]
                          a10 = a[y+1,x]
                          a11 = a[y+1,x+1]
                          xt = xi - x
                          yt = yi - y
                          a0 = a00*(1-xt) + a01*xt
                          a1 = a10*(1-xt) + a11*xt
                          return a0*(1-yt) + a1*yt
                  
                      def rk4_integrate(x0, y0):
                          ## This function does RK4 forward and back trajectories from
                          ## the initial conditions, with the odd 'blank array'
                          ## termination conditions. TODO tidy the integration loops.
                  
                          def f(xi, yi):
                              dt_ds = 1./value_at(speed, xi, yi)
                              ui = value_at(u, xi, yi)
                              vi = value_at(v, xi, yi)
                              return ui*dt_ds, vi*dt_ds
                  
                          def g(xi, yi):
                              dt_ds = 1./value_at(speed, xi, yi)
                              ui = value_at(u, xi, yi)
                              vi = value_at(v, xi, yi)
                              return -ui*dt_ds, -vi*dt_ds
                  
                          check = lambda xi, yi: xi>=0 and xi<NGX-1 and yi>=0 and yi<NGY-1
                  
                          bx_changes = []
                          by_changes = []
                  
                          ## Integrator function
                          def rk4(x0, y0, f):
                              ds = 0.01 #min(1./NGX, 1./NGY, 0.01)
                              stotal = 0
                              xi = x0
                              yi = y0
                              xb, yb = blank_pos(xi, yi)
                              xf_traj = []
                              yf_traj = []
                              while check(xi, yi):
                                  # Time step. First save the point.
                                  xf_traj.append(xi)
                                  yf_traj.append(yi)
                                  # Next, advance one using RK4
                                  try:
                                      k1x, k1y = f(xi, yi)
                                      k2x, k2y = f(xi + .5*ds*k1x, yi + .5*ds*k1y)
                                      k3x, k3y = f(xi + .5*ds*k2x, yi + .5*ds*k2y)
                                      k4x, k4y = f(xi + ds*k3x, yi + ds*k3y)
                                  except IndexError:
                                      # Out of the domain on one of the intermediate steps
                                      break
                                  xi += ds*(k1x+2*k2x+2*k3x+k4x) / 6.
                                  yi += ds*(k1y+2*k2y+2*k3y+k4y) / 6.
                                  # Final position might be out of the domain
                                  if not check(xi, yi): break
                                  stotal += ds
                                  # Next, if s gets to thres, check blank.
                                  new_xb, new_yb = blank_pos(xi, yi)
                                  if new_xb != xb or new_yb != yb:
                                      # New square, so check and colour. Quit if required.
                                      if blank[new_yb,new_xb] == 0:
                                          blank[new_yb,new_xb] = 1
                                          bx_changes.append(new_xb)
                                          by_changes.append(new_yb)
                                          xb = new_xb
                                          yb = new_yb
                                      else:
                                          break
                                  if stotal > 2:
                                      break
                              return stotal, xf_traj, yf_traj
                  
                          integrator = rk4
                  
                          sf, xf_traj, yf_traj = integrator(x0, y0, f)
                          sb, xb_traj, yb_traj = integrator(x0, y0, g)
                          stotal = sf + sb
                          x_traj = xb_traj[::-1] + xf_traj[1:]
                          y_traj = yb_traj[::-1] + yf_traj[1:]
                  
                          ## Tests to check length of traj. Remember, s in units of axes.
                          if len(x_traj) < 1: return None
                          if stotal > .2:
                              initxb, inityb = blank_pos(x0, y0)
                              blank[inityb, initxb] = 1
                              return x_traj, y_traj
                          else:
                              for xb, yb in zip(bx_changes, by_changes):
                                  blank[yb, xb] = 0
                              return None
                  
                      ## A quick function for integrating trajectories if blank==0.
                      trajectories = []
                      def traj(xb, yb):
                          if xb < 0 or xb >= NBX or yb < 0 or yb >= NBY:
                              return
                          if blank[yb, xb] == 0:
                              t = rk4_integrate(xb*bx_spacing, yb*by_spacing)
                              if t != None:
                                  trajectories.append(t)
                  
                      ## Now we build up the trajectory set. I've found it best to look
                      ## for blank==0 along the edges first, and work inwards.
                      for indent in range((max(NBX,NBY))//2):
                          for xi in range(max(NBX,NBY)-2*indent):
                              traj(xi+indent, indent)
                              traj(xi+indent, NBY-1-indent)
                              traj(indent, xi+indent)
                              traj(NBX-1-indent, xi+indent)
                  
                      xs = [np.array(t[0])*DX+XOFF for t in trajectories]
                      ys = [np.array(t[1])*DY+YOFF for t in trajectories]
                  
                      return xs, ys
                  
                  xx = np.linspace(-3, 3, 100)
                  yy = np.linspace(-3, 3, 100)
                  
                  Y, X = np.meshgrid(xx, yy)
                  U = -1 - X**2 + Y
                  V = 1 + X - Y**2
                  speed = np.sqrt(U*U + V*V)
                  theta = np.arctan(V/U)
                  
                  x0 = X[::2, ::2].flatten()
                  y0 = Y[::2, ::2].flatten()
                  length = speed[::2, ::2].flatten()/40
                  angle = theta[::2, ::2].flatten()
                  x1 = x0 + length * np.cos(angle)
                  y1 = y0 + length * np.sin(angle)
                  
                  xs, ys = streamlines(xx, yy, U.T, V.T, density=2)
                  
                  cm = np.array(["#C7E9B4", "#7FCDBB", "#41B6C4", "#1D91C0", "#225EA8", "#0C2C84"])
                  ix = ((length-length.min())/(length.max()-length.min())*5).astype('int')
                  colors = cm[ix]
                  
                  output_file("vector.html", title="vector.py example")
                  
                  p1 = figure()
                  p1.segment(x0, y0, x1, y1, color=colors, line_width=2)
                  
                  p2 = figure()
                  p2.multi_line(xs, ys, color="#ee6666", line_width=2, line_alpha=0.8)
                  
                  show(vplot(p1,p2))  # open a browser