Supervising Multiple Processes

If we want to run several commands one after another, we can use the code we’ve already written, like this:

ForkWaitShell/nativeFork/nativeFork.scala
 def​ runOneAtATime(commands​:​​Seq​[​Seq​[​String​]]) ​=​ {
 for​ (command ​<-​ commands) {
  doAndAwait { () ​=>
  runCommand(command)
  }
  }
 }

That works! But what if we want to run the commands simultaneously? If we don’t have other work to do while we wait, we can do something like this:

ForkWaitShell/nativeFork/nativeFork.scala
 def​ runSimultaneously(commands​:​​Seq​[​Seq​[​String​]]) ​=​ {
 val​ pids ​=​ ​for​ (command ​<-​ commands) ​yield​ {
  doFork { () ​=>
  runCommand(command)
  }
  }
 for​ (pid ​<-​ pids) {
  await(pid)
  }
 }

In this case, it doesn’t matter which process we wait for, as long as they all exit eventually. But if we have work to do when a process exits—like launching a new process, or some other task—this approach doesn’t work. If process 456 exits while we are blocked waiting for process 123, we’ll have no way to handle 456’s completion until 123 is done. However, if we want to inject custom logic into our loop, we’ll have to refactor our approach.

First, we can write a helper function to call waitpid(-1,status,0), to wait for any process from a set of child processes, and return the set remaining:

ForkWaitShell/nativeFork/nativeFork.scala
 def​ awaitAny(pids​:​​Set​[​Int​])​:​​Set​[​Int​] ​=​ {
 val​ status ​=​ stackalloc[​Int​]
 var​ running ​=​ pids
  !status ​=​ 0
 val​ finished ​=​ waitpid(-1, status, 0)
 if​ (running.contains(finished)) {
 val​ statusCode ​=​ !status
 if​ (statusCode != 0) {
 throw​ ​new​ Exception(s​"Child process returned error $statusCode"​)
  } ​else​ {
 return​ pids - finished
  }
  } ​else​ {
 throw​ ​new​ Exception(s​"""error: reaped process ${finished},
  expected one of $pids"""​)
  }
 }

With that in place, we can write a top-level loop that invokes awaitAny() in a loop, while maintaining a list of running processes:

ForkWaitShell/nativeFork/nativeFork.scala
 def​ awaitAll(pids​:​​Set​[​Int​])​:​​Unit​ = {
 var​ running ​=​ pids
 while​ (running.nonEmpty) {
  println(s​"waiting for $running"​)
  running ​=​ awaitAny(running)
  }
  println(​"Done!"​)
 }

With this technique, we have much more control over large numbers of child processes; however, we’re still blocked entirely while we wait for a child process to exit. It’s possible to avoid blocking at all by invoking waitpid() with the WNOHANG option; but doing so usually requires us to think of something else that our program can do while it waits, which is difficult to solve in a general way. We’ll explore nonblocking concurrent techniques in the later chapters of this book, starting with Chapter 7, Functions and Futures: Patterns for Distributed Services. For now, we’ll instead explore ways to coordinate and connect the processes we create.