Kapitel 18. Asynchrone Aufgaben

In diesem Kapitel:

Mit den Templates std::async, std::packaged_task, std::future und std::promise bietet C++11 eine sehr komfortable Schnittstelle zum einfachen Starten einer Aufgabe in einem separaten Thread. Gänzlich ohne Locks oder auch Bedingungsvariablen lässt sich eine Aktion initiieren und das Ergebnis durch einen Funktionsaufruf in der Zukunft abholen. Diese asynchronen Aufgaben (tasks) sind auch unter dem Namen Futures bekannt.

Praxistipp

Verwenden Sie wenn möglich async.

std::async ist von den dargestellten asynchronen Funktionsaufrufen am einfachsten zu verwenden. Ein Thread muss weder explizit gestartet noch muss dessen Lebenszeit verwaltet werden. Die C++-Laufzeit sorgt sogar dafür, ob es sinnvoll ist, den asynchronen Funktionsaufruf in einem separaten Thread zu starten. Erst wenn die Funktionalität von std::async nicht mehr ausreicht, sollte std::packaged_task für zu definierende Arbeitspakete oder std::promise verwendet werden.

std::async lässt sich wie eine Funktion verwenden. Die Funktion erhält eine aufrufbare Einheit und deren Argumente und führt die aufrufbare Einheit aus. Das Variadic Template std::async gibt das std::future-Objekt zurück. Dieses Objekt kann später im Programm verwendet werden, um mit seiner Funktion get das Ergebnis der Funktionsaufführung zu erhalten. Hat der std::async-Funktionsaufruf das Ergebnis noch nicht berechnet, blockiert dessen get-Aufruf.

Das einfache Listing 18.1 soll die Interaktionen darstellen, bevor die Details folgen.

async.cpp

Als aufrufbare Einheit wurde in Listing 18.1 eine Lambda-Funktion (Zeile 22), ein Zeiger auf eine Funktion (Zeile 23) und ein Funktionsobjekt (Zeile 25) verwendet. Der std::async startet die Aufgabe und stellt das Ergebnis über das Future bereit. Dabei muss der Rückgabewert des asynchronen Funktionsaufrufs explizit für das Future angegeben werden: std::future<int> in den Zeilen 22 und 23. Wird auto (Zeile 25) verwendet, ist das nicht notwendig. Durch die get-Funktionsaufrufe des Future werden die Werte explizit angefordert.

Abbildung 18.1 zeigt die Ausgabe des Programms.

Das Klassen-Template std::future bietet ein deutlich mächtigeres Interface als eine einfache Methode get an. Dazu aber mehr im Kapitel „future und promise“.

Starten eines Threads

Es ist anzunehmen, dass das Ausführen der Berechnungen in Listing 18.1 im Main-Thread deutlich schneller ist, als für jede einzelne Aufgabe einen Thread zu erzeugen und seine Datenzugriffe zu koordinieren. Tatsächlich startet std::async die Aufgabe nicht automatisch in einem neuen Thread. Die C++-Laufzeit nimmt dem Anwender diese Entscheidung ab. Entscheidungskriterien für die C++-Laufzeit können die tatsächlich vorhandene Anzahl der Prozessoren (std::thread::hardware_concurrency) oder die Anzahl der aktiven Threads sein.

launch::async und launch::deferred

Der Anwender kann mit den Optionen std::launch::async bzw. std::launch::deferred explizit bestimmen, ob die Aufgabe in einem neuen bzw. dem gleichen Thread gestartet werden soll. Durch std::launch::deferred wird der Wert erst berechnet, wenn dieser explizit angefordert wird. Diese aus der funktionalen Programmierung bewährte Strategie spart Zeit und Ressourcen und ist unter dem Namen Bedarfsauswertung (lazy evaluation) bekannt.

In Listing 18.2 ist zu sehen, wie eine Aufgabe verzögert std::launch::deferred gestartet wird. Der GCC 4.6 (C++11 Support in GCC 4.7, 2012) kennt die Bitmaske std::launch::deferred noch unter ihrem alten Namen std::launch::sync (Zeile 14).

asyncLazy.cpp

Trotz auto sind die Bezeichner in Listing 18.2 sehr lang. In den asynchronen Aufgaben (Zeilen 11 und 13) wird die aktuelle Zeit bestimmt. Dies führt asyncLazy verzögert und asyncEager sofort aus. Um das verzögerte Verhalten auf den Punkt zu bringen, schläft der Main-Thread für 1 Sekunde (Zeile 17). Ein bisschen Zeitarithmetik mit der neuen Zeitbibliothek und die Zeit, die bis zum Ausführen der asynchronen Aufgabe vergangen ist, kann in den Zeilen 23 und 24 ausgegeben werden. Während die Bitmaske std::launch::deferred bewirkt, dass die Funktion erst beim Aufruf der Funktion get ausgeführt wird, bewirkt std::launch::async, dass die Funktion sofort evaluiert wird.

