Matryoshka 3: Persisting Data to Disk

We left off in the last post having written a few extra storage combinators, including a CachingStore that lets us check multiple stores (one as a cache store, one as a main source-of-truth store). But all our stores are in-memory only, which means we’ll lose all our data when the store processes finish.

Let’s fix that by writing some stores that persist their data to disk. We’ll start with a naive implementation, FilesystemStore, before building the much more sophisticated LogStore.

FilesystemStore

In the spirit of “make it work, then make it better”, we’ll write a super naive disk store with FilesystemStore:

Let’s start with a struct and constructor function:

/lib/matryoshka/impl/filesystem_store.ex

 1defmodule Matryoshka.Impl.FilesystemStore do
 2  alias Matryoshka.Reference
 3  alias Matryoshka.Storage
 4
 5  @enforce_keys [:root_dir]
 6  defstruct @enforce_keys
 7
 8  @type t :: %__MODULE__{
 9          root_dir: Path.t()
10        }
11
12  @spec filesystem_store(Path.t()) :: t()
13  def filesystem_store(root_dir) do
14    %__MODULE__{root_dir: root_dir}
15  end
16  ...

Since the provided refs are going to be considered as paths relative to the root directory of the filesystem store, let’s build a function to turn these paths into absolute paths:

/lib/matryoshka/impl/filesystem_store.ex

16  ...
17  @spec absolute_path(t(), Reference.t()) :: Path.t()
18  def absolute_path(store, ref) do
19    path_segments = [store.root_dir | Reference.path_segments(ref)]
20    Path.join(path_segments)
21  end
22  ...

Then we can implement the Storage methods by reading from files. We first find the absolute path to the file that stores the value by combining the root directory of the store with the reference, then read the file and return the value.

/lib/matryoshka/impl/filesystem_store.ex

22  alias __MODULE__
23
24  defimpl Storage do
25    def fetch(store, ref) do
26      path = FilesystemStore.absolute_path(store, ref)
27
28      with {:ok, value} <- File.read(path) do
29        {store, {:ok, value}}
30      else
31        {:error, _reason} -> {store, {:error, {:no_ref, ref}}}
32      end
33    end
34
35    def get(store, ref) do
36      path = FilesystemStore.absolute_path(store, ref)
37
38      with {:ok, value} <- File.read(path) do
39        {store, value}
40      else
41        {:error, _reason} -> {store, nil}
42      end
43    end
44    ...

To update the values (i.e. by putting a new value in or deleting the current value), we do the same absolute filepath operation, then either write to the file (making sure to create the parent directories) or delete the file.

/lib/matryoshka/impl/filesystem_store.ex

44    ...
45    def put(store, ref, value) when is_binary(value) do
46      path = FilesystemStore.absolute_path(store, ref)
47      parent_dir = Path.dirname(path)
48      File.mkdir_p(parent_dir)
49      File.write(path, value)
50
51      store
52    end
53
54    def delete(store, ref) do
55      path = FilesystemStore.absolute_path(store, ref)
56      _result = File.rm(path)
57      store
58    end
59  end
60end

However, there’s a few issues with FilesystemStore. For a start, it only works with stringified keys and values. If we want to use it to store maps, lists, even integers, we’d need to compose it with a MappingStore—something like a JsonStore or an XmlStore that maps Elixir terms into JSONified strings or XML documents.

Also it uses a new file for every value—there’s definitely use-cases for that since we may want to store quite large values, but it seems wasteful on small values.

Oh, and there’s no history of the puts and deletes in the store—once a value is overwritten, it’s gone forever. It would be nice to be able to step through the history of a value for things like auditing and bug-hunting.

We’ll build LogStore with these limitations in mind.

LogStore

The code in this implementation is heavily inspired by the approach in Build a Simple Persistent Key-Value Store in Elixir, using Logs – Part 2. I’ve made some upgrades to deal with arbitrary terms for keys and values, along with deleting values, but this article helped me immensely in understanding how append-only log-backed key-values stores work, along with giving me ideas on how to structure my log entries.

