Luke Tierney’s snow
(Simple Network of Workstations) package, available from the CRAN R code repository, is arguably the simplest, easiest-to-use form of parallel R and one of the most popular.
The CRAN Task View page on parallel R, http://cran.r-project.org/web/views/HighPerformanceComputing.html, has a fairly up-to-date list of available parallel R packages.
To see how snow
works, here’s code for the mutual outlinks problem described in the previous section:
1 # snow version of mutual links problem 2 3 mtl <- function(ichunk,m) { 4 n <- ncol(m) 5 matches <- 0 6 for (i in ichunk) { 7 if (i < n) { 8 rowi <- m[i,] 9 matches <- matches + 10 sum(m[(i+1):n,] %*% rowi) 11 } 12 } 13 matches 14 } 15 16 mutlinks <- function(cls,m) { 17 n <- nrow(m) 18 nc <- length(cls) 19 # determine which worker gets which chunk of i 20 options(warn=-1) 21 ichunks <- split(1:n,1:nc) 22 options(warn=0) 23 counts <- clusterApply(cls,ichunks,mtl,m) 24 do.call(sum,counts) / (n*(n-1)/2) 25 }
Suppose we have this code in the file SnowMutLinks.R. Let’s first discuss how to run it.
Running the above snow
code involves the following steps:
Load the code.
Load the snow
library.
Form a snow
cluster.
Set up the adjacency matrix of interest.
Run your code on that matrix on the cluster you formed.
Assuming we are running on a dual-core machine, we issue the following commands to R:
> source("SnowMutLinks.R") > library(snow) > cl <- makeCluster(type="SOCK",c("localhost","localhost")) > testm <- matrix(sample(0:1,16,replace=T),nrow=4) > mutlinks(cl,testm) [1] 0.6666667
Here, we are instructing snow
to start two new R processes on our machine (localhost
is a standard network name for the local machine), which I will refer to here as workers. I’ll refer to the original R process—the one in which we type the preceding commands—as the manager. So, at this point, three instances of R will be running on the machine (visible by running the ps
command if you are in a Linux environment, for example).
The workers form a cluster in snow
parlance, which we have named cl
. The snow
package uses what is known in the parallel-processing world as a scatter/gather paradigm, which works as follows:
We have specified that communication between the manager and workers will be via network sockets (covered in Chapter 10).
Here’s a test matrix to check the code:
> testm [,1] [,2] [,3] [,4] [1,] 1 0 0 1 [2,] 0 0 0 0 [3,] 1 0 1 1 [4,] 0 1 0 1
Row 1 has zero outlinks in common with row 2, two in common with row 3, and one in common with row 4. Row 2 has zero outlinks in common with the rest, but row 3 has one in common with row 4. That is a total of four mutual outlinks out of 4 × 3/2 = 6 pairs—hence, the mean value of 4/6 = 0.6666667, as you saw earlier.
You can make clusters of any size, as long as you have the machines. In my department, for instance, I have machines whose network names are pc28
, pc29
, and pc30
. Each machine is dual core, so I could create a six-worker cluster as follows:
> cl6 <- makeCluster(type="SOCK",c("pc28","pc28","pc29","pc29","pc30","pc30"))
Now let’s see how the mutlinks()
function works. First, we sense how many rows the matrix m
has, in line 17, and the number of workers in our cluster, in line 18.
Next, we need to determine which worker will handle which values of i
in the for i
loop in our outline code shown earlier in Section 16.1. R’s split()
function is well suited for this. For instance, in the case of a 4-row matrix and a 2-worker cluster, that call produces the following:
> split(1:4,1:2) $`1` [1] 1 3 $`2` [1] 2 4
An R list is returned whose first element is the vector (1,3) and the second is (2,4). This will set up having one R process work on the odd values of i
and the other work on the even values, as we discussed earlier. We ward off the warnings that split()
would give us (“data length is not a multiple of split variable”) by calling options()
.
The real work is done in line 23, where we call the snow
function clusterApply()
. This function initiates a call to the same specified function (mtl()
here), with some arguments specific to each worker and some optional arguments common to all. So, here’s what the call in line 23 does:
Worker 1 will be directed to call the function mtl()
with the arguments ichunks[[1]]
and m
.
Worker 2 will call mtl()
with the arguments ichunks[[2]]
and m
, and so on for all workers.
Each worker will perform its assigned task and then return the result to the manager.
The manager will collect all such results into an R list, which we have assigned here to counts
.
At this point, we merely need to sum all the elements of counts
. Well, I shouldn’t say “merely,” because there is a little wrinkle to iron out in line 24.
R’s sum()
function is capable of acting on several vector arguments, like this:
> sum(1:2,c(4,10)) [1] 17
But here, counts
is an R list, not a (numeric) vector. So we rely on do.call()
to extract the vectors from counts
, and then we call sum()
on them.
Note lines 9 and 10. As you know, in R, we try to vectorize our computation wherever possible for better performance. By casting things in matrix-times-vector terms, we replace the for j
and for k
loops in the outline in Section 16.1 by a single vector-based expression.
I tried this code on a 1000-by-1000 matrix m1000
. I first ran it on a 4-worker cluster and then on a 12-worker cluster. In principle, I should have had speedups of 4 and 12, respectively. But the actual elapsed times were 6.2 seconds and 5.0 seconds. Compare these figures to the 16.9 seconds runtime in nonparallel form. (The latter consisted of the call mtl(1:1000,m1000)
.) So, I attained a speedup of about 2.7 instead of a theoretical 4.0 for a 4-worker cluster and 3.4 rather than 12.0 on the 12-node system. (Note that some timing variation occurs from run to run.) What went wrong?
In almost any parallel-processing application, you encounter overhead, or “wasted” time spent on noncomputational activity. In our example, there is overhead in the form of the time needed to send our matrix from the manager to the workers. We also encountered a bit of overhead in sending the function mtl()
itself to the workers. And when the workers finish their tasks, returning their results to the manager causes some overhead, too. We’ll discuss this in detail when we talk about general performance considerations in in Section 16.4.1.
To learn more about the capabilities of snow
, we’ll look at another example, this one involving k-means clustering (KMC).
KMC is a technique for exporatory data analysis. In looking at scatter plots of your data, you may have the perception that the observations tend to cluster into groups, and KMC is a method for finding such groups. The output consists of the centroids of the groups.
The following is an outline of the algorithm:
1 for iter = 1,2,...,niters 2 set vector and count totals to 0 3 for i = 1,...,nrow(m) 4 set j = index of the closest group center to m[i,] 5 add m[i,] to the vector total for group j, v[j] 6 add 1 to the count total for group j, c[j] 7 for j = 1,...,ngrps 8 set new center of group j = v[j] / c[j]
Here, we specify niters
iterations, with initcenters
as our initial guesses for the centers of the groups. Our data is in the matrix m
, and there are ngrps
groups.
The following is the snow
code to compute KMC in parallel:
1 # snow version of k-means clustering problem 2 3 library(snow) 4 5 # returns distances from x to each vector in y; 6 # here x is a single vector and y is a bunch of them; 7 # define distance between 2 points to be the sum of the absolute values 8 # of their componentwise differences; e.g., distance between (5,4.2) and 9 # (3,5.6) is 2 + 1.4 = 3.4 10 dst <- function(x,y) { 11 tmpmat <- matrix(abs(x-y),byrow=T,ncol=length(x)) # note recycling 12 rowSums(tmpmat) 13 } 14 15 # will check this worker's mchunk matrix against currctrs, the current 16 # centers of the groups, returning a matrix; row j of the matrix will 17 # consist of the vector sum of the points in mchunk closest to jth 18 # current center, and the count of such points 19 findnewgrps <- function(currctrs) { 20 ngrps <- nrow(currctrs) 21 spacedim <- ncol(currctrs) # what dimension space are we in? 22 # set up the return matrix 23 sumcounts <- matrix(rep(0,ngrps*(spacedim+1)),nrow=ngrps) 24 for (i in 1:nrow(mchunk)) { 25 dsts <- dst(mchunk[i,],t(currctrs)) 26 j <- which.min(dsts) 27 sumcounts[j,] <- sumcounts[j,] + c(mchunk[i,],1) 28 } 29 sumcounts 30 } 31 32 parkm <- function(cls,m,niters,initcenters) { 33 n <- nrow(m) 34 spacedim <- ncol(m) # what dimension space are we in? 35 # determine which worker gets which chunk of rows of m 36 options(warn=-1) 37 ichunks <- split(1:n,1:length(cls)) 38 options(warn=0) 39 # form row chunks 40 mchunks <- lapply(ichunks,function(ichunk) m[ichunk,]) 41 mcf <- function(mchunk) mchunk <<- mchunk 42 # send row chunks to workers; each chunk will be a global variable at 43 # the worker, named mchunk 44 invisible(clusterApply(cls,mchunks,mcf)) 45 # send dst() to workers 46 clusterExport(cls,"dst") 47 # start iterations 48 centers <- initcenters 49 for (i in 1:niters) { 50 sumcounts <- clusterCall(cls,findnewgrps,centers) 51 tmp <- Reduce("+",sumcounts) 52 centers <- tmp[,1:spacedim] / tmp[,spacedim+1] 53 # if a group is empty, let's set its center to 0s 54 centers[is.nan(centers)] <- 0 55 } 56 centers 57 }
The code here is largely similar to our earlier mutual outlinks example. However, there are a couple of new snow
calls and a different kind of usage of an old call.
Let’s start with lines 39 through 44. Since our matrix m
does not change from one iteration to the next, we definitely do not want to resend it to the workers repeatedly, exacerbating the overhead problem. Thus, first we need to send each worker its assigned chunk of m
, just once. This is done in line 44 via snow
’s clusterApply()
function, which we used earlier but need to get creative with here. In line 41, we define the function mcf()
, which will, running on a worker, accept the worker’s chunk from the manager and then keep it as a global variable mchunk
on the worker.
Line 46 makes use of a new snow
function, clusterExport()
, whose job it is to make copies of the manager’s global variables at the workers. The variable in question here is actually a function, dst()
. Here is why we need to send it separately: The call in line 50 will send the function findnewgrps()
to the workers, but although that function calls dst()
, snow
will not know to send the latter as well. Therefore we send it ourselves.
Line 50 itself uses another new snow
call, clusterCall()
. This instructs each worker to call findnewgrps()
, with centers
as argument.
Recall that each worker has a different matrix chunk, so this call will work on different data for each worker. This once again brings up the controversy regarding the use of global variables, discussed in Section 7.8.4. Some software developers may be troubled by the use of a hidden argument in findnewgrps()
. On the other hand, as mentioned earlier, using mchunk
as an argument would mean sending it to the workers repeatedly, compromising performance.
Finally, take a look at line 51. The snow
function clusterApply()
always returns an R list. In this case, the return value is in sumcounts
, each element of which is a matrix. We need to sum the matrices, producing a totals matrix. Using R’s sum()
function wouldn’t work, as it would total all the elements of the matrices into a single number. Matrix addition is what we need.
Calling R’s Reduce()
function will do the matrix addition. Recall that any arithmetic operation in R is implemented as a function; in this case, it is implemented as the function "+"
. The recall to Reduce()
then successively applies "+"
to the elements of the list sumcounts
. Of course, we could just write a loop to do this, but using Reduce()
may give us a small performance boost.