Build a Log

We will build our log from the bottom up, starting with the store and index files, then the segment, and finally the log. That way we can write and run tests as we build each piece. Since the word log can refer to at least three different things—a record, the file that stores records, and the abstract data type that ties segments together—to make things less confusing, throughout this chapter, I will consistently use the following terms to mean these things:

Code the Store

To get started, create a directory at internal/log for our log package, then create a file called store.go in that directory that contains the following code:

WriteALogPackage/internal/log/store.go
 package​ log
 
 import​ (
 "bufio"
 "encoding/binary"
 "os"
 "sync"
 )
 
 var​ (
  enc = binary.BigEndian
 )
 
 const​ (
  lenWidth = 8
 )
 
 type​ store ​struct​ {
  *os.File
  mu sync.Mutex
  buf *bufio.Writer
  size ​uint64
 }
 
 func​ newStore(f *os.File) (*store, ​error​) {
  fi, err := os.Stat(f.Name())
 if​ err != nil {
 return​ nil, err
  }
  size := ​uint64​(fi.Size())
 return​ &store{
  File: f,
  size: size,
  buf: bufio.NewWriter(f),
  }, nil
 }

The store struct is a simple wrapper around a file with two APIs to append and read bytes to and from the file. The newStore(*os.File) function creates a store for the given file. The function calls os.Stat(name string) to get the file’s current size, in case we’re re-creating the store from a file that has existing data, which would happen if, for example, our service had restarted.

We refer to the enc variable and lenWidth constant repeatedly in the store, so we place them up top where they’re easy to find. enc defines the encoding that we persist record sizes and index entries in and lenWidth defines the number of bytes used to store the record’s length.

Next, write the following Append method below newStore:

WriteALogPackage/internal/log/store.go
 func​ (s *store) Append(p []​byte​) (n ​uint64​, pos ​uint64​, err ​error​) {
  s.mu.Lock()
 defer​ s.mu.Unlock()
  pos = s.size
 if​ err := binary.Write(s.buf, enc, ​uint64​(len(p))); err != nil {
 return​ 0, 0, err
  }
  w, err := s.buf.Write(p)
 if​ err != nil {
 return​ 0, 0, err
  }
  w += lenWidth
  s.size += ​uint64​(w)
 return​ ​uint64​(w), pos, nil
 }

Append([]byte) persists the given bytes to the store. We write the length of the record so that, when we read the record, we know how many bytes to read. We write to the buffered writer instead of directly to the file to reduce the number of system calls and improve performance. If a user wrote a lot of small records, this would help a lot. Then we return the number of bytes written, which similar Go APIs conventionally do, and the position where the store holds the record in its file. The segment will use this position when it creates an associated index entry for this record.

Below Append, add the following Read method:

WriteALogPackage/internal/log/store.go
 func​ (s *store) Read(pos ​uint64​) ([]​byte​, ​error​) {
  s.mu.Lock()
 defer​ s.mu.Unlock()
 if​ err := s.buf.Flush(); err != nil {
 return​ nil, err
  }
  size := make([]​byte​, lenWidth)
 if​ _, err := s.File.ReadAt(size, ​int64​(pos)); err != nil {
 return​ nil, err
  }
  b := make([]​byte​, enc.Uint64(size))
 if​ _, err := s.File.ReadAt(b, ​int64​(pos+lenWidth)); err != nil {
 return​ nil, err
  }
 return​ b, nil
 }

Read(pos uint64) returns the record stored at the given position. First it flushes the writer buffer, in case we’re about to try to read a record that the buffer hasn’t flushed to disk yet. We find out how many bytes we have to read to get the whole record, and then we fetch and return the record. The compiler allocates byte slices that don’t escape the functions they’re declared in on the stack. A value escapes when it lives beyond the lifetime of the function call—if you return the value, for example.

Put this ReadAt method under Read:

WriteALogPackage/internal/log/store.go
 func​ (s *store) ReadAt(p []​byte​, off ​int64​) (​int​, ​error​) {
  s.mu.Lock()
 defer​ s.mu.Unlock()
 if​ err := s.buf.Flush(); err != nil {
 return​ 0, err
  }
 return​ s.File.ReadAt(p, off)
 }

ReadAt(p []byte, off int64) reads len(p) bytes into p beginning at the off offset in the store’s file. It implements io.ReaderAt on the store type.

Last, add this Close method after ReadAt:

WriteALogPackage/internal/log/store.go
 func​ (s *store) Close() ​error​ {
  s.mu.Lock()
 defer​ s.mu.Unlock()
  err := s.buf.Flush()
 if​ err != nil {
 return​ err
  }
 return​ s.File.Close()
 }

Close persists any buffered data before closing the file.

Let’s test that our store works. Create a store_test.go file in the log directory with the following code:

WriteALogPackage/internal/log/store_test.go
 package​ log
 
 import​ (
 "io/ioutil"
 "os"
 "testing"
 
 "github.com/stretchr/testify/require"
 )
 
 var​ (
  write = []​byte​(​"hello world"​)
  width = ​uint64​(len(write)) + lenWidth
 )
 
 func​ TestStoreAppendRead(t *testing.T) {
  f, err := ioutil.TempFile(​""​, ​"store_append_read_test"​)
  require.NoError(t, err)
 defer​ os.Remove(f.Name())
 
  s, err := newStore(f)
  require.NoError(t, err)
 
  testAppend(t, s)
  testRead(t, s)
  testReadAt(t, s)
 
  s, err = newStore(f)
  require.NoError(t, err)
  testRead(t, s)
 }

In this test, we create a store with a temporary file and call two test helpers to test appending and reading from the store. Then we create the store again and test reading from it again to verify that our service will recover its state after a restart.

After the TestStoreAppendRead function, add these test helpers:

WriteALogPackage/internal/log/store_test.go
 func​ testAppend(t *testing.T, s *store) {
  t.Helper()
 for​ i := ​uint64​(1); i < 4; i++ {
  n, pos, err := s.Append(write)
  require.NoError(t, err)
  require.Equal(t, pos+n, width*i)
  }
 }
 
 func​ testRead(t *testing.T, s *store) {
  t.Helper()
 var​ pos ​uint64
 for​ i := ​uint64​(1); i < 4; i++ {
  read, err := s.Read(pos)
  require.NoError(t, err)
  require.Equal(t, write, read)
  pos += width
  }
 }
 
 func​ testReadAt(t *testing.T, s *store) {
  t.Helper()
 for​ i, off := ​uint64​(1), ​int64​(0); i < 4; i++ {
  b := make([]​byte​, lenWidth)
  n, err := s.ReadAt(b, off)
  require.NoError(t, err)
  require.Equal(t, lenWidth, n)
  off += ​int64​(n)
 
  size := enc.Uint64(b)
  b = make([]​byte​, size)
  n, err = s.ReadAt(b, off)
  require.NoError(t, err)
  require.Equal(t, write, b)
  require.Equal(t, ​int​(size), n)
  off += ​int64​(n)
  }
 }

Below testReadAt, add this snippet to test the Close method:

WriteALogPackage/internal/log/store_test.go
 func​ TestStoreClose(t *testing.T) {
  f, err := ioutil.TempFile(​""​, ​"store_close_test"​)
  require.NoError(t, err)
 defer​ os.Remove(f.Name())
  s, err := newStore(f)
  require.NoError(t, err)
  _, _, err = s.Append(write)
  require.NoError(t, err)
 
  f, beforeSize, err := openFile(f.Name())
  require.NoError(t, err)
 
  err = s.Close()
  require.NoError(t, err)
 
  _, afterSize, err := openFile(f.Name())
  require.NoError(t, err)
  require.True(t, afterSize > beforeSize)
 }
 
 func​ openFile(name ​string​) (file *os.File, size ​int64​, err ​error​) {
  f, err := os.OpenFile(
  name,
  os.O_RDWR|os.O_CREATE|os.O_APPEND,
  0644,
  )
 if​ err != nil {
 return​ nil, 0, err
  }
  fi, err := f.Stat()
 if​ err != nil {
 return​ nil, 0, err
  }
 return​ f, fi.Size(), nil
 }

Assuming these tests pass, you know that your log can append and read persisted records.

Write the Index

Next let’s code the index. Create an index.go file inside internal/log that contains the following code:

WriteALogPackage/internal/log/index.go
 package​ log
 
 import​ (
 "io"
 "os"
 
 "github.com/tysontate/gommap"
 )
 
 var​ (
  offWidth ​uint64​ = 4
  posWidth ​uint64​ = 8
  entWidth = offWidth + posWidth
 )
 
 type​ index ​struct​ {
  file *os.File
  mmap gommap.MMap
  size ​uint64
 }

We use the *Width constants throughout the index, so like with the store’s variables and constants, we put the constants at the top of the file to make them easy to find. The *Width constants define the number of bytes that make up each index entry.

Our index entries contain two fields: the record’s offset and its position in the store file. We store offsets as uint32s and positions as uint64s, so they take up 4 and 8 bytes of space, respectively. We use the entWidth to jump straight to the position of an entry given its offset since the position in the file is offset * entWidth.

index defines our index file, which comprises a persisted file and a memory-mapped file. The size tells us the size of the index and where to write the next entry appended to the index.

Now add the following newIndex function below the index:

WriteALogPackage/internal/log/index.go
 func​ newIndex(f *os.File, c Config) (*index, ​error​) {
  idx := &index{
  file: f,
  }
  fi, err := os.Stat(f.Name())
 if​ err != nil {
 return​ nil, err
  }
  idx.size = ​uint64​(fi.Size())
 if​ err = os.Truncate(
  f.Name(), ​int64​(c.Segment.MaxIndexBytes),
  ); err != nil {
 return​ nil, err
  }
 if​ idx.mmap, err = gommap.Map(
  idx.file.Fd(),
  gommap.PROT_READ|gommap.PROT_WRITE,
  gommap.MAP_SHARED,
  ); err != nil {
 return​ nil, err
  }
 return​ idx, nil
 }