The secret sauce to LogStore is :erlang.term_to_binary/1 and :erlang.binary_to_term/1, which encode and decode Erlang (and Elixir) terms to and from binary. This lets us serialize any kinds of references and values we want to the log file, which fixes the first issue with FilesystemStore (only being able to read and write with string references and string values).

Whenever we update data (via put/3 or delete/2 on LogStore), we’ll append a log entry, in binary encoding, to the log file. Since we’re not overwriting the data that stores the current state, we’ll have a timeline of all the values that have been recorded in LogStore. Whenever we retrieve a value, we use the most up-to-date version of that value, but we could theoretically parse through the log file and show the history of a value for a given reference—and all the changes are timestamped.

After appending the entry to the log file, we’ll then store the position and size (both in bytes) of the value in an index map (with shape reference -> {position, size}) which will allow us to read exactly size bytes from the file every time we want to retrieve (get or fetch) a value.

LogStore is a lot more complicated than all the other stores we’ve written, so I broke it up into four modules:

Module Functionality
Encoding Define how entries in the log file are stored.
Deserialize Read data from the log file.
Serialize Write data to the log file.
LogStore Implement the Storage protocol by translating storage calls (put, get, fetch, delete) into reads and writes on the log file.

Encoding

The Encoding module is mainly for bookkeeping of things like how we format our log entries. We’ll have two kinds of log entries: writes (for when we put data into the LogStore), and deletes (for when we delete data from the LogStore).

Both kinds of log entries will start with a timestamp, then a one-letter atom which represents whether the rest of the log entry is a write entry (:w) or a delete entry (:d).

Then we need some more metadata to indicate the size of the rest of the entry—if the log entry is a write, we’ll need to store both the size of the key and the size of the value, but if the log entry is a delete, we’ll only need to store the size of the key.

Finally we have the actual data:

All integers will be stored as unsigned (since neither Unix timestamps nor sizes can be negative) big-endian integers.

So the entries look like this:

C C S a S a e t e t g e g e m S T g m S T g e i y o e i y o n z p r n z p r t e e y t e e y W T 8 6 M D T 8 6 M r i 4 e e i 4 e i m b t l m b t t e y b a e e y b a e s t i d t s t i d t e t a e t e t a S a s t a s t c m i a S m i a h p n c p n e t h t m e a m a W 4 A D 4 A R t E t I b o L b o T y m E y m E t T t e b E e b s i s i n n a a r r y y K 2 1 K 2 1 e 6 e 6 y b y b y b y b S t i S t i i e t i e t z s z s e i e i n n t t V 4 3 K A B D a 2 e r i a l b y b n t u y b i a a e t i t r e t r y S s a i i r e z n y n e t c s o i d z e K A B D e d e r i a y b n t t i a a e t r r r y m a r e V y n a c l s o u i d e z e e d t e r m

Timestamps are stored in a 64-bit int because that’s enough space to hold a millisecond-precision Unix timestamp.

/lib/matryoshka/impl/log_store/encoding.ex

1defmodule Matryoshka.Impl.LogStore.Encoding do
2  @timestamp_bitsize 64
3  def timestamp_bitsize, do: @timestamp_bitsize
4  
5  @time_unit :millisecond
6  def time_unit, do: @time_unit
7  ...

Oh, and we’re also providing functions in Encoding that let the Deserialize and Serialize modules access the module attributes that we’re defining—normally, module attributes are only accessible inside the module that defines them, but since we’re using Encoding to store all our magic numbers (with names!) we need to be able to use those values in other modules.

We’ll define the maximum key and value lengths to be 2^16 bits (~66 kB) and 2^32 bits (~4.3 GB) respectively, so we’ll store the key and value lengths in a 16-bit int and a 32-bit int.

/lib/matryoshka/impl/log_store/encoding.ex

 7  ...
 8  @key_bitsize 16
 9  def key_bitsize, do: @key_bitsize
