CHAPTER 24
Multithreading, Part Two: Exploring the Task Parallel Library and PLINQ

Perhaps the most important new feature added to the .NET Framework by version 4.0 is the Task Parallel Library (TPL). This library enhances multithreaded programming in two important ways. First, it simplifies the creation and use of multiple threads. Second, it automatically makes use of multiple processors. In other words, by using the TPL you enable your applications to automatically scale to make use of the number of available processors. These two features make the TPL the recommended approach to multithreading in most cases.

Another parallel programming feature added by .NET 4.0 is PLINQ, which stands for Parallel Language Integrated Query. PLINQ enables you to write queries that automatically make use of multiple processors and parallelism when appropriate. As you will see, it is trivially easy to request parallel execution of a query. Thus, through the use of PLINQ, it is possible to add parallelism to a query with little effort.

The primary reason that TPL and PLINQ are such important advances is because of the growing importance of parallelism in modern programming. Today, multicore processors are becoming commonplace. Furthermore, the demand for better program performance is increasing. As a result, there has been a growing need for a mechanism that enables software to take advantage of multiple processors to increase performance. The trouble is that in the past, it was not always easy to do so in a clean, scalable manner. The TPL and PLINQ change this, making it easier (and safer) to best utilize system resources.

The TPL is defined in the System.Threading.Tasks namespace. However, when working with the TPL, you will also often need to include System.Threading because it provides support for synchronization and other multithreading features such as the Interlocked class.

This chapter explores both the TPL and PLINQ. Understand, however, that these are large topics, and it is not possible to cover them in detail. Instead, the fundamentals of each is described and several basic techniques are demonstrated. Thus, the information in this chapter will help you get started. If you will be focusing on parallel programming, then these are areas of the .NET framework that you will want to study in greater detail.


NOTE Although the use of the TPL and PLINQ is now recommended for most multithreading applications, threading based on the Thread class as described in Chapter 23 is still in widespread use. Furthermore, much of what is described in Chapter 23 applies to the TPL. Therefore, an understanding of the material in Chapter 23 is still required to fully master multithreading in C#.

Two Approaches to Parallel Programming

When using the TPL, there are two basic ways in which you can add parallelism to a program. The first is called data parallelism. With this approach, one operation on a collection of data is broken into two or more concurrent threads of execution, each operating on a portion of the data. For example, if a transformation is applied to each element in an array, then through the use of data parallelism, it is possible for two or more threads to be operating on different ranges of the array concurrently. As you can imagine, such parallel actions could result in substantial increases in speed over a strictly sequential approach. Although data parallelism has always been possible by using the Thread class, it was difficult and time-consuming to construct scalable solutions. The TPL changes this. With the TPL, scalable data parallelism is easy to add to your program.

The second way to add parallelism is through the use of task parallelism. This approach executes two or more operations concurrently. Thus, task parallelism is the type of parallelism that has been accomplished in the past via the Thread class. The advantages that the TPL adds are ease-of-use and the ability to automatically scale execution to multiple processors.

The Task Class

At the core of the TPL is the Task class. With the TPL, the basic unit of execution is encapsulated by Task, not Thread. Task differs from Thread in that Task is an abstraction that represents an asynchronous operation. Thread encapsulates a thread of execution. Of course, at the system level, a thread is still the basic unit of execution that can be scheduled by the operating system. However, the correspondence between a Task instance and a thread of execution is not necessarily one-to-one. Furthermore, task execution is managed by a task scheduler, which works with a thread pool. This means that several tasks might share the same thread, for example. The Task class (and all of the TPL) is defined in System.Threading.Tasks.

Creating a Task

There are various ways to create a new Task and start its execution. We will begin by first creating a Task using a constructor and then starting it by calling the Start( ) method. Task defines several constructors. Here is the one we will be using:

public Task(Action action)

Here, action is the entry point of the code that represents the task. Action is a delegate defined in System. It has several forms. Here is the form we will use now:

public delegate void Action( )

Thus, the entry point must be a method that takes no parameters and returns void. (As you will see later, it is possible to specify an argument to Action.)

Once a task has been created, you can start it by calling Start( ). One version is shown here:

public void Start( )

After a call to Start( ), the task scheduler schedules it for execution.

The following program puts the preceding discussion into action. It creates a separate task based on the MyTask( ) method. After Main( ) starts, the task is created and then started. Both MyTask( ) and Main( ) execute concurrently.

// Create and run a task.

using System;
using System.Threading;
using System.Threading.Tasks;

class DemoTask {

  // A method to be run as a task.
  static void MyTask() {
    Console.WriteLine("MyTask() starting");

    for(int count = 0; count < 10; count++) {
      Thread.Sleep(500);
      Console.WriteLine("In MyTask(), count is " + count);
    }

    Console.WriteLine("MyTask terminating");
  }

  static void Main() {

    Console.WriteLine("Main thread starting.");

     // Construct a task.
    Task tsk = new Task(MyTask);

    // Run the task.
    tsk.Start();

    // Keep Main() alive until MyTask() finishes.
    for(int i = 0; i < 60; i++) {
      Console.Write(".");
       Thread.Sleep(100);
    }

   Console.WriteLine("Main thread ending.");

  }

}

The output is shown here. (The precise output that you see may differ slightly based on task load, operating system, etc.)

Main thread starting.
.MyTask() starting
..... In MyTask(), count is 0
..... In MyTask(), count is 1
..... In MyTask(), count is 2
..... In MyTask(), count is 3
..... In MyTask(), count is 4
..... In MyTask(), count is 5
..... In MyTask(), count is 6
..... In MyTask(), count is 7
..... In MyTask(), count is 8
..... In MyTask(), count is 9
MyTask terminating
......... Main thread ending.

It is important to understand that, by default, a task executes in a background thread. Thus, when the creating thread ends, the task will end. This is why Thread.Sleep( ) was used to keep the main thread alive until MyTask( ) completed. As you would expect and will soon see, there are far better ways of waiting for a task to finish.

In the foregoing example, the task to be concurrently executed is specified by a static method. However, there is no requirement to this effect. For example, the following program reworks the previous one so that MyTask( ) is encapsulated within a class:

// Use an instance method as a task.

using System;
using System.Threading;
using System.Threading.Tasks;

class MyClass {

  // A method to be run as a task.
  public void MyTask() {
    Console.WriteLine("MyTask() starting");

    for(int count = 0; count < 10; count++) {
      Thread.Sleep(500);
      Console.WriteLine("In MyTask(), count is " + count);
    }

    Console.WriteLine("MyTask terminating");
  }
}

class DemoTask {

  static void Main() {

    Console.WriteLine("Main thread starting.");

    // Construct a MyClass object.
    MyClass mc = new MyClass();

    // Construct a task on mc.MyTask().
    Task tsk = new Task(mc.MyTask);
    // Run the task.
    tsk.Start();

    // Keep Main() alive until MyTask() finishes.
    for(int i = 0; i < 60; i++) {
      Console.Write(".");
      Thread.Sleep(100);
    }

    Console.WriteLine("Main thread ending.");
  }
}

The output is the same as before. The only difference is that MyTask( ) is now called on an instance of MyClass.

One other important point about tasks needs to be made now: once a task completes, it cannot be restarted. Thus, there is no way to rerun a task without re-creating it.

Use a Task ID

Unlike Thread, Task does not include a name property. It does, however, have an ID property called Id, which can be used to identify the task. Id is a readonly property of type int. It is shown here:

public int Id { get; }