newIndex(*os.File) creates an index for the given file. We create the index and save the current size of the file so we can track the amount of data in the index file as we add index entries. We grow the file to the max index size before memory-mapping the file and then return the created index to the caller.

Next, add the following Close method below newIndex:

WriteALogPackage/internal/log/index.go
 func​ (i *index) Close() ​error​ {
 if​ err := i.mmap.Sync(gommap.MS_SYNC); err != nil {
 return​ err
  }
 if​ err := i.file.Sync(); err != nil {
 return​ err
  }
 if​ err := i.file.Truncate(​int64​(i.size)); err != nil {
 return​ err
  }
 return​ i.file.Close()
 }

Close makes sure the memory-mapped file has synced its data to the persisted file and that the persisted file has flushed its contents to stable storage. Then it truncates the persisted file to the amount of data that’s actually in it and closes the file.

Now that we’ve seen the code for both opening and closing an index, we can discuss what this growing and truncating the file business is all about.

When we start our service, the service needs to know the offset to set on the next record appended to the log. The service learns the next record’s offset by looking at the last entry of the index, a simple process of reading the last 12 bytes of the file. However, we mess up this process when we grow the files so we can memory-map them. (The reason we resize them now is that, once they’re memory-mapped, we can’t resize them, so it’s now or never.) We grow the files by appending empty space at the end of them, so the last entry is no longer at the end of the file—instead, there’s some unknown amount of space between this entry and the file’s end. This space prevents the service from restarting properly. That’s why we shut down the service by truncating the index files to remove the empty space and put the last entry at the end of the file once again. This graceful shutdown returns the service to a state where it can restart properly and efficiently.

Handling Ungraceful Shutdowns

images/aside-icons/warning.png A graceful shutdown occurs when a service finishes its ongoing tasks, performs its processes to ensure there’s no data loss, and prepares for a restart. If your service crashes or its hardware fails, you’ll experience an ungraceful shutdown. An example of an ungraceful shutdown for the service we’re building would be if it lost power before it finished truncating its index files. You handle ungraceful shutdowns by performing a sanity check when your service restarts to find corrupted data. If you have corrupted data, you can rebuild the data or replicate the data from an uncorrupted source. The log we’re building doesn’t handle ungraceful shutdowns because I wanted to keep the code simple.

And now back to our regularly scheduled programming.

Add the following Read method below newIndex:

WriteALogPackage/internal/log/index.go
 func​ (i *index) Read(in ​int64​) (out ​uint32​, pos ​uint64​, err ​error​) {
 if​ i.size == 0 {
 return​ 0, 0, io.EOF
  }
 if​ in == -1 {
  out = ​uint32​((i.size / entWidth) - 1)
  } ​else​ {
  out = ​uint32​(in)
  }
  pos = ​uint64​(out) * entWidth
 if​ i.size < pos+entWidth {
 return​ 0, 0, io.EOF
  }
  out = enc.Uint32(i.mmap[pos : pos+offWidth])
  pos = enc.Uint64(i.mmap[pos+offWidth : pos+entWidth])
 return​ out, pos, nil
 }

Read(int64) takes in an offset and returns the associated record’s position in the store. The given offset is relative to the segment’s base offset; 0 is always the offset of the index’s first entry, 1 is the second entry, and so on. We use relative offsets to reduce the size of the indexes by storing offsets as uint32s. If we used absolute offsets, we’d have to store the offsets as uint64s and require four more bytes for each entry. Four bytes doesn’t sound like much, until you multiply it by the number of records people often use distributed logs for, which with a company like LinkedIn is trillions of records every day. Even relatively small companies can make billions of records per day.

Now add the following Write method below Read:

WriteALogPackage/internal/log/index.go
 func​ (i *index) Write(off ​uint32​, pos ​uint64​) ​error​ {
 if​ ​uint64​(len(i.mmap)) < i.size+entWidth {
 return​ io.EOF
  }
  enc.PutUint32(i.mmap[i.size:i.size+offWidth], off)
  enc.PutUint64(i.mmap[i.size+offWidth:i.size+entWidth], pos)
  i.size += ​uint64​(entWidth)
 return​ nil
 }

Write(off uint32, pos uint32) appends the given offset and position to the index. First, we validate that we have space to write the entry. If there’s space, we then encode the offset and position and write them to the memory-mapped file. Then we increment the position where the next write will go.

Add this Name method to return the index’s file path:

WriteALogPackage/internal/log/index.go
 func​ (i *index) Name() ​string​ {
 return​ i.file.Name()
 }

Let’s test our index. Create an index_test.go file in internal/log starting with the following code:

WriteALogPackage/internal/log/index_test.go
 package​ log
 
 import​ (
 "io"
 "io/ioutil"
 "os"
 "testing"
 
 "github.com/stretchr/testify/require"
 )
 
 func​ TestIndex(t *testing.T) {
  f, err := ioutil.TempFile(os.TempDir(), ​"index_test"​)
  require.NoError(t, err)
 defer​ os.Remove(f.Name())
 
  c := Config{}
  c.Segment.MaxIndexBytes = 1024
  idx, err := newIndex(f, c)
  require.NoError(t, err)
  _, _, err = idx.Read(-1)
  require.Error(t, err)
  require.Equal(t, f.Name(), idx.Name())
 
  entries := []​struct​ {
  Off ​uint32
  Pos ​uint64
  }{
  {Off: 0, Pos: 0},
  {Off: 1, Pos: 10},
  }

This code sets up the test. We create an index file and make it big enough to contain our test entries via the Truncate call. We have to grow the file before we use it because we memory-map the file to a slice of bytes and if we didn’t increase the size of the file before we wrote to it, we’d get an out-of-bounds error.

Finally, add the following code beneath the previous snippet to finish the test:

WriteALogPackage/internal/log/index_test.go
 for​ _, want := ​range​ entries {
  err = idx.Write(want.Off, want.Pos)
  require.NoError(t, err)
 
  _, pos, err := idx.Read(​int64​(want.Off))
  require.NoError(t, err)
  require.Equal(t, want.Pos, pos)
  }
 
 // index and scanner should error when reading past existing entries
  _, _, err = idx.Read(​int64​(len(entries)))
  require.Equal(t, io.EOF, err)
  _ = idx.Close()
 
 // index should build its state from the existing file
  f, _ = os.OpenFile(f.Name(), os.O_RDWR, 0600)
  idx, err = newIndex(f, c)
  require.NoError(t, err)
  off, pos, err := idx.Read(-1)
  require.NoError(t, err)
  require.Equal(t, ​uint32​(1), off)
  require.Equal(t, entries[1].Pos, pos)
 }

We iterate over each entry and write it to the index. We check that we can read the same entry back via the Read method. Then we verify that the index and scanner error when we try to read beyond the number of entries stored in the index. And we check that the index builds its state from the existing file, for when our service restarts with existing data.

We need to configure the max size of a segment’s store and index. Let’s add a config struct to centralize the log’s configuration, making it easy to configure the log and use the configs throughout the code. Create an internal/log/config.go file with the following code:

WriteALogPackage/internal/log/config.go
 package​ log
 
 type​ Config ​struct​ {
  Segment ​struct​ {
  MaxStoreBytes ​uint64
  MaxIndexBytes ​uint64
  InitialOffset ​uint64
  }
 }

That wraps up the code for store and index types, which make up the lowest level of our log. Now let’s code the segment.

Create the Segment

The segment wraps the index and store types to coordinate operations across the two. For example, when the log appends a record to the active segment, the segment needs to write the data to its store and add a new entry in the index. Similarly for reads, the segment needs to look up the entry from the index and then fetch the data from the store.

To get started, create a file called segment.go in internal/log that starts with the following code:

WriteALogPackage/internal/log/segment.go
 package​ log
 
 import​ (
 "fmt"
 "os"
 "path"
 
  api ​"github.com/travisjeffery/proglog/api/v1"
 "google.golang.org/protobuf/proto"
 )
 
 type​ segment ​struct​ {
  store *store
  index *index
  baseOffset, nextOffset ​uint64
  config Config
 }

Our segment needs to call its store and index files, so we keep pointers to those in the first two fields. We need the next and base offsets to know what offset to append new records under and to calculate the relative offsets for the index entries. And we put the config on the segment so we can compare the store file and index sizes to the configured limits, which lets us know when the segment is maxed out.

Below the previous snippet, add the following newSegment function:

WriteALogPackage/internal/log/segment.go
 func​ newSegment(dir ​string​, baseOffset ​uint64​, c Config) (*segment, ​error​) {
  s := &segment{
  baseOffset: baseOffset,
  config: c,
  }
 var​ err ​error
  storeFile, err := os.OpenFile(
  path.Join(dir, fmt.Sprintf(​"%d%s"​, baseOffset, ​".store"​)),
  os.O_RDWR|os.O_CREATE|os.O_APPEND,
  0644,
  )
 if​ err != nil {
 return​ nil, err
  }
 if​ s.store, err = newStore(storeFile); err != nil {
 return​ nil, err
  }
  indexFile, err := os.OpenFile(
  path.Join(dir, fmt.Sprintf(​"%d%s"​, baseOffset, ​".index"​)),
  os.O_RDWR|os.O_CREATE,
  0644,
  )
 if​ err != nil {
 return​ nil, err
  }
 if​ s.index, err = newIndex(indexFile, c); err != nil {
 return​ nil, err
  }
 if​ off, _, err := s.index.Read(-1); err != nil {
  s.nextOffset = baseOffset
  } ​else​ {
  s.nextOffset = baseOffset + ​uint64​(off) + 1
  }
 return​ s, nil
 }

