Cron-based queues

In the previous section we wrote the sports_cron() implementation which, at each run, looks for teams that no longer have players and deletes them from the database. However, if we run the Drupal cron every hour, we keep running that query even if we are pretty certain that teams don't lose all their players so often. Moreover, we also go by the simple assumption (a functionality we have not written so far) that there is some code responsible for removing a player from a team. This would actually be the ideal place to check whether that team has lost all its players. The idea, then, is to check whether the team has been left empty and add it to a queue to be deleted later (whenever the cron runs).

We won't go into the code specific to player and team management, but instead focus on the part that adds the team that needs to be deleted to the queue.

The first thing we need to do is get our hands on the QueueFactory service:

/** @var \Drupal\Core\Queue\QueueFactory $queue_factory */ 
$queue_factory = \Drupal::service('queue');  

Then, we need to create an instance of the default QueueInterface (database) with the name of our future worker plugin ID:

/** @var \Drupal\Core\Queue\QueueInterface $queue */ 
$queue = $queue_factory->get('team_cleaner');  

This is obviously the static approach of loading services, and you should be injecting them instead whenever possible. But if you cannot, there is also the following shorthand which can achieve the same thing in one line:

$queue = \Drupal::queue('team_cleaner');  

$queue is an instance of DatabaseQueue with the name team_cleaner.

The next thing we need to do is add items to it (assuming that we've identified a team without players):

$item = new \stdClass(); 
$item->id = $team_id; 
$queue->createItem($item);  

It's standard practice to create a PHP object to wrap the data for the queue item. Inside, we can put anything we want that can serialize properly, and that's all. We can now turn to our TeamCleaner worker plugin, which naturally goes in the Plugin/QueueWorker namespace of our module:

namespace Drupal\sports\Plugin\QueueWorker; 
 
use Drupal\Core\Database\Connection; 
use Drupal\Core\Plugin\ContainerFactoryPluginInterface; 
use Drupal\Core\Queue\QueueWorkerBase; 
use Symfony\Component\DependencyInjection\ContainerInterface; 
 
/** 
 * A worker plugin that removes a team from the database. Normally used to clear 
 * teams that have run out of players. 
 * 
 * @QueueWorker( 
 *   id = "team_cleaner", 
 *   title = @Translation("Team Cleaner"), 
 *   cron = {"time" = 10} 
 * ) 
 */ 
class TeamCleaner extends QueueWorkerBase implements ContainerFactoryPluginInterface { 
 
  /** 
   * @var \Drupal\Core\Database\Connection 
   */ 
  protected $database; 
 
  /** 
   * Constructs a TeamCleaner worker. 
   * 
   * @param array $configuration 
   * @param string $plugin_id 
   * @param mixed $plugin_definition 
   * @param \Drupal\Core\Database\Connection $database 
   */ 
  public function __construct(array $configuration, $plugin_id, $plugin_definition, Connection $database) { 
    parent::__construct($configuration, $plugin_id, $plugin_definition); 
    $this->database = $database; 
  } 
 
  /** 
   * {@inheritdoc} 
   */ 
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) { 
    return new static( 
      $configuration, 
      $plugin_id, 
      $plugin_definition, 
      $container->get('database') 
    ); 
  } 
 
  /** 
   * {@inheritdoc} 
   */ 
  public function processItem($data) { 
    $id = isset($data->id) && $data->id ? $data->id : NULL; 
    if (!$id) { 
      throw new \Exception('Missing team ID'); 
      return; 
    } 
 
    $this->database->delete('teams') 
      ->condition('id', $id) 
      ->execute(); 
  } 
}  

As we're already used to it, our plugin extends the base plugin class of its type to inherit any potential base functionality. In our case, this is limited to the implementation of the QueueWorkerInterface which has one method whose name easily describes its responsibility: processItem($data). Also not new to us is the implementation of ContainerFactoryPluginInterface which allows us to inject the database service into our plugin. We use that to delete the queued team.

All the action in fact happens in the processItem() method where we simply look into the $data object and delete the team with the specified ID. We also throw a simple exception if something goes wrong. We'll talk about exceptions in queue processing shortly.

Somewhat more interesting for the Queue API, however, is the plugin annotation. Apart from the standard expected plugin definition, we also encounter the following:

cron = {"time" = 10}

This simply indicates that this plugin should be used by the cron system. In other words, when the Drupal cron runs, it loads all the worker plugin definitions, and whichever has this information gets processed. And the key here is the time information, which we have set to 10 seconds. This essentially means that when the cron runs, we are saying: go ahead and process as many queue items as you can within 10 seconds; once that time limit is up, stop and continue with the rest of the cron tasks. This is actually very powerful because we allocated an amount of time from the PHP request and dedicated it to our queue. This means that we don't have to guess how many items to allocate for a request (as we did with the batching). However, it also means that the rest of the time left needs to be enough for everything else. So, we need to adjust this carefully. As for the queue items that don't fit into those 10 seconds, they will simply be processed at the next cron run.

This approach is better than our previous one, in which we ourselves implemented hook_cron(), because we don't want to always keep checking teams for players, but can instead create queue items and defer the deletion until a later time, as needed.

Very similarly, we could refactor our JSON product importer. When calling the import() method, the products would get queued, and then a separate worker plugin would handle the product data creation/update whenever cron runs. This of course depends on whether we are okay with splitting the import functionality into two classes, which is not a big deal. We are actually fine with the way things are at the moment, so to illustrate the programmatic processing of the queue, we will use another example.