Implementing the Lazy Eyes app

Let's create a new folder for Lazy Eyes and, in this folder, create copies of or links to the ResizeUtils.py and WxUtils.py files from any of our previous Python projects such as the project The Living Headlights in Chapter 5, Equipping Your Car with a Rearview Camera and Hazard Detection. Alongside the copies or links, let's create a new file, LazyEyes.py. Edit it and enter the following import statements:

import collections
import numpy
import cv2
import threading
import timeit
import wx

import pyfftw.interfaces.cache
from pyfftw.interfaces.scipy_fftpack import fft
from pyfftw.interfaces.scipy_fftpack import ifft
from scipy.fftpack import fftfreq

import ResizeUtils
import WxUtils

Besides the modules that we have used in previous projects, we are now using the standard library's collections module for efficient collections and timeit module for precise timing. Also for the first time, we are using signal processing functionality from PyFFTW and SciPy.

Like our other Python applications, Lazy Eyes is implemented as a class that extends wx.Frame. Here are the declarations of the class and its initializer:

class LazyEyes(wx.Frame):

  def __init__(self, maxHistoryLength=360,
    minHz=5.0/6.0, maxHz=1.0,
    amplification=32.0, numPyramidLevels=2,
    useLaplacianPyramid=True,
    useGrayOverlay=True,
    numFFTThreads = 4, numIFFTThreads=4,
    cameraDeviceID=0, imageSize=(480, 360),
    title='Lazy Eyes'):

The initializer's arguments affect the app's frame rate and the manner in which motion is amplified. These effects are discussed in detail in the section Configuring and testing the app for various motions, later in this chapter. The following is just a brief description of the arguments:

The initializer's implementation begins in the same way as our other Python apps. It sets flags to indicate that the app is running and (by default) should be mirrored. It creates the capture object and configures its resolution to match the requested width and height if possible. Failing that, the device's default capture resolution is used. The relevant code is as follows:

    self.mirrored = True

    self._running = True

    self._capture = cv2.VideoCapture(cameraDeviceID)
    size = ResizeUtils.cvResizeCapture(
        self._capture, imageSize)
    w, h = size
    self._imageWidth = w
    self._imageHeight = h

Next, we will determine the shape of the history of frames. It has at least 3 dimensions: a number of frames, a width, and height for each frame. The width and height are downsampled from the capture width and height based on the number of pyramid levels. If we are concerned about the color motion and not just the grayscale motion, the history also has a fourth dimension, consisting of 3 color channels. Here is the code to calculate the history's shape:

    self._useGrayOverlay = useGrayOverlay
    if useGrayOverlay:
      historyShape = (maxHistoryLength,
              h >> numPyramidLevels,
              w >> numPyramidLevels)
    else:
      historyShape = (maxHistoryLength,
              h >> numPyramidLevels,
              w >> numPyramidLevels, 3)

Note the use of >>, the right bit shift operator, to reduce the dimensions by a power of 2. The power is equal to the number of pyramid levels.

We will store the specified maximum history length. For the frames in the history, we will create a NumPy array of the shape that we just determined. For timestamps of the frames, we will create a deque (double-ended queue), a type of collection that allows us to cheaply add or remove elements from either end:

    self._maxHistoryLength = maxHistoryLength
    self._history = numpy.empty(historyShape,
                  numpy.float32)
    self._historyTimestamps = collections.deque()

We will store the remaining arguments because later, in each frame, we will pass them to the pyramid functions and signal the processing functions:

    self._numPyramidLevels = numPyramidLevels
    self._useLaplacianPyramid = useLaplacianPyramid

    self._minHz = minHz
    self._maxHz = maxHz
    self._amplification = amplification

    self._numFFTThreads = numFFTThreads
    self._numIFFTThreads = numIFFTThreads

We call the following two functions to tell PyFFTW to cache its data structures (notably, its NumPy arrays) for a period of at least 1.0 second from their last use. (The default is 0.1 seconds.) Caching is a critical optimization for the PyFFTW interfaces that we are using, and we will choose a period that is more than long enough to keep the cache alive from frame to frame:

    pyfftw.interfaces.cache.enable()
    pyfftw.interfaces.cache.set_keepalive_time(1.0)