10
11  @value_bitsize 32
12  def value_bitsize, do: @value_bitsize
13  ...

Since we’re using single-letter atoms to represent write vs. delete entries in the log file, we need 4 bytes to store them (single-letter atoms have a length of 4 after converting them to binary with :erlang.term_to_binary/1).

/lib/matryoshka/impl/log_store/encoding.ex

13  ...
14  @atom_bytesize 4
15  def atom_bytesize, do: @atom_bytesize
16
17  @atom_write :w
18  def atom_write, do: @atom_write
19  def atom_write_binary, do: :erlang.term_to_binary(@atom_write)
20
21  @atom_delete :d
22  def atom_delete, do: @atom_delete
23  def atom_delete_binary, do: :erlang.term_to_binary(@atom_delete)
24  ...

We also need a helper function to calculate how many bits are in a byte, which will be useful for calculating entry sizes:

/lib/matryoshka/impl/log_store/encoding.ex

24  ...
25  def bits_to_bytes(bits), do: div(bits, 8)
26  ...

We want to minimise how much data the LogStore has to read from disk whenever we retrieve a value. To do so, we’ll be storing an in-memory index, which maps references to a combination of file position (where the value starts in the file) and the value size. Whenever we want to retrieve the value for a reference, we’ll go to the file position for that reference, then read the number of bytes the index tells us.

By adding together the position of the start of the write entry with the size of the write entry (without the value), we’ll get the absolute position of the value, which we can save in our index, so that we know what position to start reading the file from in order to retrieve the value.

R e s t P o o s f i t f i i o l n e . o . f . w r i T t i e m e e s n t t a r m y p W R I S T i E z e o f w r i t K e e y e n S t i r z y e p r e - v P V a o a l s l u i u e t e i o S n i z o e f v a K l e u y e ( s a v V e a d l u i e n i n d e x )

As such, whenever we’re appending entries to the log file, or reading the log file on a cold start, we’ll need to be able to calculate the sizes of the delete entry, the write entry, or the size of the write entry up until the value starts.

/lib/matryoshka/impl/log_store/encoding.ex

26  ...
27  def delete_entry_size(key_size) do
28    Enum.sum([
29      bits_to_bytes(@timestamp_bitsize),
30      atom_bytesize(),
31      bits_to_bytes(@key_bitsize),
32      key_size
33    ])
34  end
35
36  def write_entry_size(key_size, value_size) do
37    Enum.sum([
38      bits_to_bytes(@timestamp_bitsize),
39      atom_bytesize(),
40      bits_to_bytes(@key_bitsize),
41      bits_to_bytes(@value_bitsize),
42      key_size,
43      value_size
44    ])
45  end
46
47  def write_entry_pre_value_size(key_size) do
48    Enum.sum([
49      bits_to_bytes(@timestamp_bitsize),
50      atom_bytesize(),
51      bits_to_bytes(@key_bitsize),
52      bits_to_bytes(@value_bitsize),
53      key_size
54    ])
55  end
56end

Serialize

Now that we’ve defined our log entry formats, let’s work on serializing them to disk. We’ll start with a function to retrieve the system time, then serialize it to binary:

/lib/matryoshka/impl/log_store/serialize.ex

 1defmodule Matryoshka.Impl.LogStore.Serialize do
 2  alias Matryoshka.Impl.LogStore.Encoding
 3  import :erlang, only: [term_to_binary: 1]
 4
 5  # ------------------------ Timestamp -----------------------
 6
 7  def binary_timestamp() do
 8    timestamp = System.system_time(Encoding.time_unit())
 9    <<timestamp::big-unsigned-integer-size(Encoding.timestamp_bitsize())>>
10  end
11  ...

Then we’d also like functions for formatting the log entries in binary. These functions will return a tuple in the shape {entry, entry_size, value_size} where:

/lib/matryoshka/impl/log_store/serialize.ex