The log calls newSegment when it needs to add a new segment, such as when the current active segment hits its max size. We open the store and index files and pass the os.O_CREATE file mode flag as an argument to os.OpenFile to create the files if they don’t exist yet. When we create the store file, we pass the os.O_APPEND flag to make the operating system append to the file when writing. Then we create our index and store with these files. Finally, we set the segment’s next offset to prepare for the next appended record. If the index is empty, then the next record appended to the segment would be the first record and its offset would be the segment’s base offset. If the index has at least one entry, then that means the offset of the next record written should take the offset at the end of the segment, which we get by adding 1 to the base offset and relative offset. Our segment is ready to write to and read from the log—once we’ve written those methods!

Next, below newSegment put the following Append method:

WriteALogPackage/internal/log/segment.go
 func​ (s *segment) Append(record *api.Record) (offset ​uint64​, err ​error​) {
  cur := s.nextOffset
  record.Offset = cur
  p, err := proto.Marshal(record)
 if​ err != nil {
 return​ 0, err
  }
  _, pos, err := s.store.Append(p)
 if​ err != nil {
 return​ 0, err
  }
 if​ err = s.index.Write(
 // index offsets are relative to base offset
 uint32​(s.nextOffset-​uint64​(s.baseOffset)),
  pos,
  ); err != nil {
 return​ 0, err
  }
  s.nextOffset++
 return​ cur, nil
 }

Append writes the record to the segment and returns the newly appended record’s offset. The log returns the offset to the API response. The segment appends a record in a two-step process: it appends the data to the store and then adds an index entry. Since index offsets are relative to the base offset, we subtract the segment’s next offset from its base offset (which are both absolute offsets) to get the entry’s relative offset in the segment. We then increment the next offset to prep for a future append call.

Now add the following Read method below Append:

WriteALogPackage/internal/log/segment.go
 func​ (s *segment) Read(off ​uint64​) (*api.Record, ​error​) {
  _, pos, err := s.index.Read(​int64​(off - s.baseOffset))
 if​ err != nil {
 return​ nil, err
  }
  p, err := s.store.Read(pos)
 if​ err != nil {
 return​ nil, err
  }
  record := &api.Record{}
  err = proto.Unmarshal(p, record)
 return​ record, err
 }

Read(off uint64) returns the record for the given offset. Similar to writes, to read a record the segment must first translate the absolute index into a relative offset and get the associated index entry. Once it has the index entry, the segment can go straight to the record’s position in the store and read the proper amount of data.

Next, put the following IsMaxed method below Read:

WriteALogPackage/internal/log/segment.go
 func​ (s *segment) IsMaxed() ​bool​ {
 return​ s.store.size >= s.config.Segment.MaxStoreBytes ||
  s.index.size >= s.config.Segment.MaxIndexBytes
 }

IsMaxed returns whether the segment has reached its max size, either by writing too much to the store or the index. If you wrote a small number of long logs, then you’d hit the segment bytes limit; if you wrote a lot of small logs, then you’d hit the index bytes limit. The log uses this method to know it needs to create a new segment.

Write this Remove method below IsMaxed:

WriteALogPackage/internal/log/segment.go
 func​ (s *segment) Remove() ​error​ {
 if​ err := s.Close(); err != nil {
 return​ err
  }
 if​ err := os.Remove(s.index.Name()); err != nil {
 return​ err
  }
 if​ err := os.Remove(s.store.Name()); err != nil {
 return​ err
  }
 return​ nil
 }

Remove closes the segment and removes the index and store files.

And put this Close method below Remove:

WriteALogPackage/internal/log/segment.go
 func​ (s *segment) Close() ​error​ {
 if​ err := s.index.Close(); err != nil {
 return​ err
  }
 if​ err := s.store.Close(); err != nil {
 return​ err
  }
 return​ nil
 }

Finally, add this last function at the end of the file:

WriteALogPackage/internal/log/segment.go
 func​ nearestMultiple(j, k ​uint64​) ​uint64​ {
 if​ j >= 0 {
 return​ (j / k) * k
  }
 return​ ((j - k + 1) / k) * k
 
 }

nearestMultiple(j uint64, k uint64) returns the nearest and lesser multiple of k in j, for example nearestMultiple(9, 4) == 8. We take the lesser multiple to make sure we stay under the user’s disk capacity.

That’s all the segment code, so now let’s test it. Create a segment_test.go file inside internal/log with the following test code:

WriteALogPackage/internal/log/segment_test.go
 package​ log
 
 import​ (
 "io"
 "io/ioutil"
 "os"
 "testing"
 
 "github.com/stretchr/testify/require"
  api ​"github.com/travisjeffery/proglog/api/v1"
 )
 
 func​ TestSegment(t *testing.T) {
  dir, _ := ioutil.TempDir(​""​, ​"segment-test"​)
 defer​ os.RemoveAll(dir)
 
  want := &api.Record{Value: []​byte​(​"hello world"​)}
 
  c := Config{}
  c.Segment.MaxStoreBytes = 1024
  c.Segment.MaxIndexBytes = entWidth * 3
 
  s, err := newSegment(dir, 16, c)
  require.NoError(t, err)
  require.Equal(t, ​uint64​(16), s.nextOffset, s.nextOffset)
  require.False(t, s.IsMaxed())
 
 for​ i := ​uint64​(0); i < 3; i++ {
  off, err := s.Append(want)
  require.NoError(t, err)
  require.Equal(t, 16+i, off)
 
  got, err := s.Read(off)
  require.NoError(t, err)
  require.Equal(t, want.Value, got.Value)
  }
 
  _, err = s.Append(want)
  require.Equal(t, io.EOF, err)
 
 // maxed index
  require.True(t, s.IsMaxed())
 
  c.Segment.MaxStoreBytes = ​uint64​(len(want.Value) * 3)
  c.Segment.MaxIndexBytes = 1024
 
  s, err = newSegment(dir, 16, c)
  require.NoError(t, err)
 // maxed store
  require.True(t, s.IsMaxed())
 
  err = s.Remove()
  require.NoError(t, err)
  s, err = newSegment(dir, 16, c)
  require.NoError(t, err)
  require.False(t, s.IsMaxed())
 }