The initializer's implementation ends with code to set up a window, event bindings, a bitmap, layout, and background thread—all familiar tasks from our previous Python projects:

    style = wx.CLOSE_BOX | wx.MINIMIZE_BOX | \
        wx.CAPTION | wx.SYSTEM_MENU | \
        wx.CLIP_CHILDREN
    wx.Frame.__init__(self, None, title=title,
             style=style, size=size)

    self.Bind(wx.EVT_CLOSE, self._onCloseWindow)

    quitCommandID = wx.NewId()
    self.Bind(wx.EVT_MENU, self._onQuitCommand,
         id=quitCommandID)
    acceleratorTable = wx.AcceleratorTable([
      (wx.ACCEL_NORMAL, wx.WXK_ESCAPE,
       quitCommandID)
    ])
    self.SetAcceleratorTable(acceleratorTable)

    self._staticBitmap = wx.StaticBitmap(self,
                       size=size)
    self._showImage(None)

    rootSizer = wx.BoxSizer(wx.VERTICAL)
    rootSizer.Add(self._staticBitmap)
    self.SetSizerAndFit(rootSizer)

    self._captureThread = threading.Thread(
        target=self._runCaptureLoop)
    self._captureThread.start()

We must modify our usual _onCloseWindow callback to disable PyFFTW's cache. Disabling the cache ensures that resources are freed and that PyFFTW's threads terminate normally. The callback's implementation is given in the following code:

  def _onCloseWindow(self, event):
    self._running = False
    self._captureThread.join()
    pyfftw.interfaces.cache.disable()
    self.Destroy()

The Esc key is bound to our usual _onQuitCommand callback, which just closes the app:

  def _onQuitCommand(self, event):
    self.Close()

The loop running on our background thread is similar to the one in our other Python apps. In each frame, it calls a helper function, _applyEulerianVideoMagnification. Here is the loop's implementation.

  def _runCaptureLoop(self):
    while self._running:
      success, image = self._capture.read()
      if image is not None:
        self._applyEulerianVideoMagnification(
            image)
        if (self.mirrored):
          image[:] = numpy.fliplr(image)
      wx.CallAfter(self._showImage, image)

The _applyEulerianVideoMagnification helper function is quite long so we will consider its implementation in several chunks. First, we will create a timestamp for the frame and copy the frame to a format that is more suitable for processing. Specifically, we will use a floating point array with either one gray channel or 3 color channels, depending on the configuration:

  def _applyEulerianVideoMagnification(self, image):

    timestamp = timeit.default_timer()

    if self._useGrayOverlay:
      smallImage = cv2.cvtColor(
          image, cv2.COLOR_BGR2GRAY).astype(
              numpy.float32)
    else:
      smallImage = image.astype(numpy.float32)

Using this copy, we will calculate the appropriate level in the Gaussian or Laplacian pyramid:

    # Downsample the image using a pyramid technique.
    i = 0
    while i < self._numPyramidLevels:
      smallImage = cv2.pyrDown(smallImage)
      i += 1
    if self._useLaplacianPyramid:
      smallImage[:] -= \
        cv2.pyrUp(cv2.pyrDown(smallImage))

For the purposes of the history and signal processing functions, we will refer to this pyramid level as "the image" or "the frame".

Next, we will check the number of history frames that have been filled so far. If the history has more than one unfilled frame (meaning the history will still not be full after adding this frame), we will append the new image and timestamp and then return early, such that no signal processing is done until a later frame:

    historyLength = len(self._historyTimestamps)

    if historyLength < self._maxHistoryLength - 1:

      # Append the new image and timestamp to the
      # history.
      self._history[historyLength] = smallImage
      self._historyTimestamps.append(timestamp)

      # The history is still not full, so wait.
      return

If the history is just one frame short of being full (meaning the history will be full after adding this frame), we will append the new image and timestamp using the code given as follows:

    if historyLength == self._maxHistoryLength - 1:
      # Append the new image and timestamp to the
      # history.
      self._history[historyLength] = smallImage
      self._historyTimestamps.append(timestamp)

If the history is already full, we will drop the oldest image and timestamp and append the new image and timestamp using the code given as follows:

    else:
      # Drop the oldest image and timestamp from the
      # history and append the new ones.
      self._history[:-1] = self._history[1:]
      self._historyTimestamps.popleft()
      self._history[-1] = smallImage
      self._historyTimestamps.append(timestamp)

    # The history is full, so process it.

