Working with Pipes

What we’ve built so far is already sufficient for a lot of automation tasks. For example, with our HTTP client from Chapter 3, Writing a Simple HTTP Client, and some JSON parsing, we could easily write a small, safe program that authenticates to an HTTP service and queries another endpoint for service discovery before launching another executable. An equivalent bash program could certainly be written, using tools like curl[25] and jq,[26] but such scripts rapidly grow in complexity and can be painful to maintain. However, there’s one more essential shell pattern we’ve yet to tackle: pipelines.

A pipeline is a series of cooperating processes executing simultaneously that communicate over in-memory channels called pipes. For example, if we want to list all the files in a directory and then sort them alphabetically, we can do this in bash with ls | sort, which will print out all the filenames.

Pipes work by replacing the standard input and/or standard output of a program. For example, if we run sort on its own, it gets its own stdin and stdout, as in the following diagram:

images/single_process_io.png

But if we pipe them together, the shell can create a pipe to chain them together without an intermediate file as shown here:

images/2_process_pipe_io.png

In this case, ls gets the terminal input as STDIN, and sort’s output goes to the terminal output. In between, ls’s output goes to the write-end of a new, anonymous pipe, whereas sort’s input is attached to the read-end of the same pipe. Best of all, if we want to create a larger pipeline, we can chain these pipes one after another.

The semantics of reading and writing from a pipe can be subtle, but we don’t have to worry about them for now; programs written in idiomatic UNIX STDIN/STDOUT patterns should “just work,” but in Chapter 8, Streaming with Pipes and Files, we’ll learn how to work with pipes on a more granular level. All we need to do for now is create a pipe and attach it to both of the new processes we create with fork(). To do this, we’ll use two new syscalls: pipe() and dup2().

pipe

Here’s the signature for pipe():

 def pipe(fds:Ptr[Int]):Int

pipe takes one argument: an array containing space for exactly two Ints. If pipe succeeds, the two pointers will be filled with file descriptors. The first will contain the write-only side of the pipe, which is suitable for attaching to standard input, and the second will contain the read-only side, which is likewise suitable for standard output.

Here’s the riddle though: how do we pass this pipe on so a new process can read or write from it without passing in custom arguments or modified code?

Part of the answer is in a subtlety of the exec functions. Although a successful exec call obliterates the state of the calling program, it inherits all open file descriptors. If we want to “trick” a new program into treating the new pipe as though it were the terminal input and output, we can use dup2() to replace the stdin file descriptor with the write end of the pipe.

dup2

dup2() is a more user-friendly variant of another function called dup(), and dup2() has the following signature:

 def dup2(old_fd:Int, new_fd:Int):Int

dup2() is straightforward in terms of signature, but it has some semantic quirks, especially in concert with fork(). Most significantly, when fork, dup2, and pipe are used together, it’s common to create situations where multiple processes have the same file open. There are, in fact, good reasons for multiple processes to keep a file descriptor open at once, but it will only cause problems for us. In particular, programs like sort that require their input fd to be closed before completing will hang until all processes have closed the file.

Because the file handles created by pipe() are anonymous, it’s almost always used in concert with fork(). The general order of system calls is like so:

  1. Call pipe.
  2. Call fork.
  3. In the child, call dup2.
  4. In the child, call execve (or do other work).
  5. In the parent, call close on the pipe.
  6. In the parent, call wait or waitpid.

Implemented in Scala code, we can do this for two processes, like this:

ForkWaitShell/nativePipeTwo/nativePipeTwo.scala
 def​ runTwoAndPipe(input​:​​Int​, output​:​​Int​, proc1​:​​Seq​[​String​],
  proc2​:​​Seq​[​String​])​:​​Int​ = {
 val​ pipe_array ​=​ stackalloc[​Int​](2)
 val​ pipe_ret ​=​ util.pipe(pipe_array)
  println(s​"pipe() returned ${pipe_ret}"​)
 val​ output_pipe ​=​ pipe_array(1)
 val​ input_pipe ​=​ pipe_array(0)
 
 val​ proc1_pid ​=​ doFork { () ​=>
 if​ (input != 0) {
  println(s​"proc ${unistd.getpid()}: about to dup ${input} to stdin"​ )
  util.dup2(input, 0)
  }
  println(s​"proc 1 about to dup ${output_pipe} to stdout"​)
  util.dup2(output_pipe, 1)
  stdio.printf(c​"process %d about to runCommand\n"​, unistd.getpid())
  runCommand(proc1)
  }
 
 
 val​ proc2_pid ​=​ doFork { () ​=>
  println(s​"proc ${unistd.getpid()}: about to dup"​)
  util.dup2(input_pipe, 0)
 if​ (output != 1) {
  util.dup2(output, 1)
  }
  unistd.close(output_pipe)
  stdio.printf(c​"process %d about to runCommand\n"​, unistd.getpid())
  runCommand(proc2)
  }
 
  unistd.close(input)
  unistd.close(output_pipe)
  unistd.close(input_pipe)
 val​ waiting_for ​=​ Seq(proc1_pid, proc2_pid)
  println(s​"waiting for procs: ${waiting_for}"​)
 val​ r1 ​=​ waitpid(-1, ​null​, 0)
  println(s​"proc $r1 returned"​)
 val​ r2 ​=​ waitpid(-1, ​null​, 0)
  println(s​"proc $r2 returned"​)
  r2
 }