11  ...
12  # ----------------------- Formatting -----------------------
13
14  def format_write_log_entry(key, value) do
15    {key_size, key_size_data, key} = pack_key(key)
16    {value_size, value_size_data, value} = pack_value(value)
17
18    entry =
19      Enum.join([
20        Encoding.atom_write_binary(),
21        key_size_data,
22        value_size_data,
23        key,
24        value
25      ])
26
27    entry_size = Encoding.write_entry_pre_value_size(key_size)
28
29    {prepend_timestamp(entry), entry_size, value_size}
30  end
31
32  def format_delete_log_entry(key) do
33    {key_size, key_size_data, key} = pack_key(key)
34
35    entry =
36      Enum.join([
37        Encoding.atom_delete_binary(),
38        key_size_data,
39        key
40      ])
41
42    entry_size = Encoding.delete_entry_size(key_size)
43
44    {prepend_timestamp(entry), entry_size, nil}
45  end
46  ...

These entry formatting functions will need some helper functions. We want to prepend the timestamp to all log entries, which will be done with prepend_timestamp/1:

/lib/matryoshka/impl/log_store/serialize.ex

46  ...
47  def prepend_timestamp(data) when is_binary(data) do
48    timestamp = binary_timestamp()
49    timestamp <> data
50  end
51  ...

We also want some helper functions to pack keys and values into an unsigned int, with a size of a given bit width (16 bits for keys, 32 bits for values):

/lib/matryoshka/impl/log_store/serialize.ex

51  ...
52  def pack_term(term, int_size) do
53    binary = term |> term_to_binary()
54    size = byte_size(binary)
55    size_data = <<size::big-unsigned-integer-size(int_size)>>
56    {size, size_data, binary}
57  end
58
59  def pack_key(key), do: pack_term(key, Encoding.key_bitsize())
60
61  def pack_value(value), do: pack_term(value, Encoding.value_bitsize())
62  ...

Now that we can format log entries, we just need to append entries to the log file:

/lib/matryoshka/impl/log_store/serialize.ex

62  ...
63  # ------------------- Writing to Log File ------------------
64
65  def append_write_log_entry(fd, key, value) do
66    {entry, relative_offset, value_size} = format_write_log_entry(key, value)
67    IO.binwrite(fd, entry)
68    {relative_offset, value_size}
69  end
70
71  def append_delete_log_entry(fd, key) do
72    {entry, relative_offset, value_size} = format_delete_log_entry(key)
73    IO.binwrite(fd, entry)
74    {relative_offset, value_size}
75  end
76end

We’ve finished defining how to write log entries, now it’s time to define how to read them.

Deserialize

We’ll start with some helper functions with IO:

/lib/matryoshka/impl/log_store/deserialize.ex

 1defmodule Matryoshka.Impl.LogStore.Deserialize do
 2  alias Matryoshka.Impl.LogStore.Encoding
 3  import :erlang, only: [binary_to_term: 1]
 4
 5  # __________________ Reading from Log File _________________
 6
 7  # ----------------------- IO Helpers -----------------------
 8
 9  def handle_io_result(:eof, _fun), do: :eof
10  def handle_io_result({:error, reason}, _fun), do: {:error, reason}
11  def handle_io_result(bytes, fun), do: {:ok, fun.(bytes)}
12
13  def binread_then_map(fd, number_bytes, fun) do
14    bytes = IO.binread(fd, number_bytes)
15    handle_io_result(bytes, fun)
16  end
17  ...

Next, we’ll add some functions to read and parse data from the log file. These parse the binary into simple types—integers, atoms, timestamps:

/lib/matryoshka/impl/log_store/deserialize.ex