A task is given an ID when it is created. The ID values are unique, but unordered. Therefore, the ID of a task begun before another might not be lower in value.

You can find the ID of the currently executing task by using the CurrentId property. This is a readonly static property, which is declared like this:

public static Nullable<int> CurrentID { get; }

It returns the ID of the currently executing task or null if the invoking code is not a task.

The following program creates two tasks and shows which task is executing:

// Demonstrate the Id and CurrentId properties.

using System;
using System.Threading;
using System.Threading.Tasks;

class DemoTask {

  // A method to be run as a task.
  static void MyTask() {
    Console.WriteLine("MyTask() #" + Task.CurrentId + " starting");

    for(int count = 0; count < 10; count++) {
      Thread.Sleep(500);
      Console.WriteLine("In MyTask() #" + Task.CurrentId +
                        ", count is " + count);
    }

    Console.WriteLine("MyTask #" + Task.CurrentId + " terminating");
  }

  static void Main() {

    Console.WriteLine("Main thread starting.");

    // Construct two tasks.
    Task tsk = new Task(MyTask);
     Task tsk2 = new Task(MyTask);

    // Run the tasks.
    tsk.Start();
    tsk2.Start();

     Console.WriteLine("Task ID for tsk is " + tsk.Id);
    Console.WriteLine("Task ID for tsk2 is " + tsk2.Id);

     // Keep Main() alive until the other tasks finish.
     for(int i = 0; i < 60; i++) {
      Console.Write(".");
      Thread.Sleep(100);
    }

    Console.WriteLine("Main thread ending.");
  }
}

The output is shown here:

Main thread starting.
Task ID for tsk is 1
Task ID for tsk2 is 2
.MyTask() #1 starting
MyTask() #2 starting
..... In MyTask() #1, count is 0
In MyTask() #2, count is 0
..... In MyTask() #2, count is 1
In MyTask() #1, count is 1
..... In MyTask() #1, count is 2
In MyTask() #2, count is 2
..... In MyTask() #2, count is 3
In MyTask() #1, count is 3
..... In MyTask() #1, count is 4
In MyTask() #2, count is 4
.... In MyTask() #1, count is 5
In MyTask() #2, count is 5
..... In MyTask() #2, count is 6
.In MyTask() #1, count is 6
.... In MyTask() #2, count is 7
.In MyTask() #1, count is 7
.... In MyTask() #1, count is 8
In MyTask() #2, count is 8
.....In MyTask() #1, count is 9
MyTask #1 terminating
In MyTask() #2, count is 9
MyTask #2 terminating
..........Main thread ending.

Using Wait Methods

In the preceding examples, the Main( ) thread ended last because the calls to Thread.Sleep( ) ensured this outcome, but this is not a satisfactory approach. The best way to wait for a task to end is to use one of the wait methods that Task provides. The simplest one is called Wait( ), and it pauses execution of the calling thread until the invoking task completes. Here is its most straightforward form:

public void Wait( )

This method can throw two exceptions. The first is ObjectDisposedException. It is thrown if the task has been released via a call to Dispose( ). The second is AggregateException. It is thrown when a task throws an exception or is cancelled. In general, you will want to watch for and handle this exception. Because a task might produce more than one exception (if it has child tasks, for example), they are aggregated into a single exception of type AggregateException. You can then examine the inner exception(s) associated with this exception to determine what happened. For now, the following examples will simply let any task-based exceptions be handled by the runtime.

The following reworked version of the preceding program shows Wait( ) in action. It is used inside Main( ) to suspend execution until both tsk and tsk2 finish.

// Use Wait().

using System;
using System.Threading;
using System.Threading.Tasks;

class DemoTask {

  // A method to be run as a task.
  static void MyTask() {
    Console.WriteLine("MyTask() #" + Task.CurrentId + " starting");

    for(int count = 0; count < 10; count++) {
      Thread.Sleep(500);
      Console.WriteLine("In MyTask() #" + Task.CurrentId +
                        ", count is " + count);
    }

    Console.WriteLine("MyTask #" + Task.CurrentId + " terminating");
  }

  static void Main() {

    Console.WriteLine("Main thread starting.");
    // Construct two tasks.
    Task tsk = new Task(MyTask);
    Task tsk2 = new Task(MyTask);

    // Run the tasks.
     tsk.Start();
    tsk2.Start();

    Console.WriteLine("Task ID for tsk is " + tsk.Id);
    Console.WriteLine("Task ID for tsk2 is " + tsk2.Id);

    // Suspend Main() until both tsk and tsk2 finish.
    tsk.Wait();
     tsk2.Wait();

    Console.WriteLine("Main thread ending.");
  }
}

Here is the output:

Main thread starting.
Task ID for tsk is 1
Task ID for tsk2 is 2
MyTask() #1 starting
MyTask() #2 starting
In MyTask() #1, count is 0
In MyTask() #2, count is 0
In MyTask() #1, count is 1
In MyTask() #2, count is 1
In MyTask() #1, count is 2
In MyTask() #2, count is 2
In MyTask() #1, count is 3
In MyTask() #2, count is 3
In MyTask() #1, count is 4
In MyTask() #2, count is 4
In MyTask() #1, count is 5
In MyTask() #2, count is 5
In MyTask() #1, count is 6
In MyTask() #2, count is 6
In MyTask() #1, count is 7
In MyTask() #2, count is 7
In MyTask() #1, count is 8
In MyTask() #2, count is 8
In MyTask() #1, count is 9
MyTask #1 terminating
In MyTask() #2, count is 9
MyTask #2 terminating
Main thread ending.

As the output shows, Main( ) suspends execution until both tsk and tsk2 terminate. It is important to understand that in this program, the sequence in which tsk and tsk2 finish is not important relative to the calls to Wait( ). For example, if tsk2 completed first, the call to tsk.Wait( ) would still wait until tsk finished. Then, the call to tsk2.Wait( ) would execute and return immediately, since tsk2 was already done.

Although using two separate calls to Wait( ) works in this case, there is a simpler way: use WaitAll( ). This method waits on a group of tasks. It will not return until all have finished. Here is its simplest form:

public static void WaitAll(params Task[ ] tasks)

The tasks that you want to wait for are passed via tasks. Because this is a params parameter, you can pass an array of Task objects or list of tasks separately. Various exceptions are possible, including AggregateException.

To see WaitAll( ) in action, in the preceding program try replacing this sequence

tsk.Wait();
tsk2.Wait();

with

Task.WaitAll(tsk, tsk2);

The program will work the same, but the logic is cleaner and more compact.

When waiting for multiple tasks, you need to be careful about deadlocks. If two tasks are waiting on each other, then a call to WaitAll( ) will never return. Of course, deadlock conditions are errors that you must avoid. Therefore, if a call to WaitAll( ) does not return, consider the possibility that two or more of the tasks could be deadlocking. (A call to Wait( ) that doesn’t return could also be the result of deadlock.)

Sometimes you will want to wait until any one of a group of tasks completes. To do this, use the WaitAny( ) method. Here is its simplest form:

public static int WaitAny(params Task[ ] tasks)

The tasks that you want to wait for are passed via tasks. The tasks can be passed either as an array of Task objects or separately as a list of Task arguments. It returns the index of the task that completes first. Various exceptions are possible.

You can try WaitAny( ) in the previous program by substituting this call:

Task.WaitAny(tsk, tsk2);

Now, as soon as one task finishes, Main( ) resumes and the program ends.