futuresWorkflow.cpp

Aufgabe 18-1

Beobachten Sie, wie viele Threads beim Ausführen des Programms in Listing 18.2 gestartet werden.

Modifizieren Sie das Programm, indem Sie die Threads länger schlafen lassen oder auch die Threads synchron oder asynchron starten. Ändert sich dadurch die Anzahl der verwendeten Threads?

Aufgabe 18-2

Implementieren Sie den Workflow in Listing 17.4 mit std::async.

packaged_task

std::package_task ist ein einfacher Wrapper für ein aufrufbare Einheit, um sie später in einem Thread zu verwenden. Durch std::packaged_task wird ein Future mit einem Promise verbunden, sodass mit dem Future der Wert des Promise und somit der Wert der aufrufbaren Einheit ermittelt werden kann.

Listing 18.1 lässt sich direkt in std::packaged_task in Listing 18.3 übersetzen. Die Syntax ist deutlich anspruchsvoller.

packagedTask.cpp

Diese Berechnung der Werte in Listing 18.3 findet in vier Schritten statt:

  1. Die Aufgaben werden verpackt (Zeile 19).

  2. Die Promises werden mit den Futures verbunden (Zeile 25).

  3. Die Argumente werden an die Promises übergeben, und die Ergebnisse werden berechnet (Zeile 30).

  4. Die Futures holen die Ergebnisse ab (Zeile 35).

Nach diesem oberflächlichen Blick noch ein paar Details: Die eigenwillige Syntax int(int,int) in std::packaged_type<int(int,int)> beschreibt eine Funktion, die zwei int-Argumente erwartet und einen int-Rückgabewert liefert. Durch sumTask.get_future() in Zeile 26 gibt der Promise den Future zurück. In den Zeilen 31 bis 33 werden die Argumente an den Promise übergeben, um dessen Berechnung anzustoßen. Da die Funktionen im Main-Thread gestartet wurden (Zeilen 31 bis 33), findet deren Berechnung auch in diesem statt. Die Ausgabe des Programms ist bereits bekannt.

Master-Worker-Threads

Ein typischer Anwendungsfall für std::packaged_task ist es, die Arbeitspakete im Main-Thread zu schnüren und sie in einem Container zu speichern, um sie anschließend auf verschiedene Arbeiter-Threads zu verteilen und auszuführen. Zuletzt werden die Ergebnisse eingesammelt.

In Listing 18.4 wird diese Strategie angewandt, um die Zahlen von 0 bis 10000 in vier Threads aufzusummieren.

packagedTaskSum.cpp

01 #include <utility>
02 #include <future>
03 #include <iostream>
04 #include <thread>
05 #include <deque>
06
07 class SumUp{
08   public:
09     SumUp(int b, int e): beg(b),end(e),sum(0){}
10     int operator()(){
11       for (int i= beg; i < end; ++i ) sum += i;
12       return sum;
13     }
14   private:
15     int beg;
16     int end;
17     int sum;
18 };
19
20 int main(){
21
22   std::cout << std::endl;
23
24   SumUp sumUp1(0,2500);
25   SumUp sumUp2(2500,5000);
26   SumUp sumUp3(5000,7500);
27   SumUp sumUp4(7500,10001);
28
29   // define the tasks
30   std::packaged_task<int()> sumTask1(sumUp1);
31   std::packaged_task<int()> sumTask2(sumUp2);
32   std::packaged_task<int()> sumTask3(sumUp3);
33   std::packaged_task<int()> sumTask4(sumUp4);
34
35   // get the futures
36   std::future<int> sumResult1= sumTask1.get_future();
37   std::future<int> sumResult2= sumTask2.get_future();
38   std::future<int> sumResult3= sumTask3.get_future();
39   std::future<int> sumResult4= sumTask4.get_future();
40
41   // push the tasks on the container
42   std::deque< std::packaged_task<int()> > allTasks;
43   allTasks.push_back(std::move(sumTask1));
44   allTasks.push_back(std::move(sumTask2));
45   allTasks.push_back(std::move(sumTask3));
46   allTasks.push_back(std::move(sumTask4));
47
48   // execute each task in a separate thread
49   while ( not allTasks.empty() ){
50     std::packaged_task<int()>
          myTask= std::move(allTasks.front());
51     allTasks.pop_front();
52     std::thread sumThread(std::move(myTask));
53     sumThread.detach();
54   }
55
56   // get the results
57   int sum= sumResult1.get() + sumResult2.get()
            + sumResult3.get() + sumResult4.get();
58
59   std::cout << "sum of 0 .. 100000 = " << sum << std::endl;
60
61   std::cout << std::endl;
62
63 }
Listing 18.4 Summation von natürlichen Zahlen in vier Threads

