If we want to run several commands one after another, we can use the code we’ve already written, like this:
| def runOneAtATime(commands:Seq[Seq[String]]) = { |
| for (command <- commands) { |
| doAndAwait { () => |
| runCommand(command) |
| } |
| } |
| } |
That works! But what if we want to run the commands simultaneously? If we don’t have other work to do while we wait, we can do something like this:
| def runSimultaneously(commands:Seq[Seq[String]]) = { |
| val pids = for (command <- commands) yield { |
| doFork { () => |
| runCommand(command) |
| } |
| } |
| for (pid <- pids) { |
| await(pid) |
| } |
| } |
In this case, it doesn’t matter which process we wait for, as long as they all exit eventually. But if we have work to do when a process exits—like launching a new process, or some other task—this approach doesn’t work. If process 456 exits while we are blocked waiting for process 123, we’ll have no way to handle 456’s completion until 123 is done. However, if we want to inject custom logic into our loop, we’ll have to refactor our approach.
First, we can write a helper function to call waitpid(-1,status,0), to wait for any process from a set of child processes, and return the set remaining:
| def awaitAny(pids:Set[Int]):Set[Int] = { |
| val status = stackalloc[Int] |
| var running = pids |
| !status = 0 |
| val finished = waitpid(-1, status, 0) |
| if (running.contains(finished)) { |
| val statusCode = !status |
| if (statusCode != 0) { |
| throw new Exception(s"Child process returned error $statusCode") |
| } else { |
| return pids - finished |
| } |
| } else { |
| throw new Exception(s"""error: reaped process ${finished}, |
| expected one of $pids""") |
| } |
| } |
With that in place, we can write a top-level loop that invokes awaitAny() in a loop, while maintaining a list of running processes:
| def awaitAll(pids:Set[Int]):Unit = { |
| var running = pids |
| while (running.nonEmpty) { |
| println(s"waiting for $running") |
| running = awaitAny(running) |
| } |
| println("Done!") |
| } |
With this technique, we have much more control over large numbers of child processes; however, we’re still blocked entirely while we wait for a child process to exit. It’s possible to avoid blocking at all by invoking waitpid() with the WNOHANG option; but doing so usually requires us to think of something else that our program can do while it waits, which is difficult to solve in a general way. We’ll explore nonblocking concurrent techniques in the later chapters of this book, starting with Chapter 7, Functions and Futures: Patterns for Distributed Services. For now, we’ll instead explore ways to coordinate and connect the processes we create.