In addition to the forms of Wait( ), WaitAll( ), and WaitAny( ) shown here, there are versions that let you specify a timeout period or watch for a cancellation token. (Task cancellation is described later in this chapter.)

Calling Dispose( )

The Task class implements the IDisposable interface, which specifies the Dispose( ) method. It has this form:

public void Dispose( )

As implemented by Task, Dispose( ) releases the resources used by the Task. In general, the resources associated with a Task are automatically released when the Task is subjected to garbage collection (or when the program terminates). However, to release those resources before then, call Dispose( ). This is especially important in a program in which large numbers of tasks are created and then abandoned.

It is important to understand that Dispose( ) can be called on a task only after it has completed. Thus, you will need to use some mechanism, such as Wait( ), to determine that a task has completed before calling Dispose( ). This is why it was necessary to describe the Wait( ) method prior to discussing Dispose( ). It you do try to call Dispose( ) on a still active task, an InvalidOperationException will be generated.

Because all of the examples in this chapter create few tasks, are quite short, and end immediately, calls to Dispose( ) are of essentially no benefit. (This is why it was not necessary to call Dispose( ) in the preceding programs; they all end as soon as the tasks end, thus resulting in the disposal of the tasks.) However, so as to demonstrate its use and to avoid confusion in this regard, all subsequent examples will call Dispose( ) explicitly when working directly with Task instances. However, don’t be surprised if you see example code from other sources that do not. Again, if a program will be ending as soon as a task ends, then there is essentially no point is calling Dispose( )—aside from demonstrating its use.

Using TaskFactory to Start a Task

The preceding examples are written a bit less efficiently than they need to be because it is possible to create a task and start its execution in a single step by calling the StartNew( ) method defined by TaskFactory. TaskFactory is a class that provides various methods that streamline the creation and management of tasks. The default TaskFactory can be obtained from the readonly Factory property provided by Task. Using this property, you can call any of the TaskFactory methods.

There are many forms of StartNew( ). The simplest version is shown here:

public Task StartNew(Action action)

Here, action is the entry point to the task to be executed. StartNew( ) automatically creates a Task instance for action and then starts the task by scheduling it for execution. Thus, there is no need to call Start( ).

For example, assuming the preceding programs, the following call creates and starts tsk in one step:

Task tsk = Task.Factory.StartNew(MyTask);

After this statement executes, MyTask will begin executing.

Since StartNew( ) is more efficient when a task is going to be created and then immediately started, subsequent examples will use this approach.

Use a Lambda Expression as a Task

Although there is nothing wrong with using a normal method as a task, there is a second option that is more streamlined. You can simply specify a lambda expression as the task. Recall that a lambda expression is a form of anonymous function. Thus, it can be run as a separate task. The lambda expression is especially useful when the only purpose of a method is to be a single-use task. The lambda can either constitute the entire task, or it can invoke other methods. Either way, the lambda expression approach offers a pleasing alternative to using a named method.

The following program demonstrates the use of a lambda expression as a task. It converts the MyTask( ) code in preceding programs into a lambda expression.

// Use a lambda expression as a task.

using System;
using System.Threading;
using System.Threading.Tasks;

class DemoLambdaTask {

  static void Main() {

     Console.WriteLine("Main thread starting.");

    // The following uses a lambda expression to define a task.
    Task tsk = Task.Factory.StartNew( () => {
      Console.WriteLine("Task starting");

       for(int count = 0; count < 10; count++) {
        Thread.Sleep(500);
        Console.WriteLine("Task count is " + count);
      }

      Console.WriteLine("Task terminating");
    } );

    // Wait until tsk finishes.
    tsk.Wait();

    // Dispose of tsk.
    tsk.Dispose();

     Console.WriteLine("Main thread ending.");
  }
}

The output is shown here:

Main thread starting.
Task starting
Task count is 0
Task count is 1
Task count is 2
Task count is 3
Task count is 4
Task count is 5
Task count is 6
Task count is 7
Task count is 8
Task count is 9
Task terminating
Main thread ending.

In addition to the use of a lambda expression to describe a task, notice that tsk.Dispose( ) is not called until after tsk.Wait( ) returns. As explained in the previous section, Dispose( ) can be called only on a completed task. To prove this, try putting the call to tsk.Dispose( ) before the call to tsk.Wait( ). As you will see, an exception is generated.

Create a Task Continuation

One innovative, and very convenient, feature of the TPL is its ability to create a task continuation. A continuation is a task that automatically begins when another task finishes. One way to create a continuation is to use the ContinueWith( ) method defined by Task. Its simplest form is shown here:

public Task ContinueWith(Action<Task> continuationAction)

Here, continuationAction specifies the task that will be run after the invoking task completes. This delegate has one parameter of type Task. Thus, this is the version of the Action delegate used by the method:

public delegate void Action<in T>(T obj)

In this case, T is Task.

The following program demonstrates a task continuation.

// Demonstrate a continuation.

using System;
using System.Threading;
using System.Threading.Tasks;

class ContinuationDemo {

  // A method to be run as a task.
  static void MyTask() {
    Console.WriteLine("MyTask() starting");

     for(int count = 0; count < 5; count++) {
      Thread.Sleep(500);
      Console.WriteLine("In MyTask() count is " + count);
     }

     Console.WriteLine("MyTask terminating");
  }

  // A method to be run as a continuation.
  static void ContTask(Task t) {
    Console.WriteLine("Continuation starting");

     for(int count = 0; count < 5; count++) {
      Thread.Sleep(500);
      Console.WriteLine("Continuation count is " + count);
     }
    Console.WriteLine("Continuation terminating");
  }

  static void Main() {

     Console.WriteLine("Main thread starting.");

     // Construct the first task.
     Task tsk = new Task(MyTask);

    // Now, create the continuation.
     Task taskCont = tsk.ContinueWith(ContTask);

    // Begin the task sequence.
    tsk.Start();

    // Just wait on the continuation.
    taskCont.Wait();

    tsk.Dispose();
    taskCont.Dispose();

    Console.WriteLine("Main thread ending.");
  }
}

The output is shown here:

Main thread starting.
MyTask() starting
In MyTask() count is 0
In MyTask() count is 1
In MyTask() count is 2
In MyTask() count is 3
In MyTask() count is 4
MyTask terminating
Continuation starting
Continuation count is 0
Continuation count is 1
Continuation count is 2
Continuation count is 3
Continuation count is 4
Continuation terminating
Main thread ending.

As the output shows, the second task did not begin until the first task completed. Also notice that it was necessary for Main( ) to wait only on the continuation task. This is because MyTask( ) will be finished before ContTask begins. Thus, there is no need to wait for MyTask( ), although it would not be wrong to do so.

As a point of interest, it is not uncommon to use a lambda expression as a continuation task. For example, here is another way to write the continuation used in the preceding program:

// Here, a lambda expression is used as the continuation.
Task taskCont = tsk.ContinueWith((first) =>
        {
          Console.WriteLine("Continuation starting");
          for(int count = 0; count < 5; count++) {
            Thread.Sleep(500);
            Console.WriteLine("Continuation count is " + count);
          }
          Console.WriteLine("Continuation terminating");
        }
);

Here, the parameter first receives the antecedent task (which is tsk in this case).

In addition to ContinueWith( ) provided by Task, there are other methods that support task continuation provided by TaskFactory. These include various forms of ContinueWhenAny( ) and ContinueWhenAll( ), which continue a task when any or all of the specified tasks complete, respectively.

Returning a Value from a Task

