Leaky bucket

We saw how to build a rate limiter using ticker in the previous chapters: by using time.Ticker to force a client to await its turn in order to get served. There is another take on rate limiting of services and libraries that's known as the leaky bucket. The name evokes an image of a bucket with a few holes in it. If you are filling it, you have to be careful to not put too much water into the bucket, otherwise it's going to overflow. Before adding more water, you need to wait for the level to drop – the speed at which this happens will depend on the size of the bucket and the number of the holes it has. We can easily understand what this concurrency pattern does by taking a look at the following analogy:

The bucket will be defined by two attributes:

The bucket has a maximum capacity, so when requests are made with a frequency higher than the rate specified, this capacity starts dropping, just like when you're putting too much water in and the bucket starts to overflow. If the frequency is zero or lower than the rate, the bucket will slowly gain its capacity, and so the water will be slowly drained.

The data structure of the leaky bucket will have a capacity and a counter for the requests that are available. This counter will be the same as the capacity on creation, and will drop each time requests are executed. The rate specifies how often the status needs to be reset to the capacity:

type bucket struct {
capacity uint64
status uint64
}

When creating a new bucket, we should also take care of the status reset. We can use a goroutine for this and use a context to terminate it correctly. We can create a ticker using the rate and then use these ticks to reset the status. We need to use the atomic package to ensure it is thread-safe:

func newBucket(ctx context.Context, cap uint64, rate time.Duration) *bucket {
b := bucket{capacity: cap, status: cap}
go func() {
t := time.NewTicker(rate)
for {
select {
case <-t.C:
atomic.StoreUint64(&b.status, b.capacity)
case <-ctx.Done():
t.Stop()
return
}
}
}()
return &b
}

When we're adding to the bucket, we can check the status and act accordingly:

func (b *bucket) Add(n uint64) uint64 {
for {
r := atomic.LoadUint64(&b.status)
if r == 0 {
return 0
}
if n > r {
n = r
}
if !atomic.CompareAndSwapUint64(&b.status, r, r-n) {
continue
}
return n
}
}

We are using a loop to try atomic swap operations until they succeed to ensure that what we get with the Load operation doesn't change when we are doing a compare and swap (CAS).

The bucket can be used in a client that will try to add a random amount to the bucket and will log its result:

type client struct {
name string
max int
b *bucket
sleep time.Duration
}

func (c client) Run(ctx context.Context, start time.Time) {
for {
select {
case <-ctx.Done():
return
default:
n := 1 + rand.Intn(c.max-1)
time.Sleep(c.sleep)
e := time.Since(start).Seconds()
a := c.b.Add(uint64(n))
log.Printf("%s tries to take %d after %.02fs, takes
%d", c.name, n, e, a)
}
}
}

We can use more clients concurrently so that having concurrent access to resources will have the following result:

func main() {
ctx, canc := context.WithTimeout(context.Background(), time.Second)
defer canc()
start := time.Now()
b := newBucket(ctx, 10, time.Second/5)
t := time.Second / 10
for i := 0; i < 5; i++ {
c := client{
name: fmt.Sprint(i),
b: b,
sleep: t,
max: 5,
}
go c.Run(ctx, start)
}
<-ctx.Done()
}