Kapitel 17. Synchronisation der Threads

Bedingungsvariablen geben dem C++11-Entwickler das Werkzeug an die Hand, Aktionen von Threads zu synchronisieren. Im Standardfall agiert ein Thread als Sender, ein anderer Thread als Empfänger des Signals. Der Empfänger wartet blockierend auf das Signal, um mit seiner Aktion voranschreiten zu können. Die Beziehung zwischen Sender und Empfänger muss aber nicht 1:1 sein. Es ist durchaus möglich, dass mehrere Threads Sender bzw. Empfänger des Signals sind. Damit die Synchronisation der Threads auch koordiniert ausgeführt wird, setzt die Bedingungsvariable einen Lock voraus.

Aktionen zwischen Sender und Empfänger

Das Programm conditionVariableStructure.cpp in Listing 17.1 soll die Aktionen aufzeigen, die für die Synchronisation des Senders und Empfängers mithilfe der Bedingungsvariablen ausgeführt werden.

conditionVariableStructure.cpp

01 #include <condition_variable>
02 #include <thread>
03
04 std::mutex mutex_;
05 std::condition_variable condVar;
06
07 bool dataReady;
08
09
10 void waitingForWork(){
11
12   std::unique_lock<std::mutex> lck(mutex_);
13   condVar.wait(lck,[]{return dataReady;});
14
15 }
16
17 void setDataReady(){
18
19   std::lock_guard<std::mutex> lck(mutex_);
20   dataReady=true;
21   condVar.notify_one();
22
23 }
24
25 int main(){
26
27   std::thread t1(waitingForWork);
28   std::thread t2(setDataReady);
29
30   t1.join();
31   t2.join();
32
33 }
Listing 17.1 Struktur einer Anwendung, die Bedingungsvariablen verwendet

Die Bedingungsvariable condVar (Zeile 5) in Listing 17.1 muss sowohl dem Sender als auch dem Empfänger des Signals zur Verfügung stehen. Daher ist sie in dieser konkreten Anwendung eine globale Variable. Dies gilt sowohl für den Mutex mutex_ in Zeile 4, der in einen Lock verpackt an die Bedingungsvariable übergeben wird, als auch den Wahrheitswert dataReady in Zeile 7, der die Verfügbarkeit der Daten anzeigt. Im Hauptprogramm werden sowohl der Sender (Zeile 29) als auch der Empfänger (Zeile 28) des Signals in einem eigenen Thread gestartet.

Sender

Der Sender-Thread t2 lockt den Mutex in Zeile 19, setzt den Wahrheitswert auf true und benachrichtigt die assoziierte Bedingungsvariable mit dem Aufruf condVar.notify_one(). Der Aufruf dieser Benachrichtigung ist atomar.

Empfänger

Der Empfänger-Thread t1 lockt ebenfalls den Mutex in Zeile 22. Als Nächstes ruft der Empfänger wait die gleiche Bedingungsvariable condVar auf. wait erhält zwei Argumente: den Lock und ein Prädikat in Form einer Lambda-Funktion, die das Aufwachkriterium definiert. Dies ist notwendig, da ein Thread fälschlicherweise (spurious) aufgeweckt werden kann. wait bewirkt darüber hinaus, dass der Thread seinen Lock freigibt und blockierend wartet.

Benachrichtigung

Sendet der Sender an den Empfänger eine Benachrichtigung, lockt der Empfänger den Lock und prüft, ob die Bedingung erfüllt ist. Ist sie erfüllt, vollzieht er seine Arbeit. Ist die Bedingung nicht erfüllt, gibt er den Lock wieder frei und wartet weiter auf die nächste Benachrichtigung.

Verschiedene Locks

Dem aufmerksamen Leser wird aufgefallen sein, dass der Sender den Mutex mit einem einfachen Lock std::lock_guard lockt, während der Empfänger den deutlich mächtigeren Lock std::unique_lock benötigt. Der Unterschied ist schnell erklärt. Während der Sender in der Funktion setDataReady in Zeile 17 den Mutex genau einmal lockt und am Ende seiner Funktion wieder freigibt, muss der Lock in der Funktion waitingForWork in Zeile 10 öfter gelockt und wieder freigegeben werden. Dies setzt ein std::unique_lock voraus.

std::condition_variable_any

Neben der Bedingungsvariablen std::condition_variable besitzt C++11 die allgemeinere Bedingungsvariable std::condition_variable_any. Beide benötigen einen Mutex. Während es im Fall von std::condition_variable ein std::mutex sein muss, genügt std::condition_variable_any ein Mutex-Typ ohne try_lock-Funktionalität.