17  ...
18  # ------------------ Reading Elixir Types ------------------
19
20  def read_big_unsigned_integer(fd, int_size) do
21    number_bytes = Encoding.bits_to_bytes(int_size)
22
23    binread_then_map(fd, number_bytes, fn bytes ->
24      <<int::big-unsigned-integer-size(int_size)>> = bytes
25      int
26    end)
27  end
28
29  def read_atom(fd) do
30    atom_bytesize = Encoding.atom_bytesize()
31
32    binread_then_map(
33      fd,
34      atom_bytesize,
35      fn bytes ->
36        <<binary_atom::binary-size(atom_bytesize)>> = bytes
37        atom = binary_to_term(binary_atom)
38        atom
39      end
40    )
41  end
42
43  def read_timestamp(fd) do
44    timestamp_bitsize = Encoding.timestamp_bitsize()
45
46    with {:ok, timestamp_int} <- read_big_unsigned_integer(fd, timestamp_bitsize) do
47      DateTime.from_unix(timestamp_int, Encoding.time_unit())
48    else
49      other -> other
50    end
51  end
52  ...

But the most important part is reading the log entries. When we read a log entry, we consume the timestamp (which currently, we do nothing with) and the entry kind atom, which lets us split the rest of the entry reading to either read_write_entry/1 (if the atom is :w) or read_delete_entry/1 (if the atom is :d). If there’s a different atom encoded, we’ll return an error value.

/lib/matryoshka/impl/log_store/deserialize.ex

52  ...
53  # ------------------- Reading Log Entries ------------------
54
55  # .................... Read Entire Entry ....................
56
57  def read_log_entry(fd) do
58    _timestamp = read_timestamp(fd)
59    entry_kind = read_atom(fd)
60    atom_write = Encoding.atom_write()
61    atom_delete = Encoding.atom_delete()
62
63    case entry_kind do
64      {:ok, ^atom_write} -> read_write_entry(fd)
65      {:ok, ^atom_delete} -> read_delete_entry(fd)
66      {:ok, atom} -> {:error, {:no_entry_kind, atom}}
67      other -> other
68    end
69  end
70  ...

For write entries, we need to read:

  1. The key size (16 bit int)
  2. The value size (32 bit int)
  3. The key, which we parse from binary into a term using :erlang.binary_to_term/1
  4. The value, which we also parse from binary into a term

Then we return the parsed entry in the form {:w, key, value}.

/lib/matryoshka/impl/log_store/deserialize.ex

70  ...
71  def read_write_entry(fd) do
72    with {:ok, key_size} <- 
73           read_big_unsigned_integer(fd, Encoding.key_bitsize()),
74         {:ok, value_size} <- 
75           read_big_unsigned_integer(fd, Encoding.value_bitsize()),
76         {:ok, key} <-
77           binread_then_map(fd, key_size, &binary_to_term/1),
78         {:ok, value} <-
79           binread_then_map(fd, value_size, &binary_to_term/1) do
80      {:ok, {Encoding.atom_write(), key, value}}
81    else
82      error -> error
83    end
84  end
85  ...

For delete entries, we only need to read the key size and the key, parse the key into a term, then return the parsed entry in the form {:d, key}.

/lib/matryoshka/impl/log_store/deserialize.ex

85  ...
86  def read_delete_entry(fd) do
87    with {:ok, key_size} <- 
88           read_big_unsigned_integer(fd, Encoding.key_bitsize()),
89         {:ok, key} <-
90           binread_then_map(fd, key_size, &binary_to_term/1) do
91      {:ok, {Encoding.atom_delete(), key}}
92    else
93      error -> error
94    end
95  end
96  ...

Now that we can read one entry at a time, we need the ability to read through the entire log file, keeping track of:

These will eventually become the index in LogStore, a map of type reference -> {value_position, value_size}.

We start with a helper function load_offsets/1 which takes the file descriptor of the log file. load_offsets/1 starts off loading the offsets with an empty index map and a position of 0 (i.e. because we’re at the beginning of the file):

/lib/matryoshka/impl/log_store/deserialize.ex

 95  ...
 96  # ............... Load Offsets and Value Size ..............
 97
 98  def load_offsets(fd) do
 99    load_offsets(fd, Map.new(), 0)
100  end
101  ...

OK, so now, how do we read the log file?

