Let's create a new folder for Lazy Eyes and, in this folder, create copies of or links to the ResizeUtils.py
and WxUtils.py
files from any of our previous Python projects such as the project The Living Headlights in Chapter 5, Equipping Your Car with a Rearview Camera and Hazard Detection. Alongside the copies or links, let's create a new file, LazyEyes.py
. Edit it and enter the following import statements:
import collections import numpy import cv2 import threading import timeit import wx import pyfftw.interfaces.cache from pyfftw.interfaces.scipy_fftpack import fft from pyfftw.interfaces.scipy_fftpack import ifft from scipy.fftpack import fftfreq import ResizeUtils import WxUtils
Besides the modules that we have used in previous projects, we are now using the standard library's collections
module for efficient collections and timeit
module for precise timing. Also for the first time, we are using signal processing functionality from PyFFTW and SciPy.
Like our other Python applications, Lazy Eyes is implemented as a class that extends wx.Frame
. Here are the declarations of the class and its initializer:
class LazyEyes(wx.Frame): def __init__(self, maxHistoryLength=360, minHz=5.0/6.0, maxHz=1.0, amplification=32.0, numPyramidLevels=2, useLaplacianPyramid=True, useGrayOverlay=True, numFFTThreads = 4, numIFFTThreads=4, cameraDeviceID=0, imageSize=(480, 360), title='Lazy Eyes'):
The initializer's arguments affect the app's frame rate and the manner in which motion is amplified. These effects are discussed in detail in the section Configuring and testing the app for various motions, later in this chapter. The following is just a brief description of the arguments:
maxHistoryLength
is the number of frames (including the current frame and preceding frames) that are analyzed for motion.minHz
and maxHz
, respectively, define the slowest and fastest motions that are amplified.amplification
is the scale of the visual effect. A higher value means motion is highlighted more brightly.numPyramidLevels
is the number of pyramid levels by which frames are downsampled before signal processing is done. Remember that each level corresponds to downsampling by a factor of 2. Our implementation assumes numPyramidLevels>0
.If useLaplacianPyramid
is True
, frames are downsampled using a Laplacian pyramid before the signal processing is done. The implication is that only edge motion is highlighted. Alternatively, if useLaplacianPyramid
is False
, a Gaussian pyramid is used, and the motion in all areas is highlighted.
useGrayOverlay
is True
, frames are converted to grayscale before the signal processing is done. The implication is that motion is only highlighted in areas of grayscale contrast. Alternatively, if useGrayOverlay
is False
, motion is highlighted in areas that have contrast in any color channel.numFFTThreads
and numIFFTThreads
, respectively, are the numbers of threads used in FFT and IFFT computations.cameraDeviceID
and imageSize
are our usual capture parameters.The initializer's implementation begins in the same way as our other Python apps. It sets flags to indicate that the app is running and (by default) should be mirrored. It creates the capture object and configures its resolution to match the requested width and height if possible. Failing that, the device's default capture resolution is used. The relevant code is as follows:
self.mirrored = True self._running = True self._capture = cv2.VideoCapture(cameraDeviceID) size = ResizeUtils.cvResizeCapture( self._capture, imageSize) w, h = size self._imageWidth = w self._imageHeight = h
Next, we will determine the shape of the history of frames. It has at least 3 dimensions: a number of frames, a width, and height for each frame. The width and height are downsampled from the capture width and height based on the number of pyramid levels. If we are concerned about the color motion and not just the grayscale motion, the history also has a fourth dimension, consisting of 3 color channels. Here is the code to calculate the history's shape:
self._useGrayOverlay = useGrayOverlay if useGrayOverlay: historyShape = (maxHistoryLength, h >> numPyramidLevels, w >> numPyramidLevels) else: historyShape = (maxHistoryLength, h >> numPyramidLevels, w >> numPyramidLevels, 3)
Note the use of >>
, the right bit shift operator, to reduce the dimensions by a power of 2. The power is equal to the number of pyramid levels.
We will store the specified maximum history length. For the frames in the history, we will create a NumPy array of the shape that we just determined. For timestamps of the frames, we will create a deque (double-ended queue), a type of collection that allows us to cheaply add or remove elements from either end:
self._maxHistoryLength = maxHistoryLength self._history = numpy.empty(historyShape, numpy.float32) self._historyTimestamps = collections.deque()
We will store the remaining arguments because later, in each frame, we will pass them to the pyramid functions and signal the processing functions:
self._numPyramidLevels = numPyramidLevels self._useLaplacianPyramid = useLaplacianPyramid self._minHz = minHz self._maxHz = maxHz self._amplification = amplification self._numFFTThreads = numFFTThreads self._numIFFTThreads = numIFFTThreads
We call the following two functions to tell PyFFTW to cache its data structures (notably, its NumPy arrays) for a period of at least 1.0 second from their last use. (The default is 0.1 seconds.) Caching is a critical optimization for the PyFFTW interfaces that we are using, and we will choose a period that is more than long enough to keep the cache alive from frame to frame:
pyfftw.interfaces.cache.enable() pyfftw.interfaces.cache.set_keepalive_time(1.0)
The initializer's implementation ends with code to set up a window, event bindings, a bitmap, layout, and background thread—all familiar tasks from our previous Python projects:
style = wx.CLOSE_BOX | wx.MINIMIZE_BOX | \ wx.CAPTION | wx.SYSTEM_MENU | \ wx.CLIP_CHILDREN wx.Frame.__init__(self, None, title=title, style=style, size=size) self.Bind(wx.EVT_CLOSE, self._onCloseWindow) quitCommandID = wx.NewId() self.Bind(wx.EVT_MENU, self._onQuitCommand, id=quitCommandID) acceleratorTable = wx.AcceleratorTable([ (wx.ACCEL_NORMAL, wx.WXK_ESCAPE, quitCommandID) ]) self.SetAcceleratorTable(acceleratorTable) self._staticBitmap = wx.StaticBitmap(self, size=size) self._showImage(None) rootSizer = wx.BoxSizer(wx.VERTICAL) rootSizer.Add(self._staticBitmap) self.SetSizerAndFit(rootSizer) self._captureThread = threading.Thread( target=self._runCaptureLoop) self._captureThread.start()
We must modify our usual _onCloseWindow
callback to disable PyFFTW's cache. Disabling the cache ensures that resources are freed and that PyFFTW's threads terminate normally. The callback's implementation is given in the following code:
def _onCloseWindow(self, event): self._running = False self._captureThread.join() pyfftw.interfaces.cache.disable() self.Destroy()
The Esc key is bound to our usual _onQuitCommand
callback, which just closes the app:
def _onQuitCommand(self, event): self.Close()
The loop running on our background thread is similar to the one in our other Python apps. In each frame, it calls a helper function, _applyEulerianVideoMagnification
. Here is the loop's implementation.
def _runCaptureLoop(self): while self._running: success, image = self._capture.read() if image is not None: self._applyEulerianVideoMagnification( image) if (self.mirrored): image[:] = numpy.fliplr(image) wx.CallAfter(self._showImage, image)
The _applyEulerianVideoMagnification
helper function is quite long so we will consider its implementation in several chunks. First, we will create a timestamp for the frame and copy the frame to a format that is more suitable for processing. Specifically, we will use a floating point array with either one gray channel or 3 color channels, depending on the configuration:
def _applyEulerianVideoMagnification(self, image): timestamp = timeit.default_timer() if self._useGrayOverlay: smallImage = cv2.cvtColor( image, cv2.COLOR_BGR2GRAY).astype( numpy.float32) else: smallImage = image.astype(numpy.float32)
Using this copy, we will calculate the appropriate level in the Gaussian or Laplacian pyramid:
# Downsample the image using a pyramid technique. i = 0 while i < self._numPyramidLevels: smallImage = cv2.pyrDown(smallImage) i += 1 if self._useLaplacianPyramid: smallImage[:] -= \ cv2.pyrUp(cv2.pyrDown(smallImage))
For the purposes of the history and signal processing functions, we will refer to this pyramid level as "the image" or "the frame".
Next, we will check the number of history frames that have been filled so far. If the history has more than one unfilled frame (meaning the history will still not be full after adding this frame), we will append the new image and timestamp and then return early, such that no signal processing is done until a later frame:
historyLength = len(self._historyTimestamps) if historyLength < self._maxHistoryLength - 1: # Append the new image and timestamp to the # history. self._history[historyLength] = smallImage self._historyTimestamps.append(timestamp) # The history is still not full, so wait. return
If the history is just one frame short of being full (meaning the history will be full after adding this frame), we will append the new image and timestamp using the code given as follows:
if historyLength == self._maxHistoryLength - 1: # Append the new image and timestamp to the # history. self._history[historyLength] = smallImage self._historyTimestamps.append(timestamp)
If the history is already full, we will drop the oldest image and timestamp and append the new image and timestamp using the code given as follows:
else: # Drop the oldest image and timestamp from the # history and append the new ones. self._history[:-1] = self._history[1:] self._historyTimestamps.popleft() self._history[-1] = smallImage self._historyTimestamps.append(timestamp) # The history is full, so process it.
The history of image data is a NumPy array and, as such, we are using the terms "append" and "drop" loosely. NumPy arrays are immutable, meaning they cannot grow or shrink. Moreover, we are not recreating this array because it is large and reallocating it each frame would be expensive. We are just overwriting data within the array by moving the old data leftward and copying the new data in.
Based on the timestamps, we will calculate the average time per frame in the history, as seen in the following code:
# Find the average length of time per frame. startTime = self._historyTimestamps[0] endTime = self._historyTimestamps[-1] timeElapsed = endTime - startTime timePerFrame = \ timeElapsed / self._maxHistoryLength #print 'FPS:', 1.0 / timePerFrame
We will proceed with a combination of signal processing functions, collectively called a temporal bandpass filter. This filter blocks (zeros out) some frequencies and allows others to pass (remain unchanged). Our first step in implementing this filter is to run the pyfftw.interfaces.scipy_fftpack.fft
function using the history and a number of threads as arguments. Also, with the argument axis=0
, we will specify that the history's first axis is its time axis:
# Apply the temporal bandpass filter. fftResult = fft(self._history, axis=0, threads=self._numFFTThreads)
We will pass the FFT result and the time per frame to the scipy.fftpack.fftfreq
function. This function returns an array of midpoint frequencies (Hz in our case) corresponding to the indices in the FFT result. (This array answers the question, "Which frequency is the midpoint of the bin of frequencies represented by index i
in the FFT?".) We will find the indices whose midpoint frequencies lie closest (minimum absolute value difference) to our initializer's minHz
and maxHz
parameters. Then, we will modify the FFT result by setting the data to zero in all ranges that do not represent the frequencies of interest:
frequencies = fftfreq( self._maxHistoryLength, d=timePerFrame) lowBound = (numpy.abs( frequencies - self._minHz)).argmin() highBound = (numpy.abs( frequencies - self._maxHz)).argmin() fftResult[:lowBound] = 0j fftResult[highBound:-highBound] = 0j fftResult[-lowBound:] = 0j
The FFT result is symmetrical: fftResult[i]
and fftResult[-i]
pertain to the same bin of frequencies. Thus, we will modify the FFT result symmetrically.
Remember, the Fourier transform maps a frequency to a complex number that encodes an amplitude and phase. Thus, while the indices of the FFT result correspond to frequencies, the values contained at those indices are complex numbers. Zero as a complex number is written in Python as 0+0j
or 0j
.
Having thus filtered out the frequencies that do not interest us, we will finish applying the temporal bandpass filter by passing the data to the pyfftw.interfaces.scipy_fftpack.ifft
function:
ifftResult = ifft(fftResult, axis=0, threads=self._numIFFTThreads)
From the IFFT result, we will take the most recent frame. It should somewhat resemble the current camera frame, but should be black in areas that do not exhibit recent motion matching our parameters. We will multiply this filtered frame so that the non-black areas become bright. Then, we will upsample it (using a pyramid technique) and add the result to the current camera frame so that areas of motion are lit up. Here is the relevant code, which concludes the _applyEulerianVideoMagnification
method:
# Amplify the result and overlay it on the # original image. overlay = numpy.real(ifftResult[-1]) * \ self._amplification i = 0 while i < self._numPyramidLevels: overlay = cv2.pyrUp(overlay) i += 1 if self._useGrayOverlay: overlay = cv2.cvtColor(overlay, cv2.COLOR_GRAY2BGR cv2.convertScaleAbs(image + overlay, image)
To finish the implementation of the LazyEyes class, we will display the image in the same manner as we have done in our other Python apps. Here is the relevant method:
def _showImage(self, image): if image is None: # Provide a black bitmap. bitmap = wx.EmptyBitmap(self._imageWidth, self._imageHeight) else: # Convert the image to bitmap format. bitmap = WXUtils.wxBitmapFromCvImage(image) # Show the bitmap. self._staticBitmap.SetBitmap(bitmap)
Our module's main function just instantiates and runs the app, as seen in the following code:
def main(): app = wx.App() lazyEyes = LazyEyes() lazyEyes.Show() app.MainLoop() if __name__ == '__main__': main()
That's all! Run the app and stay quite still while it builds up its history of frames. Until the history is full, the video feed will not show any special effect. At the history's default length of 360 frames, it fills in about 20 seconds on my machine. Once it is full, you should see ripples moving through the video feed in areas of recent motion—or perhaps all areas if the camera is moved or the lighting or exposure is changed. The ripples will gradually settle and disappear in areas of the scene that become still, while new ripples will appear in new areas of motion. Feel free to experiment on your own. Now, let's discuss a few recipes for configuring and testing the parameters of the LazyEyes
class.