Implementing The Living Headlights app

The Living Headlights app will use the following files:

Let's start with the creation of ColorUtils.py. We need functions to calculate a color's hue and saturation according to the formulae mentioned in the Detecting lights as blobs section. The module's implementation is done in the following code:

import math

def hueFromBGR(color):
  b, g, r = color
  # Note: sqrt(3) = 1.7320508075688772
  hue = math.degrees(math.atan2(
    1.7320508075688772 * (g - b), 2 * r - g - b))
  if hue < 0.0:
    hue += 360.0
  return hue

def saturationFromBGR(color):
  return max(color) - min(color)

Note

If we want to convert an entire image (every pixel) to hue, saturation, and either luminosity or value, we can use OpenCV's cvtColor method:

Refer to the following Wikipedia article for the definitions of saturation, luminosity, and value in HSV and HSL color models at https://en.wikipedia.org/wiki/HSL_and_HSV. Our definition of saturation is called chroma in the Wikipedia article and it differs from HSL saturation, which differs again from HSV saturation. Moreover, OpenCV represents hue in units of 2 degrees (in a range of 0 to 180) so that the hue channel fits inside a byte.

We will write our own conversion functions because, for our purposes, converting an entire image is unnecessary. We just need to convert a sample from each blob. Also, we prefer a more accurate floating-point representation instead of the byte-sized integer representation that OpenCV imposes.

We also need to modify GeomUtils.py by adding a function to calculate the Euclidean distance between two 2D points, such as the pixel coordinates of two headlights in an image. At the top of the file, let's add an import statement and implement the function thusly:

import math

def dist2D(p0, p1):

  deltaX = p1[0] - p0[0]
  deltaY = p1[1] - p0[1]
  return math.sqrt(deltaX * deltaX +
    deltaY * deltaY)

The preceding code contains all the new utility functions. Now, let's create a file, LivingHeadlights.py, for the app's main class, LivingHeadlights. Like InteractiveRecognizer in Chapter 3, Training a Smart Alarm to Recognize the Villain and His Cat, LivingHeadlights is a class for a wxPython app that captures and processes images on a background thread (to avoid blocking the GUI on the main thread), allows a user to enter reference data, serializes its reference data when exiting, and deserializes its reference data when starting up again. This time, serialization/deserialization is accomplished using Python's cPickle module or, if cPickle is unavailable for any reason, the less optimized pickle module. Let's add the following import statements to the start of LivingHeadlights.py:

import numpy
import cv2
import os
import threading
import wx

try:
  import cPickle as pickle
except:
  import pickle

import ColorUtils
import GeomUtils
import PyInstallerUtils
import ResizeUtils
import WxUtils

Let's also define some BGR color values and names at the start of the module. We will classify each blob as one of these colors, depending on the hue and saturation:

COLOR_Red         = ((  0,   0, 255), 'red')
COLOR_YellowWhite = ((223, 247, 255), 'yellowish white')
COLOR_AmberYellow = ((  0, 191, 255), 'amber or yellow')
COLOR_Green       = ((128, 255, 128), 'green')
COLOR_BlueWhite   = ((255, 231, 223), 'bluish white')
COLOR_BluePurple  = ((255,  64,   0), 'blue or purple')
COLOR_Pink        = ((240, 128, 255), 'pink')

Now, let's begin implementing the class. The initializer takes several arguments relating to the configuration of the blob detector and the camera. Refer to the preceding section, Detecting lights as blobs, for explanations of the blob detection parameters supported by OpenCV's SimpleBlobDetector and SimpleBlobDetector_Params classes. The following is the class declaration and the declaration of the initializer:

class LivingHeadlights(wx.Frame):

  def __init__(self, configPath, thresholdStep=8.0,
    minThreshold=191.0, maxThreshold=255.0,
    minRepeatability=2,
    minDistBetweenBlobsProportional=0.02,
    minBlobAreaProportional=0.005,
    maxBlobAreaProportional=0.1,
    minBlobCircularity=0.7, cameraDeviceID=0,
    imageSize=(640, 480),
    title='The Living Headlights'):

We will start the initializer's implementation by setting a public Boolean variable indicating that the app should currently display a mirrored image, and a protected Boolean variable indicating that the app should currently be running (not preparing to quit):

    self.mirrored = True

    self._running = True

If there is any configuration file saved from a previous run of the app, we will deserialize the reference measurements (pixel distance between lights and real distance in meters between lights and camera) as well as the user's preferred unit of measurement (meters or feet):

    self._configPath = configPath
    self._pixelDistBetweenLights = None
    if os.path.isfile(configPath):
      file = open(self._configPath, 'r')
      self._referencePixelDistBetweenLights = \
          pickle.load(file)
      self._referenceMetersToCamera = \
          pickle.load(file)
      self._convertMetersToFeet = pickle.load(file)
    else:
      self._referencePixelDistBetweenLights = None
      self._referenceMetersToCamera = None
      self._convertMetersToFeet = False