In Tabelle 17.1 sind die Funktionen der beiden Bedingungsvariablen cv zusammengestellt. lk bezeichnet den Lock und pre das Prädikat. abs_time steht für die absolute, rel_time für die relative Zeitangabe.

Tabelle 17.1 Funktionen der Bedingungsvariablen

Funktion

Beschreibung

cv.notify_one()

Wecke einen wartenden Thread auf.

cv.notify_all()

Wecke alle wartenden Threads auf.

cv.wait(lk)

cv.wait(lk,pre)

Das blockierende Warten auf eine Benachrichtigung (optional ein Prädikat).

cv.wait_until(lk,abs_time)

cv.wait_until(lk,abs_time,pred)

Das blockierende Warten auf eine Benachrichtigung mit absoluter Zeitangabe (optional ein Prädikat).

cv.wait_for(lk,rel_time)

cv.wait_for(lk,rel_time,pred)

Das blockierende Warten auf eine Benachrichtigung mit relativer Zeitangabe (optional ein Prädikat).

cv.native_handle()

Der Verweis auf die Implementierung der Bedingungsvariablen.

std::notify_all_at_thread_exit(cv,lk)

Eine freie Funktion, die den Lock freigibt und die wartenden Threads aufweckt, wenn der aktuelle Thread beendet wird.

notify_one

Warten mehrere Threads auf ihre Benachrichtigung und ruft der Sender notify_one auf der Bedingungsvariablen auf, wird ein beliebiger Thread aufgeweckt und kann seine Aktion vollziehen, während die übrigen Threads weiter warten. Eine kleine Modifikation von Listing 17.1 zeigt dies anschaulich in Listing 17.2.

conditionVariableNotifyOne.cpp

01 #include <chrono>
02 #include <iostream>
03 #include <condition_variable>
04 #include <thread>
05
06 std::mutex mutex_;
07 std::condition_variable condVar;
08
09 bool dataReady;
10
11 void waitingForWork(){
12
13   std::unique_lock<std::mutex> lck(mutex_);
14   //condVar.wait(lck,[]{return dataReady;});
15   condVar.wait_for(lck,
     std::chrono::milliseconds(10000),[]{return dataReady;});
16   std::cout << "Hello from thread: "
               << std::this_thread::get_id() << std::endl;
17
18 }
19
20 void setDataReady(){
21
22   std::lock_guard<std::mutex> lck(mutex_);
23   dataReady=true;
24   condVar.notify_one();
25
26 }
27
28 int main(){
29
30   std::cout << std::endl;
31
32   std::thread w1(waitingForWork);
33   std::thread w2(waitingForWork);
34   std::thread w3(waitingForWork);
35   std::thread w4(waitingForWork);
36   std::thread w5(waitingForWork);
37
38   std::thread t(setDataReady);
39
40   t.join();
41
42   w1.join();
43   w2.join();
44   w3.join();
45   w4.join();
46   w5.join();
47
48   std::cout << std::endl;
49
50 }
Listing 17.2 Fünf Threads, die auf ihre Benachrichtigung warten

Die Ausführung von Listing 17.2 führt dazu, dass vier der fünf Threads auf ihre Benachrichtigung warten, die sie nie erhalten, denn es wird lediglich eine Benachrichtigung vom Sender verschickt. Hier hilft nur noch eine Unterbrechung des Programmlaufs mit (Strg+C) (Abbildung 17.1).

Erzwungener Programmabbruch wegen vergeblich wartender Threads

Wird die Zeile 14 in Listing 17.2 durch die Zeile 15 ersetzt, beendet sich das Programm regulär, denn die vergessenen Threads warten nur für 10 Sekunden (Abbildung 17.2).

Programmausführung mit zeitlich bedingtem Warten

Das gleiche Verhalten lässt sich natürlich viel direkter erzeugen, wenn statt condVar.notify_one condVar.notify_all in Listing 17.2 verwendet wird. Dies gilt unabhängig vom zeitlich bedingten oder unbedingten Warten. Listing 17.3 zeigt die Variationen.

conditionVariableNotifyAll.cpp

01 #include <chrono>
02 #include <iostream>
03 #include <condition_variable>
04 #include <mutex>
05 #include <thread>
06
07 std::mutex mutex_;
08 std::condition_variable condVar;
09
10 bool dataReady;
11
12 void waitingForWork(){
13
14   std::unique_lock<std::mutex> lck(mutex_);
15   condVar.wait(lck,[]{return dataReady;});
16   std::cout << "Hello from thread: "
               << std::this_thread::get_id() << std::endl;
17
18 }
19
20 void setDataReady(){
21
22   std::lock_guard<std::mutex> lck(mutex_);
23   dataReady=true;
24   condVar.notify_all();
25
26 }
27
28 int main(){
29
30   std::cout << std::endl;
31
32   std::thread w1(waitingForWork);
33   std::thread w2(waitingForWork);
34   std::thread w3(waitingForWork);
35   std::thread w4(waitingForWork);
36   std::thread w5(waitingForWork);
37
38   std::thread t(setDataReady);
39
40   t.join();
41
42   w1.join();
43   w2.join();
44   w3.join();
45   w4.join();
46   w5.join();
47
48   std::cout << std::endl;
49
50 }
Listing 17.3 Benachrichtige alle wartenden Threads gleichzeitig

