Introducing the snow Package

Luke Tierney’s snow (Simple Network of Workstations) package, available from the CRAN R code repository, is arguably the simplest, easiest-to-use form of parallel R and one of the most popular.

Note

The CRAN Task View page on parallel R, http://cran.r-project.org/web/views/HighPerformanceComputing.html, has a fairly up-to-date list of available parallel R packages.

To see how snow works, here’s code for the mutual outlinks problem described in the previous section:

1    # snow version of mutual links problem
2
3    mtl <- function(ichunk,m) {
4      n <- ncol(m)
5      matches <- 0
6      for (i in ichunk) {
7          if (i < n) {
8             rowi <- m[i,]
9             matches <- matches +
10               sum(m[(i+1):n,] %*% rowi)
11         }
12      }
13      matches
14    }
15
16    mutlinks <- function(cls,m) {
17       n <- nrow(m)
18       nc <- length(cls)
19       # determine which worker gets which chunk of i
20       options(warn=-1)
21       ichunks <- split(1:n,1:nc)
22       options(warn=0)
23       counts <- clusterApply(cls,ichunks,mtl,m)
24       do.call(sum,counts) / (n*(n-1)/2)
25    }

Suppose we have this code in the file SnowMutLinks.R. Let’s first discuss how to run it.

Running the above snow code involves the following steps:

Assuming we are running on a dual-core machine, we issue the following commands to R:

> source("SnowMutLinks.R")
> library(snow)
> cl <- makeCluster(type="SOCK",c("localhost","localhost"))
> testm <- matrix(sample(0:1,16,replace=T),nrow=4)
> mutlinks(cl,testm)
[1] 0.6666667

Here, we are instructing snow to start two new R processes on our machine (localhost is a standard network name for the local machine), which I will refer to here as workers. I’ll refer to the original R process—the one in which we type the preceding commands—as the manager. So, at this point, three instances of R will be running on the machine (visible by running the ps command if you are in a Linux environment, for example).

The workers form a cluster in snow parlance, which we have named cl. The snow package uses what is known in the parallel-processing world as a scatter/gather paradigm, which works as follows:

We have specified that communication between the manager and workers will be via network sockets (covered in Chapter 10).

Here’s a test matrix to check the code:

> testm
  [,1] [,2] [,3] [,4]
[1,]    1    0    0    1
[2,]    0    0    0    0
[3,]    1    0    1    1
[4,]    0    1    0    1

Row 1 has zero outlinks in common with row 2, two in common with row 3, and one in common with row 4. Row 2 has zero outlinks in common with the rest, but row 3 has one in common with row 4. That is a total of four mutual outlinks out of 4 × 3/2 = 6 pairs—hence, the mean value of 4/6 = 0.6666667, as you saw earlier.

You can make clusters of any size, as long as you have the machines. In my department, for instance, I have machines whose network names are pc28, pc29, and pc30. Each machine is dual core, so I could create a six-worker cluster as follows:

> cl6 <- makeCluster(type="SOCK",c("pc28","pc28","pc29","pc29","pc30","pc30"))

Now let’s see how the mutlinks() function works. First, we sense how many rows the matrix m has, in line 17, and the number of workers in our cluster, in line 18.

Next, we need to determine which worker will handle which values of i in the for i loop in our outline code shown earlier in Section 16.1. R’s split() function is well suited for this. For instance, in the case of a 4-row matrix and a 2-worker cluster, that call produces the following:

> split(1:4,1:2)
$`1`
[1] 1 3

$`2`
[1] 2 4

An R list is returned whose first element is the vector (1,3) and the second is (2,4). This will set up having one R process work on the odd values of i and the other work on the even values, as we discussed earlier. We ward off the warnings that split() would give us (“data length is not a multiple of split variable”) by calling options().

The real work is done in line 23, where we call the snow function clusterApply(). This function initiates a call to the same specified function (mtl() here), with some arguments specific to each worker and some optional arguments common to all. So, here’s what the call in line 23 does:

At this point, we merely need to sum all the elements of counts. Well, I shouldn’t say “merely,” because there is a little wrinkle to iron out in line 24.

R’s sum() function is capable of acting on several vector arguments, like this:

> sum(1:2,c(4,10))
[1] 17

But here, counts is an R list, not a (numeric) vector. So we rely on do.call() to extract the vectors from counts, and then we call sum() on them.

Note lines 9 and 10. As you know, in R, we try to vectorize our computation wherever possible for better performance. By casting things in matrix-times-vector terms, we replace the for j and for k loops in the outline in Section 16.1 by a single vector-based expression.

I tried this code on a 1000-by-1000 matrix m1000. I first ran it on a 4-worker cluster and then on a 12-worker cluster. In principle, I should have had speedups of 4 and 12, respectively. But the actual elapsed times were 6.2 seconds and 5.0 seconds. Compare these figures to the 16.9 seconds runtime in nonparallel form. (The latter consisted of the call mtl(1:1000,m1000).) So, I attained a speedup of about 2.7 instead of a theoretical 4.0 for a 4-worker cluster and 3.4 rather than 12.0 on the 12-node system. (Note that some timing variation occurs from run to run.) What went wrong?

In almost any parallel-processing application, you encounter overhead, or “wasted” time spent on noncomputational activity. In our example, there is overhead in the form of the time needed to send our matrix from the manager to the workers. We also encountered a bit of overhead in sending the function mtl() itself to the workers. And when the workers finish their tasks, returning their results to the manager causes some overhead, too. We’ll discuss this in detail when we talk about general performance considerations in in Section 16.4.1.