Firstly, let’s set our position in the file to the current offset that was passed in. We’ve already read up to this position, so we should continue reading the file from it.

/lib/matryoshka/impl/log_store/deserialize.ex

101  ...
102  def load_offsets(fd, offsets, current_offset) do
103    :file.position(fd, current_offset)
104    ...

Afterwards, we’ll read the timestamp and entry-type atom, which we’ll use to read the key term, key size, and value size from the entry (more on this later):

/lib/matryoshka/impl/log_store/deserialize.ex

104    ...
105    with {:ok, _timestamp} <- read_timestamp(fd),
106         {:ok, entry_kind} <- read_atom(fd),
107         {:ok, {key, key_size, value_size}} <-
108           load_offsets_entry(fd, entry_kind) do
109      ...

Now, it’s time to calculate the various sizes we need. We’ll need two sizes:

  1. relative_offset_to_value, which is calculated as the size of the log entry from the beginning of the timestamp to the beginning of the value
    • Delete entries have no value, so this is calculated as the whole delete entry size
  2. relative_offset_to_end, which is calculated as the size of the whole log entry (including the value for write entries)

/lib/matryoshka/impl/log_store/deserialize.ex

109      ...
110      relative_offset_to_value =
111        case value_size do
112          nil ->
113            Encoding.delete_entry_size(key_size)
114
115          _nonzero ->
116            Encoding.write_entry_pre_value_size(key_size)
117        end
118
119      relative_offset_to_end =
120        case value_size do
121          nil -> Encoding.delete_entry_size(key_size)
122          value_size -> Encoding.write_entry_size(key_size, value_size)
123        end
124      ...

Now we need to calculate the value_offset, which is the absolute position of the value in the log file. This is equal to the current offset (which is set to the starting position of the timestamp). We then put the information into the offsets map, which we’ll later use as the index in LogStore:

/lib/matryoshka/impl/log_store/deserialize.ex

124      ...
125      value_offset =
126        current_offset + relative_offset_to_value
127
128      offsets = Map.put(offsets, key, {value_offset, value_size})
129      ...

And then we need to calculate the position of the end of the entry, so that we can continue the recursion with the current offset set to the start of the next entry. We continue until we reach an :eof which informs us that we’ve traversed the entire log file, and therefore we can return the offsets:

/lib/matryoshka/impl/log_store/deserialize.ex

129      ...
130      absolute_offset =
131        current_offset + relative_offset_to_end
132
133      load_offsets(fd, offsets, absolute_offset)
134    else
135      :eof -> offsets
136    end
137  end
138  ...

OK so that’s all well and good, but we still need to fill in the loading of the offset entries. load_offsets_entry/2 delegates the offset loading to either load_offsets_write_entry/1 or load_offsets_write_entry/1 depending on the entry-type atom:

/lib/matryoshka/impl/log_store/deserialize.ex

138  ...
139  def load_offsets_entry(fd, entry_kind) do
140    atom_write = Encoding.atom_write()
141    atom_delete = Encoding.atom_delete()
142
143    case entry_kind do
144      ^atom_write -> load_offsets_write_entry(fd)
145      ^atom_delete -> load_offsets_delete_entry(fd)
146      atom when is_atom(atom) -> {:error, {:no_lin_kind, atom}}
147      other -> other
148    end
149  end
150  ...

To load the offsets for a write entry, we read:

  1. The key size
  2. The value size
  3. The key, which we parse from binary into a term

Then return this info in a tuple {key, key_size, value_size}:

/lib/matryoshka/impl/log_store/deserialize.ex

150  ...
151  def load_offsets_write_entry(fd) do
152    with {:ok, key_size} <- 
153           read_big_unsigned_integer(fd, Encoding.key_bitsize()),
154         {:ok, value_size} <- 
155           read_big_unsigned_integer(fd, Encoding.value_bitsize()) do
156      binread_then_map(fd, key_size, fn key_bin ->
157        key = binary_to_term(key_bin)
158        {key, key_size, value_size}
159      end)
160    end
161  end
162  ...