A task can return a value. This is a very useful feature for two reasons. First, it means that you can use a task to compute some result. This supports parallel computation. Second, the calling process will block until the result is ready. This means that you don’t need to do any special synchronization to wait for the result.

To return a result, you will create a task by using the generic form of Task, which is Task<TResult>. Here are two of its constructors:

public Task(Func<TResult> function)
public Task(Func<Object, TResult> function, Object state)

Here, function is the delegate to be run. Notice that it is of type Func rather than Action. Func is used when a task returns a result. The first form creates a task that takes no arguments. The second form creates a task that takes an argument of type Object passed in state. Other constructors are also available.

As you might expect, there are also versions of StartNew( ) provided by TaskFactory<TResult> that support returning a result from a task. Here are the ones that parallel the Task constructors just shown:

public Task<TResult> StartNew(Func<TResult> function)
public Task<TResult> StartNew(Func<Object, TResult> function, Object state)

In all cases, the value returned by the task is obtained from Task’s Result property, which is defined like this:

public TResult Result { get; internal set; }

Because the set accessor is internal, this property is effectively readonly relative to external code. The get accessor won’t return until the result is ready. Thus, retrieving the result blocks the calling code until the result has been computed.

The following program demonstrates task return values. It creates two methods. The first is MyTask( ), which takes no parameters. It simply returns the bool value true. The second is SumIt( ), which has a single parameter (which is cast to int) and returns the summation of the value passed to that parameter.

// Return a value from a task.

using System;
using System.Threading;
using System.Threading.Tasks;

class DemoTask {

  // A trivial method that returns a result and takes no arguments.
  static bool MyTask() {
    return true;
  }

  // This method returns the summation of a positive integer
  // which is passed to it.
  static int SumIt(object v) {
    int x = (int) v;
    int sum = 0;

    for(; x >0; x--)
       sum += x;

     return sum;
  }

  static void Main() {

    Console.WriteLine("Main thread starting.");

    // Construct the first task.
    Task<bool> tsk = Task<bool>.Factory.StartNew(MyTask);

    Console.WriteLine("After running MyTask. The result is " +
                      tsk.Result);

    // Construct the second task.
    Task<int> tsk2 = Task<int>.Factory.StartNew(SumIt, 3);
  
    Console.WriteLine("After running SumIt. The result is " +
                      tsk2.Result);

    tsk.Dispose();
    tsk2.Dispose();

    Console.WriteLine("Main thread ending.");
  }
}

The output is shown here:

Main thread starting.
After running MyTask. The result is True
After running SumIt. The result is 6
Main thread ending.

In addition to the forms of Task<TResult> and StartNew<TResult> used here, there are other forms available for use that let you specify other options.

Cancelling a Task and Using AggregateException

The .NET Framework 4.0 adds a new subsystem that provides a structured, yet highly flexible way to cancel tasks. This new mechanism is based on the cancellation token. Cancellation tokens are supported by the Task class, and through the StartNew( ) factory method (among others).


NOTE The new cancellation subsystem can also be used to cancel threads, which were described in the previous chapter. However, it is fully integrated into the TPL and PLINQ. For this reason, it is described here.

In general, here is how task cancellation works. A cancellation token is obtained from a cancellation token source. This token is then passed to the task. The task must then monitor that token for a cancellation request. (This request can come only from the cancellation token source.) If a cancellation request is received, the task must end. Sometimes it is sufficient for the task to simply stop, taking no further action. Other times, the task should call ThrowIfCancellationRequested( ) on the cancellation token. This lets the canceling code know that the task was cancelled. Now, we will look at the cancellation process in detail.

A cancellation token is an instance of CancellationToken, which is a structure defined in System.Threading. It defines several properties and methods. We will use two of them. The first is the readonly property IsCancellationRequested. It is shown here:

public bool IsCancellationRequested { get; }

It returns true if cancellation has been requested on the invoking token and false otherwise. The second member that we will use is the ThrowIfCancellationRequested( ) method. It is shown here:

public void ThrowIfCancellationRequested( )

If the cancellation token on which it is called has received a cancellation request, then this method will throw an OperationCanceledException. Otherwise, it takes no action. The cancelling code can watch for this exception to confirm that cancellation did, indeed, occur. This is normally done by catching AggregateException and then examining the inner exception, via the InnerException or InnerExceptions properties. (InnerExceptions is a collection of exceptions. Collections are described in Chapter 25.)

A cancellation token is obtained from a cancellation source. This is an object of CancellationTokenSource, which is defined in System.Threading. To obtain a token, first create a CancellationTokenSource instance. (You can use its default constructor for this purpose.) The cancellation token associated with that source is available through the read-only Token property, which is shown here:

public CancellationToken Token { get; }

This is the token that must be passed to the task that you want to be able to cancel.

To use cancellation, the task must receive a copy of the cancellation token and then monitor that token, watching for cancellation. There are three ways to watch for cancellation: polling, using a callback method, and using a wait handle. The easiest is polling, and that is the approach used here. To use polling, the task will check the IsCancellationRequested property of the cancellation token, described earlier. If this property is true, cancellation has been requested and the task should terminate. Polling can be quite efficient if it is done appropriately. For example, if a task contains nested loops, then checking IsCancellationRequested in the outer loop would often be better than checking it with each iteration of the inner loop.

To create a task that calls ThrowIfCancellationRequested( ) when cancelled, you will often want to pass the cancellation token to both the task and the Task constructor, whether directly or indirectly through the StartNew( ) method. Passing the cancellation token to the task enables a cancellation request by outside code to change the state of the task to be cancelled. Here, we will use this version of StartNew( ):

public Task StartNew(Action<Object> action, Object state, CancellationToken cancellationToken)

In this use, the cancellation token will be passed to both state and cancellationToken. This means that the cancellation token will be passed to both the delegate that implements the task and to the Task instance, itself. The form of Action that supports this is shown here:

public delegate void Action<in T>(T obj)

In this case, T is Object. Because of this, inside the task, obj must be cast to CancellationToken.

One other point: when you are done with the token source, you should release its resources by calling Dispose( ).

There are various ways to determine if a task has been cancelled. The approach used here is to test the value of IsCanceled on the Task instance. If it is true, the task was cancelled.

The following program demonstrates cancellation. It uses polling to monitor the state of the cancellation token. Notice that ThrowIfCancellationRequested( ) is called on entry into MyTask( ). This enables the task to be terminated if it was cancelled before it was started. Inside the loop, IsCancellationRequested is checked. When this property is true (which it will be after Cancel( ) is called on the token source), a message indicating cancellation is displayed and ThrowIfCancellationRequested( ) is called to cancel the task.

// A simple example of cancellation that uses polling.

using System;
using System.Threading;
using System.Threading.Tasks;

class DemoCancelTask {

  // A method to be run as a task.
  static void MyTask(Object ct) {
    CancellationToken cancelTok = (CancellationToken) ct;

    // Check if cancelled prior to starting.
     cancelTok.ThrowIfCancellationRequested();

    Console.WriteLine("MyTask() starting");

    for(int count = 0; count < 10; count++) {
      // This example uses polling to watch for cancellation.
       if(cancelTok.IsCancellationRequested) {
        Console.WriteLine("Cancellation request received.");
        cancelTok.ThrowIfCancellationRequested();
      }

       Thread.Sleep(500);
       Console.WriteLine("In MyTask(), count is " + count);
     }

    Console.WriteLine("MyTask terminating");
  }

