Mit den Templates std::async
, std::packaged_task
, std::future
und std::promise
bietet C++11 eine sehr komfortable Schnittstelle zum einfachen Starten einer Aufgabe in einem separaten Thread. Gänzlich ohne Locks oder auch Bedingungsvariablen lässt sich eine Aktion initiieren und das Ergebnis durch einen Funktionsaufruf in der Zukunft abholen. Diese asynchronen Aufgaben (tasks) sind auch unter dem Namen Futures bekannt.
Verwenden Sie wenn möglich async.
std::async
ist von den dargestellten asynchronen Funktionsaufrufen am einfachsten zu verwenden. Ein Thread muss weder explizit gestartet noch muss dessen Lebenszeit verwaltet werden. Die C++-Laufzeit sorgt sogar dafür, ob es sinnvoll ist, den asynchronen Funktionsaufruf in einem separaten Thread zu starten. Erst wenn die Funktionalität von std::async
nicht mehr ausreicht, sollte std::packaged_task
für zu definierende Arbeitspakete oder std::promise
verwendet werden.
std::async
lässt sich wie eine Funktion verwenden. Die Funktion erhält eine aufrufbare Einheit und deren Argumente und führt die aufrufbare Einheit aus. Das Variadic Template std::async
gibt das std::future
-Objekt zurück. Dieses Objekt kann später im Programm verwendet werden, um mit seiner Funktion get
das Ergebnis der Funktionsaufführung zu erhalten. Hat der std::async
-Funktionsaufruf das Ergebnis noch nicht berechnet, blockiert dessen get
-Aufruf.
Das einfache Listing 18.1 soll die Interaktionen darstellen, bevor die Details folgen.
async.cpp
01 #include <future> 02 #include <iostream> 03 04 int product(int a, int b){ 05 return a*b; 06 } 07 08 struct Div{ 09 int operator()(int a, int b){ 10 return a/b; 11 } 12 }; 13 14 15 int main(){ 16 17 int a= 20; 18 int b= 10; 19 20 std::cout << std::endl; 21 22 std::future<int> sum=std::async([=]{ return a+b;}); 23 std::future<int> prod= std::async(&product,a,b); 24 Div divide; 25 auto div=std::async(divide,a,b); 26 27 std::cout << "20+10= " << sum.get() << std::endl; 28 std::cout << "20*10= " << prod.get() << std::endl; 29 std::cout << "20/10= " << div.get() << std::endl; 30 31 std::cout << std::endl; 32 }
Als aufrufbare Einheit wurde in Listing 18.1 eine Lambda-Funktion (Zeile 22), ein Zeiger auf eine Funktion (Zeile 23) und ein Funktionsobjekt (Zeile 25) verwendet. Der std::async
startet die Aufgabe und stellt das Ergebnis über das Future bereit. Dabei muss der Rückgabewert des asynchronen Funktionsaufrufs explizit für das Future angegeben werden: std::future<int>
in den Zeilen 22 und 23. Wird auto
(Zeile 25) verwendet, ist das nicht notwendig. Durch die get
-Funktionsaufrufe des Future werden die Werte explizit angefordert.
Abbildung 18.1 zeigt die Ausgabe des Programms.
Das Klassen-Template std::future
bietet ein deutlich mächtigeres Interface als eine einfache Methode get
an. Dazu aber mehr im Kapitel „future und promise“.
Starten eines Threads
Es ist anzunehmen, dass das Ausführen der Berechnungen in Listing 18.1 im Main-Thread deutlich schneller ist, als für jede einzelne Aufgabe einen Thread zu erzeugen und seine Datenzugriffe zu koordinieren. Tatsächlich startet std::async
die Aufgabe nicht automatisch in einem neuen Thread. Die C++-Laufzeit nimmt dem Anwender diese Entscheidung ab. Entscheidungskriterien für die C++-Laufzeit können die tatsächlich vorhandene Anzahl der Prozessoren (std::thread::hardware_concurrency
) oder die Anzahl der aktiven Threads sein.
launch::async und launch::deferred
Der Anwender kann mit den Optionen std::launch::async
bzw. std::launch::deferred
explizit bestimmen, ob die Aufgabe in einem neuen bzw. dem gleichen Thread gestartet werden soll. Durch std::launch::deferred
wird der Wert erst berechnet, wenn dieser explizit angefordert wird. Diese aus der funktionalen Programmierung bewährte Strategie spart Zeit und Ressourcen und ist unter dem Namen Bedarfsauswertung (lazy evaluation) bekannt.
In Listing 18.2 ist zu sehen, wie eine Aufgabe verzögert std::launch::deferred
gestartet wird. Der GCC 4.6 (C++11 Support in GCC 4.7, 2012) kennt die Bitmaske std::launch::deferred
noch unter ihrem alten Namen std::launch::sync
(Zeile 14).
asyncLazy.cpp
01 #include <chrono> 02 #include <future> 03 #include <iostream> 04 05 int main(){ 06 07 std::cout << std::endl; 08 09 auto begin= std::chrono::system_clock::now(); 10 11 auto asyncLazy=std::async(std::launch::deferred, []{ return std::chrono::system_clock::now();}); 12 13 auto asyncEager=std::async( std::launch::async,[]{ return std::chrono::system_clock::now();}); 14 15 std::this_thread::sleep_for(std::chrono::seconds(1)); 16 17 auto lazyStart= asyncLazy.get() - begin; 18 auto eagerStart= asyncEager.get() - begin; 19 20 auto lazyDuration= std::chrono::duration<double>(lazyStart).count(); 21 auto eagerDuration= std::chrono::duration<double>(eagerStart).count(); 22 23 std::cout << "asyncLazy evaluated after : " << lazyDuration << " seconds." << std::endl; 24 std::cout << "asyncEager evaluated after: " << eagerDuration << " seconds." << std::endl; 25 26 27 std::cout << std::endl; 28 29 }
Trotz auto
sind die Bezeichner in Listing 18.2 sehr lang. In den asynchronen Aufgaben (Zeilen 11 und 13) wird die aktuelle Zeit bestimmt. Dies führt asyncLazy
verzögert und asyncEager
sofort aus. Um das verzögerte Verhalten auf den Punkt zu bringen, schläft der Main-Thread für 1 Sekunde (Zeile 17). Ein bisschen Zeitarithmetik mit der neuen Zeitbibliothek und die Zeit, die bis zum Ausführen der asynchronen Aufgabe vergangen ist, kann in den Zeilen 23 und 24 ausgegeben werden. Während die Bitmaske std::launch::deferred
bewirkt, dass die Funktion erst beim Aufruf der Funktion get
ausgeführt wird, bewirkt std::launch::async
, dass die Funktion sofort evaluiert wird.
Aufgabe 18-1
Beobachten Sie, wie viele Threads beim Ausführen des Programms in Listing 18.2 gestartet werden.
Modifizieren Sie das Programm, indem Sie die Threads länger schlafen lassen oder auch die Threads synchron oder asynchron starten. Ändert sich dadurch die Anzahl der verwendeten Threads?
Aufgabe 18-2
Implementieren Sie den Workflow in Listing 17.4 mit std::async
.
std::package_task
ist ein einfacher Wrapper für ein aufrufbare Einheit, um sie später in einem Thread zu verwenden. Durch std::packaged_task
wird ein Future mit einem Promise verbunden, sodass mit dem Future der Wert des Promise und somit der Wert der aufrufbaren Einheit ermittelt werden kann.
Listing 18.1 lässt sich direkt in std::packaged_task
in Listing 18.3 übersetzen. Die Syntax ist deutlich anspruchsvoller.
packagedTask.cpp
01 #include <future> 02 #include <iostream> 03 04 int product(int a, int b){ 05 return a*b; 06 } 07 08 struct Div{ 09 int operator()(int a, int b){ 10 return a/b; 11 } 12 }; 13 14 15 int main(){ 16 17 std::cout << std::endl; 18 19 // define the package tasks 20 std::packaged_task<int(int,int)> sumTask([] (int x, int y){return x+y;}); 21 std::packaged_task<int(int,int)> prodTask(&product); 22 Div divide; 23 std::packaged_task<int(int,int)> divTask(divide); 24 25 // get the futures 26 std::future<int> sumResult= sumTask.get_future(); 27 std::future<int> prodResult= prodTask.get_future(); 28 std::future<int> divResult= divTask.get_future(); 29 30 // calculate the result 31 sumTask(20,10); 32 prodTask(20,10); 33 divTask(20,10); 34 35 // get the result 36 std::cout << "20+10= " << sumResult.get() << std::endl; 37 std::cout << "20*10= " << prodResult.get() << std::endl; 38 std::cout << "20/10= " << divResult.get() << std::endl; 39 40 std::cout << std::endl; 41 42 }
Diese Berechnung der Werte in Listing 18.3 findet in vier Schritten statt:
Die Aufgaben werden verpackt (Zeile 19).
Die Promises werden mit den Futures verbunden (Zeile 25).
Die Argumente werden an die Promises übergeben, und die Ergebnisse werden berechnet (Zeile 30).
Die Futures holen die Ergebnisse ab (Zeile 35).
Nach diesem oberflächlichen Blick noch ein paar Details: Die eigenwillige Syntax int(int,int)
in std::packaged_type<int(int,int)>
beschreibt eine Funktion, die zwei int
-Argumente erwartet und einen int
-Rückgabewert liefert. Durch sumTask.get_future()
in Zeile 26 gibt der Promise den Future zurück. In den Zeilen 31 bis 33 werden die Argumente an den Promise übergeben, um dessen Berechnung anzustoßen. Da die Funktionen im Main-Thread gestartet wurden (Zeilen 31 bis 33), findet deren Berechnung auch in diesem statt. Die Ausgabe des Programms ist bereits bekannt.
Master-Worker-Threads
Ein typischer Anwendungsfall für std::packaged_task
ist es, die Arbeitspakete im Main-Thread zu schnüren und sie in einem Container zu speichern, um sie anschließend auf verschiedene Arbeiter-Threads zu verteilen und auszuführen. Zuletzt werden die Ergebnisse eingesammelt.
In Listing 18.4 wird diese Strategie angewandt, um die Zahlen von 0 bis 10000 in vier Threads aufzusummieren.
packagedTaskSum.cpp
01 #include <utility> 02 #include <future> 03 #include <iostream> 04 #include <thread> 05 #include <deque> 06 07 class SumUp{ 08 public: 09 SumUp(int b, int e): beg(b),end(e),sum(0){} 10 int operator()(){ 11 for (int i= beg; i < end; ++i ) sum += i; 12 return sum; 13 } 14 private: 15 int beg; 16 int end; 17 int sum; 18 }; 19 20 int main(){ 21 22 std::cout << std::endl; 23 24 SumUp sumUp1(0,2500); 25 SumUp sumUp2(2500,5000); 26 SumUp sumUp3(5000,7500); 27 SumUp sumUp4(7500,10001); 28 29 // define the tasks 30 std::packaged_task<int()> sumTask1(sumUp1); 31 std::packaged_task<int()> sumTask2(sumUp2); 32 std::packaged_task<int()> sumTask3(sumUp3); 33 std::packaged_task<int()> sumTask4(sumUp4); 34 35 // get the futures 36 std::future<int> sumResult1= sumTask1.get_future(); 37 std::future<int> sumResult2= sumTask2.get_future(); 38 std::future<int> sumResult3= sumTask3.get_future(); 39 std::future<int> sumResult4= sumTask4.get_future(); 40 41 // push the tasks on the container 42 std::deque< std::packaged_task<int()> > allTasks; 43 allTasks.push_back(std::move(sumTask1)); 44 allTasks.push_back(std::move(sumTask2)); 45 allTasks.push_back(std::move(sumTask3)); 46 allTasks.push_back(std::move(sumTask4)); 47 48 // execute each task in a separate thread 49 while ( not allTasks.empty() ){ 50 std::packaged_task<int()> myTask= std::move(allTasks.front()); 51 allTasks.pop_front(); 52 std::thread sumThread(std::move(myTask)); 53 sumThread.detach(); 54 } 55 56 // get the results 57 int sum= sumResult1.get() + sumResult2.get() + sumResult3.get() + sumResult4.get(); 58 59 std::cout << "sum of 0 .. 100000 = " << sum << std::endl; 60 61 std::cout << std::endl; 62 63 }
In den Zeilen 24 bis 27 in Listing 18.4 werden die Arbeitspakete definiert. Die Funktionsobjekte sumUp(begin,end)
addieren die natürlichen Zahlen von begin
ausschließlich end
zusammen. Diese teure Addition soll in separaten Threads vollzogen werden. Dazu werden die Arbeitspakete mit den Futures verbunden (Zeile 35) und auf den Container allTasks
geschoben (Zeile 41). Die eigentliche Arbeit findet in den Zeilen 49 bis 54 statt. Für jedes Arbeitspaket myTask
im Container allTasks
wird das Arbeitspaket aus dem Container transferiert (Zeile 50), das nun leere Arbeitspaket vom Container entfernt (Zeile 51) und myTask
in einen neuen Thread transferiert und im Hintergrund (Zeile 53) ausgeführt. Zuletzt werden die Ergebnisse der vier Threads eingesammelt, addiert und ausgegeben (Abbildung 18.4).
Weitere Funktionen
Neben get_future
bietet std::packaged_task
noch weitere Methoden an. Durch std::packaged_task::valid
lässt sich prüfen, ob der std::packaged_task
einen geteilten Zustand besitzt. std::packaged_task::reset
erlaubt es, diese zurückzusetzen, und std::packaged_task::swap
bietet die Möglichkeit an, den Zustand zweier std::packaged_task
s auszutauschen.
packagedTaskSumSolution.cpp
Aufgabe 18-3
Erweitern Sie Listing 18.4.
In Listing 18.4 sind vier Arbeiter-Threads aktiv. Viel schöner ist es, wenn die Zahl der Arbeiter der Zahl der CPUs entspricht. Parametrisieren Sie das Listing 18.4, sodass die Anzahl der Threads von der Anzahl der vorhandenen CPUs abhängt. Verwenden Sie dazu std::thread::hardware_concurrency()
. Falls der Wert 0 ergibt, gehen Sie von vier CPUs aus.
Volle Kontrolle über den Empfänger und den Sender der Nachricht stellt std::future
in Zusammenarbeit mit std::promise
zur Verfügung. Hier ist es in der Verantwortung des Programmierers, den Thread zu starten, den std::future
mit dem std::promise
zu verbinden, den Rückgabewert oder auch eine Ausnahme des std::promise
zu setzen und das Ergebnis des asynchronen Funktionsaufrufs abzuholen.
promise als Ersatz für async und packaged_task
Mit std::future
und std::promise
lassen sich Listing 18.1 und Listing 18.3 auch formulieren, wobei std::promise
, vereinfacht gesagt, die Aufgabe von std::async
bzw. std::packaged_task
übernehmen wird. In Listing 18.5 kommen als aufrufbare Entitäten lediglich ein Funktionszeiger und ein Funktionsobjekt zum Einsatz, da der Funktionskörper von std::promise
zu komplex für eine anonyme Funktion ist.
futurePromise.cpp
01 #include <future> 02 #include <iostream> 03 #include <thread> 04 #include <utility> 05 06 void product(std::promise<int>&& intPromise, int a, int b){ 07 intPromise.set_value(a*b); 08 } 09 10 struct Div{ 11 12 void operator() (std::promise<int>&& intPromise, int a, int b) const { 13 intPromise.set_value(a/b); 14 } 15 16 }; 17 18 int main(){ 19 20 int a= 20; 21 int b= 10; 22 23 std::cout << std::endl; 24 25 // define the promises 26 std::promise<int> prodPromise; 27 std::promise<int> divPromise; 28 29 // get the futures 30 std::future<int> prodResult= prodPromise.get_future(); 31 std::future<int> divResult= divPromise.get_future(); 32 33 // calculate the result in a separat thread 34 std::thread prodThread(product, std::move(prodPromise),a,b); 35 Div div; 36 std::thread divThread(div,std::move(divPromise),a,b); 37 38 // get the result 39 std::cout << "20*10= " << prodResult.get() << std::endl; 40 std::cout << "20/10= " << divResult.get() << std::endl; 41 42 prodThread.join(); 43 divThread.join(); 44 45 std::cout << std::endl; 46 47 }
Das Hauptprogramm in Listing 18.5 sollte vertraut wirken. Der Umgang mit std::promise
folgt der gleichen Struktur wie der Umgang mit std::packaged_task
in Listing 18.3. Im Funktionskörper der Funktion oder auch des Funktionsobjekts wird mit dem Aufruf set_value
(Zeilen 7 und 13) der Rückgabewert von std::promise
gesetzt. Der Aufruf des Promise product(prodPromise(a,b)
kann natürlich auch im Haupt-Thread erfolgen.
Das Programm in Listing 18.5 hat ein konzeptionelles Problem. Wird ein Objekt der Struktur Div
(Zeile 10) mit einem Nenner 0 instanziiert, führt dies zum sofortigen Programmabbruch. std::async
, std::packaged_task
und std::promise
erlauben es, nicht nur den Wert, sondern auch Ausnahmen an das Future zurückzugeben. Im Gegensatz zu std::async
und std::packaged_task
, die die Ausnahmen automatisch übertragen, muss bei std::promise
der Anwender die Funktionalität set_exception
implementieren.
In Listing 18.6 werden der Future und der Promise um eine Ausnahmebehandlung erweitert.
futurePromiseException.cpp
01 #include <exception> 02 #include <future> 03 #include <iostream> 04 #include <thread> 05 #include <utility> 06 07 struct Div{ 08 09 void operator()(std::promise<int>&& intPromise, int a, int b){ 10 try{ 11 if ( b==0 ) throw std::runtime_error( "illegal division by zero"); 12 intPromise.set_value(a/b); 13 } 14 catch ( ...){ 15 intPromise.set_exception(std::current_exception()); 16 } 17 } 18 19 }; 20 21 int main(){ 22 23 std::cout << std::endl; 24 25 // define the promises 26 std::promise<int> divPromise; 27 28 // get the futures 29 std::future<int> divResult= divPromise.get_future(); 30 31 // calculate the result in a separat thread 32 Div div; 33 std::thread divThread(div,std::move(divPromise),20,0); 34 35 // get the result 36 try{ 37 std::cout << "20/0= " << divResult.get() << std::endl; 38 } 39 catch (std::runtime_error& e){ 40 std::cout << e.what() << std::endl; 41 } 42 43 divThread.join(); 44 45 std::cout << std::endl; 46 47 }
Die Ausführung des Programms führt dazu, dass die Ausnahme std::runtime_error("illegal divion by zero")
in Zeile 11 geworfen wird. Im anschließenden catch
-Block wird die Ausnahme gefangen. Die aktuelle Ausnahme std::current_exception
(Zeile 15) wird zum Rückgabewert des Promise. Der get
-Aufruf des Future (Zeile 37) ist in einem try
-Block gekapselt, sodass die Nachricht der Ausnahme ausgegeben werden kann.
Funktionen des future
Tabelle 18.1 stellt die Funktionen von std::future
im Überblick dar. Dabei bezeichnet f
einen std::future
. abs_time
bezeichnet die absolute, rel_time
die relative Zeitangabe.
Funktion | Beschreibung |
| Gibt einen |
| Wartet, bis der gemeinsame Zustand verfügbar ist, und gibt diesen zurück. Kann einen Wert oder eine Ausnahme zurückgeben. |
| Prüft, ob ein gemeinsamer Zustand vorliegt. |
| Blockiert, bis der gemeinsame Zustand zur Verfügung steht. |
| Blockiert maximal für eine bestimmte Zeitspanne, bis der gemeinsame Zustand zur Verfügung steht. |
| Blockiert maximal bis zu einem bestimmten Zeitpunkt, bis der gemeinsame Zustand zur Verfügung steht. |
Während get
implizit wait
aufruft und so blockiert, bis der gemeinsame Wert mit std::async
, std::packed_task
oder auch mit std::promise
zur Verfügung steht, erlauben es die wait
-Funktionen von std::future
, den Future mit dem Promise zu synchronisieren. Damit sind zwei interessante Anwendungsfälle möglich:
Wird std::async
mit std::launch::deferred
gestartet, bewirkt der Aufruf von wait
, dass der Funktionskörper von std::async
erst zu diesem Augenblick ausgeführt wird.
std::async
-, std::packaged_task
- oder std::promise
-Aufrufe, die keinen Wert (void
) zurückgeben, ermöglichen die Synchronisation zweier Threads ähnlich wie Bedingungsvariablen.
std::shared_future
Soll der gemeinsame Zustand von mehr als einem Thread angefordert werden, ist der Aufruf der get
-Funktion undefiniert. Für dieses Szenario besitzt der std::future
die Methode share
, die einen std::shared_future
zurückgibt. Dessen Wert kann mehrfach angefordert werden. Der entscheidende Unterschied zwischen std::future
und std::shared_future
ist, dass der std::shared_future
neben der Move- auch die Copy-Semantik anbietet. Abgesehen von der Methode share
des std::future
besitzen beiden Future-Typen das gleiche Interface.
Ein std::shared_future
lässt sich direkt oder über einen std::future
erzeugen. So ist die Zeile 1 äquivalent zu den Zeilen 3 und 4 in Listing 18.7.
01 std::shared_future<int> divResult= divPromise.get_future(); 02 03 std::future<int> divResult1= divPromise.get_future(); 04 std::shared_future<int> divResult= divResult1.share();
In Listing 18.8 wird im Promise das Ergebnis von 20/10
berechnet. Das Ergebnis der Berechnung wird anschließend von fünf Futures angefordert.
sharedFuture.cpp
01 #include <exception> 02 #include <future> 03 #include <iostream> 04 #include <thread> 05 #include <utility> 06 07 std::mutex coutMutex; 08 09 struct Div{ 10 11 void operator()(std::promise<int>&& intPromise, int a, int b){ 12 try{ 13 if ( b==0 ) throw std::runtime_error( "illegal division by zero"); 14 intPromise.set_value(a/b); 15 } 16 catch ( ...){ 17 intPromise.set_exception(std::current_exception()); 18 } 19 } 20 21 }; 22 23 struct Requestor{ 24 25 void operator ()(std::shared_future<int> shaFut){ 26 27 // lock std::cout 28 std::lock_guard<std::mutex> coutGuard(coutMutex); 29 30 // get the thread id 31 std::cout << "threadId(" << std::this_thread::get_id() << "): " ; 32 33 // get the result 34 try{ 35 std::cout << "20/10= " << shaFut.get() << std::endl; 36 } 37 catch (std::runtime_error& e){ 38 std::cout << e.what() << std::endl; 39 } 40 } 41 42 }; 43 44 int main(){ 45 46 std::cout << std::endl; 47 48 // define the promises 49 std::promise<int> divPromise; 50 51 // get the futures 52 std::shared_future<int> divResult= divPromise.get_future(); 53 54 // calculate the result in a separat thread 55 Div div; 56 std::thread divThread(div,std::move(divPromise),20,10); 57 58 Requestor req; 59 std::thread sharedThread1(req,divResult); 60 std::thread sharedThread2(req,divResult); 61 std::thread sharedThread3(req,divResult); 62 std::thread sharedThread4(req,divResult); 63 std::thread sharedThread5(req,divResult); 64 65 divThread.join(); 66 67 sharedThread1.join(); 68 sharedThread2.join(); 69 sharedThread3.join(); 70 sharedThread4.join(); 71 sharedThread5.join(); 72 73 std::cout << std::endl; 74 75 }
Der wesentliche Unterschied von Listing 18.6 zu Listing 18.8 besteht darin, dass durch div.get_future
in Zeile 52 der std::shared_future divResult
instanziiert wird. divResult
als std::shared_future
ist kopierbar. Somit kann jeder der fünf Threads (Zeilen 59 bis 63) seine Identität und das Ergebnis der Berechnung ausgeben. std::cout
als gemeinsam genutzte Variable muss geschützt werden (Zeile 28).
Die Ausgabe des Programms in Abbildung 18.7 zeigt die fünf Threads in Aktion.
In Tabelle 18.2 folgen die weiteren Funktionen von std::promise
. p
und p1
. val
steht für den gemeinsamen Wert, ep
für den Ausnahmezeiger und alloc
für den Speicherbeschaffer.
synchroniseThreads.cpp
Aufgabe 18-4
Verwenden Sie einen std::shared_future
zur Synchronisation von Threads.
Benutzen Sie dazu einen Promise prom
im Haupt-Thread, der durch seinen Aufruf von prom.set_value()
signalisiert, dass alle Threads weiterarbeiten können. Die Signatur des Promise bzw. des Future ist in diesem Fall:
std::promise<void> std::shared_future<void>