We test that we can append a record to a segment, read back the same record, and eventually hit the configured max size for both the store and index. Calling newSegment twice with the same base offset and dir also checks that the function loads a segment’s state from the persisted index and log files.

Now that we know that our segment works, we’re ready to create the log.

Code the Log

All right, one last piece to go and that’s the log, which manages the list of segments. Create a log.go file inside internal/log that starts with the following code:

WriteALogPackage/internal/log/log.go
 package​ log
 
 import​ (
 "fmt"
 "io"
 "io/ioutil"
 "os"
 "path"
 "sort"
 "strconv"
 "strings"
 "sync"
 
  api ​"github.com/travisjeffery/proglog/api/v1"
 )
 
 type​ Log ​struct​ {
  mu sync.RWMutex
 
  Dir ​string
  Config Config
 
  activeSegment *segment
  segments []*segment
 }

The log consists of a list of segments and a pointer to the active segment to append writes to. The directory is where we store the segments.

Below the Log struct, write the following NewLog function:

WriteALogPackage/internal/log/log.go
 func​ NewLog(dir ​string​, c Config) (*Log, ​error​) {
 if​ c.Segment.MaxStoreBytes == 0 {
  c.Segment.MaxStoreBytes = 1024
  }
 if​ c.Segment.MaxIndexBytes == 0 {
  c.Segment.MaxIndexBytes = 1024
  }
  l := &Log{
  Dir: dir,
  Config: c,
  }
 
 return​ l, l.setup()
 }

In NewLog(dir string, c Config), we first set defaults for the configs the caller didn’t specify, create a log instance, and set up that instance.

Next, add this setup method below NewLog:

WriteALogPackage/internal/log/log.go
 func​ (l *Log) setup() ​error​ {
  files, err := ioutil.ReadDir(l.Dir)
 if​ err != nil {
 return​ err
  }
 var​ baseOffsets []​uint64
 for​ _, file := ​range​ files {
  offStr := strings.TrimSuffix(
  file.Name(),
  path.Ext(file.Name()),
  )
  off, _ := strconv.ParseUint(offStr, 10, 0)
  baseOffsets = append(baseOffsets, off)
  }
  sort.Slice(baseOffsets, ​func​(i, j ​int​) ​bool​ {
 return​ baseOffsets[i] < baseOffsets[j]
  })
 for​ i := 0; i < len(baseOffsets); i++ {
 if​ err = l.newSegment(baseOffsets[i]); err != nil {
 return​ err
  }
 // baseOffset contains dup for index and store so we skip
 // the dup
  i++
  }
 if​ l.segments == nil {
 if​ err = l.newSegment(
  l.Config.Segment.InitialOffset,
  ); err != nil {
 return​ err
  }
  }
 return​ nil
 }

When a log starts, it’s responsible for setting itself up for the segments that already exist on disk or, if the log is new and has no existing segments, for bootstrapping the initial segment. We fetch the list of the segments on disk, parse and sort the base offsets (because we want our slice of segments to be in order from oldest to newest), and then create the segments with the newSegment helper method, which creates a segment for the base offset you pass in.

Now add the following Append function below setup:

WriteALogPackage/internal/log/log.go
 func​ (l *Log) Append(record *api.Record) (​uint64​, ​error​) {
  l.mu.Lock()
 defer​ l.mu.Unlock()
  off, err := l.activeSegment.Append(record)
 if​ err != nil {
 return​ 0, err
  }
 if​ l.activeSegment.IsMaxed() {
  err = l.newSegment(off + 1)
  }
 return​ off, err
 }

Append(*api.Record) appends a record to the log. We append the record to the active segment. Afterward, if the segment is at its max size (per the max size configs), then we make a new active segment. Note that we’re wrapping this func (and subsequent funcs) with a mutex to coordinate access to this section of the code. We use a RWMutex to grant access to reads when there isn’t a write holding the lock. If you felt so inclined, you could optimize this further and make the locks per segment rather than across the whole log. (I haven’t done that here because I want to keep this code simple.)

Below Append, add this Read method:

WriteALogPackage/internal/log/log.go
 func​ (l *Log) Read(off ​uint64​) (*api.Record, ​error​) {
  l.mu.RLock()
 defer​ l.mu.RUnlock()
 var​ s *segment
 for​ _, segment := ​range​ l.segments {
 if​ segment.baseOffset <= off && off < segment.nextOffset {
  s = segment
 break
  }
  }
 if​ s == nil || s.nextOffset <= off {
 return​ nil, fmt.Errorf(​"offset out of range: %d"​, off)
  }
 return​ s.Read(off)
 }

Read(offset uint64) reads the record stored at the given offset. In Read(offset uint64), we first find the segment that contains the given record. Since the segments are in order from oldest to newest and the segment’s base offset is the smallest offset in the segment, we iterate over the segments until we find the first segment whose base offset is less than or equal to the offset we’re looking for. Once we know the segment that contains the record, we get the index entry from the segment’s index, and we read the data out of the segment’s store file and return the data to the caller.

Below Read, add this snippet to define the Close, Remove, and Reset methods:

WriteALogPackage/internal/log/log.go
 func​ (l *Log) Close() ​error​ {
  l.mu.Lock()
 defer​ l.mu.Unlock()
 for​ _, segment := ​range​ l.segments {
 if​ err := segment.Close(); err != nil {
 return​ err
  }
  }
 return​ nil
 }
 
 func​ (l *Log) Remove() ​error​ {
 if​ err := l.Close(); err != nil {
 return​ err
  }
 return​ os.RemoveAll(l.Dir)
 }
 
 func​ (l *Log) Reset() ​error​ {
 if​ err := l.Remove(); err != nil {
 return​ err
  }
 return​ l.setup()
 }

This snippet implements a few related methods:

After the previous snippet, add this snippet to implement the LowestOffset and HighestOffset methods:

WriteALogPackage/internal/log/log.go
 func​ (l *Log) LowestOffset() (​uint64​, ​error​) {
  l.mu.RLock()
 defer​ l.mu.RUnlock()
 return​ l.segments[0].baseOffset, nil
 }
 
 func​ (l *Log) HighestOffset() (​uint64​, ​error​) {
  l.mu.RLock()
 defer​ l.mu.RUnlock()
  off := l.segments[len(l.segments)-1].nextOffset
 if​ off == 0 {
 return​ 0, nil
  }
 return​ off - 1, nil
 }

These methods tell us the offset range stored in the log. In Chapter 8, Coordinate Your Services with Consensus, when we work on supporting a replicated, coordinated cluster, we’ll need this information to know what nodes have the oldest and newest data and what nodes are falling behind and need to replicate.

Below HighestOffset, add this Truncate method:

WriteALogPackage/internal/log/log.go
 func​ (l *Log) Truncate(lowest ​uint64​) ​error​ {
  l.mu.Lock()
 defer​ l.mu.Unlock()
 var​ segments []*segment
 for​ _, s := ​range​ l.segments {
 if​ s.nextOffset <= lowest+1 {
 if​ err := s.Remove(); err != nil {
 return​ err
  }
 continue
  }
  segments = append(segments, s)
  }
  l.segments = segments
 return​ nil
 }

Truncate(lowest uint64) removes all segments whose highest offset is lower than lowest. Because we don’t have disks with infinite space, we’ll periodically call Truncate to remove old segments whose data we (hopefully) have processed by then and don’t need anymore.

After Truncate, add this snippet:

WriteALogPackage/internal/log/log.go
 func​ (l *Log) Reader() io.Reader {
  l.mu.RLock()
 defer​ l.mu.RUnlock()
  readers := make([]io.Reader, len(l.segments))
 for​ i, segment := ​range​ l.segments {
  readers[i] = &originReader{segment.store, 0}
  }
 return​ io.MultiReader(readers...)
 }
 
 type​ originReader ​struct​ {
  *store
  off ​int64
 }
 
 func​ (o *originReader) Read(p []​byte​) (​int​, ​error​) {
  n, err := o.ReadAt(p, o.off)
  o.off += ​int64​(n)
 return​ n, err
 }

Reader returns an io.Reader to read the whole log. We’ll need this capability when we implement coordinate consensus and need to support snapshots and restoring a log. Reader uses an io.MultiReader call to concatenate the segments’ stores. The segment stores are wrapped by the originReader type for two reasons. The first reason is to satisfy the io.Reader interface so we can pass it into the io.MultiReader call. The second is to ensure that we begin reading from the origin of the store and read its entire file.

We’ve got one last method to add to our log, and that’s the function to create new segments. Copy the following newSegment method below Read:

WriteALogPackage/internal/log/log.go
 func​ (l *Log) newSegment(off ​uint64​) ​error​ {
  s, err := newSegment(l.Dir, off, l.Config)
 if​ err != nil {
 return​ err
  }
  l.segments = append(l.segments, s)
  l.activeSegment = s
 return​ nil
 }

newSegment(off int64) creates a new segment, appends that segment to the log’s slice of segments, and makes the new segment the active segment so that subsequent append calls write to it.

You know the deal: it’s time to test our log. Create a log_test.go inside internal/log that starts with the following code:

WriteALogPackage/internal/log/log_test.go
 package​ log
 
 import​ (
 "io/ioutil"
 "os"
 "testing"
 
 "github.com/stretchr/testify/require"
  api ​"github.com/travisjeffery/proglog/api/v1"
 "google.golang.org/protobuf/proto"
 )
 
 func​ TestLog(t *testing.T) {
 for​ scenario, fn := ​range​ ​map​[​string​]​func​(
  t *testing.T, log *Log,
  ){
 "append and read a record succeeds"​: testAppendRead,
 "offset out of range error"​: testOutOfRangeErr,
 "init with existing segments"​: testInitExisting,
 "reader"​: testReader,
 "truncate"​: testTruncate,
  } {
  t.Run(scenario, ​func​(t *testing.T) {
  dir, err := ioutil.TempDir(​""​, ​"store-test"​)
  require.NoError(t, err)
 defer​ os.RemoveAll(dir)
 
  c := Config{}
  c.Segment.MaxStoreBytes = 32
  log, err := NewLog(dir, c)
  require.NoError(t, err)
 
  fn(t, log)
  })
  }
 }

TestLog(*testing.T) defines a table of tests to, well, test the log. I used a table to write the log tests so we don’t have to repeat the code that creates a new log for every test case.

Now, let’s define the test cases. Put the following test cases below the TestLog function:

WriteALogPackage/internal/log/log_test.go
 func​ testAppendRead(t *testing.T, log *Log) {
  append := &api.Record{
  Value: []​byte​(​"hello world"​),
  }
  off, err := log.Append(append)
  require.NoError(t, err)
  require.Equal(t, ​uint64​(0), off)
 
  read, err := log.Read(off)
  require.NoError(t, err)
  require.Equal(t, append.Value, read.Value)
 }

testAppendRead(*testing.T, *log.Log) tests that we can successfully append to and read from the log. When we append a record to the log, the log returns the offset it associated that record with. So, when we ask the log for the record at that offset, we expect to get the same record that we appended.

WriteALogPackage/internal/log/log_test.go
 func​ testOutOfRangeErr(t *testing.T, log *Log) {
  read, err := log.Read(1)
  require.Nil(t, read)
  require.Error(t, err)
 }

testOutOfRangeErr(*testing.T, *log.Log) tests that the log returns an error when we try to read an offset that’s outside of the range of offsets the log has stored.

WriteALogPackage/internal/log/log_test.go
 func​ testInitExisting(t *testing.T, o *Log) {
  append := &api.Record{
  Value: []​byte​(​"hello world"​),
  }
 for​ i := 0; i < 3; i++ {
  _, err := o.Append(append)
  require.NoError(t, err)
  }
  require.NoError(t, o.Close())
 
  off, err := o.LowestOffset()
  require.NoError(t, err)
  require.Equal(t, ​uint64​(0), off)
  off, err = o.HighestOffset()
  require.NoError(t, err)
  require.Equal(t, ​uint64​(2), off)
 
  n, err := NewLog(o.Dir, o.Config)
  require.NoError(t, err)
 
  off, err = n.LowestOffset()
  require.NoError(t, err)
  require.Equal(t, ​uint64​(0), off)
  off, err = n.HighestOffset()
  require.NoError(t, err)
  require.Equal(t, ​uint64​(2), off)
 }

testInitExisting(*testing.T, *log.Log) tests that when we create a log, the log bootstraps itself from the data stored by prior log instances. We append three records to the original log before closing it. Then we create a new log configured with the same directory as the old log. Finally, we confirm that the new log set itself up from the data stored by the original log.

WriteALogPackage/internal/log/log_test.go
 func​ testReader(t *testing.T, log *Log) {
  append := &api.Record{
  Value: []​byte​(​"hello world"​),
  }
  off, err := log.Append(append)
  require.NoError(t, err)
  require.Equal(t, ​uint64​(0), off)
 
  reader := log.Reader()
  b, err := ioutil.ReadAll(reader)
  require.NoError(t, err)
 
  read := &api.Record{}
  err = proto.Unmarshal(b[lenWidth:], read)
  require.NoError(t, err)
  require.Equal(t, append.Value, read.Value)
 }

testReader(*testing.T, *log.Log) tests that we can read the full, raw log as it’s stored on disk so that we can snapshot and restore the logs in Finite-State Machine.

WriteALogPackage/internal/log/log_test.go
 func​ testTruncate(t *testing.T, log *Log) {
  append := &api.Record{
  Value: []​byte​(​"hello world"​),
  }
 for​ i := 0; i < 3; i++ {
  _, err := log.Append(append)
  require.NoError(t, err)
  }
 
  err := log.Truncate(1)
  require.NoError(t, err)
 
  _, err = log.Read(0)
  require.Error(t, err)
 }

testTruncate(*testing.T, *log.Log) tests that we can truncate the log and remove old segments that we don’t need any more.

That wraps up our log code! We just wrote a log that’s not that watered down from the log that drives Kafka, and we didn’t even have to work too hard.