We left off in the last post having written a few extra storage combinators, including a CachingStore that lets us check multiple stores (one as a cache store, one as a main source-of-truth store). But all our stores are in-memory only, which means we’ll lose all our data when the store processes finish.
Let’s fix that by writing some stores that persist their data to disk. We’ll start with a naive implementation, FilesystemStore, before building the much more sophisticated LogStore.
In the spirit of “make it work, then make it better”, we’ll write a super naive disk store with FilesystemStore:
Let’s start with a struct and constructor function:
/lib/matryoshka/impl/filesystem_store.ex
1defmodule Matryoshka.Impl.FilesystemStore do
2 alias Matryoshka.Reference
3 alias Matryoshka.Storage
4
5 @enforce_keys [:root_dir]
6 defstruct @enforce_keys
7
8 @type t :: %__MODULE__{
9 root_dir: Path.t()
10 }
11
12 @spec filesystem_store(Path.t()) :: t()
13 def filesystem_store(root_dir) do
14 %__MODULE__{root_dir: root_dir}
15 end
16 ...
Since the provided refs are going to be considered as paths relative to the root directory of the filesystem store, let’s build a function to turn these paths into absolute paths:
/lib/matryoshka/impl/filesystem_store.ex
16 ...
17 @spec absolute_path(t(), Reference.t()) :: Path.t()
18 def absolute_path(store, ref) do
19 path_segments = [store.root_dir | Reference.path_segments(ref)]
20 Path.join(path_segments)
21 end
22 ...
Then we can implement the Storage methods by reading from files. We first find the absolute path to the file that stores the value by combining the root directory of the store with the reference, then read the file and return the value.
/lib/matryoshka/impl/filesystem_store.ex
22 alias __MODULE__
23
24 defimpl Storage do
25 def fetch(store, ref) do
26 path = FilesystemStore.absolute_path(store, ref)
27
28 with {:ok, value} <- File.read(path) do
29 {store, {:ok, value}}
30 else
31 {:error, _reason} -> {store, {:error, {:no_ref, ref}}}
32 end
33 end
34
35 def get(store, ref) do
36 path = FilesystemStore.absolute_path(store, ref)
37
38 with {:ok, value} <- File.read(path) do
39 {store, value}
40 else
41 {:error, _reason} -> {store, nil}
42 end
43 end
44 ...
To update the values (i.e. by putting a new value in or deleting the current value), we do the same absolute filepath operation, then either write to the file (making sure to create the parent directories) or delete the file.
/lib/matryoshka/impl/filesystem_store.ex
44 ...
45 def put(store, ref, value) when is_binary(value) do
46 path = FilesystemStore.absolute_path(store, ref)
47 parent_dir = Path.dirname(path)
48 File.mkdir_p(parent_dir)
49 File.write(path, value)
50
51 store
52 end
53
54 def delete(store, ref) do
55 path = FilesystemStore.absolute_path(store, ref)
56 _result = File.rm(path)
57 store
58 end
59 end
60end
However, there’s a few issues with FilesystemStore. For a start, it only works with stringified keys and values. If we want to use it to store maps, lists, even integers, we’d need to compose it with a MappingStore—something like a JsonStore or an XmlStore that maps Elixir terms into JSONified strings or XML documents.
Also it uses a new file for every value—there’s definitely use-cases for that since we may want to store quite large values, but it seems wasteful on small values.
Oh, and there’s no history of the puts and deletes in the store—once a value is overwritten, it’s gone forever. It would be nice to be able to step through the history of a value for things like auditing and bug-hunting.
We’ll build LogStore with these limitations in mind.
The code in this implementation is heavily inspired by the approach in Build a Simple Persistent Key-Value Store in Elixir, using Logs – Part 2. I’ve made some upgrades to deal with arbitrary terms for keys and values, along with deleting values, but this article helped me immensely in understanding how append-only log-backed key-values stores work, along with giving me ideas on how to structure my log entries.
The secret sauce to LogStore is :erlang.term_to_binary/1
and :erlang.binary_to_term/1
, which encode and decode Erlang (and Elixir) terms to and from binary. This lets us serialize any kinds of references and values we want to the log file, which fixes the first issue with FilesystemStore (only being able to read and write with string references and string values).
Whenever we update data (via put/3
or delete/2
on LogStore), we’ll append a log entry, in binary encoding, to the log file. Since we’re not overwriting the data that stores the current state, we’ll have a timeline of all the values that have been recorded in LogStore. Whenever we retrieve a value, we use the most up-to-date version of that value, but we could theoretically parse through the log file and show the history of a value for a given reference—and all the changes are timestamped.
After appending the entry to the log file, we’ll then store the position and size (both in bytes) of the value in an index map (with shape reference -> {position, size}
) which will allow us to read exactly size
bytes from the file every time we want to retrieve (get or fetch) a value.
LogStore is a lot more complicated than all the other stores we’ve written, so I broke it up into four modules:
Module | Functionality |
---|---|
Encoding | Define how entries in the log file are stored. |
Deserialize | Read data from the log file. |
Serialize | Write data to the log file. |
LogStore | Implement the Storage protocol by translating storage calls (put, get, fetch, delete) into reads and writes on the log file. |
The Encoding module is mainly for bookkeeping of things like how we format our log entries. We’ll have two kinds of log entries: writes (for when we put data into the LogStore), and deletes (for when we delete data from the LogStore).
Both kinds of log entries will start with a timestamp, then a one-letter atom which represents whether the rest of the log entry is a write entry (:w
) or a delete entry (:d
).
Then we need some more metadata to indicate the size of the rest of the entry—if the log entry is a write, we’ll need to store both the size of the key and the size of the value, but if the log entry is a delete, we’ll only need to store the size of the key.
Finally we have the actual data:
All integers will be stored as unsigned (since neither Unix timestamps nor sizes can be negative) big-endian integers.
So the entries look like this:
Timestamps are stored in a 64-bit int because that’s enough space to hold a millisecond-precision Unix timestamp.
/lib/matryoshka/impl/log_store/encoding.ex
1defmodule Matryoshka.Impl.LogStore.Encoding do
2 @timestamp_bitsize 64
3 def timestamp_bitsize, do: @timestamp_bitsize
4
5 @time_unit :millisecond
6 def time_unit, do: @time_unit
7 ...
Oh, and we’re also providing functions in Encoding that let the Deserialize and Serialize modules access the module attributes that we’re defining—normally, module attributes are only accessible inside the module that defines them, but since we’re using Encoding to store all our magic numbers (with names!) we need to be able to use those values in other modules.
We’ll define the maximum key and value lengths to be 2^16 bits (~66 kB) and 2^32 bits (~4.3 GB) respectively, so we’ll store the key and value lengths in a 16-bit int and a 32-bit int.
/lib/matryoshka/impl/log_store/encoding.ex
7 ...
8 @key_bitsize 16
9 def key_bitsize, do: @key_bitsize
10
11 @value_bitsize 32
12 def value_bitsize, do: @value_bitsize
13 ...
Since we’re using single-letter atoms to represent write vs. delete entries in the log file, we need 4 bytes to store them (single-letter atoms have a length of 4 after converting them to binary with :erlang.term_to_binary/1
).
/lib/matryoshka/impl/log_store/encoding.ex
13 ...
14 @atom_bytesize 4
15 def atom_bytesize, do: @atom_bytesize
16
17 @atom_write :w
18 def atom_write, do: @atom_write
19 def atom_write_binary, do: :erlang.term_to_binary(@atom_write)
20
21 @atom_delete :d
22 def atom_delete, do: @atom_delete
23 def atom_delete_binary, do: :erlang.term_to_binary(@atom_delete)
24 ...
We also need a helper function to calculate how many bits are in a byte, which will be useful for calculating entry sizes:
/lib/matryoshka/impl/log_store/encoding.ex
24 ...
25 def bits_to_bytes(bits), do: div(bits, 8)
26 ...
We want to minimise how much data the LogStore has to read from disk whenever we retrieve a value. To do so, we’ll be storing an in-memory index, which maps references to a combination of file position (where the value starts in the file) and the value size. Whenever we want to retrieve the value for a reference, we’ll go to the file position for that reference, then read the number of bytes the index tells us.
By adding together the position of the start of the write entry with the size of the write entry (without the value), we’ll get the absolute position of the value, which we can save in our index, so that we know what position to start reading the file from in order to retrieve the value.
As such, whenever we’re appending entries to the log file, or reading the log file on a cold start, we’ll need to be able to calculate the sizes of the delete entry, the write entry, or the size of the write entry up until the value starts.
/lib/matryoshka/impl/log_store/encoding.ex
26 ...
27 def delete_entry_size(key_size) do
28 Enum.sum([
29 bits_to_bytes(@timestamp_bitsize),
30 atom_bytesize(),
31 bits_to_bytes(@key_bitsize),
32 key_size
33 ])
34 end
35
36 def write_entry_size(key_size, value_size) do
37 Enum.sum([
38 bits_to_bytes(@timestamp_bitsize),
39 atom_bytesize(),
40 bits_to_bytes(@key_bitsize),
41 bits_to_bytes(@value_bitsize),
42 key_size,
43 value_size
44 ])
45 end
46
47 def write_entry_pre_value_size(key_size) do
48 Enum.sum([
49 bits_to_bytes(@timestamp_bitsize),
50 atom_bytesize(),
51 bits_to_bytes(@key_bitsize),
52 bits_to_bytes(@value_bitsize),
53 key_size
54 ])
55 end
56end
Now that we’ve defined our log entry formats, let’s work on serializing them to disk. We’ll start with a function to retrieve the system time, then serialize it to binary:
/lib/matryoshka/impl/log_store/serialize.ex
1defmodule Matryoshka.Impl.LogStore.Serialize do
2 alias Matryoshka.Impl.LogStore.Encoding
3 import :erlang, only: [term_to_binary: 1]
4
5 # ------------------------ Timestamp -----------------------
6
7 def binary_timestamp() do
8 timestamp = System.system_time(Encoding.time_unit())
9 <<timestamp::big-unsigned-integer-size(Encoding.timestamp_bitsize())>>
10 end
11 ...
Then we’d also like functions for formatting the log entries in binary. These functions will return a tuple in the shape {entry, entry_size, value_size}
where:
/lib/matryoshka/impl/log_store/serialize.ex
11 ...
12 # ----------------------- Formatting -----------------------
13
14 def format_write_log_entry(key, value) do
15 {key_size, key_size_data, key} = pack_key(key)
16 {value_size, value_size_data, value} = pack_value(value)
17
18 entry =
19 Enum.join([
20 Encoding.atom_write_binary(),
21 key_size_data,
22 value_size_data,
23 key,
24 value
25 ])
26
27 entry_size = Encoding.write_entry_pre_value_size(key_size)
28
29 {prepend_timestamp(entry), entry_size, value_size}
30 end
31
32 def format_delete_log_entry(key) do
33 {key_size, key_size_data, key} = pack_key(key)
34
35 entry =
36 Enum.join([
37 Encoding.atom_delete_binary(),
38 key_size_data,
39 key
40 ])
41
42 entry_size = Encoding.delete_entry_size(key_size)
43
44 {prepend_timestamp(entry), entry_size, nil}
45 end
46 ...
These entry formatting functions will need some helper functions. We want to prepend the timestamp to all log entries, which will be done with prepend_timestamp/1
:
/lib/matryoshka/impl/log_store/serialize.ex
46 ...
47 def prepend_timestamp(data) when is_binary(data) do
48 timestamp = binary_timestamp()
49 timestamp <> data
50 end
51 ...
We also want some helper functions to pack keys and values into an unsigned int, with a size of a given bit width (16 bits for keys, 32 bits for values):
size
is the byte size of the binary-encoded term.size_data
is the binary-encoded size using int_size
bits.binary_term
is the binary representation of the term./lib/matryoshka/impl/log_store/serialize.ex
51 ...
52 def pack_term(term, int_size) do
53 binary = term |> term_to_binary()
54 size = byte_size(binary)
55 size_data = <<size::big-unsigned-integer-size(int_size)>>
56 {size, size_data, binary}
57 end
58
59 def pack_key(key), do: pack_term(key, Encoding.key_bitsize())
60
61 def pack_value(value), do: pack_term(value, Encoding.value_bitsize())
62 ...
Now that we can format log entries, we just need to append entries to the log file:
/lib/matryoshka/impl/log_store/serialize.ex
62 ...
63 # ------------------- Writing to Log File ------------------
64
65 def append_write_log_entry(fd, key, value) do
66 {entry, relative_offset, value_size} = format_write_log_entry(key, value)
67 IO.binwrite(fd, entry)
68 {relative_offset, value_size}
69 end
70
71 def append_delete_log_entry(fd, key) do
72 {entry, relative_offset, value_size} = format_delete_log_entry(key)
73 IO.binwrite(fd, entry)
74 {relative_offset, value_size}
75 end
76end
We’ve finished defining how to write log entries, now it’s time to define how to read them.
We’ll start with some helper functions with IO:
handle_io_result/2
applies a function to the result of IO.binread only if there’s no errorbinread_then_map/3
reads a set number of bytes from a file, then applies a function to those bytes—this lets us incrementally read and parse data/lib/matryoshka/impl/log_store/deserialize.ex
1defmodule Matryoshka.Impl.LogStore.Deserialize do
2 alias Matryoshka.Impl.LogStore.Encoding
3 import :erlang, only: [binary_to_term: 1]
4
5 # __________________ Reading from Log File _________________
6
7 # ----------------------- IO Helpers -----------------------
8
9 def handle_io_result(:eof, _fun), do: :eof
10 def handle_io_result({:error, reason}, _fun), do: {:error, reason}
11 def handle_io_result(bytes, fun), do: {:ok, fun.(bytes)}
12
13 def binread_then_map(fd, number_bytes, fun) do
14 bytes = IO.binread(fd, number_bytes)
15 handle_io_result(bytes, fun)
16 end
17 ...
Next, we’ll add some functions to read and parse data from the log file. These parse the binary into simple types—integers, atoms, timestamps:
/lib/matryoshka/impl/log_store/deserialize.ex
17 ...
18 # ------------------ Reading Elixir Types ------------------
19
20 def read_big_unsigned_integer(fd, int_size) do
21 number_bytes = Encoding.bits_to_bytes(int_size)
22
23 binread_then_map(fd, number_bytes, fn bytes ->
24 <<int::big-unsigned-integer-size(int_size)>> = bytes
25 int
26 end)
27 end
28
29 def read_atom(fd) do
30 atom_bytesize = Encoding.atom_bytesize()
31
32 binread_then_map(
33 fd,
34 atom_bytesize,
35 fn bytes ->
36 <<binary_atom::binary-size(atom_bytesize)>> = bytes
37 atom = binary_to_term(binary_atom)
38 atom
39 end
40 )
41 end
42
43 def read_timestamp(fd) do
44 timestamp_bitsize = Encoding.timestamp_bitsize()
45
46 with {:ok, timestamp_int} <- read_big_unsigned_integer(fd, timestamp_bitsize) do
47 DateTime.from_unix(timestamp_int, Encoding.time_unit())
48 else
49 other -> other
50 end
51 end
52 ...
But the most important part is reading the log entries. When we read a log entry, we consume the timestamp (which currently, we do nothing with) and the entry kind atom, which lets us split the rest of the entry reading to either read_write_entry/1
(if the atom is :w
) or read_delete_entry/1
(if the atom is :d
). If there’s a different atom encoded, we’ll return an error value.
/lib/matryoshka/impl/log_store/deserialize.ex
52 ...
53 # ------------------- Reading Log Entries ------------------
54
55 # .................... Read Entire Entry ....................
56
57 def read_log_entry(fd) do
58 _timestamp = read_timestamp(fd)
59 entry_kind = read_atom(fd)
60 atom_write = Encoding.atom_write()
61 atom_delete = Encoding.atom_delete()
62
63 case entry_kind do
64 {:ok, ^atom_write} -> read_write_entry(fd)
65 {:ok, ^atom_delete} -> read_delete_entry(fd)
66 {:ok, atom} -> {:error, {:no_entry_kind, atom}}
67 other -> other
68 end
69 end
70 ...
For write entries, we need to read:
:erlang.binary_to_term/1
Then we return the parsed entry in the form {:w, key, value}
.
/lib/matryoshka/impl/log_store/deserialize.ex
70 ...
71 def read_write_entry(fd) do
72 with {:ok, key_size} <-
73 read_big_unsigned_integer(fd, Encoding.key_bitsize()),
74 {:ok, value_size} <-
75 read_big_unsigned_integer(fd, Encoding.value_bitsize()),
76 {:ok, key} <-
77 binread_then_map(fd, key_size, &binary_to_term/1),
78 {:ok, value} <-
79 binread_then_map(fd, value_size, &binary_to_term/1) do
80 {:ok, {Encoding.atom_write(), key, value}}
81 else
82 error -> error
83 end
84 end
85 ...
For delete entries, we only need to read the key size and the key, parse the key into a term, then return the parsed entry in the form {:d, key}
.
/lib/matryoshka/impl/log_store/deserialize.ex
85 ...
86 def read_delete_entry(fd) do
87 with {:ok, key_size} <-
88 read_big_unsigned_integer(fd, Encoding.key_bitsize()),
89 {:ok, key} <-
90 binread_then_map(fd, key_size, &binary_to_term/1) do
91 {:ok, {Encoding.atom_delete(), key}}
92 else
93 error -> error
94 end
95 end
96 ...
Now that we can read one entry at a time, we need the ability to read through the entire log file, keeping track of:
These will eventually become the index in LogStore, a map of type reference -> {value_position, value_size}
.
We start with a helper function load_offsets/1
which takes the file descriptor of the log file. load_offsets/1
starts off loading the offsets with an empty index map and a position of 0 (i.e. because we’re at the beginning of the file):
/lib/matryoshka/impl/log_store/deserialize.ex
95 ...
96 # ............... Load Offsets and Value Size ..............
97
98 def load_offsets(fd) do
99 load_offsets(fd, Map.new(), 0)
100 end
101 ...
OK, so now, how do we read the log file?
Firstly, let’s set our position in the file to the current offset that was passed in. We’ve already read up to this position, so we should continue reading the file from it.
/lib/matryoshka/impl/log_store/deserialize.ex
101 ...
102 def load_offsets(fd, offsets, current_offset) do
103 :file.position(fd, current_offset)
104 ...
Afterwards, we’ll read the timestamp and entry-type atom, which we’ll use to read the key term, key size, and value size from the entry (more on this later):
/lib/matryoshka/impl/log_store/deserialize.ex
104 ...
105 with {:ok, _timestamp} <- read_timestamp(fd),
106 {:ok, entry_kind} <- read_atom(fd),
107 {:ok, {key, key_size, value_size}} <-
108 load_offsets_entry(fd, entry_kind) do
109 ...
Now, it’s time to calculate the various sizes we need. We’ll need two sizes:
relative_offset_to_value
, which is calculated as the size of the log entry from the beginning of the timestamp to the beginning of the value
relative_offset_to_end
, which is calculated as the size of the whole log entry (including the value for write entries)/lib/matryoshka/impl/log_store/deserialize.ex
109 ...
110 relative_offset_to_value =
111 case value_size do
112 nil ->
113 Encoding.delete_entry_size(key_size)
114
115 _nonzero ->
116 Encoding.write_entry_pre_value_size(key_size)
117 end
118
119 relative_offset_to_end =
120 case value_size do
121 nil -> Encoding.delete_entry_size(key_size)
122 value_size -> Encoding.write_entry_size(key_size, value_size)
123 end
124 ...
Now we need to calculate the value_offset
, which is the absolute position of the value in the log file. This is equal to the current offset (which is set to the starting position of the timestamp). We then put the information into the offsets map, which we’ll later use as the index in LogStore:
/lib/matryoshka/impl/log_store/deserialize.ex
124 ...
125 value_offset =
126 current_offset + relative_offset_to_value
127
128 offsets = Map.put(offsets, key, {value_offset, value_size})
129 ...
And then we need to calculate the position of the end of the entry, so that we can continue the recursion with the current offset set to the start of the next entry. We continue until we reach an :eof
which informs us that we’ve traversed the entire log file, and therefore we can return the offsets:
/lib/matryoshka/impl/log_store/deserialize.ex
129 ...
130 absolute_offset =
131 current_offset + relative_offset_to_end
132
133 load_offsets(fd, offsets, absolute_offset)
134 else
135 :eof -> offsets
136 end
137 end
138 ...
OK so that’s all well and good, but we still need to fill in the loading of the offset entries. load_offsets_entry/2
delegates the offset loading to either load_offsets_write_entry/1
or load_offsets_write_entry/1
depending on the entry-type atom:
/lib/matryoshka/impl/log_store/deserialize.ex
138 ...
139 def load_offsets_entry(fd, entry_kind) do
140 atom_write = Encoding.atom_write()
141 atom_delete = Encoding.atom_delete()
142
143 case entry_kind do
144 ^atom_write -> load_offsets_write_entry(fd)
145 ^atom_delete -> load_offsets_delete_entry(fd)
146 atom when is_atom(atom) -> {:error, {:no_lin_kind, atom}}
147 other -> other
148 end
149 end
150 ...
To load the offsets for a write entry, we read:
Then return this info in a tuple {key, key_size, value_size}
:
/lib/matryoshka/impl/log_store/deserialize.ex
150 ...
151 def load_offsets_write_entry(fd) do
152 with {:ok, key_size} <-
153 read_big_unsigned_integer(fd, Encoding.key_bitsize()),
154 {:ok, value_size} <-
155 read_big_unsigned_integer(fd, Encoding.value_bitsize()) do
156 binread_then_map(fd, key_size, fn key_bin ->
157 key = binary_to_term(key_bin)
158 {key, key_size, value_size}
159 end)
160 end
161 end
162 ...
To load the offsets for a delete entry, we read:
And we also set the value size to nil
since there’s no value in a delete entry. We’ll take advantage of this when retrieving values in LogStore, as we’ll know that we can return an error for fetch/2
or nil
for get/2
.
load_offsets_delete_entry/1
then returns this info in a tuple {key, key_size, nil}
:
/lib/matryoshka/impl/log_store/deserialize.ex
160 ...
161 def load_offsets_delete_entry(fd) do
162 key_size = read_big_unsigned_integer(fd, Encoding.key_bitsize())
163
164 binread_then_map(fd, key_size, fn key_bin ->
165 key = binary_to_term(key_bin)
166 {key, key_size, nil}
167 end)
168 end
169 ...
Now we’re done with indexing the log file. We just need to add a function to use the value offset and value size data to read values from the log file, which we do in get_value/3
:
size
bytes from the position offset
using the Erlang function :file.pread/3
:eof
or {:error, reason}
)/lib/matryoshka/impl/log_store/deserialize.ex
169 ...
170 # ----------------- Read Value at Position -----------------
171
172 def get_value(fd, offset, size) when not is_nil(size) do
173 with {:ok, bin} <- :file.pread(fd, offset, size) do
174 {:ok, binary_to_term(bin)}
175 else
176 other -> other
177 end
178 end
179end
Finally it’s time to wrap up all this functionality into the LogStore.
The LogStore will need 3 components:
fetch/2
and get/2
put/3
and delete/2
reference -> {value position, value size}
we receive from Deserialize.load_offsets/1
.We start, as always, with a struct and a constructor function.
We do want to be careful with log_store/1
, however. If the log file already exists, we want to load the offsets into the LogStore index beforehand, and open the writer in append mode (so that all writes go to the end of the file). If the log file doesn’t already exist, we’ll open the writer in write mode (which creates an empty log file) before recreating the reader and initializing an empty index map.
/lib/matryoshka/impl/log_store/log_store.ex
1defmodule Matryoshka.Impl.LogStore do
2 alias Matryoshka.Impl.LogStore.Deserialize
3 alias Matryoshka.Impl.LogStore.Serialize
4 alias Matryoshka.Storage
5
6 @enforce_keys [:reader, :writer, :index]
7 defstruct @enforce_keys
8
9 @type t :: %__MODULE__{
10 reader: File.io_device(),
11 writer: File.io_device(),
12 index: map()
13 }
14
15 def log_store(log_filepath) do
16 {reader, writer, index} =
17 case File.open(log_filepath, [:binary, :read]) do
18 {:ok, reader} ->
19 index = Deserialize.load_offsets(reader)
20 {:ok, writer} = File.open(log_filepath, [:binary, :append])
21 {reader, writer, index}
22
23 {:error, _reason} ->
24 {:ok, writer} = File.open(log_filepath, [:binary, :write])
25 {:ok, reader} = File.open(log_filepath, [:binary, :read])
26 index = Map.new()
27 {reader, writer, index}
28 end
29
30 %__MODULE__{reader: reader, writer: writer, index: index}
31 end
32 ...
Now, let’s implement the storage protocols. fetch/2
is the most complicated as we want to return a reason when there’s an error retrieving the value. We tag both steps in the with
macro with an atom (:index
or :store
) so that we can pattern match on the individual steps and reshape any errors into our {:error, reason}
format. On line 48 you can see that if the index map returns a nil
, we know that the value has been deleted, so we return a “no reference” error without needing to check the log file.
/lib/matryoshka/impl/log_store/log_store.ex
32 ...
33 defimpl Storage do
34 def fetch(store, ref) do
35 value =
36 with {:index, {:ok, {offset, size}}} when not is_nil(size) <-
37 {:index, Map.fetch(store.index, ref)},
38 {:store, {:ok, value}} <-
39 {:store,
40 Deserialize.get_value(
41 store.reader,
42 offset,
43 size
44 )} do
45 value
46 else
47 {:index, :error} -> {:error, {:no_ref, ref}}
48 {:index, {:ok, {_position, nil}}} -> {:error, {:no_ref, ref}}
49 {:store, {:error, reason}} -> {:error, reason}
50 {:store, :eof} -> {:error, :eof}
51 end
52
53 {store, value}
54 end
55 ...
get/2
on the other hand is much easier, as we simply return nil
on any error:
Map.fetch(store.index, ref)
)/lib/matryoshka/impl/log_store/log_store.ex
55 ...
56 def get(store, ref) do
57 value =
58 with {:ok, {offset, size}} when not is_nil(size) <-
59 Map.fetch(store.index, ref),
60 {:ok, value} <-
61 Deserialize.get_value(store.reader, offset, size) do
62 value
63 else
64 _ -> nil
65 end
66
67 {store, value}
68 end
69 ...
put/2
and delete/2
are even more simple. We append an entry (write for puts, delete for deletes) to the log file, update the index with the value information, then return the updated LogStore:
/lib/matryoshka/impl/log_store/log_store.ex
69 ...
70 def put(store, ref, value) do
71 {position, size} = Serialize.append_write_log_entry(store.writer, ref, value)
72 index = Map.put(store.index, ref, {position, size})
73 %{store | index: index}
74 end
75
76 def delete(store, ref) do
77 {position, size} = Serialize.append_delete_log_entry(store.writer, ref)
78 index = Map.put(store.index, ref, {position, size})
79 %{store | index: index}
80 end
81 end
82end
And with that, we have finally finished writing the append-only-log-backed store LogStore.
We’ll cap off this post with an example of a new store we construct with LogStore: a persistent store that uses LogStore as a disk-based source of truth, with caching functionality backed by an in-memory MapStore.
/lib/matryoshka/impl/persistent_store.ex
1defmodule Matryoshka.Impl.PersistentStore do
2 alias Matryoshka.Impl.CachingStore
3 alias Matryoshka.Impl.LogStore
4
5 def persistent_store(log_filepath) do
6 LogStore.log_store(log_filepath)
7 |> CachingStore.caching_store()
8 end
9end
It’s as easy as that. Whenever we want to compose stores and store combinators together, we can just pass them into each-other using good old fashioned function calls and the Elixir pipeline operator.
Now that we’ve written the business logic for FilesystemStore, LogStore, and PersistentStore, it’s time to expose them in the Matryoshka module so users of our library can consume them:
/lib/matryoshka.ex
17 ...
18 # Business logic
19 defdelegate backup_store(source_store, target_stores), to: BackupStore
20 defdelegate caching_store(main_store), to: CachingStore
21 defdelegate caching_store(main_store, cache_store), to: CachingStore
22 defdelegate filesystem_store(root_dir), to: FilesystemStore
23 defdelegate logging_store(store), to: LoggingStore
24 defdelegate log_store(log_filepath), to: LogStore
25 defdelegate map_store(), to: MapStore
26 defdelegate map_store(map), to: MapStore
27 defdelegate mapping_store(store, opts), to: MappingStore
28 defdelegate pass_through(store), to: PassThrough
29 defdelegate persistent_store(log_filepath), to: PersistentStore
30 defdelegate switching_store(path_store_map), to: SwitchingStore
31end
OK, we’ve got some disk stores now that will let us persist data, keeping it safe between restarts. Why not go the complete opposite direction and build some remote stores? That is, let’s build some stores which compute their storage call results by sending network requests to other servers.
We’ll be doing that in the next post in this series.
You can see the latest version of Matryoshka at my GitHub.