[Elixir]PubSub機構をサクッとつくる

Elixirは receive があらかじめ用意されている。そのため、 send で特定のプロセスに送ったメッセージを受け取って、何らかの処理をするというコードが容易にかける。それを使い、PubSubをするコード。


defmodule PubSubServer do
def start(subscriber_callback \\ nil) do
spawn __MODULE__, :run, [[], subscriber_callback]
end
def publish(server, message) do
send server, {:publish, message}
end
def subscribe(server, handler) do
send server, {:subscribe, self}
listen(handler)
end
def listen(handler) do
receive do
message ->
handler.(message)
listen handler
end
end
def run(subscribers, subscriber_callback) do
receive do
{:publish, message} ->
Enum.each(subscribers, &(send(&1, message)))
run subscribers, subscriber_callback
{:subscribe, subscriber} ->
if subscriber_callback, do: subscriber_callback.(subscriber_callback)
run [subscriber | subscribers], subscriber_callback
end
end
end
main = self
server = PubSubServer.start(fn subscriber ->
send main, {:subscriber, subscriber}
end)
listener_count = 10
Stream.repeatedly(fn ->
spawn(fn ->
PubSubServer.subscribe(server, fn message ->
IO.puts "#{inspect self} received: #{message}"
send main, {:written, message}
end)
end)
end) |> Enum.take(listener_count)
Stream.repeatedly(fn ->
receive do
{:subscriber, _} -> true
end
end) |> Enum.take(listener_count)
PubSubServer.publish(server, "Hello everyone!")
Stream.repeatedly(fn ->
receive do
{:written, _} -> true
end
end) |> Enum.take(listener_count)

以下の記事を参考にしました。

http://tech.noredink.com/post/141444822213/pubsub-in-30-lines-of-elixir

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.