In this section, we’ll build a concurrent non-blocking cache, an abstraction that solves a problem that arises often in real-world concurrent programs but is not well addressed by existing libraries. This is the problem of memoizing a function, that is, caching the result of a function so that it need be computed only once. Our solution will be concurrency-safe and will avoid the contention associated with designs based on a single lock for the whole cache.
We’ll use the httpGetBody
function below as an example of the
type of function we might want to memoize.
It makes an HTTP GET request and reads the request body.
Calls to this function are relatively expensive, so we’d like to avoid
repeating them unnecessarily.
func httpGetBody(url string) (interface{}, error) { resp, err := http.Get(url) if err != nil { return nil, err } defer resp.Body.Close() return ioutil.ReadAll(resp.Body) }
The final line hides a minor subtlety.
ReadAll
returns two results, a []byte
and an
error
, but since these are assignable to the declared result
types of httpGetBody
—interface{}
and error
,
respectively—we can return the result of the call without further
ado.
We chose this return type for httpGetBody
so that it conforms
to the type of functions that our cache is designed to memoize.
Here’s the first draft of the cache:
// Package memo provides a concurrency-unsafe // memoization of a function of type Func. package memo // A Memo caches the results of calling a Func. type Memo struct { f Func cache map[string]result } // Func is the type of the function to memoize. type Func func(key string) (interface{}, error) type result struct { value interface{} err error } func New(f Func) *Memo { return &Memo{f: f, cache: make(map[string]result)} } // NOTE: not concurrency-safe! func (memo *Memo) Get(key string) (interface{}, error) { res, ok := memo.cache[key] if !ok { res.value, res.err = memo.f(key) memo.cache[key] = res } return res.value, res.err }
A Memo
instance holds the function f
to memoize, of type
Func
, and the cache, which is a mapping from strings
to result
s.
Each result
is simply the pair of results returned by a call to
f
—a value and an error.
We’ll show several variations of Memo
as the design progresses,
but all will share these basic aspects.
An example of how to use Memo
appears below.
For each element in a stream of incoming URLs, we call Get
,
logging the latency of the call and the amount of data it returns:
m := memo.New(httpGetBody) for url := range incomingURLs() { start := time.Now() value, err := m.Get(url) if err != nil { log.Print(err) } fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte))) }
We can use the testing
package (the topic of Chapter 11)
to systematically investigate the effect of memoization.
From the test output below, we see that the URL stream contains
duplicates, and that although the first call to (*Memo).Get
for each URL takes
hundreds of milliseconds, the second request returns the same amount
of data in under a millisecond.
$ go test -v gopl.io/ch9/memo1 === RUN Test https://golang.org, 175.026418ms, 7537 bytes https://godoc.org, 172.686825ms, 6878 bytes https://play.golang.org, 115.762377ms, 5767 bytes http://gopl.io, 749.887242ms, 2856 bytes https://golang.org, 721ns, 7537 bytes https://godoc.org, 152ns, 6878 bytes https://play.golang.org, 205ns, 5767 bytes http://gopl.io, 326ns, 2856 bytes --- PASS: Test (1.21s) PASS ok gopl.io/ch9/memo1 1.257s
This test executes all calls to Get
sequentially.
Since HTTP requests are a great opportunity for parallelism, let’s
change the test so that it makes all requests concurrently.
The test uses a sync.WaitGroup
to wait until the last request
is complete before returning.
m := memo.New(httpGetBody) var n sync.WaitGroup for url := range incomingURLs() { n.Add(1) go func(url string) { start := time.Now() value, err := m.Get(url) if err != nil { log.Print(err) } fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte))) n.Done() }(url) } n.Wait()
The test runs much faster, but unfortunately it is unlikely to work correctly all the time. We may notice unexpected cache misses, or cache hits that return incorrect values, or even crashes.
Worse, it is likely to work correctly some of the time, so we
may not even notice that it has a problem.
But if we run it with the -race
flag,
the race detector (§9.6) often prints
a report such as this one:
$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1 === RUN TestConcurrent ... WARNING: DATA RACE Write by goroutine 36: runtime.mapassign1() ~/go/src/runtime/hashmap.go:411 +0x0 gopl.io/ch9/memo1.(*Memo).Get() ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205 ... Previous write by goroutine 35: runtime.mapassign1() ~/go/src/runtime/hashmap.go:411 +0x0 gopl.io/ch9/memo1.(*Memo).Get() ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205 ... Found 1 data race(s) FAIL gopl.io/ch9/memo1 2.393s
The reference to memo.go:32
tells us that two goroutines have
updated the cache
map without any intervening synchronization.
Get
is not concurrency-safe: it has a data race.
28 func (memo *Memo) Get(key string) (interface{}, error) { 29 res, ok := memo.cache[key] 30 if !ok { 31 res.value, res.err = memo.f(key) 32 memo.cache[key] = res 33 } 34 return res.value, res.err 35 }
The simplest way to make the cache concurrency-safe is to use
monitor-based synchronization.
All we need to do is add a mutex to the Memo
, acquire the
mutex lock at the start of Get
, and release it before
Get
returns, so that the two cache
operations occur
within the critical section:
type Memo struct { f Func mu sync.Mutex // guards cache cache map[string]result } // Get is concurrency-safe. func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() res, ok := memo.cache[key] if !ok { res.value, res.err = memo.f(key) memo.cache[key] = res } memo.mu.Unlock() return res.value, res.err }
Now the race detector is silent, even when running the tests
concurrently.
Unfortunately this change to Memo
reverses our earlier performance
gains.
By holding the lock for the duration of each call to f
,
Get
serializes all the I/O operations we intended to
parallelize.
What we need is a non-blocking cache,
one that does
not serialize calls to the function it memoizes.
In the next implementation of Get
, below, the calling goroutine
acquires the lock twice: once for the lookup, and then a second time
for the update if the lookup returned nothing.
In between, other goroutines are free to use the cache.
func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() res, ok := memo.cache[key] memo.mu.Unlock() if !ok { res.value, res.err = memo.f(key) // Between the two critical sections, several goroutines // may race to compute f(key) and update the map. memo.mu.Lock() memo.cache[key] = res memo.mu.Unlock() } return res.value, res.err }
The performance improves again, but now we notice that some URLs are being
fetched twice.
This happens when two or more goroutines call Get
for the same
URL at about the same time.
Both consult the cache, find no value there, and then call the slow
function f
.
Then both of them update the map with the result they obtained.
One of the results is overwritten by the other.
Ideally we’d like to avoid this redundant work.
This feature is sometimes called duplicate suppression.
In the version of Memo
below, each map element is a pointer to
an entry
struct.
Each entry
contains the memoized result of a call to the
function f
, as before, but it additionally contains a channel
called ready
.
Just after the entry
’s result
has been set, this channel will
be closed, to broadcast (§8.9)
to any other goroutines that it is now
safe for them to read the result from the entry
.
type entry struct { res result ready chan struct{} // closed when res is ready } func New(f Func) *Memo { return &Memo{f: f, cache: make(map[string]*entry)} } type Memo struct { f Func mu sync.Mutex // guards cache cache map[string]*entry } func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() e := memo.cache[key] if e == nil { // This is the first request for this key. // This goroutine becomes responsible for computing // the value and broadcasting the ready condition. e = &entry{ready: make(chan struct{})} memo.cache[key] = e memo.mu.Unlock() e.res.value, e.res.err = memo.f(key) close(e.ready) // broadcast ready condition } else { // This is a repeat request for this key. memo.mu.Unlock() <-e.ready // wait for ready condition } return e.res.value, e.res.err }
A call to Get
now involves acquiring the mutex lock that guards
the cache
map, looking in the map for a pointer to an existing
entry
, allocating and inserting a new entry
if none was found,
then releasing the lock.
If there was an existing entry
, its value is not necessarily ready
yet—another goroutine could still be calling the slow function
f
—so the calling goroutine must wait for the entry
’s “ready”
condition before it reads the entry
’s result
.
It does this by reading a value from the ready
channel,
since this operation blocks until the channel is closed.
If there was no existing entry
, then by inserting a new “not ready”
entry
into the map, the current goroutine becomes responsible for
invoking the slow function, updating the entry
, and
broadcasting the readiness of the new entry
to any other goroutines
that might (by then) be waiting for it.
Notice that the variables e.res.value
and e.res.err
in
the entry
are shared among multiple goroutines. The goroutine
that creates the entry
sets their values, and other goroutines
read their values once the “ready” condition has been broadcast.
Despite being accessed by multiple goroutines, no mutex lock is necessary.
The closing of the ready
channel happens before any other goroutine receives the
broadcast event, so the write to those variables in the first
goroutine happens before they are read by subsequent goroutines.
There is no data race.
Our concurrent, duplicate-suppressing, non-blocking cache is complete.
The implementation of Memo
above uses a mutex to guard a map
variable that is shared by each goroutine that calls Get
.
It’s interesting to contrast this design with an alternative
one in which the map variable is confined to a monitor
goroutine to which callers of Get
must send a message.
The declarations of Func
, result
, and entry
remain as before:
// Func is the type of the function to memoize. type Func func(key string) (interface{}, error) // A result is the result of calling a Func. type result struct { value interface{} err error } type entry struct { res result ready chan struct{} // closed when res is ready }
However, the Memo
type now consists of a channel, requests
,
through which the caller of Get
communicates with the monitor
goroutine.
The element type of the channel is a request
.
Using this structure, the caller of Get
sends the monitor
goroutine both the key, that is, the argument to the memoized function,
and another channel, response
, over which the result should
be sent back when it becomes available.
This channel will carry only a single value.
// A request is a message requesting that the Func be applied to key. type request struct { key string response chan<- result // the client wants a single result } type Memo struct{ requests chan request } // New returns a memoization of f. Clients must subsequently call Close. func New(f Func) *Memo { memo := &Memo{requests: make(chan request)} go memo.server(f) return memo } func (memo *Memo) Get(key string) (interface{}, error) { response := make(chan result) memo.requests <- request{key, response} res := <-response return res.value, res.err } func (memo *Memo) Close() { close(memo.requests) }
The Get
method, above, creates a response channel, puts it in
the request, sends it to the monitor goroutine, then immediately
receives from it.
The cache
variable is confined to the monitor
goroutine (*Memo).server
, shown below.
The monitor reads requests in a loop until the request channel is
closed by the Close
method.
For each request, it consults the cache, creating and inserting a new
entry
if none was found.
func (memo *Memo) server(f Func) { cache := make(map[string]*entry) for req := range memo.requests { e := cache[req.key] if e == nil { // This is the first request for this key. e = &entry{ready: make(chan struct{})} cache[req.key] = e go e.call(f, req.key) // call f(key) } go e.deliver(req.response) } } func (e *entry) call(f Func, key string) { // Evaluate the function. e.res.value, e.res.err = f(key) // Broadcast the ready condition. close(e.ready) } func (e *entry) deliver(response chan<- result) { // Wait for the ready condition. <-e.ready // Send the result to the client. response <- e.res }
In a similar manner to the mutex-based version, the first request for a
given key becomes responsible for calling the function f
on
that key, storing the result in the entry
, and broadcasting the
readiness of the entry
by closing the ready
channel.
This is done by (*entry).call
.
A subsequent request for the same key finds the existing entry
in the
map, waits for the result to become ready, and sends the result
through the response channel to the client goroutine that called
Get
.
This is done by (*entry).deliver
.
The call
and deliver
methods must be called in their own
goroutines to ensure that the monitor goroutine does not stop
processing new requests.
This example shows that it’s possible to build many concurrent structures using either of the two approaches—shared variables and locks, or communicating sequential processes—without excessive complexity.
It’s not always obvious which approach is preferable in a given situation, but it’s worth knowing how they correspond. Sometimes switching from one approach to the other can make your code simpler.
Exercise 9.3:
Extend the Func
type and the (*Memo).Get
method so that
callers may provide an optional done
channel through which they
can cancel the operation (§8.9).
The results of a cancelled Func
call should not be cached.