We will initialize a VideoCapture object and try to configure the size of the captured images. If the requested size is unsupported, we will fall back to the default size:

    self._capture = cv2.VideoCapture(cameraDeviceID)
    size = ResizeUtils.cvResizeCapture(
        self._capture, imageSize)
    w, h = size
    self._imageWidth, self._imageHeight = w, h

We will create a SimpleBlobDetector_Params object and SimpleBlobDetector object based on the arguments passed to the app's initializer:

    minDistBetweenBlobs = \
        min(w, h) * \
        minDistBetweenBlobsProportional

    area = w * h
    minBlobArea = area * minBlobAreaProportional
    maxBlobArea = area * maxBlobAreaProportional

    detectorParams = cv2.SimpleBlobDetector_Params()

    detectorParams.minDistBetweenBlobs = \
        minDistBetweenBlobs

    detectorParams.thresholdStep = thresholdStep
    detectorParams.minThreshold = minThreshold
    detectorParams.maxThreshold = maxThreshold

    detectorParams.minRepeatability = minRepeatability

    detectorParams.filterByArea = True
    detectorParams.minArea = minBlobArea
    detectorParams.maxArea = maxBlobArea

    detectorParams.filterByColor = True
    detectorParams.blobColor = 255

    detectorParams.filterByCircularity = True
    detectorParams.minCircularity = minBlobCircularity

    detectorParams.filterByInertia = False

    detectorParams.filterByConvexity = False

    self._detector = cv2.SimpleBlobDetector(
        detectorParams)

We will specify the style of the app's window and will initialize the base class, wx.Frame:

    style = wx.CLOSE_BOX | wx.MINIMIZE_BOX | \
        wx.CAPTION | wx.SYSTEM_MENU | \
        wx.CLIP_CHILDREN
    wx.Frame.__init__(self, None, title=title,
             style=style, size=size)
    self.SetBackgroundColour(wx.Colour(232, 232, 232))

We will bind the Esc key to a callback that closes the app:

    self.Bind(wx.EVT_CLOSE, self._onCloseWindow)

    quitCommandID = wx.NewId()
    self.Bind(wx.EVT_MENU, self._onQuitCommand,
         id=quitCommandID)
    acceleratorTable = wx.AcceleratorTable([
      (wx.ACCEL_NORMAL, wx.WXK_ESCAPE,
       quitCommandID)
    ])
    self.SetAcceleratorTable(acceleratorTable)

We will create the GUI elements, including the bitmap, the text field for the reference distance, radio buttons for the unit (meters or feet), and the Calibrate button. We will also bind callbacks for various input events:

    self._staticBitmap = wx.StaticBitmap(self,
            size=size)
    self._showImage(None)

    self._calibrationTextCtrl = wx.TextCtrl(
        self, style=wx.TE_PROCESS_ENTER)
    self._calibrationTextCtrl.Bind(
        wx.EVT_KEY_UP,
        self._onCalibrationTextCtrlKeyUp)

    self._distanceStaticText = wx.StaticText(self)
    if self._referencePixelDistBetweenLights is None:
      self._showInstructions()
    else:
      self._clearMessage()

    self._calibrationButton = wx.Button(
            self, label='Calibrate')
    self._calibrationButton.Bind(
        wx.EVT_BUTTON, self._calibrate)
    self._calibrationButton.Disable()

    border = 12

    metersButton = wx.RadioButton(self,
            label='Meters')
    metersButton.Bind(wx.EVT_RADIOBUTTON,
            self._onSelectMeters)

    feetButton = wx.RadioButton(self, label='Feet')
    feetButton.Bind(wx.EVT_RADIOBUTTON,
            self._onSelectFeet)

We will ensure that the proper radio buttons start in a selected state, depending on the configuration data that we deserialized earlier:

    if self._convertMetersToFeet:
      feetButton.SetValue(True)
    else:
      metersButton.SetValue(True)

We will stack the radio buttons vertically using BoxSizer:

    unitButtonsSizer = wx.BoxSizer(wx.VERTICAL)
    unitButtonsSizer.Add(metersButton)
    unitButtonsSizer.Add(feetButton)

We will line up all our controls horizontally, again using BoxSizer:

    controlsSizer = wx.BoxSizer(wx.HORIZONTAL)
    style = wx.ALIGN_CENTER_VERTICAL | wx.RIGHT
    controlsSizer.Add(self._calibrationTextCtrl, 0,
             style, border)
    controlsSizer.Add(unitButtonsSizer, 0, style,
             border)
    controlsSizer.Add(self._calibrationButton, 0,
             style, border)
    controlsSizer.Add(self._distanceStaticText, 0,
             wx.ALIGN_CENTER_VERTICAL)

