summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/poolparty.ex8
-rw-r--r--lib/poolparty/pool/supervisor.ex8
-rw-r--r--lib/poolparty/pool/worker.ex12
-rw-r--r--lib/poolparty/scheduler.ex12
-rw-r--r--lib/poolparty/supervisor.ex13
5 files changed, 34 insertions, 19 deletions
diff --git a/lib/poolparty.ex b/lib/poolparty.ex
index 293f579..4cc0430 100644
--- a/lib/poolparty.ex
+++ b/lib/poolparty.ex
@@ -1,2 +1,10 @@
defmodule PoolParty do
+ use Application
+ require Logger
+
+ def start(_type, _args) do
+ Logger.debug("[#{__MODULE__}]: Starting a Pool Party!")
+ {:ok, event_manager} = GenEvent.start_link()
+ PoolParty.Supervisor.start_link(event_manager)
+ end
end
diff --git a/lib/poolparty/pool/supervisor.ex b/lib/poolparty/pool/supervisor.ex
index 3132f46..751a942 100644
--- a/lib/poolparty/pool/supervisor.ex
+++ b/lib/poolparty/pool/supervisor.ex
@@ -2,17 +2,17 @@ defmodule PoolParty.Pool.Supervisor do
use Supervisor
require Logger
- def start_link(pool_size, opts \\ []) do
+ def start_link(pool_size, event_manager, opts \\ []) do
Logger.debug("[#{__MODULE__}]: Starting Work Pool Supervisor")
- Supervisor.start_link(__MODULE__, {pool_size}, opts)
+ Supervisor.start_link(__MODULE__, {pool_size, event_manager}, opts)
end
- def init({pool_size}) do
+ def init({pool_size, event_manager}) do
Logger.debug("[#{__MODULE__}]: Initializing Work Pool Supervisor")
children = (1..pool_size) |>
Enum.map(fn (id) ->
Logger.debug("[#{__MODULE__}]: Starting child worker: #{id}")
- worker(PoolParty.Pool.Worker, [], id: id)
+ worker(PoolParty.Pool.Worker, [event_manager], id: id)
end)
supervise(children, strategy: :one_for_one)
end
diff --git a/lib/poolparty/pool/worker.ex b/lib/poolparty/pool/worker.ex
index 5ae4161..2131f8e 100644
--- a/lib/poolparty/pool/worker.ex
+++ b/lib/poolparty/pool/worker.ex
@@ -2,15 +2,15 @@ defmodule PoolParty.Pool.Worker do
use GenServer
require Logger
- def start_link(opts \\ []) do
+ def start_link(event_manager, opts \\ []) do
Logger.debug("[#{__MODULE__}]: Starting worker")
- GenServer.start_link(__MODULE__, {}, opts)
+ GenServer.start_link(__MODULE__, {event_manager}, opts)
end
- def init(_) do
+ def init({event_manager}) do
Logger.debug("[#{__MODULE__}]: Initializing Worker")
PoolParty.Scheduler.join(self)
- {:ok, nil}
+ {:ok, %{events: event_manager}}
end
def process(pid, function, args) do
@@ -18,10 +18,10 @@ defmodule PoolParty.Pool.Worker do
GenServer.cast(pid, {:compute, function, args})
end
- def handle_cast({:compute, function, args}, _) do
+ def handle_cast({:compute, function, args}, state) do
Logger.debug("[#{__MODULE__}]: Process request received")
PoolParty.Scheduler.ready({:result, function.(args), self})
- {:noreply, nil}
+ {:noreply, state}
end
end
diff --git a/lib/poolparty/scheduler.ex b/lib/poolparty/scheduler.ex
index 225556a..dc9e1b3 100644
--- a/lib/poolparty/scheduler.ex
+++ b/lib/poolparty/scheduler.ex
@@ -2,20 +2,21 @@ defmodule PoolParty.Scheduler do
use GenServer
require Logger
- def start_link(pool_size, opts \\ []) do
+ def start_link(pool_size, event_manager, opts \\ []) do
Logger.debug("[#{__MODULE__}]: Starting Pool Scheduler")
GenServer.start_link(
__MODULE__,
- {pool_size},
+ {pool_size, event_manager},
[name: __MODULE__] ++ opts)
end
- def init({pool_size}) do
+ def init({pool_size, event_manager}) do
Logger.debug("[#{__MODULE__}]: Initializing Pool Scheduler")
{:ok, %{max_pool_size: pool_size,
workers: [],
queue: [],
- processing: HashDict.new()}
+ processing: HashDict.new(),
+ events: event_manager}
}
end
@@ -36,6 +37,7 @@ defmodule PoolParty.Scheduler do
def handle_cast({:process, func, args, from}, state) do
Logger.debug("[#{__MODULE__}]: Work request received")
+ GenEvent.notify(state.events, {:work_queued, func, args, from})
queue = state.queue ++ [{func, args, from}]
case length(state.workers) do
0 ->
@@ -54,11 +56,13 @@ defmodule PoolParty.Scheduler do
def handle_cast({:join, pid}, state) do
Logger.debug("[#{__MODULE__}]: Worker joined pool")
+ GenEvent.notify(state.events, {:worker_joining, pid})
{:noreply, %{state| workers: [pid | state.workers]}}
end
def handle_cast({:leave, pid}, state) do
Logger.debug("[#{__MODULE__}]: Worker left pool")
+ GenEvent.notify(state.events, {:worker_leaving, pid})
{:noreply, %{state| workers: state.workers -- [pid]}}
end
diff --git a/lib/poolparty/supervisor.ex b/lib/poolparty/supervisor.ex
index 9995a3d..712f0dc 100644
--- a/lib/poolparty/supervisor.ex
+++ b/lib/poolparty/supervisor.ex
@@ -2,17 +2,20 @@ defmodule PoolParty.Supervisor do
use Supervisor
require Logger
- def start_link(opts \\ []) do
+ def start_link(event_manager, opts \\ []) do
Logger.debug("[#{__MODULE__}]: Starting Pool Party Supervisor")
- Supervisor.start_link(__MODULE__, {}, [name: __MODULE__] ++ opts)
+ Supervisor.start_link(
+ __MODULE__,
+ {event_manager},
+ [name: __MODULE__] ++ opts)
end
- def init(_) do
+ def init({event_manager}) do
Logger.debug("[#{__MODULE__}]: Initializing Pool Party Supervisor")
pool_size = Application.get_env(:poolparty, :pool_size)
Logger.debug("[#{__MODULE__}]: Pool size: #{pool_size}")
- children = [worker(PoolParty.Scheduler, [pool_size]),
- worker(PoolParty.Pool.Supervisor, [pool_size])]
+ children = [worker(PoolParty.Scheduler, [pool_size, event_manager]),
+ worker(PoolParty.Pool.Supervisor, [pool_size, event_manager])]
supervise(children, strategy: :one_for_one)
end
end