From 5ba91fecec283003690072773638f2f143a2b0fa Mon Sep 17 00:00:00 2001 From: kballou Date: Mon, 20 Oct 2014 18:56:23 -0600 Subject: Add event manager and event notification The PoolParty module was turned into an application that creates a generic genEvent server, passing it to the supervisor. The supervisor, in turn, distributes the genevent server to all child processes, etc. This allows all processes to send events to the manager for logging, etc. --- lib/poolparty.ex | 8 ++++++++ lib/poolparty/pool/supervisor.ex | 8 ++++---- lib/poolparty/pool/worker.ex | 12 ++++++------ lib/poolparty/scheduler.ex | 12 ++++++++---- lib/poolparty/supervisor.ex | 13 ++++++++----- 5 files changed, 34 insertions(+), 19 deletions(-) diff --git a/lib/poolparty.ex b/lib/poolparty.ex index 293f579..4cc0430 100644 --- a/lib/poolparty.ex +++ b/lib/poolparty.ex @@ -1,2 +1,10 @@ defmodule PoolParty do + use Application + require Logger + + def start(_type, _args) do + Logger.debug("[#{__MODULE__}]: Starting a Pool Party!") + {:ok, event_manager} = GenEvent.start_link() + PoolParty.Supervisor.start_link(event_manager) + end end diff --git a/lib/poolparty/pool/supervisor.ex b/lib/poolparty/pool/supervisor.ex index 3132f46..751a942 100644 --- a/lib/poolparty/pool/supervisor.ex +++ b/lib/poolparty/pool/supervisor.ex @@ -2,17 +2,17 @@ defmodule PoolParty.Pool.Supervisor do use Supervisor require Logger - def start_link(pool_size, opts \\ []) do + def start_link(pool_size, event_manager, opts \\ []) do Logger.debug("[#{__MODULE__}]: Starting Work Pool Supervisor") - Supervisor.start_link(__MODULE__, {pool_size}, opts) + Supervisor.start_link(__MODULE__, {pool_size, event_manager}, opts) end - def init({pool_size}) do + def init({pool_size, event_manager}) do Logger.debug("[#{__MODULE__}]: Initializing Work Pool Supervisor") children = (1..pool_size) |> Enum.map(fn (id) -> Logger.debug("[#{__MODULE__}]: Starting child worker: #{id}") - worker(PoolParty.Pool.Worker, [], id: id) + worker(PoolParty.Pool.Worker, [event_manager], id: id) end) supervise(children, strategy: :one_for_one) end diff --git a/lib/poolparty/pool/worker.ex b/lib/poolparty/pool/worker.ex index 5ae4161..2131f8e 100644 --- a/lib/poolparty/pool/worker.ex +++ b/lib/poolparty/pool/worker.ex @@ -2,15 +2,15 @@ defmodule PoolParty.Pool.Worker do use GenServer require Logger - def start_link(opts \\ []) do + def start_link(event_manager, opts \\ []) do Logger.debug("[#{__MODULE__}]: Starting worker") - GenServer.start_link(__MODULE__, {}, opts) + GenServer.start_link(__MODULE__, {event_manager}, opts) end - def init(_) do + def init({event_manager}) do Logger.debug("[#{__MODULE__}]: Initializing Worker") PoolParty.Scheduler.join(self) - {:ok, nil} + {:ok, %{events: event_manager}} end def process(pid, function, args) do @@ -18,10 +18,10 @@ defmodule PoolParty.Pool.Worker do GenServer.cast(pid, {:compute, function, args}) end - def handle_cast({:compute, function, args}, _) do + def handle_cast({:compute, function, args}, state) do Logger.debug("[#{__MODULE__}]: Process request received") PoolParty.Scheduler.ready({:result, function.(args), self}) - {:noreply, nil} + {:noreply, state} end end diff --git a/lib/poolparty/scheduler.ex b/lib/poolparty/scheduler.ex index 225556a..dc9e1b3 100644 --- a/lib/poolparty/scheduler.ex +++ b/lib/poolparty/scheduler.ex @@ -2,20 +2,21 @@ defmodule PoolParty.Scheduler do use GenServer require Logger - def start_link(pool_size, opts \\ []) do + def start_link(pool_size, event_manager, opts \\ []) do Logger.debug("[#{__MODULE__}]: Starting Pool Scheduler") GenServer.start_link( __MODULE__, - {pool_size}, + {pool_size, event_manager}, [name: __MODULE__] ++ opts) end - def init({pool_size}) do + def init({pool_size, event_manager}) do Logger.debug("[#{__MODULE__}]: Initializing Pool Scheduler") {:ok, %{max_pool_size: pool_size, workers: [], queue: [], - processing: HashDict.new()} + processing: HashDict.new(), + events: event_manager} } end @@ -36,6 +37,7 @@ defmodule PoolParty.Scheduler do def handle_cast({:process, func, args, from}, state) do Logger.debug("[#{__MODULE__}]: Work request received") + GenEvent.notify(state.events, {:work_queued, func, args, from}) queue = state.queue ++ [{func, args, from}] case length(state.workers) do 0 -> @@ -54,11 +56,13 @@ defmodule PoolParty.Scheduler do def handle_cast({:join, pid}, state) do Logger.debug("[#{__MODULE__}]: Worker joined pool") + GenEvent.notify(state.events, {:worker_joining, pid}) {:noreply, %{state| workers: [pid | state.workers]}} end def handle_cast({:leave, pid}, state) do Logger.debug("[#{__MODULE__}]: Worker left pool") + GenEvent.notify(state.events, {:worker_leaving, pid}) {:noreply, %{state| workers: state.workers -- [pid]}} end diff --git a/lib/poolparty/supervisor.ex b/lib/poolparty/supervisor.ex index 9995a3d..712f0dc 100644 --- a/lib/poolparty/supervisor.ex +++ b/lib/poolparty/supervisor.ex @@ -2,17 +2,20 @@ defmodule PoolParty.Supervisor do use Supervisor require Logger - def start_link(opts \\ []) do + def start_link(event_manager, opts \\ []) do Logger.debug("[#{__MODULE__}]: Starting Pool Party Supervisor") - Supervisor.start_link(__MODULE__, {}, [name: __MODULE__] ++ opts) + Supervisor.start_link( + __MODULE__, + {event_manager}, + [name: __MODULE__] ++ opts) end - def init(_) do + def init({event_manager}) do Logger.debug("[#{__MODULE__}]: Initializing Pool Party Supervisor") pool_size = Application.get_env(:poolparty, :pool_size) Logger.debug("[#{__MODULE__}]: Pool size: #{pool_size}") - children = [worker(PoolParty.Scheduler, [pool_size]), - worker(PoolParty.Pool.Supervisor, [pool_size])] + children = [worker(PoolParty.Scheduler, [pool_size, event_manager]), + worker(PoolParty.Pool.Supervisor, [pool_size, event_manager])] supervise(children, strategy: :one_for_one) end end -- cgit v1.2.1