To finish our layout, we will place the controls below the image:

    rootSizer = wx.BoxSizer(wx.VERTICAL)
    rootSizer.Add(self._staticBitmap)
    rootSizer.Add(controlsSizer, 0,
           wx.EXPAND | wx.ALL, border)
    self.SetSizerAndFit(rootSizer)

The last thing we will do in the initializer is start a background thread to capture and process images from the camera:

    self._captureThread = threading.Thread(
        target=self._runCaptureLoop)
    self._captureThread.start()

When closing the app, we will first ensure that the capture thread terminates itself, just as we did in InteractiveRecognizer in Chapter 3, Training a Smart Alarm to Recognize the Villain and His Cat. We will also use pickle or cPickle to serialize the reference measurements and preferred unit (meters or feet) to a file. Here is the implementation of the relevant callback:

  def _onCloseWindow(self, event):
    self._running = False
    self._captureThread.join()
    configDir = os.path.dirname(self._configPath)
    if not os.path.isdir(configDir):
      os.makedirs(configDir)
    file = open(self._configPath, 'w')
    pickle.dump(self._referencePixelDistBetweenLights,
          file)
    pickle.dump(self._referenceMetersToCamera, file)
    pickle.dump(self._convertMetersToFeet, file)
    self.Destroy()

The callback associated with the escape button just closes the app, as follows:

  def _onQuitCommand(self, event):
    self.Close()

When either of the radio buttons is selected, we will record the newly selected unit of measurement, as seen in these two callback methods:

  def _onSelectMeters(self, event):
    self._convertMetersToFeet = False

  def _onSelectFeet(self, event):
    self._convertMetersToFeet = True

Whenever a new character is entered in the text field, we will call a helper method to validate this text as potential input:

  def _onCalibrationTextCtrlKeyUp(self, event):
    self._enableOrDisableCalibrationButton()

When the Calibrate button is clicked, we will parse the measurement from the text field, clear the text field, convert the measurement to meters if necessary, and store the value. The button's callback is implemented as follows:

  def _calibrate(self, event):
    self._referencePixelDistBetweenLights = \
        self._pixelDistBetweenLights
    s = self._calibrationTextCtrl.GetValue()
    self._calibrationTextCtrl.SetValue('')
    self._referenceMetersToCamera = float(s)
    if self._convertMetersToFeet:
      self._referenceMetersToCamera *= 0.3048

Just like the Interactive Recognizer app in Chapter 3, Training a Smart Alarm to Recognize the Villain and His Cat, the background thread runs a loop that includes capturing an image, calling a helper method to process the image, and calling another helper method to display the image. Optionally, the image might be mirrored (flipped horizontally) before being displayed. Here is the loop's implementation:

  def _runCaptureLoop(self):
    while self._running:
      success, image = self._capture.read()
      if image is not None:
        self._detectAndEstimateDistance(image)
        if (self.mirrored):
          image[:] = numpy.fliplr(image)
      wx.CallAfter(self._showImage, image)

The helper method used for processing the image is quite long. Let's look at it in several chunks. First, we will detect blobs in a gray version of the image and will initialize a dictionary that we will use to sort blobs by color:

  def _detectAndEstimateDistance(self, image):

    grayImage = cv2.cvtColor(
        image, cv2.COLOR_BGR2GRAY)
    blobs = self._detector.detect(grayImage)
    blobsForColors = {}

For each blob, we will crop out a square region that is likely to include a white circle of light, plus some more saturated pixels around the edge:

    for blob in blobs:

      centerXAsInt, centerYAsInt = \
          (int(n) for n in blob.pt)
      radiusAsInt = int(blob.size)

      minX = max(0, centerXAsInt - radiusAsInt)
      maxX = min(self._imageWidth,
           centerXAsInt + radiusAsInt)
      minY = max(0, centerYAsInt - radiusAsInt)
      maxY = min(self._imageHeight,
           centerYAsInt + radiusAsInt)

      region = image[minY:maxY, minX:maxX]

We will find the average hue and saturation of the region and, using these values, we will classify the blob as one of the colors that we defined at the top of this module:

      # Get the region's dimensions, which may
      # differ from the blob's diameter if the blob
      # extends past the edge of the image.
      h, w = region.shape[:2]

      meanColor = region.reshape(w * h, 3).mean(0)
      meanHue = ColorUtils.hueFromBGR(meanColor)
      meanSaturation = ColorUtils.saturationFromBGR(
          meanColor)

      if meanHue < 22.5 or meanHue > 337.5:
        color = COLOR_Red
      elif meanHue < 67.5:
        if meanSaturation < 25.0:
          color = COLOR_YellowWhite
        else:
          color = COLOR_AmberYellow
      elif meanHue < 172.5:
        color = COLOR_Green
      elif meanHue < 277.5:
        if meanSaturation < 25.0:
          color = COLOR_BlueWhite
        else:
          color = COLOR_BluePurple
      else:
        color = COLOR_Pink

      if color in blobsForColors:
        blobsForColors[color] += [blob]
      else:
        blobsForColors[color] = [blob]

