We’ll start by creating a supervised application:
| $ mix new --sup duper |
| $ cd duper |
| $ git init |
| $ git add . |
| $ git commit -a -m 'raw application' |
Time to start writing servers.
The results server wraps an Elixir map. When it starts, it sets its state to an empty map. The keys of this map are hash values, and the values are the list of one of more paths whose files have that hash.
The server provides two API calls: one to add a hash/path pair to the map, the second to retrieve entries that have more than one path in the value (as these are two duplicate files).
This is similar to the code we wrote for the sequence stash:
| defmodule Duper.Results do |
| |
| use GenServer |
| |
| @me __MODULE__ |
| |
| |
| # API |
| |
| def start_link(_) do |
| GenServer.start_link(__MODULE__, :no_args, name: @me) |
| end |
| |
| def add_hash_for(path, hash) do |
| GenServer.cast(@me, { :add, path, hash }) |
| end |
| |
| def find_duplicates() do |
| GenServer.call(@me, :find_duplicates) |
| end |
| |
| # Server |
| |
| def init(:no_args) do |
| { :ok, %{} } |
| end |
| |
| |
| def handle_cast({ :add, path, hash }, results) do |
| results = |
| Map.update( |
| results, # look in this map |
| hash, # for an entry with key |
| [ path ], # if not found, store this value |
| fn existing -> # else update with result of this fn |
| [ path | existing ] |
| end) |
| { :noreply, results } |
| end |
| |
| def handle_call(:find_duplicates, _from, results) do |
| { |
| :reply, |
| hashes_with_more_than_one_path(results), |
| results |
| } |
| end |
| |
| defp hashes_with_more_than_one_path(results) do |
| results |
| |> Enum.filter(fn { _hash, paths } -> length(paths) > 1 end) |
| |> Enum.map(&elem(&1, 1)) |
| end |
| |
| end |
The only mild magic in this code is the use of Map.update/4. This wonderful function takes a map, a key, an initial value, and a function. If the key is not present in the map, then a new map is returned with that key and initial value added. If the key is present, then the corresponding value is passed to the function, and whatever the function returns becomes the updated value in the returned map. In our case, we’re using it to create a single-element path list the first time a hash is encountered, and then to add paths to that list on duplicates.
We’ll add this server to the list of top-level children in application.ex.
| def start(_type, _args) do |
| children = [ |
» | Duper.Results, |
| ] |
| |
| opts = [strategy: :one_for_one, name: Duper.Supervisor] |
| Supervisor.start_link(children, opts) |
| end |
This code is easy to test:
| defmodule Duper.ResultsTest do |
| use ExUnit.Case |
| alias Duper.Results |
| |
| test "can add entries to the results" do |
| |
| Results.add_hash_for("path1", 123) |
| Results.add_hash_for("path2", 456) |
| Results.add_hash_for("path3", 123) |
| Results.add_hash_for("path4", 789) |
| Results.add_hash_for("path5", 456) |
| Results.add_hash_for("path6", 999) |
| |
| duplicates = Results.find_duplicates() |
| |
| assert length(duplicates) == 2 |
| |
| assert ~w{path3 path1} in duplicates |
| assert ~w{path5 path2} in duplicates |
| end |
| |
| end |
| $ mix test |
| ... |
| |
| Finished in 0.05 seconds |
| 1 doctest, 2 tests, 0 failures |
Structuring Tests | |
---|---|
I like the directory structure of my tests to follow the same structure as the code it is testing. Because results.ex is in lib/duper/results.ex, I put the test in a subdirectory of test, also called duper. |
Our next server is responsible for returning all the file paths in a filesystem tree, one at a time.
Elixir doesn’t have a filesystem-traversal API built in, so we look on ‘hex.pm‘ and find dir_walker,[36] which we just need to wrap in a trivial GenServer whose state is the directory walker’s PID. So we add the dependency to our mix.exs file:
| defp deps do |
| [ |
| dir_walker: "~> 0.0.7", |
| ] |
| end |
and code the server in lib/duper/path_finder.ex:
| defmodule Duper.PathFinder do |
| use GenServer |
| |
| @me PathFinder |
| |
| def start_link(root) do |
| GenServer.start_link(__MODULE__, root, name: @me) |
| end |
| |
| def next_path() do |
| GenServer.call(@me, :next_path) |
| end |
| |
| |
| def init(path) do |
| DirWalker.start_link(path) |
| end |
| |
| def handle_call(:next_path, _from, dir_walker) do |
| path = case DirWalker.next(dir_walker) do |
| [ path ] -> path |
| other -> other |
| end |
| |
| { :reply, path, dir_walker } |
| end |
| |
| end |
Finally we add the pathfinder server to our application’s list of children:
| def start(_type, _args) do |
| children = [ |
| Duper.Results, |
| { Duper.PathFinder, "." }, |
| ] |
| |
| opts = [strategy: :one_for_one, name: Duper.Supervisor] |
| Supervisor.start_link(children, opts) |
| end |
Notice we used a tuple to specify the PathFinder server. That’s because it requires us to pass in the root of the tree to be searched as a parameter. Here, I’m using the current working directory, “.”, which will work well for playing with the code.
When we investigated how we’d get things started,, we realized that we’d need a different supervisor for our workers. This supervisor will only manage the worker servers, and it will let us add servers dynamically, after the application has started.
The simplest way to do this is to use a DynamicSupervisor. This type of supervisor allows you to create an arbitrary number of workers at runtime. (A DynamicSupervisor encapsulates what used to be the :simple_one_for_one strategy in regular supervisors. You can still do it the old way, but DynamicSupervisors let you express your intent better.)
Let’s create the supervisor (in lib/duper/worker_supervisor.ex) and then see how it works.
| defmodule Duper.WorkerSupervisor do |
| use DynamicSupervisor |
| |
| @me WorkerSupervisor |
| |
| def start_link(_) do |
| DynamicSupervisor.start_link(__MODULE__, :no_args, name: @me) |
| end |
| |
| |
| def init(:no_args) do |
| DynamicSupervisor.init(strategy: :one_for_one) |
| end |
| |
| def add_worker() do |
| {:ok, _pid} = DynamicSupervisor.start_child(@me, Duper.Worker) |
| end |
| end |
The supervisor is just a regular Elixir module. It starts with use DynamicSupervisor, which gives it its super(visor) powers.
The start_link function works the same in a supervisor as it does in a GenServer: it is called to start the server containing the supervisor code. Inside this server, Elixir automatically calls the init callback, which in turn initializes the supervisor code itself. This initialization receives the supervisor options. In the case of a dynamic supervisor, this can only be strategy: one_for_one.
Later, we can call the add_worker function. This calls the supervisor, telling it to add another child based on the child specification we pass. In this case, we tell it to start Duper.Worker. A new server is created for each call, and these servers run in parallel. As a result, each time add_worker is called, a new Duper.Worker instance is spawned.
Note: One side effect of the fact that the same module is run in multiple child servers is we can’t give the children a name in their start_link function. If we did, then there’d be multiple servers with the same name, which Elixir doesn’t allow.
Let’s remember to add the supervisor to the list of top-level children.
| def start(_type, _args) do |
| children = [ |
| Duper.Results, |
| { Duper.PathFinder, "." }, |
| Duper.WorkerSupervisor, |
| ] |
| |
| opts = [strategy: :one_for_one, name: Duper.Supervisor] |
| Supervisor.start_link(children, opts) |
| end |
Whenever I add a child to a supervisor’s list, I stop and think about the supervision strategy: how do I want the failure of a child managed by this supervisor to affect the other children?
If the results server fails, then all is lost, and we have to restart everything. The same applies to the pathfinder: although we could in theory work out how far into the folder structure we were if it crashed, and restart from there, in practice that would be difficult, so for now we treat a failure of the pathfinder as a failure of the application.
What about the worker supervisor? Here we have to be careful. The worker supervisor handles the actual worker processes. If one of these fails, the worker supervisor simply restarts it and the application continues. But the failure of a worker does not mean that the worker supervisor itself has failed. In fact, in the very unlikely event that the worker supervisor fails, it’s probably best to assume we can’t continue and stop the application.
So, for all three children we have on this top-level supervisor, a failure means the application should stop. The strategy that enforces this is :one_for_all, so we change our code accordingly.
| def start(_type, _args) do |
| children = [ |
| Duper.Results, |
| { Duper.PathFinder, "." }, |
| Duper.WorkerSupervisor, |
| ] |
| |
» | opts = [strategy: :one_for_all, name: Duper.Supervisor] |
| Supervisor.start_link(children, opts) |
| end |
That’s all we’re going to do on the worker side of things for now. Let’s write the gatherer, and then circle back and implement the actual worker.
Looking at the sequence diagram, we can see that the gatherer is invoked by the workers. Each worker can tell the gatherer that it has run out of work (by calling done()) or it can give it the results of hashing a file.
The gatherer has one more function, not shown in the diagram. It is responsible for starting the workers, and it is responsible for determining when the application has finished processing the files.
To do this, it maintains a simple state: the number of worker servers that are currently running.
Knowing that, we can write most of the gatherer server:
| defmodule Duper.Gatherer do |
| use GenServer |
| |
| @me Gatherer |
| |
| # api |
| |
| def start_link(worker_count) do |
| GenServer.start_link(__MODULE__, worker_count, name: @me) |
| end |
| |
| def done() do |
| GenServer.cast(@me, :done) |
| end |
| |
| def result(path, hash) do |
| GenServer.cast(@me, { :result, path, hash }) |
| end |
| |
| # server |
| |
| def init(worker_count) do |
| { :ok, worker_count } |
| end |
| def handle_cast(:done, _worker_count = 1) do |
| report_results() |
| System.halt(0) |
| end |
| |
| def handle_cast(:done, worker_count) do |
| { :noreply, worker_count - 1 } |
| end |
| |
| def handle_cast({:result, path, hash}, worker_count) do |
| Duper.Results.add_hash_for(path, hash) |
| { :noreply, worker_count } |
| end |
| |
| defp report_results() do |
| IO.puts "Results:\n" |
| Duper.Results.find_duplicates() |
| |> Enum.each(&IO.inspect/1) |
| end |
| end |
See how the implementation of :done keeps track of the number of running workers? As each signals it is done the count is decremented, until the last :done is received, where we report the results and exit.
We could try something like this:
| def init(worker_count) do |
| 1..worker_count |
| |> Enum.each(fn _ -> Duper.WorkerSupervisor.add_worker() end) |
| { :ok, worker_count } |
| end |
However, this won’t work, and it won’t work in a fairly ugly way.
Remember when we described how supervisors started children in order? They wait for each child to initialize itself before starting the next.
In the preceding code, we’re still initializing the gatherer when we start adding workers. The workers may start running before the initialization of the gatherer finishes. In this case, messages they send to it may well get lost. If you’re only traversing a small filesystem tree, it is even possible that a worker might signal :done before the gatherer is ready, in which case the system will hang, because it will never know that it has finished.
The answer to this mess is to follow a simple rule: when you’re initializing a server, don’t interact with anything that uses that server.
So how do we get the workers running? The answer will be familiar to anyone who has written JavaScript—we arrange for a callback into our gatherer server after initialization is complete:
| def init(worker_count) do |
» | Process.send_after(self(), :kickoff, 0) |
| { :ok, worker_count } |
| end |
| |
» | def handle_info(:kickoff, worker_count) do |
» | 1..worker_count |
» | |> Enum.each(fn _ -> Duper.WorkerSupervisor.add_worker() end) |
» | { :noreply, worker_count } |
» | end |
Here the init function uses send_after to tell the runtime to queue a message to this server immediately (that is, after waiting 0 ms). When the init function exits, the server is then free to pick up this message, which triggers the handle_info callback, and the workers get started.
So, now that the gatherer code is ready, we just have to remember to start it:
| def start(_type, _args) do |
| children = [ |
| Duper.Results, |
| { Duper.PathFinder, "/Users/dave/Pictures" }, |
| Duper.WorkerSupervisor, |
| { Duper.Gatherer, 1 }, |
| ] |
| |
| opts = [strategy: :one_for_all, name: Duper.Supervisor] |
| Supervisor.start_link(children, opts) |
| end |
Referring back one last time to the sequence diagram, we can see that the workers are a little strange: they have no incoming API. All they do is ask for a path, compute the hash of the corresponding file, and send the hash to the gatherer. At some point, there are no paths left, so they then send a :done notification to the gatherer instead.
Here’s the code:
| defmodule Duper.Worker do |
| use GenServer, restart: :transient |
| |
| def start_link(_) do |
| GenServer.start_link(__MODULE__, :no_args) |
| end |
| |
| |
| def init(:no_args) do |
| Process.send_after(self(), :do_one_file, 0) |
| { :ok, nil } |
| end |
| |
| def handle_info(:do_one_file, _) do |
| Duper.PathFinder.next_path() |
| |> add_result() |
| end |
| |
| defp add_result(nil) do |
| Duper.Gatherer.done() |
| {:stop, :normal, nil} |
| end |
| |
| defp add_result(path) do |
| Duper.Gatherer.result(path, hash_of_file_at(path)) |
| send(self(), :do_one_file) |
| { :noreply, nil } |
| end |
| |
| defp hash_of_file_at(path) do |
| File.stream!(path, [], 1024*1024) |
| |> Enum.reduce( |
| :crypto.hash_init(:md5), |
| fn (block, hash) -> |
| :crypto.hash_update(hash, block) |
| end) |
| |> :crypto.hash_final() |
| end |
| end |
Notice we use the same trick in the init() function to call back into ourselves, invoking handle_info(:do_one_file,_). This function asks the pathfinder for the next file, and then passes the returned value to add_result().
If the pathfinder returns nil, it has run out of files, so we tell the gatherer that we’re done. Otherwise, we call a private function to calculate the hash of the file contents, pass the path and the hash to the gatherer, and then send ourselves another :do_one_file message, causing the whole process to repeat.
Why Can’t We Just Write a Looping Function? | |
---|---|
We can implement a loop in Elixir using a recursive function call. But the worker server doesn’t do this. Instead, it sends itself a message and then exits after processing each file. The reason is that the Elixir runtime won’t let any one invocation of a server hog the CPU forever. Instead it sets a timeout on each call or cast into a GenServer (by default 5 seconds). If the call or cast handler has not returned in that time, the runtime assumes something has gone wrong and terminates the server. Processing a million files in a loop will take more than 5 seconds. So we instead just process one file per entry into the server, and then queue up another message to process the next on a fresh entry. The result: no timeouts. |
One other thing to note—we flagged this server as being transient:
| use GenServer, restart: :transient |
This means that the supervisor will not restart it if it terminates normally, but will restart it if it fails.