To learn more about the capabilities of snow, we’ll look at another example, this one involving k-means clustering (KMC).

KMC is a technique for exporatory data analysis. In looking at scatter plots of your data, you may have the perception that the observations tend to cluster into groups, and KMC is a method for finding such groups. The output consists of the centroids of the groups.

The following is an outline of the algorithm:

1    for iter = 1,2,...,niters
2       set vector and count totals to 0
3       for i = 1,...,nrow(m)
4          set j = index of the closest group center to m[i,]
5          add m[i,] to the vector total for group j, v[j]
6          add 1 to the count total for group j, c[j]
7       for j = 1,...,ngrps
8          set new center of group j = v[j] / c[j]

Here, we specify niters iterations, with initcenters as our initial guesses for the centers of the groups. Our data is in the matrix m, and there are ngrps groups.

The following is the snow code to compute KMC in parallel:

1    # snow version of k-means clustering problem
2
3    library(snow)
4
5    # returns distances from x to each vector in y;
6    # here x is a single vector and y is a bunch of them;
7    # define distance between 2 points to be the sum of the absolute values
8    # of their componentwise differences; e.g., distance between (5,4.2) and
9    # (3,5.6) is 2 + 1.4 = 3.4
10    dst <- function(x,y) {
11      tmpmat <- matrix(abs(x-y),byrow=T,ncol=length(x))  # note recycling
12      rowSums(tmpmat)
13    }
14
15    # will check this worker's mchunk matrix against currctrs, the current
16    # centers of the groups, returning a matrix; row j of the matrix will
17    # consist of the vector sum of the points in mchunk closest to jth
18    # current center, and the count of such points
19    findnewgrps <- function(currctrs) {
20      ngrps <- nrow(currctrs)
21      spacedim <- ncol(currctrs)  # what dimension space are we in?
22      # set up the return matrix
23      sumcounts <- matrix(rep(0,ngrps*(spacedim+1)),nrow=ngrps)
24      for (i in 1:nrow(mchunk)) {
25         dsts <- dst(mchunk[i,],t(currctrs))
26         j <- which.min(dsts)
27         sumcounts[j,] <- sumcounts[j,] + c(mchunk[i,],1)
28      }
29      sumcounts
30    }
31
32    parkm <- function(cls,m,niters,initcenters) {
33      n <- nrow(m)
34      spacedim <- ncol(m)  # what dimension space are we in?
35      # determine which worker gets which chunk of rows of m
36      options(warn=-1)
37      ichunks <- split(1:n,1:length(cls))
38      options(warn=0)
39      # form row chunks
40      mchunks <- lapply(ichunks,function(ichunk) m[ichunk,])
41      mcf <- function(mchunk) mchunk <<- mchunk
42      # send row chunks to workers; each chunk will be a global variable at
43      # the worker, named mchunk
44      invisible(clusterApply(cls,mchunks,mcf))
45      # send dst() to workers
46      clusterExport(cls,"dst")
47      # start iterations
48      centers <- initcenters
49      for (i in 1:niters) {
50         sumcounts <- clusterCall(cls,findnewgrps,centers)
51         tmp <- Reduce("+",sumcounts)
52         centers <- tmp[,1:spacedim] / tmp[,spacedim+1]
53         # if a group is empty, let's set its center to 0s
54         centers[is.nan(centers)] <- 0
55      }
56      centers
57    }

The code here is largely similar to our earlier mutual outlinks example. However, there are a couple of new snow calls and a different kind of usage of an old call.

Let’s start with lines 39 through 44. Since our matrix m does not change from one iteration to the next, we definitely do not want to resend it to the workers repeatedly, exacerbating the overhead problem. Thus, first we need to send each worker its assigned chunk of m, just once. This is done in line 44 via snow’s clusterApply() function, which we used earlier but need to get creative with here. In line 41, we define the function mcf(), which will, running on a worker, accept the worker’s chunk from the manager and then keep it as a global variable mchunk on the worker.

Line 46 makes use of a new snow function, clusterExport(), whose job it is to make copies of the manager’s global variables at the workers. The variable in question here is actually a function, dst(). Here is why we need to send it separately: The call in line 50 will send the function findnewgrps() to the workers, but although that function calls dst(), snow will not know to send the latter as well. Therefore we send it ourselves.

Line 50 itself uses another new snow call, clusterCall(). This instructs each worker to call findnewgrps(), with centers as argument.

Recall that each worker has a different matrix chunk, so this call will work on different data for each worker. This once again brings up the controversy regarding the use of global variables, discussed in Section 7.8.4. Some software developers may be troubled by the use of a hidden argument in findnewgrps(). On the other hand, as mentioned earlier, using mchunk as an argument would mean sending it to the workers repeatedly, compromising performance.

Finally, take a look at line 51. The snow function clusterApply() always returns an R list. In this case, the return value is in sumcounts, each element of which is a matrix. We need to sum the matrices, producing a totals matrix. Using R’s sum() function wouldn’t work, as it would total all the elements of the matrices into a single number. Matrix addition is what we need.

Calling R’s Reduce() function will do the matrix addition. Recall that any arithmetic operation in R is implemented as a function; in this case, it is implemented as the function "+". The recall to Reduce() then successively applies "+" to the elements of the list sumcounts. Of course, we could just write a loop to do this, but using Reduce() may give us a small performance boost.