Finally, after classifying all blobs, we will call a helper method that handles the classification results and a helper method that can enable or disable the Calibrate button:

    self._processBlobsForColors(image, blobsForColors)
    self._enableOrDisableCalibrationButton()

Based on the color classification results, we want to highlight the blobs in certain colors, draw lines that connect pairs of like-colored blobs (if any), and display a message about the estimated distance to the first such pair of blobs. We will use the BGR color values and human-readable color names that we defined at the top of this module. The relevant code is as follows:

  def _processBlobsForColors(self, image,
               blobsForColors):

    self._pixelDistBetweenLights = None

    for color in blobsForColors:

      prevBlob = None

      for blob in blobsForColors[color]:

        colorBGR, colorName = color

        centerAsInts = \
            tuple(int(n) for n in blob.pt)
        radiusAsInt = int(blob.size)

        # Fill the circle with the selected color.
        cv2.circle(image, centerAsInts,
             radiusAsInt, colorBGR,
             cv2.cv.CV_FILLED, cv2.CV_AA)
        # Outline the circle in black.
        cv2.circle(image, centerAsInts,
             radiusAsInt, (0, 0, 0), 1,
             cv2.CV_AA)

        if prevBlob is not None:

          if self._pixelDistBetweenLights is None:
            self._pixelDistBetweenLights = \
                GeomUtils.dist2D(blob.pt,
                       prevBlob.pt)
            wx.CallAfter(self._showDistance,
                   colorName)

          prevCenterAsInts = \
            tuple(int(n) for n in prevBlob.pt)
 
          # Connect the current and previous
          # circle with a black line.
          cv2.line(image, prevCenterAsInts,
               centerAsInts, (0, 0, 0), 1,
               cv2.CV_AA)
 
          prevBlob = blob

Next, let's look at the helper method that enables or disables the Calibrate button. The button should be enabled only when a pixel distance between two lights is being measured and when a number (the real distance between the lights and camera) is in the text field. Here are the tests for these conditions:

  def _enableOrDisableCalibrationButton(self):
    s = self._calibrationTextCtrl.GetValue()
    if len(s) < 1 or \
        self._pixelDistBetweenLights is None:
      self._calibrationButton.Disable()
    else:
      # Validate that the input is a number.
      try:
        float(s)
        self._calibrationButton.Enable()
      except:
        self._calibrationButton.Disable()

The helper method for displaying an image is the same as in our previous wxPython projects:

  def _showImage(self, image):
    if image is None:
      # Provide a black bitmap.
      bitmap = wx.EmptyBitmap(self._imageWidth,
                  self._imageHeight)
    else:
      # Convert the image to bitmap format.
      bitmap = WxUtils.wxBitmapFromCvImage(image)
    # Show the bitmap.
    self._staticBitmap.SetBitmap(bitmap)

Here is the helper method that shows the instructional message:

  def _showInstructions(self):
    self._showMessage(
        'When a pair of lights is highlighted, '
        'enter the\ndistance and click '
        '"Calibrate".')

And here is the helper method that shows the estimated distance in either meters or feet:

  def _showDistance(self, colorName):
    if self._referenceMetersToCamera is None:
      return
    value = self._referenceMetersToCamera * \
        self._referencePixelDistBetweenLights / \
        self._pixelDistBetweenLights
    if self._convertMetersToFeet:
      value /= 0.3048
      unit = 'feet'
    else:
      unit = 'meters'
    self._showMessage(
        'A pair of %s lights was spotted\nat '
        '%.2f %s.' % \
        (colorName, value, unit))

When the message is cleared, we will leave a newline character so that the label still has the same height as when it was populated:

  def _clearMessage(self):
    # Insert an endline for consistent spacing.
    self._showMessage('\n')

Showing a message simply entails changing the text of the StaticText object, as seen in the following helper method:

  def _showMessage(self, message):
    self._distanceStaticText.SetLabel(message)

The class is complete. Now, we just need the following main function (similar to our main functions for previous wxPython apps) to specify a file path for serialization/deserialization and to launch the app:

def main():
  app = wx.App()
  configPath = PyInstallerUtils.pyInstallerResourcePath(
    'config.dat')
  livingHeadlights = LivingHeadlights(configPath)
  livingHeadlights.Show()
  app.MainLoop()

if __name__ == '__main__':
  main()

That is the whole implementation of The Living Headlights app! This project's code is short, but we have some unusual requirements for setup and testing. Let's turn to these tasks now.