  static void Main() {

    Console.WriteLine("Main thread starting.");

    // Create a cancellation token source.
    CancellationTokenSource cancelTokSrc = new CancellationTokenSource();

    // Start a task, passing the cancellation token to both
    // the delegate and the task.
    Task tsk = Task.Factory.StartNew(MyTask, cancelTokSrc.Token,
                                     cancelTokSrc.Token);

    // Let tsk run until cancelled.
    Thread.Sleep(2000);

    try {
      // Cancel the task.
      cancelTokSrc.Cancel();

       // Suspend Main() until tsk terminates.
      tsk.Wait();
    } catch (AggregateException exc) {
       if(tsk.IsCanceled)
         Console.WriteLine("\ntsk Cancelled\n");

      // To see the exception, un-comment this line:
      // Console.WriteLine(exc);
    } finally {
      tsk.Dispose();
      cancelTokSrc.Dispose();
     }

     Console.WriteLine("Main thread ending.");
  }
}

The output is shown here. Notice that the task is cancelled after 2 seconds.

Main thread starting.
MyTask() starting
In MyTask(), count is 0
In MyTask(), count is 1
In MyTask(), count is 2
In MyTask(), count is 3
Cancellation request received.
tsk Cancelled

Main thread ending.

As the output shows, MyTask( ) was cancelled by Main( ) after a delay of 2 seconds. Thus, MyTask( ) executes four loop iterations. When an AggregateException is caught, the status of the task is checked. If it is cancelled (which it will be in this example), the cancellation of tsk is reported. It is important to understand that when AggregateException is thrown in response to a cancellation, it does not indicate an error. It simply means that the task was cancelled.

Although the preceding discussion introduces the fundamental concepts behind task cancellation and AggregateException, there is much more to these topics. These are areas that you will need to study in-depth if you want to create high-performance, scalable code.

Some Other Task Features

The preceding sections have described several of the concepts and fundamental techniques involved with tasks. However, there are other features that you may find useful. For example, you can create nested tasks, which are tasks created by a task, and child tasks, which are nested tasks that are closely tied to the creating task.

Although the AggregateException was briefly discussed in the preceding section, it has some other features that you may find useful. One is the Flatten( ) method. It is used to convert any inner exceptions of type AggregateException into a single AggregateException. Another is the Handle( ) method, which is used to handle an exception contained within an AggregateException.

When you create a task, it is possible to specify various options that affect the task’s execution characteristics. This is done by specifying an instance of TaskCreationOptions in either the Task constructor or the StartNew( ) factory method. Also, TaskFactory supports the FromAsync( ) family of methods that support the Asynchronous Programming Model.

As mentioned early on in this chapter, tasks are scheduled by an instance of TaskScheduler. Normally, the default scheduler provided by the .NET Framework is used, but it is possible to tailor aspects of the scheduler to best fit your needs. Custom schedulers are also possible.

The Parallel Class

So far, the preceding examples have show situations in which the TPL has been used in much the same way in which Thread would be used, but this is just its most basic application. Significantly more sophisticated features are available. One of the most important is the Parallel class. It facilitates the execution of concurrent code and provides methods that streamline both task and data parallelism.

Parallel is a static class that defines the For( ), ForEach( ), and Invoke( ) methods. Each has various forms. The For( ) method executes a parallelized for loop, and the ForEach( ) method executes a parallelized foreach loop. Both of these support data parallelism. Invoke( ) supports the concurrent execution of two or more methods. Thus, it supports task parallelism. As you will see, these methods offer the advantage of providing easy ways to utilize common parallel programming techniques without the need to manage tasks or threads explicitly. The following sections examine each of these methods.

Parallelizing Tasks via Invoke( )

The Invoke( ) method defined by Parallel lets you execute one or more methods by simply specifying them as arguments. If possible, it scales to utilize the available processors. Its simplest version is defined like this:

public static void Invoke(params Action[ ] actions)

The methods to be executed must be compatible with the Action delegate that was described earlier. Recall that Action is declared like this:

public delegate void Action( )

Therefore, each method passed to Invoke( ) must have no parameters and must return void. Because actions is a params parameter, you can specify a variable-length argument list of methods to execute. You can also use an array of Action, but often the argument list is easier.

Invoke( ) will initiate execution of all of the methods that it is passed. It will then wait until all of the methods have finished. Thus, there is no need (nor ability) to call Wait( ), for example. Invoke( ) handles all the details. Although there is no guarantee that the methods will execute in parallel, this is the expectation if the system supports more than one processor. Also, the order of execution, including which method starts or finishes first, cannot be specified, and may not be the same as the order as the argument list.

The following program demonstrates Invoke( ). It runs two methods, called MyMeth( ) and MyMeth2( ), via a call to Invoke( ). Notice the simplicity of the process.

// Use Parallel.Invoke() to execute methods concurrently.

using System;
using System.Threading;
using System.Threading.Tasks;

class DemoParallel {

  // A method to be run as a task.
  static void MyMeth() {
    Console.WriteLine("MyMeth starting");

     for(int count = 0; count < 5; count++) {
      Thread.Sleep(500);
      Console.WriteLine("In MyMeth, count is " + count);
     }

     Console.WriteLine("MyMeth terminating");
  }

  // A method to be run as a task.
  static void MyMeth2() {
    Console.WriteLine("MyMeth2 starting");

     for(int count = 0; count < 5; count++) {
      Thread.Sleep(500);
      Console.WriteLine("In MyMeth2, count is " + count);
     }
     Console.WriteLine("MyMeth2 terminating");
  }

  static void Main() {

    Console.WriteLine("Main thread starting.");

    // Run two named methods.
    Parallel.Invoke(MyMeth, MyMeth2);

     Console.WriteLine("Main thread ending.");
  }
}

The output is shown here:

Main thread starting.
MyMeth starting
MyMeth2 starting
In MyMeth, count is 0
In MyMeth2, count is 0
In MyMeth, count is 1
In MyMeth2, count is 1
In MyMeth, count is 2
In MyMeth2, count is 2
In MyMeth, count is 3
In MyMeth2, count is 3
In MyMeth, count is 4
MyMeth terminating
In MyMeth2, count is 4
MyMeth2 terminating
Main thread ending.

There is something very important to notice in this example: Main( ) suspends until Invoke( ) returns. Therefore, even though MyMeth( ) and MyMeth2( ) are executing concurrently, Main( ) is not. If you want the calling thread to continue execution, you can’t use Invoke( ) as shown here.

Although the previous example used named methods, this is not required when calling Invoke( ). Here is the same program reworked to use lambda expressions as arguments to Invoke( ):

// Use Parallel.Invoke( ) to execute methods concurrently.
// This version uses lambda expressions.

using System;
using System.Threading;
using System.Threading.Tasks;

class DemoParallel {

  static void Main() {

   Console.WriteLine("Main thread starting.");

// Run two anonymous methods specified via lambda expressions.
     Parallel.Invoke( () => {
        Console.WriteLine("Expression #1 starting");

        for(int count = 0; count < 5; count++) {
           Thread.Sleep(500);
          Console.WriteLine("Expression #1 count is " + count);
         }

        Console.WriteLine("Expression #1 terminating");
      },
      
() => {
    Console.WriteLine("Expression #2 starting");

    for(int count = 0; count <5; count++) {
       Thread.Sleep(500);
       Console.WriteLine("Expression #2 count is " + count);
      }

       Console.WriteLine("Expression #2 terminating");
      }
    );

    Console.WriteLine("Main thread ending.");
  }
}