Based on the timestamps, we will calculate the average time per frame in the history, as seen in the following code:

    # Find the average length of time per frame.
    startTime = self._historyTimestamps[0]
    endTime = self._historyTimestamps[-1]
    timeElapsed = endTime - startTime
    timePerFrame = \
        timeElapsed / self._maxHistoryLength
    #print 'FPS:', 1.0 / timePerFrame

We will proceed with a combination of signal processing functions, collectively called a temporal bandpass filter. This filter blocks (zeros out) some frequencies and allows others to pass (remain unchanged). Our first step in implementing this filter is to run the pyfftw.interfaces.scipy_fftpack.fft function using the history and a number of threads as arguments. Also, with the argument axis=0, we will specify that the history's first axis is its time axis:

    # Apply the temporal bandpass filter.
    fftResult = fft(self._history, axis=0,
            threads=self._numFFTThreads)

We will pass the FFT result and the time per frame to the scipy.fftpack.fftfreq function. This function returns an array of midpoint frequencies (Hz in our case) corresponding to the indices in the FFT result. (This array answers the question, "Which frequency is the midpoint of the bin of frequencies represented by index i in the FFT?".) We will find the indices whose midpoint frequencies lie closest (minimum absolute value difference) to our initializer's minHz and maxHz parameters. Then, we will modify the FFT result by setting the data to zero in all ranges that do not represent the frequencies of interest:

    frequencies = fftfreq(
        self._maxHistoryLength, d=timePerFrame)
    lowBound = (numpy.abs(
        frequencies - self._minHz)).argmin()
    highBound = (numpy.abs(
        frequencies - self._maxHz)).argmin()
    fftResult[:lowBound] = 0j
    fftResult[highBound:-highBound] = 0j
    fftResult[-lowBound:] = 0j

Having thus filtered out the frequencies that do not interest us, we will finish applying the temporal bandpass filter by passing the data to the pyfftw.interfaces.scipy_fftpack.ifft function:

    ifftResult = ifft(fftResult, axis=0,
            threads=self._numIFFTThreads)

From the IFFT result, we will take the most recent frame. It should somewhat resemble the current camera frame, but should be black in areas that do not exhibit recent motion matching our parameters. We will multiply this filtered frame so that the non-black areas become bright. Then, we will upsample it (using a pyramid technique) and add the result to the current camera frame so that areas of motion are lit up. Here is the relevant code, which concludes the _applyEulerianVideoMagnification method:

    # Amplify the result and overlay it on the
    # original image.
    overlay = numpy.real(ifftResult[-1]) * \
            self._amplification
    i = 0
    while i < self._numPyramidLevels:
      overlay = cv2.pyrUp(overlay)
      i += 1
    if self._useGrayOverlay:
      overlay = cv2.cvtColor(overlay,
                  cv2.COLOR_GRAY2BGR
    cv2.convertScaleAbs(image + overlay, image)

To finish the implementation of the LazyEyes class, we will display the image in the same manner as we have done in our other Python apps. Here is the relevant method:

  def _showImage(self, image):
    if image is None:
      # Provide a black bitmap.
      bitmap = wx.EmptyBitmap(self._imageWidth,
                  self._imageHeight)
    else:
      # Convert the image to bitmap format.
      bitmap = WXUtils.wxBitmapFromCvImage(image)
    # Show the bitmap.
    self._staticBitmap.SetBitmap(bitmap)

Our module's main function just instantiates and runs the app, as seen in the following code:

def main():
  app = wx.App()
  lazyEyes = LazyEyes()
  lazyEyes.Show()
  app.MainLoop()

if __name__ == '__main__':
  main()

That's all! Run the app and stay quite still while it builds up its history of frames. Until the history is full, the video feed will not show any special effect. At the history's default length of 360 frames, it fills in about 20 seconds on my machine. Once it is full, you should see ripples moving through the video feed in areas of recent motion—or perhaps all areas if the camera is moved or the lighting or exposure is changed. The ripples will gradually settle and disappear in areas of the scene that become still, while new ripples will appear in new areas of motion. Feel free to experiment on your own. Now, let's discuss a few recipes for configuring and testing the parameters of the LazyEyes class.