In den Zeilen 24 bis 27 in Listing 18.4 werden die Arbeitspakete definiert. Die Funktionsobjekte sumUp(begin,end) addieren die natürlichen Zahlen von begin ausschließlich end zusammen. Diese teure Addition soll in separaten Threads vollzogen werden. Dazu werden die Arbeitspakete mit den Futures verbunden (Zeile 35) und auf den Container allTasks geschoben (Zeile 41). Die eigentliche Arbeit findet in den Zeilen 49 bis 54 statt. Für jedes Arbeitspaket myTask im Container allTasks wird das Arbeitspaket aus dem Container transferiert (Zeile 50), das nun leere Arbeitspaket vom Container entfernt (Zeile 51) und myTask in einen neuen Thread transferiert und im Hintergrund (Zeile 53) ausgeführt. Zuletzt werden die Ergebnisse der vier Threads eingesammelt, addiert und ausgegeben (Abbildung 18.4).

Weitere Funktionen

Neben get_future bietet std::packaged_task noch weitere Methoden an. Durch std::packaged_task::valid lässt sich prüfen, ob der std::packaged_task einen geteilten Zustand besitzt. std::packaged_task::reset erlaubt es, diese zurückzusetzen, und std::packaged_task::swap bietet die Möglichkeit an, den Zustand zweier std::packaged_tasks auszutauschen.

packagedTaskSumSolution.cpp

Aufgabe 18-3

Erweitern Sie Listing 18.4.

In Listing 18.4 sind vier Arbeiter-Threads aktiv. Viel schöner ist es, wenn die Zahl der Arbeiter der Zahl der CPUs entspricht. Parametrisieren Sie das Listing 18.4, sodass die Anzahl der Threads von der Anzahl der vorhandenen CPUs abhängt. Verwenden Sie dazu std::thread::hardware_concurrency(). Falls der Wert 0 ergibt, gehen Sie von vier CPUs aus.

future und promise

Volle Kontrolle über den Empfänger und den Sender der Nachricht stellt std::future in Zusammenarbeit mit std::promise zur Verfügung. Hier ist es in der Verantwortung des Programmierers, den Thread zu starten, den std::future mit dem std::promise zu verbinden, den Rückgabewert oder auch eine Ausnahme des std::promise zu setzen und das Ergebnis des asynchronen Funktionsaufrufs abzuholen.

promise als Ersatz für async und packaged_task

Mit std::future und std::promise lassen sich Listing 18.1 und Listing 18.3 auch formulieren, wobei std::promise, vereinfacht gesagt, die Aufgabe von std::async bzw. std::packaged_task übernehmen wird. In Listing 18.5 kommen als aufrufbare Entitäten lediglich ein Funktionszeiger und ein Funktionsobjekt zum Einsatz, da der Funktionskörper von std::promise zu komplex für eine anonyme Funktion ist.

futurePromise.cpp