Zum Abschluss stellt noch Listing 17.4 vor, wie mit Bedingungsvariablen Arbeitsabläufe definiert werden können, bei denen jeder Schritt von der Erfüllung des vorherigen abhängt.

conditionVariableWorkflow.cpp

01 #include <iostream>
02 #include <condition_variable>
03 #include <thread>
04
05 std::mutex mutex1, mutex2, mutex3;
06
07 std::condition_variable condVar1, condVar2, condVar3;
08
09 bool dataReady1, dataReady2, dataReady3;
10
11
12 void waitingForWork1(){
13
14   std::unique_lock<std::mutex> lck(mutex1);
15   condVar1.wait(lck,[]{return dataReady1;});
16   std::cout << "--- Worker 1 done" << std::endl;
17   std::lock_guard<std::mutex> lckGuard(mutex2);
18   dataReady2=true;
19   condVar2.notify_one();
20
21 }
22
23 void waitingForWork2(){
24
25   std::unique_lock<std::mutex> lck(mutex2);
26   condVar2.wait(lck,[]{return dataReady2;});
27   std::cout << "--- Worker 2 done" << std::endl;
28   std::lock_guard<std::mutex> lckGuard(mutex3);
29   dataReady3=true;
30   condVar3.notify_one();
31
32 }
33
34 void waitingForWork3(){
35
36   std::unique_lock<std::mutex> lck(mutex3);
37   condVar3.wait(lck,[]{return dataReady3;});
38   std::cout << "--- Worker 3 done" << std::endl;
39
40 }
41
42 void setDataReady(){
43
44   std::lock_guard<std::mutex> lck(mutex1);
45   std::cout << "Starting Workflow"  << std::endl;
46   dataReady1=true;
47   condVar1.notify_one();
48
49 }
50
51 int main(){
52
53   std::cout << std::endl;
54
55   std::thread w1(waitingForWork1);
56   std::thread w2(waitingForWork2);
57   std::thread w3(waitingForWork3);
58
59   std::thread t(setDataReady);
60
61   t.join();
62   w1.join();
63   w2.join();
64   w3.join();
65
66   std::cout << "Work done" << std::endl;
67
68   std::cout << std::endl;
69
70 }
Listing 17.4 Arbeitsablauf mit Bedingungsvariablen

Der Arbeitsablauf in Listing 17.4 wird durch den Thread t in Zeile 59 gestartet. Jeder Arbeiter wartet in dem Funktionskörper waitingForWork (Zeilen 12, 23 und 34) zuerst auf die Benachrichtigung, um darauf den nächsten Arbeiter durch notify_one (Zeilen 19 und 30) aufzuwecken. Schön ist an dem Beispiel zu sehen, dass jede Bedingungsvariable einen eigenen Mutex und ein eigenes Prädikat benötigt.

Die Ausgabe des Programms in Listing 17.4 zeigt die Schritte des Arbeitsablaufs.

Arbeitsablauf mit mehreren Arbeitern

waitForTwo.cpp

waitForOneOfTwo.cpp

Aufgabe 17-1

Variieren Sie mit der Anwendung in Listing 17.1

In Listing 17.1 besteht eine 1:1-Beziehung zwischen Sender und Empfänger. Nun soll der Empfänger von zwei Sendern abhängen.

Der Empfänger waitForWork soll nur weiter fortfahren können, wenn

Durch das Setzen von kurzen Schlafperioden std::this_thread::sleep_for(std::chrono::milliseconds(1000)) lässt sich das richtige Verhalten testen.

pingPongConditionVariable.cpp

Aufgabe 17-2

Schreiben Sie ein kleines Pingpongspiel.

Zwei Threads sollen abwechselnd einen Wahrheitswert auf true bzw. false setzen. Dabei setzt der erste Thread den Wahrheitswert auf true, gibt den Wert des Wahrheitswerts aus und signalisiert dem zweiten Thread über eine Bedingungsvariable, dass er jetzt an der Reihe ist. Der zweite Thread setzt den Wahrheitswert auf false, gibt ihn aus und benachrichtigt den ersten Thread.