To load the offsets for a delete entry, we read:

  1. The key size
  2. The key, which we parse from binary into a term

And we also set the value size to nil since there’s no value in a delete entry. We’ll take advantage of this when retrieving values in LogStore, as we’ll know that we can return an error for fetch/2 or nil for get/2.

load_offsets_delete_entry/1 then returns this info in a tuple {key, key_size, nil}:

/lib/matryoshka/impl/log_store/deserialize.ex

160  ...
161  def load_offsets_delete_entry(fd) do
162    key_size = read_big_unsigned_integer(fd, Encoding.key_bitsize())
163
164    binread_then_map(fd, key_size, fn key_bin ->
165      key = binary_to_term(key_bin)
166      {key, key_size, nil}
167    end)
168  end
169  ...

Now we’re done with indexing the log file. We just need to add a function to use the value offset and value size data to read values from the log file, which we do in get_value/3:

  1. We read size bytes from the position offset using the Erlang function :file.pread/3
  2. Then, if the bytes are successfully read, we convert the bytes back into a term and return it
  3. Otherwise, it returns the error (:eof or {:error, reason})

/lib/matryoshka/impl/log_store/deserialize.ex

169  ...
170  # ----------------- Read Value at Position -----------------
171
172  def get_value(fd, offset, size) when not is_nil(size) do
173    with {:ok, bin} <- :file.pread(fd, offset, size) do
174      {:ok, binary_to_term(bin)}
175    else
176      other -> other
177    end
178  end
179end

Combining into LogStore

Finally it’s time to wrap up all this functionality into the LogStore.

The LogStore will need 3 components:

  1. A reader, which will read values from the log file on fetch/2 and get/2
  2. A writer, which will append new entries to the log file on put/3 and delete/2
  3. An index, which will store the mapping reference -> {value position, value size} we receive from Deserialize.load_offsets/1.

We start, as always, with a struct and a constructor function.

We do want to be careful with log_store/1, however. If the log file already exists, we want to load the offsets into the LogStore index beforehand, and open the writer in append mode (so that all writes go to the end of the file). If the log file doesn’t already exist, we’ll open the writer in write mode (which creates an empty log file) before recreating the reader and initializing an empty index map.

/lib/matryoshka/impl/log_store/log_store.ex

 1defmodule Matryoshka.Impl.LogStore do
 2  alias Matryoshka.Impl.LogStore.Deserialize
 3  alias Matryoshka.Impl.LogStore.Serialize
 4  alias Matryoshka.Storage
 5
 6  @enforce_keys [:reader, :writer, :index]
 7  defstruct @enforce_keys
 8
 9  @type t :: %__MODULE__{
10          reader: File.io_device(),
11          writer: File.io_device(),
12          index: map()
13        }
14
15  def log_store(log_filepath) do
16    {reader, writer, index} =
17      case File.open(log_filepath, [:binary, :read]) do
18        {:ok, reader} ->
19          index = Deserialize.load_offsets(reader)
20          {:ok, writer} = File.open(log_filepath, [:binary, :append])
21          {reader, writer, index}
22
23        {:error, _reason} ->
24          {:ok, writer} = File.open(log_filepath, [:binary, :write])
25          {:ok, reader} = File.open(log_filepath, [:binary, :read])
26          index = Map.new()
27          {reader, writer, index}
28      end
29
30    %__MODULE__{reader: reader, writer: writer, index: index}
31  end
32  ...

Now, let’s implement the storage protocols. fetch/2 is the most complicated as we want to return a reason when there’s an error retrieving the value. We tag both steps in the with macro with an atom (:index or :store) so that we can pattern match on the individual steps and reshape any errors into our {:error, reason} format. On line 48 you can see that if the index map returns a nil, we know that the value has been deleted, so we return a “no reference” error without needing to check the log file.

/lib/matryoshka/impl/log_store/log_store.ex

