Ectoが2.0(beta)になって、concurrent acceptance testingができるようになったのでいくつか調べてみた。
参考にした元記事は以下。
この中で、Ectoのownershipのところが気になったのでいくつかコードを追ってみました。
Ownweshipを取得する時、 Ecto.Adapters.SQL.Sandbox.checkout/2 が呼ばれます。
これはEcto2.0で必要になったところですね。
def checkout(repo, opts \\ []) do
{name, opts} =
if Keyword.get(opts, :sandbox, true) do
proxy_pool(repo)
else
repo.__pool__
end
DBConnection.Ownership.ownership_checkout(name, opts)
end
ここの中で、もしsandboxであれば
defp proxy_pool(repo) do
{name, opts} = repo.__pool__
{pool, opts} = Keyword.pop(opts, :ownership_pool, DBConnection.Poolboy)
{name, [repo: repo, sandbox_pool: pool, ownership_pool: Pool] ++ opts}
end
にあるように、ownership_poolのキーを持つDBConnectionのプロセスをとってきます。
これは、以下のdb_connectionのコードを呼びます。
@spec ownership_checkout(GenServer.server, Keyword.t) ::
:ok | {:already, :owner | :allowed} | :error | no_return
def ownership_checkout(manager, opts) do
case Manager.checkout(manager, opts) do
{:init, owner} -> Owner.init(owner, opts)
{:already, _} = already -> already
end
end
この中で case されるのは以下。
@spec checkout(GenServer.server, Keyword.t) ::
{:init, pid} | {:already, :owner | :allowed}
def checkout(manager, opts) do
timeout = Keyword.get(opts, :pool_timeout, @timeout)
GenServer.call(manager, {:checkout, opts}, timeout)
end
この DBConnection.Ownership.Manager はGenServerになってて、その自身に対して call します。
:checkout が call されるところを探すと、以下がみつかります。
def handle_call({:checkout, opts}, {caller, _}, %{checkouts: checkouts} = state) do
if kind = already_checked_out(checkouts, caller) do
{:reply, {:already, kind}, state}
else
{owner, state} = checkout(state, caller, opts)
{:reply, {:init, owner}, state}
end
end
ここで、DBに対して処理を行うプロセスがcheckoutされていない場合、以下のprivateメソッドが呼ばれます。
defp checkout(state, caller, opts) do
%{pool: pool, owner_sup: owner_sup, checkouts: checkouts, owners: owners,
ets: ets} = state
{:ok, owner} = OwnerSupervisor.start_owner(owner_sup, caller, pool, opts)
ref = Process.monitor(owner)
checkouts = Map.put(checkouts, caller, {:owner, ref, owner})
owners = Map.put(owners, ref, {owner, caller, []})
ets && :ets.insert(ets, {caller, owner})
{owner, %{state | checkouts: checkouts, owners: owners}}
end
こう見ると、この段階でetsに対してcheckoutした、ということを保存するのですね。なるほど。
もう1つ。以下の通り allow されることがconcurrentlyにテストを実行する上では必要です。
これは、以下の通りドキュメントに書かれています。
https://hexdocs.pm/ecto/2.0.0-beta.1/Ecto.Adapters.SQL.Sandbox.html
Summing up
– Using allowances – requires explicit allowances via allow/3. Tests may run concurrently.
– Using shared mode – does not require explicit allowances. Tests cannot run concurrently.
このページにおいて、 allowances のところがさらには詳細を言及しています。
ということで、ここを追ってみます。
def allow(repo, owner, allow, _opts \\ []) do
{name, opts} = repo.__pool__
DBConnection.Ownership.ownership_allow(name, owner, allow, opts)
end
DBConnection.Ownership.ownership_allow/4 を見ると…
defdelegate ownership_allow(manager, owner, allow, opts), to: Manager, as: :allow
とデリゲートされて、以下に行き着きます。
@spec allow(GenServer.server, parent :: pid, allow :: pid, Keyword.t) ::
:ok | {:already, :owner | :allowed} | :not_found
def allow(manager, parent, allow, opts) do
timeout = Keyword.get(opts, :pool_timeout, @timeout)
GenServer.call(manager, {:allow, parent, allow}, timeout)
end
この中は、GenServerに :allow のtupleを渡します。
def handle_call({:allow, caller, allow}, _from, %{checkouts: checkouts} = state) do
if kind = already_checked_out(checkouts, allow) do
{:reply, {:already, kind}, state}
else
case Map.get(checkouts, caller, :not_found) do
{:owner, ref, owner} ->
{:reply, :ok, owner_allow(state, allow, ref, owner)}
{:allowed, ref, owner} ->
{:reply, :ok, owner_allow(state, allow, ref, owner)}
:not_found ->
{:reply, :not_found, state}
end
end
end
ここを見ると、まだcheckoutされていないDBコネクションのプロセスであれば、以下
defp owner_allow(%{ets: ets} = state, allow, ref, owner) do
state = put_in(state.checkouts[allow], {:allowed, ref, owner})
state = update_in(state.owners[ref], fn {owner, caller, allowed} ->
{owner, caller, [allow|List.delete(allowed, allow)]}
end)
ets && :ets.insert(ets, {allow, owner})
state
end
ここを見ると、最終的にetsにペアとなるprocess idが保存されます。
このように、ownerとなるプロセスと、実際にDBに処理を渡すプロセスが管理され、各々のDBコネクションをうまいこと回してconcurrentなテストを実施するようにしているのですね。
学び。