The output is similar to that of the previous version.

Using the For( ) Method

One way that the TPL supports data parallelism is through the For( ) method defined by Parallel. The are several forms of For( ). We will start with the simplest version, which is shown here:

public static ParallelLoopResult

For(int fromInclusive, int toExclusive, Action<int> body)

Here, fromInclusive specifies the starting value of what corresponds to the loop control variable (also called the iteration value or index value), and toExclusive specifies one greater than the ending value. Each time through the loop, the loop control variable will increase by one. Thus, the loop will iterate from fromInclusive to toExclusive –1. The code that will be iterated is specified by the method passed to body. This method must be compatible with the Action<int> delegate, which is shown here:

public delegate void Action<in T>(T obj)

Of course, in the case of For( ), T must be int. The value passed to obj will be the next value of the loop control value. The method passed to body can be a named or anonymous method. For( ) returns a ParallelLoopResult instance that describes the completion status of the loop. For simple loops, this value can be ignored. (We will look at this value closely a bit later.)

The key point about For( ) is that it can (when feasible) parallelize the loop code. This can, in turn, lead to performance improvement. For example, in a loop that applies a transformation to an array, the process can be broken into pieces to allow different portions of the array to be transformed simultaneously. Understand, however, that no performance boost is guaranteed because of differences in the number of available processors in different execution environments, and because parallelizing small loops may create more overhead than the time that is saved.

The following shows a simple example of For( ). It begins by creating an array called data that contains 1,000,000,000 integers. It then calls For( ), passing as the loop “body” a method called MyTransform( ). This method contains a number of statements that perform arbitrary transformations on the data array. Its purpose is to simulate an actual operation. As explained in greater detail in a moment, for data parallelism to be effective, the operation being performed must usually be non-trivial. If it isn’t, then a sequential loop can be faster.

// Use Parallel.For() to create a data-parallel loop.

using System;
using System.Threading.Tasks;

class DemoParallelFor {
  static int[] data;
  // A method to be run as the body of a parallel loop.
  // The statements in this loop are designed to simply
  // consume some CPU time for the purposes of demonstration.
  static void MyTransform(int i) {
    data[i] = data[i] / 10;

    if(data[i] < 10000) data[i] = 0;
    if(data[i] > 10000 & data[i] < 20000) data[i] = 100;
    if(data[i] > 20000 & data[i] <30000) data[i] = 200;
    if(data[i] > 30000) data[i] = 300;
  }

  static void Main() {

    Console.WriteLine("Main thread starting.");

    data = new int[100000000];

    // Initialize the data using a regular for loop.
    for(int i=0; i < data.Length; i++) data[i] = i;

     // A parallel For loop.
    Parallel.For(0, data.Length, MyTransform);

    Console.WriteLine("Main thread ending.");
  }
}

The program contains two loops. The first is a standard for loop that initializes data. The second is a parallel For( ) loop that applies a transformation to each element in data. As stated, in this case, the transformation is arbitrary (being used simply for demonstration). The For( ) automatically breaks up the calls to MyTransform( ) so they can be run on separate portions of data in parallel. Therefore, if you run this program on a computer that has two or more available processors, the For( ) loop can run in parallel.

It is important to understand that not all loops will be more efficient when parallelized. In general, small loops or loops that perform very simple operations are often faster as sequential rather than parallel loops. This is why the for loop that initializes data is not a parallel For( ). The reason that small or very simple loops might not be efficient when parallelized is because the time needed to set up the parallel tasks and the time needed to context-switch exceeds the time saved by parallelization. To prove this point, the following program creates both parallel and sequential versions of both for loops in the program and times each one for comparison purposes:

// Show timing differences between sequential and parallel for loops.

using System;
using System.Threading.Tasks;
using System.Diagnostics;

class DemoParallelFor {
  static int[] data;

  // A method to be run as the body of a parallel loop.
  // The statements in this loop are designed to simply
  // consume some CPU time for the purposes of demonstration.
  static void MyTransform(int i) {
    data[i] = data[i] / 10;

     if(data[i] < 1000) data[i] = 0;
     if(data[i] > 1000 & data[i] < 2000) data[i] = 100;
     if(data[i] > 2000 & data[i] < 3000) data[i] = 200;
     if(data[i] > 3000) data[i] = 300;
  }

static void Main() {

     Console.WriteLine("Main thread starting.");

    // Create a Stopwatch instance to time loops.
     Stopwatch sw = new Stopwatch();

    data = new int[100000000];

    // Initialize data.
    sw.Start();

    // Parallel version of initialization loop.
    Parallel.For(0, data.Length, (i) => data[i] = i);

    sw.Stop();

    Console.WriteLine("Parallel initialization loop:   {0} secs",
                       sw.Elapsed.TotalSeconds);
     sw.Reset();

     sw.Start();

     // Sequential version of initialization loop.
    for(int i=0; i < data.Length; i++) data[i] = i;
   
sw.Stop();

    Console.WriteLine("Sequential initialization loop: {0} secs",
                        sw.Elapsed.TotalSeconds);

    Console.WriteLine();

    // Perform transforms.

     sw.Start();

    // Parallel version of transformation loop.
    Parallel.For(0, data.Length, MyTransform);

     sw.Stop();

    Console.WriteLine("Parallel transform loop:    {0} secs",
                        sw.Elapsed.TotalSeconds);

     sw.Reset();

     sw.Start();

     // Sequential version of transformation loop.
    for(int i=0; i < data.Length; i++) MyTransform(i);
 
  sw.Stop();

     Console.WriteLine("Sequential transform loop: {0} secs",
                        sw.Elapsed.TotalSeconds);

    Console.WriteLine("Main thread ending.");
  }
}

The following output was produced using a dual-core processor:

Main thread starting.
Parallel initialization loop:   1.0537757 secs
Sequential initialization loop: 0.3457628 secs
Parallel transform loop:   4.2246675 secs
Sequential transform loop: 5.3849959 secs
Main thread ending.

First, notice that the parallel version of the initialization loop ran about three times slower than the sequential version. This is because (in this case) assignment takes so little time that the overhead added by parallelism exceeds the gains. Now, notice that the parallel transform loop ran faster than its sequential equivalent. In this case, the gains of parallelization more than offset the overhead added by parallelization.


NOTE In general, you should consult Microsoft’s current guidelines in regards to what types of loops make the best use of parallelization. You will also want to confirm that you are actually getting performance gains before using a parallel loop in released application code.

There are a couple of other things to mention about the preceding program. First, notice that the parallel initialization loop uses a lambda expression to initialize data. It is shown here:

Parallel.For(0, data.Length, (i) => data[i] = i);

Here, the “body” of the loop is specified by a lambda expression. (Again, recall that a lambda expression creates an anonymous method.) Thus, there is no requirement that For( ) be used with a named method.

The second point of interest is the use of the Stopwatch class to handle the loop timing. This class is in System.Diagnostics. To use Stopwatch, create an instance and then call Start( ) to begin timing and Stop( ) to end timing. Use Reset( ) to reset the stopwatch. There are various ways to obtain the duration. The approach used by the program is the Elapsed property, which returns a TimeSpan object. Using the TimeSpan object, the seconds (including fractional seconds) are displayed by use of the TotalSeconds property. As this program shows, Stopwatch is very useful when developing parallel code.

As mentioned, the For( ) method returns an instance of ParallelLoopResult. This is a structure that defines the following two properties:

public bool IsCompleted { get; }
public Nullable<long> LowestBreakIteration { get; }

