summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkballou <kballou@devnulllabs.io>2014-10-20 18:56:23 -0600
committerkballou <kballou@devnulllabs.io>2014-10-20 18:56:23 -0600
commit5ba91fecec283003690072773638f2f143a2b0fa (patch)
tree057a1d695256043546ce652f9419f23f2100ca22
parent482e23b272bbb5221e09634c16ab0be9edd40592 (diff)
downloadpoolparty-5ba91fecec283003690072773638f2f143a2b0fa.tar.gz
poolparty-5ba91fecec283003690072773638f2f143a2b0fa.tar.xz
Add event manager and event notification
The PoolParty module was turned into an application that creates a generic genEvent server, passing it to the supervisor. The supervisor, in turn, distributes the genevent server to all child processes, etc. This allows all processes to send events to the manager for logging, etc.
-rw-r--r--lib/poolparty.ex8
-rw-r--r--lib/poolparty/pool/supervisor.ex8
-rw-r--r--lib/poolparty/pool/worker.ex12
-rw-r--r--lib/poolparty/scheduler.ex12
-rw-r--r--lib/poolparty/supervisor.ex13
5 files changed, 34 insertions, 19 deletions
diff --git a/lib/poolparty.ex b/lib/poolparty.ex
index 293f579..4cc0430 100644
--- a/lib/poolparty.ex
+++ b/lib/poolparty.ex
@@ -1,2 +1,10 @@
defmodule PoolParty do
+ use Application
+ require Logger
+
+ def start(_type, _args) do
+ Logger.debug("[#{__MODULE__}]: Starting a Pool Party!")
+ {:ok, event_manager} = GenEvent.start_link()
+ PoolParty.Supervisor.start_link(event_manager)
+ end
end
diff --git a/lib/poolparty/pool/supervisor.ex b/lib/poolparty/pool/supervisor.ex
index 3132f46..751a942 100644
--- a/lib/poolparty/pool/supervisor.ex
+++ b/lib/poolparty/pool/supervisor.ex
@@ -2,17 +2,17 @@ defmodule PoolParty.Pool.Supervisor do
use Supervisor
require Logger
- def start_link(pool_size, opts \\ []) do
+ def start_link(pool_size, event_manager, opts \\ []) do
Logger.debug("[#{__MODULE__}]: Starting Work Pool Supervisor")
- Supervisor.start_link(__MODULE__, {pool_size}, opts)
+ Supervisor.start_link(__MODULE__, {pool_size, event_manager}, opts)
end
- def init({pool_size}) do
+ def init({pool_size, event_manager}) do
Logger.debug("[#{__MODULE__}]: Initializing Work Pool Supervisor")
children = (1..pool_size) |>
Enum.map(fn (id) ->
Logger.debug("[#{__MODULE__}]: Starting child worker: #{id}")
- worker(PoolParty.Pool.Worker, [], id: id)
+ worker(PoolParty.Pool.Worker, [event_manager], id: id)
end)
supervise(children, strategy: :one_for_one)
end
diff --git a/lib/poolparty/pool/worker.ex b/lib/poolparty/pool/worker.ex
index 5ae4161..2131f8e 100644
--- a/lib/poolparty/pool/worker.ex
+++ b/lib/poolparty/pool/worker.ex
@@ -2,15 +2,15 @@ defmodule PoolParty.Pool.Worker do
use GenServer
require Logger
- def start_link(opts \\ []) do
+ def start_link(event_manager, opts \\ []) do
Logger.debug("[#{__MODULE__}]: Starting worker")
- GenServer.start_link(__MODULE__, {}, opts)
+ GenServer.start_link(__MODULE__, {event_manager}, opts)
end
- def init(_) do
+ def init({event_manager}) do
Logger.debug("[#{__MODULE__}]: Initializing Worker")
PoolParty.Scheduler.join(self)
- {:ok, nil}
+ {:ok, %{events: event_manager}}
end
def process(pid, function, args) do
@@ -18,10 +18,10 @@ defmodule PoolParty.Pool.Worker do
GenServer.cast(pid, {:compute, function, args})
end
- def handle_cast({:compute, function, args}, _) do
+ def handle_cast({:compute, function, args}, state) do
Logger.debug("[#{__MODULE__}]: Process request received")
PoolParty.Scheduler.ready({:result, function.(args), self})
- {:noreply, nil}
+ {:noreply, state}
end
end
diff --git a/lib/poolparty/scheduler.ex b/lib/poolparty/scheduler.ex
index 225556a..dc9e1b3 100644
--- a/lib/poolparty/scheduler.ex
+++ b/lib/poolparty/scheduler.ex
@@ -2,20 +2,21 @@ defmodule PoolParty.Scheduler do
use GenServer
require Logger
- def start_link(pool_size, opts \\ []) do
+ def start_link(pool_size, event_manager, opts \\ []) do
Logger.debug("[#{__MODULE__}]: Starting Pool Scheduler")
GenServer.start_link(
__MODULE__,
- {pool_size},
+ {pool_size, event_manager},
[name: __MODULE__] ++ opts)
end
- def init({pool_size}) do
+ def init({pool_size, event_manager}) do
Logger.debug("[#{__MODULE__}]: Initializing Pool Scheduler")
{:ok, %{max_pool_size: pool_size,
workers: [],
queue: [],
- processing: HashDict.new()}
+ processing: HashDict.new(),
+ events: event_manager}
}
end
@@ -36,6 +37,7 @@ defmodule PoolParty.Scheduler do
def handle_cast({:process, func, args, from}, state) do
Logger.debug("[#{__MODULE__}]: Work request received")
+ GenEvent.notify(state.events, {:work_queued, func, args, from})
queue = state.queue ++ [{func, args, from}]
case length(state.workers) do
0 ->
@@ -54,11 +56,13 @@ defmodule PoolParty.Scheduler do
def handle_cast({:join, pid}, state) do
Logger.debug("[#{__MODULE__}]: Worker joined pool")
+ GenEvent.notify(state.events, {:worker_joining, pid})
{:noreply, %{state| workers: [pid | state.workers]}}
end
def handle_cast({:leave, pid}, state) do
Logger.debug("[#{__MODULE__}]: Worker left pool")
+ GenEvent.notify(state.events, {:worker_leaving, pid})
{:noreply, %{state| workers: state.workers -- [pid]}}
end
diff --git a/lib/poolparty/supervisor.ex b/lib/poolparty/supervisor.ex
index 9995a3d..712f0dc 100644
--- a/lib/poolparty/supervisor.ex
+++ b/lib/poolparty/supervisor.ex
@@ -2,17 +2,20 @@ defmodule PoolParty.Supervisor do
use Supervisor
require Logger
- def start_link(opts \\ []) do
+ def start_link(event_manager, opts \\ []) do
Logger.debug("[#{__MODULE__}]: Starting Pool Party Supervisor")
- Supervisor.start_link(__MODULE__, {}, [name: __MODULE__] ++ opts)
+ Supervisor.start_link(
+ __MODULE__,
+ {event_manager},
+ [name: __MODULE__] ++ opts)
end
- def init(_) do
+ def init({event_manager}) do
Logger.debug("[#{__MODULE__}]: Initializing Pool Party Supervisor")
pool_size = Application.get_env(:poolparty, :pool_size)
Logger.debug("[#{__MODULE__}]: Pool size: #{pool_size}")
- children = [worker(PoolParty.Scheduler, [pool_size]),
- worker(PoolParty.Pool.Supervisor, [pool_size])]
+ children = [worker(PoolParty.Scheduler, [pool_size, event_manager]),
+ worker(PoolParty.Pool.Supervisor, [pool_size, event_manager])]
supervise(children, strategy: :one_for_one)
end
end