32  ...
33  defimpl Storage do
34    def fetch(store, ref) do
35      value =
36        with {:index, {:ok, {offset, size}}} when not is_nil(size) <-
37               {:index, Map.fetch(store.index, ref)},
38             {:store, {:ok, value}} <-
39               {:store,
40                Deserialize.get_value(
41                  store.reader,
42                  offset,
43                  size
44                )} do
45          value
46        else
47          {:index, :error} -> {:error, {:no_ref, ref}}
48          {:index, {:ok, {_position, nil}}} -> {:error, {:no_ref, ref}}
49          {:store, {:error, reason}} -> {:error, reason}
50          {:store, :eof} -> {:error, :eof}
51        end
52
53      {store, value}
54    end
55    ...

get/2 on the other hand is much easier, as we simply return nil on any error:

/lib/matryoshka/impl/log_store/log_store.ex

55    ...
56    def get(store, ref) do
57      value =
58        with {:ok, {offset, size}} when not is_nil(size) <-
59               Map.fetch(store.index, ref),
60             {:ok, value} <-
61               Deserialize.get_value(store.reader, offset, size) do
62          value
63        else
64          _ -> nil
65        end
66
67      {store, value}
68    end
69    ...

put/2 and delete/2 are even more simple. We append an entry (write for puts, delete for deletes) to the log file, update the index with the value information, then return the updated LogStore:

/lib/matryoshka/impl/log_store/log_store.ex

69    ...
70    def put(store, ref, value) do
71      {position, size} = Serialize.append_write_log_entry(store.writer, ref, value)
72      index = Map.put(store.index, ref, {position, size})
73      %{store | index: index}
74    end
75
76    def delete(store, ref) do
77      {position, size} = Serialize.append_delete_log_entry(store.writer, ref)
78      index = Map.put(store.index, ref, {position, size})
79      %{store | index: index}
80    end
81  end
82end

And with that, we have finally finished writing the append-only-log-backed store LogStore.

Packaging into PersistentStore

We’ll cap off this post with an example of a new store we construct with LogStore: a persistent store that uses LogStore as a disk-based source of truth, with caching functionality backed by an in-memory MapStore.

/lib/matryoshka/impl/persistent_store.ex

1defmodule Matryoshka.Impl.PersistentStore do
2  alias Matryoshka.Impl.CachingStore
3  alias Matryoshka.Impl.LogStore
4
5  def persistent_store(log_filepath) do
6    LogStore.log_store(log_filepath)
7    |> CachingStore.caching_store()
8  end
9end

It’s as easy as that. Whenever we want to compose stores and store combinators together, we can just pass them into each-other using good old fashioned function calls and the Elixir pipeline operator.

Exposing to Matryoshka consumers

Now that we’ve written the business logic for FilesystemStore, LogStore, and PersistentStore, it’s time to expose them in the Matryoshka module so users of our library can consume them:

/lib/matryoshka.ex

17  ...
18  # Business logic
19  defdelegate backup_store(source_store, target_stores), to: BackupStore
20  defdelegate caching_store(main_store), to: CachingStore
21  defdelegate caching_store(main_store, cache_store), to: CachingStore
22  defdelegate filesystem_store(root_dir), to: FilesystemStore
23  defdelegate logging_store(store), to: LoggingStore
24  defdelegate log_store(log_filepath), to: LogStore
25  defdelegate map_store(), to: MapStore
26  defdelegate map_store(map), to: MapStore
27  defdelegate mapping_store(store, opts), to: MappingStore
28  defdelegate pass_through(store), to: PassThrough
29  defdelegate persistent_store(log_filepath), to: PersistentStore
30  defdelegate switching_store(path_store_map), to: SwitchingStore
31end

Next steps

OK, we’ve got some disk stores now that will let us persist data, keeping it safe between restarts. Why not go the complete opposite direction and build some remote stores? That is, let’s build some stores which compute their storage call results by sending network requests to other servers.

We’ll be doing that in the next post in this series.

You can see the latest version of Matryoshka at my GitHub.