IsCompleted will be true if the loop completed all requested iterations. In other words, it is true if the loop ran normally. It will be false if the loop was terminated early. LowestBreakIteration contains the lowest value of the loop control variable if the loop was terminated early via a call to ParallelLoopState.Break( ).

To have access to a ParallelLoopState object, you must use a form of For( ) whose delegate takes a second parameter that receives the current loop state. Here is the simplest one:

public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)

In this version, the Action delegate that describes the body of the loop is defined like this:

public delegate void Action<in T1, in T2>(T arg1, T2 arg2)

For use with the For( ), T1 must be int and T2 must be ParallelLoopState. Each time this delegate is called, the current loop state is passed to arg2.

To stop a loop early, call Break( ) on the ParallelLoopState instance inside body. Break( ) is defined as shown here:

public void Break( )

A call to Break( ) requests that the parallel loop stop as soon as possible, which might be a few iterations beyond the one in which Break( ) is called. However, all iterations prior to the one in which Break( ) is called will still execute. Also, remember that portions of the loop might be running in parallel, so if 10 iterations have taken place, it does not necessarily mean that those 10 iterations represent the first 10 values of the loop control variable.

Breaking from a parallel For( ) loop is often useful when data is being searched. If the desired value is found, there is no need to further execute the loop. It might also be used if invalid data is encountered during an operation.

The following program demonstrates the use of Break( ) with a For( ) loop. It reworks the previous example so that MyTransform( ) now has a ParallelLoopState parameter and the Break( ) method is called if a negative value is found in data. Inside Main( ), a negative value is put into the data array (which will cause the loop to break). The completion status of the transform loop is checked. Since the negative value in data will cause it to terminate early, the IsCompleted property will be false, and the iteration count at which the loop was terminated is displayed. (The program removes the redundant loops used by the previous version, keeping only the most efficient of each, which is the sequential initialization loop and the parallel transform loop.)

// Use ParallelLoopResult, ParallelLoopState, and Break()
// with a parallel For loop.

using System;
using System.Threading.Tasks;

class DemoParallelForWithLoopResult {
  static int[] data;

  // A method to be run as the body of a parallel loop.
  // The statements in this loop are designed to simply
  // consume some CPU time for the purposes of demonstration.
  static void MyTransform(int i, ParallelLoopState pls) {

     // Break out of loop if a negative value is found.
    if(data[i] < 0) pls.Break();
 
 data[i] = data[i] / 10;
    if(data[i] < 1000) data[i] = 0;
    if(data[i] > 1000 & data[i] < 2000) data[i] = 100;
    if(data[i] > 2000 & data[i] < 3000) data[i] = 200;
    if(data[i] > 3000) data[i] = 300;
  }

  static void Main() {

    Console.WriteLine("Main thread starting.");

    data = new int[100000000];

    // Initialize data.
    for(int i=0; i < data.Length; i++) data[i] = i;
 
    // Put a negative value into data.
    data[1000] = -10;

    // Parallel transform loop.
    ParallelLoopResult loopResult =
                Parallel.For(0, data.Length, MyTransform);

    // See if the loop ran to completion.
     if(!loopResult.IsCompleted)
      Console.WriteLine("\nLoop Terminated early because a " +
                        "negative value was encountered\n" +
                        "in iteration number " +
                         loopResult.LowestBreakIteration + ".\n");

    Console.WriteLine("Main thread ending.");
  }
}

Sample output is shown here:

Main thread starting.

Loop Terminated early because a negative value was encountered
in iteration number 1000.

Main thread ending.

As the output shows, the transform loop stops after 1000 iterations. This is because Break( ) is called inside the MyTransform( ) method when a negative value is encountered.

In addition to the two described here, there are several additional forms of For( ). Some let you specify various options. Others use long rather than int as the type of iteration parameters. There are also forms of For( ) that provide added flexibility, such as being able to specify a method that is invoked when each loop thread ends.

One other point: if you want to stop a For( ) loop and don’t care if any more iterations whatsoever are performed, use the Stop( ) method, rather than Break( ).

Using the ForEach( ) Method

You can create a parallelized version of the foreach loop by using the ForEach( ) method. It has several forms. Here is its simplest form:

public static ParallelLoopResult

ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)

Here, source specifies the collection of data over which the loop will iterate and body specifies the method that will be executed with each iteration. As explained earlier in this book, all arrays and collections (described in Chapter 25), as well as several other sources, support IEnumerable<T>. The method that you pass to the body receives the value of or reference to (not the index of) each element being iterated as an argument. Information about the status of the loop is returned.

Like For( ), you can stop a ForEach( ) loop early by calling Break( ) on the ParallelLoopState object passed to body if you use this version of ForEach( ):

public static ParallelLoopResult

ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)

The following program demonstrates the ForEach( ) loop. Like the previous examples, it creates a large array of integers. It differs from the previous examples in that the method that is executed with each iteration simply displays the values of the array on the console. Normally you would not use WriteLine( ) inside a parallelized loop because console I/O is so slow that the loop will simply be I/O bound. However, it used here to illustrate ForEach( ). When a negative value is encountered, the loop is stopped via a call to Break( ). Depending upon the precise conditions of the execution environment, you might notice that even though Break( ) is called in one task, another task may still continue to execute a few iterations prior to the stopping point.

// Use ParallelLoopResult, ParallelLoopState, and Break()
// with a parallel ForEach() loop.

using System;
using System.Threading.Tasks;

class DemoParallelForWithLoopResult {
  static int[] data;

  // A method to be run as the body of a parallel loop.
  // In this version, notice that the value of an element of
  // of data is passed to v, not an index.
  static void DisplayData(int v, ParallelLoopState pls) {

    // Break out of loop if a negative value is found.
    if(v < 0) pls.Break();
  
    Console.WriteLine("Value: " + v);
  }

  static void Main() {

    Console.WriteLine("Main thread starting.");

    data = new int[100000000];

    // Initialize data.
    for(int i=0; i < data.Length; i++) data[i] = i;
 
   // Put a negative value into data.
     data[100000] = -10;

    // Use a parallel ForEach() loop to display the data.
    ParallelLoopResult loopResult =
                Parallel.ForEach(data, DisplayData);
    // See if the loop ran to completion.
    if(!loopResult.IsCompleted)
      Console.WriteLine("\nLoop Terminated early because a " +
                        "negative value was encountered\n" +
                        "in iteration number " +
                         loopResult.LowestBreakIteration + ".\n");

    Console.WriteLine("Main thread ending.");
  }
}

Although the preceding code used a named method as the delegate that represented the “body” of the loop, sometimes it is more convenient to use an anonymous method. For example, here the “body” of the ForEach( ) loop is implemented as a lambda expression:

// Use a parallel ForEach() loop to display the data.
ParallelLoopResult loopResult =
            Parallel.ForEach(data, (v, pls) => {
              Console.WriteLine("Value: " + v);
              if(v < 0) pls.Break();
            });

Exploring PLINQ

PLINQ is the parallel version of LINQ, and it is closely related to the TPL. A primary use of PLINQ is to achieve data parallelism within a query. As you will see, this is very easy to do. Like the TPL, PLINQ is a large topic with many facets. This chapter introduces the basic concepts.

ParallelEnumerable