Das Hauptprogramm in Listing 18.5 sollte vertraut wirken. Der Umgang mit std::promise folgt der gleichen Struktur wie der Umgang mit std::packaged_task in Listing 18.3. Im Funktionskörper der Funktion oder auch des Funktionsobjekts wird mit dem Aufruf set_value (Zeilen 7 und 13) der Rückgabewert von std::promise gesetzt. Der Aufruf des Promise product(prodPromise(a,b) kann natürlich auch im Haupt-Thread erfolgen.

Das Programm in Listing 18.5 hat ein konzeptionelles Problem. Wird ein Objekt der Struktur Div (Zeile 10) mit einem Nenner 0 instanziiert, führt dies zum sofortigen Programmabbruch. std::async, std::packaged_task und std::promise erlauben es, nicht nur den Wert, sondern auch Ausnahmen an das Future zurückzugeben. Im Gegensatz zu std::async und std::packaged_task, die die Ausnahmen automatisch übertragen, muss bei std::promise der Anwender die Funktionalität set_exception implementieren.

In Listing 18.6 werden der Future und der Promise um eine Ausnahmebehandlung erweitert.

futurePromiseException.cpp

Die Ausführung des Programms führt dazu, dass die Ausnahme std::runtime_error("illegal divion by zero") in Zeile 11 geworfen wird. Im anschließenden catch-Block wird die Ausnahme gefangen. Die aktuelle Ausnahme std::current_exception (Zeile 15) wird zum Rückgabewert des Promise. Der get-Aufruf des Future (Zeile 37) ist in einem try-Block gekapselt, sodass die Nachricht der Ausnahme ausgegeben werden kann.

Funktionen des future

Tabelle 18.1 stellt die Funktionen von std::future im Überblick dar. Dabei bezeichnet f einen std::future. abs_time bezeichnet die absolute, rel_time die relative Zeitangabe.

Während get implizit wait aufruft und so blockiert, bis der gemeinsame Wert mit std::async, std::packed_task oder auch mit std::promise zur Verfügung steht, erlauben es die wait-Funktionen von std::future, den Future mit dem Promise zu synchronisieren. Damit sind zwei interessante Anwendungsfälle möglich:

std::shared_future

Soll der gemeinsame Zustand von mehr als einem Thread angefordert werden, ist der Aufruf der get-Funktion undefiniert. Für dieses Szenario besitzt der std::future die Methode share, die einen std::shared_future zurückgibt. Dessen Wert kann mehrfach angefordert werden. Der entscheidende Unterschied zwischen std::future und std::shared_future ist, dass der std::shared_future neben der Move- auch die Copy-Semantik anbietet. Abgesehen von der Methode share des std::future besitzen beiden Future-Typen das gleiche Interface.

Ein std::shared_future lässt sich direkt oder über einen std::future erzeugen. So ist die Zeile 1 äquivalent zu den Zeilen 3 und 4 in Listing 18.7.

In Listing 18.8 wird im Promise das Ergebnis von 20/10 berechnet. Das Ergebnis der Berechnung wird anschließend von fünf Futures angefordert.

sharedFuture.cpp

01 #include <exception>
02 #include <future>
03 #include <iostream>
04 #include <thread>
05 #include <utility>
06
07 std::mutex coutMutex;
08
09 struct Div{
10
11   void operator()(std::promise<int>&& intPromise,
                     int a, int b){
12     try{
13       if ( b==0 ) throw std::runtime_error(
                     "illegal division by zero");
14       intPromise.set_value(a/b);
15     }
16     catch ( ...){
17       intPromise.set_exception(std::current_exception());
18     }
19   }
20
21 };
22
23 struct Requestor{
24
25   void operator ()(std::shared_future<int> shaFut){
26
27     // lock std::cout
28     std::lock_guard<std::mutex> coutGuard(coutMutex);
29
30     // get the thread id
31     std::cout << "threadId("
                 << std::this_thread::get_id() << "): " ;
32
33     // get the result
34     try{
35       std::cout << "20/10= " << shaFut.get() << std::endl;
36     }
37     catch (std::runtime_error& e){
38       std::cout << e.what() << std::endl;
39     }
40   }
41
42 };
43
44 int main(){
45
46   std::cout << std::endl;
47
48   // define the promises
49   std::promise<int> divPromise;
50
51   // get the futures
52   std::shared_future<int> divResult=
                             divPromise.get_future();
53
54   // calculate the result in a separat thread
55   Div div;
56   std::thread divThread(div,std::move(divPromise),20,10);
57
58   Requestor req;
59   std::thread sharedThread1(req,divResult);
60   std::thread sharedThread2(req,divResult);
61   std::thread sharedThread3(req,divResult);
62   std::thread sharedThread4(req,divResult);
63   std::thread sharedThread5(req,divResult);
64
65   divThread.join();
66
67   sharedThread1.join();
68   sharedThread2.join();
69   sharedThread3.join();
70   sharedThread4.join();
71   sharedThread5.join();
72
73   std::cout << std::endl;
74
75 }
Listing 18.8 Mehrfache Abfrage einer Berechnung mit std::shared_future

Der wesentliche Unterschied von Listing 18.6 zu Listing 18.8 besteht darin, dass durch div.get_future in Zeile 52 der std::shared_future divResult instanziiert wird. divResult als std::shared_future ist kopierbar. Somit kann jeder der fünf Threads (Zeilen 59 bis 63) seine Identität und das Ergebnis der Berechnung ausgeben. std::cout als gemeinsam genutzte Variable muss geschützt werden (Zeile 28).

Die Ausgabe des Programms in Abbildung 18.7 zeigt die fünf Threads in Aktion.

In Tabelle 18.2 folgen die weiteren Funktionen von std::promise. p und p1. val steht für den gemeinsamen Wert, ep für den Ausnahmezeiger und alloc für den Speicherbeschaffer.

synchroniseThreads.cpp

Aufgabe 18-4

Verwenden Sie einen std::shared_future zur Synchronisation von Threads.

Benutzen Sie dazu einen Promise prom im Haupt-Thread, der durch seinen Aufruf von prom.set_value() signalisiert, dass alle Threads weiterarbeiten können. Die Signatur des Promise bzw. des Future ist in diesem Fall:

std::promise<void>
std::shared_future<void>