But in a real shell, we can string together any number of processes into a single pipeline. Generalizing the previous code to take a Seq of commands to run, instead of a pair, actually simplifies the code somewhat and reduces duplication:

ForkWaitShell/nativePipe/nativePipe.scala
 def​ pipeMany(input​:​​Int​, output​:​​Int​, procs​:​​Seq​[​Seq​[​String​]])​:​​Int​ = {
 val​ pipe_array ​=​ stackalloc[​Int​](2 * (procs.size - 1))
 var​ input_fds ​=​ mutable.ArrayBuffer[​Int​](input)
 var​ output_fds ​=​ mutable.ArrayBuffer[​Int​]()
 // create our array of pipes
 for​ (i ​<-​ 0 until (procs.size - 1)) {
 val​ array_offset ​=​ i * 2
 val​ pipe_ret ​=​ util.pipe(pipe_array + array_offset)
  output_fds += pipe_array(array_offset + 1)
  input_fds += pipe_array(array_offset)
  }
  output_fds += output
 
 val​ procsWithFds ​=​ (procs, input_fds, output_fds).zipped
 val​ pids ​=​ ​for​ ((proc, input_fd, output_fd) ​<-​ procsWithFds) ​yield​ {
  doFork { () ​=>
 // close all pipes that this process won't be using.
 for​ (p ​<-​ 0 until (2 * (procs.size - 1))) {
 if​ (pipe_array(p) != input_fd && pipe_array(p) != output_fd) {
  unistd.close(pipe_array(p))
  }
  }
 // reassign STDIN if we aren't at the front of the pipeline
 if​ (input_fd != input) {
  unistd.close(unistd.STDIN_FILENO)
  util.dup2(input_fd, unistd.STDIN_FILENO)
  }
 // reassign STDOUT if we aren't at the end of the pipeline
 if​ (output_fd != output) {
  unistd.close(unistd.STDOUT_FILENO)
  util.dup2(output_fd, unistd.STDOUT_FILENO)
  }
  runCommand(proc)
  }
  }
 
 for​ (i ​<-​ 0 until (2 * (procs.size - 1))) {
  unistd.close(pipe_array(i))
  }
  unistd.close(input)
 var​ waiting_for ​=​ pids.toSet
 while​ (!waiting_for.isEmpty) {
 val​ wait_result ​=​ waitpid(-1,​null​,0)
  println(s​"- waitpid returned ${wait_result}"​)
  waiting_for ​=​ waiting_for - wait_result
  }
 return​ 0
 }

Now we’re really close to our goal! Let’s write a quick main function that strings a few commands together, the equivalent of running this:

 $ ls -l | sort -R

If we wanted to, we could write a small parser for the traditional bash pipe syntax—it’s a great, compact notation for interactive shell usage. But as we noted at the start of the chapter, interactive use is just the tip of the iceberg. For contemporary automation tasks, a lean Scala DSL like the one we’ve started to sketch out has the potential to be more useful; and in particular, Scala Native’s small binaries and fast startup time really lend themselves to this sort of use case.

So, for this exercise we’ll just bake the command to run into our main function:

ForkWaitShell/nativePipe/nativePipe.scala
 def​ main(args​:​​Array​[​String​])​:​​Unit​ = {
 val​ status ​=​ pipeMany(0,1,Seq(
  Seq(​"/bin/ls"​, ​"."​),
  Seq(​"/usr/bin/sort"​, ​"-r"​)
  ))
  println(s​"- wait returned ${status}"​)
 }

And when we run it, we’ll see this:

 $ ./target/scala-2.11/nativepipe-out
 - proc 1510: running command /usr/bin/sort with args List(/usr/bin/sort, -r)
 - waitpid returned 1509
 target
 project
 nativePipe.scala
 build.sbt
 - proc 1509: running command /bin/ls with args List(/bin/ls, .)
 - waitpid returned 1510
 - wait returned 0

But you may see slightly different results any time you run this program; because we have three processes running in parallel, sharing single terminal output, strange things may happen, as you can see here, where the various output messages can appear interleaved in an unusual order. This is the power and complexity of concurrent programming in a nutshell, and we should take a moment to recognize what we’ve achieved in this chapter. Orchestrating a nontrivial pipeline of processes isn’t easy, and we’ve done it in a remarkably concise bit of Scala code.