At the foundation of PLINQ is the ParallelEnumerable class, which is defined in System.Linq. This is a static class that defines many extension methods that support parallel operations. It is, essentially, the parallel version of the standard LINQ class Enumerable. Many of the methods extend ParallelQuery. Others return ParallelQuery. ParallelQuery encapsulates a sequence that supports parallel operations. Both generic and non-generic versions are supported. We won’t be working with ParallelQuery directly, but we will be making use of several ParallelEnumerable methods. The most important of these is AsParallel( ), described in the following section.

Parallelizing a Query with AsParallel( )

Perhaps the single most convenient feature of PLINQ is how easy it is to create a parallel query. To do this, you simply call AsParallel( ) on the data source. AsParallel( ) is defined by ParallelEnumerable, and it returns the data source encapsulated within a ParallelQuery instance. This enables it to support parallel query extension methods. Once this is done, the query will partition the data source and operate on each partition in parallel if possible, and if the query is likely to benefit from parallelization. (If parallelization is not possible or reasonable, the query is simply executed sequentially.) Therefore, with the addition of a single call to AsParallel( ), a sequential LINQ query is transformed into a parallel PLINQ query, and for simple queries, this is the only step necessary.

There are both generic and non-generic versions of AsParallel( ). The non-generic version and the simplest generic version are shown here:

public static ParallelQuery AsParallel(this IEnumerable source)

public static ParallelQuery<TSource>

AsParallel<TSource>(this IEnumerable<TSource> source)

Here, TSource stands for the type of the elements in the sequence source.

Here is an example that demonstrates a simple PLINQ query:

// A Simple PLINQ Query.

using System;
using System.Linq;

class PLINQDemo {

  static void Main() {

    int[] data = new int[10000000];

    // Initialize the data to positive values.
    for(int i=0; i < data.Length; i++) data[i] = i;
  
    // Now, insert some negative values.
    data[1000] = -1;
    data[14000] = -2;
    data[15000] = -3;
    data[676000] = -4;
    data[8024540] = -5;
    data[9908000] = -6;

    // Use a PLINQ query to find the negative values.
    var negatives = from val in data.AsParallel()
                    where val < 0
                     select val;

     foreach(var v in negatives)
     Console.Write(v + " ");
  
Console.WriteLine();
  }
}

The program begins by creating a large array of integers called data that contains positive values. Next, a few negative values are inserted. Then, a PLINQ query is used toreturn a sequence of the negative values. This query is shown here:

var negatives = from val in data.AsParallel()
                where val < 0
                select val;

In this query, AsParallel( ) is called on data. This enables parallel operations on data, which is the data source of the query, letting multiple threads search data in parallel, looking for negative values. As those values are found, they are added to the output sequence. This means that the order of the output sequence may not reflect the order of the negative values within data. For example, here is a sample run produced on a dual-core system:

-5 -6 -1 -2 -3 -4

As you can see, the thread that searched the higher partition found –5 and –6 before the thread that searched the lower partition found –1. It is important to understand that you might see a different outcome because of differences in task load, number of available processors, and so on. The key point is that the resulting sequence will not necessarily reflect the order of the original sequence.

Using AsOrdered( )

As pointed out in the previous section, by default, the order of the resulting sequence produced by a parallel query does not necessarily reflect the order of the source sequence. Furthermore, for all practical purposes, the resulting sequence should be considered unordered. If you need to have the result reflect the order of the source, you must specifically request it by using the AsOrdered( ) method, which is defined by ParallelEnumerable. Both generic and non-generic forms are defined, as shown here:

public static ParallelQuery AsOrdered(this ParallelQuery source)

public static ParallelQuery<TSource>

                    AsOrdered<TSource>(this ParallelQuery<TSource> source)

Here, TSource stands for the type of the elements in source. AsOrdered( ) can only be called on a ParallelQuery object because it is a ParallelQuery extension method.

To see the effects of using AsOrdered( ), substitute the following query into the program in the preceding section:

// Use AsOrdered() to retain the order of the result.
var negatives = from val in data.AsParallel().AsOrdered()
                where val < 0
                select val;

When you run the program, the order of the elements in the resulting sequence will now reflect the order of the elements in the source sequence.

Cancelling a Parallel Query

Cancelling a parallel query is similar to cancelling a task, as described earlier. Both rely on the CancellationToken that is obtained from a CancellationTokenSource. This token is passed to the query by way of the WithCancellation( ) method. To cancel the query, call Cancel( ) on the token source. There is one important difference, however, between cancelling a parallel query and cancelling a task. When a parallel query is cancelled, it throws an OperationCanceledException, rather than an AggregateException. However, in cases where more than one exception can be generated by the query, an OperationCanceledException might be combined into an AggregateException. Therefore, it is often best to watch for both.

The WithCancellation( ) method is shown here:

public static ParallelQuery<TSource>

WithCancellation<TSource>(

this ParallelQuery<TSource> source,

CancellationToken cancellationToken)

Here, source specifies the invoking query, and cancellationToken specifies the cancellation token. It returns a query that supports the specified cancellation token.

The following example shows how to cancel the query used in the preceding program. It sets up a separate task that sleeps for 100 milliseconds and then cancels the query. A separate task is needed because the foreach loop that executes the query blocks the Main( ) method until the loop completes.

// Cancel a parallel query.

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class PLINQCancelDemo {

  static void Main() {
    CancellationTokenSource cancelTokSrc = new CancellationTokenSource();
    int[] data = new int[10000000];

    // Initialize the data to positive values.
     for(int i=0; i < data.Length; i++) data[i] = i;
 
 // Now, insert some negative values.
    data[1000] = -1;
     data[14000] = -2;
     data[15000] = -3;
    data[676000] = -4;
    data[8024540] = -5;
    data[9908000] = -6;

    // Use a PLINQ query to find the negative values.
    var negatives = from val in data.AsParallel().
                                WithCancellation(cancelTokSrc.Token)
                     where val < 0
                     select val;

   // Create a task that cancels the query after 100 milliseconds.
     Task cancelTsk = Task.Factory.StartNew( () => {
                       Thread.Sleep(100);
                       cancelTokSrc.Cancel();
                     });

    try {
       foreach(var v in negatives)
         Console.Write(v + " ");

    } catch(OperationCanceledException exc) {
       Console.WriteLine(exc.Message);
     } catch(AggregateException exc) {
       Console.WriteLine(exc);
    } finally {
      cancelTsk.Wait();
       cancelTokSrc.Dispose();
      cancelTsk.Dispose();
    }

    Console.WriteLine();

  }
}

The output is shown here. Because the query is cancelled prior to completion, only the exception message is displayed.

The query has been canceled via the token supplied to WithCancellation.

Other PLINQ Features

As mentioned, PLINQ is a large subsystem. Part of its size is due to the flexibility that it provides. PLINQ offers many other features that help you tailor or manage a parallel query so it best fits the demands of your situation. Here are a few examples. You can specify the maximum number of processors that will be allocated to a query by calling WithDegreeOfParallelism( ). You can request that a portion of a parallel query be executed sequentially by calling AsSequential( ). If you don’t want to block the calling thread waiting for results from a foreach loop, you can use the ForAll( ) method. All of these methods are defined by ParallelEnumerable. To override cases in which PLINQ would default to sequential execution, you can use the WithExecutionMode( ) method, passing in ParallelExecutionMode.ForceParallelism.

PLINQ Efficiency Concerns

Not every query will run faster simply because it is parallelized. As explained earlier in regards to the TPL, there is overhead associated with creating and managing concurrent threads of execution. In general, if the data source is quite small and if the processing required is quite short, then adding parallelism may not increase the speed of the query. For the latest information and guidelines in this regard, consult Microsoft’s current recommendations.