diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/poolparty.ex | 8 | ||||
-rw-r--r-- | lib/poolparty/pool/supervisor.ex | 8 | ||||
-rw-r--r-- | lib/poolparty/pool/worker.ex | 12 | ||||
-rw-r--r-- | lib/poolparty/scheduler.ex | 12 | ||||
-rw-r--r-- | lib/poolparty/supervisor.ex | 13 |
5 files changed, 34 insertions, 19 deletions
diff --git a/lib/poolparty.ex b/lib/poolparty.ex index 293f579..4cc0430 100644 --- a/lib/poolparty.ex +++ b/lib/poolparty.ex @@ -1,2 +1,10 @@ defmodule PoolParty do + use Application + require Logger + + def start(_type, _args) do + Logger.debug("[#{__MODULE__}]: Starting a Pool Party!") + {:ok, event_manager} = GenEvent.start_link() + PoolParty.Supervisor.start_link(event_manager) + end end diff --git a/lib/poolparty/pool/supervisor.ex b/lib/poolparty/pool/supervisor.ex index 3132f46..751a942 100644 --- a/lib/poolparty/pool/supervisor.ex +++ b/lib/poolparty/pool/supervisor.ex @@ -2,17 +2,17 @@ defmodule PoolParty.Pool.Supervisor do use Supervisor require Logger - def start_link(pool_size, opts \\ []) do + def start_link(pool_size, event_manager, opts \\ []) do Logger.debug("[#{__MODULE__}]: Starting Work Pool Supervisor") - Supervisor.start_link(__MODULE__, {pool_size}, opts) + Supervisor.start_link(__MODULE__, {pool_size, event_manager}, opts) end - def init({pool_size}) do + def init({pool_size, event_manager}) do Logger.debug("[#{__MODULE__}]: Initializing Work Pool Supervisor") children = (1..pool_size) |> Enum.map(fn (id) -> Logger.debug("[#{__MODULE__}]: Starting child worker: #{id}") - worker(PoolParty.Pool.Worker, [], id: id) + worker(PoolParty.Pool.Worker, [event_manager], id: id) end) supervise(children, strategy: :one_for_one) end diff --git a/lib/poolparty/pool/worker.ex b/lib/poolparty/pool/worker.ex index 5ae4161..2131f8e 100644 --- a/lib/poolparty/pool/worker.ex +++ b/lib/poolparty/pool/worker.ex @@ -2,15 +2,15 @@ defmodule PoolParty.Pool.Worker do use GenServer require Logger - def start_link(opts \\ []) do + def start_link(event_manager, opts \\ []) do Logger.debug("[#{__MODULE__}]: Starting worker") - GenServer.start_link(__MODULE__, {}, opts) + GenServer.start_link(__MODULE__, {event_manager}, opts) end - def init(_) do + def init({event_manager}) do Logger.debug("[#{__MODULE__}]: Initializing Worker") PoolParty.Scheduler.join(self) - {:ok, nil} + {:ok, %{events: event_manager}} end def process(pid, function, args) do @@ -18,10 +18,10 @@ defmodule PoolParty.Pool.Worker do GenServer.cast(pid, {:compute, function, args}) end - def handle_cast({:compute, function, args}, _) do + def handle_cast({:compute, function, args}, state) do Logger.debug("[#{__MODULE__}]: Process request received") PoolParty.Scheduler.ready({:result, function.(args), self}) - {:noreply, nil} + {:noreply, state} end end diff --git a/lib/poolparty/scheduler.ex b/lib/poolparty/scheduler.ex index 225556a..dc9e1b3 100644 --- a/lib/poolparty/scheduler.ex +++ b/lib/poolparty/scheduler.ex @@ -2,20 +2,21 @@ defmodule PoolParty.Scheduler do use GenServer require Logger - def start_link(pool_size, opts \\ []) do + def start_link(pool_size, event_manager, opts \\ []) do Logger.debug("[#{__MODULE__}]: Starting Pool Scheduler") GenServer.start_link( __MODULE__, - {pool_size}, + {pool_size, event_manager}, [name: __MODULE__] ++ opts) end - def init({pool_size}) do + def init({pool_size, event_manager}) do Logger.debug("[#{__MODULE__}]: Initializing Pool Scheduler") {:ok, %{max_pool_size: pool_size, workers: [], queue: [], - processing: HashDict.new()} + processing: HashDict.new(), + events: event_manager} } end @@ -36,6 +37,7 @@ defmodule PoolParty.Scheduler do def handle_cast({:process, func, args, from}, state) do Logger.debug("[#{__MODULE__}]: Work request received") + GenEvent.notify(state.events, {:work_queued, func, args, from}) queue = state.queue ++ [{func, args, from}] case length(state.workers) do 0 -> @@ -54,11 +56,13 @@ defmodule PoolParty.Scheduler do def handle_cast({:join, pid}, state) do Logger.debug("[#{__MODULE__}]: Worker joined pool") + GenEvent.notify(state.events, {:worker_joining, pid}) {:noreply, %{state| workers: [pid | state.workers]}} end def handle_cast({:leave, pid}, state) do Logger.debug("[#{__MODULE__}]: Worker left pool") + GenEvent.notify(state.events, {:worker_leaving, pid}) {:noreply, %{state| workers: state.workers -- [pid]}} end diff --git a/lib/poolparty/supervisor.ex b/lib/poolparty/supervisor.ex index 9995a3d..712f0dc 100644 --- a/lib/poolparty/supervisor.ex +++ b/lib/poolparty/supervisor.ex @@ -2,17 +2,20 @@ defmodule PoolParty.Supervisor do use Supervisor require Logger - def start_link(opts \\ []) do + def start_link(event_manager, opts \\ []) do Logger.debug("[#{__MODULE__}]: Starting Pool Party Supervisor") - Supervisor.start_link(__MODULE__, {}, [name: __MODULE__] ++ opts) + Supervisor.start_link( + __MODULE__, + {event_manager}, + [name: __MODULE__] ++ opts) end - def init(_) do + def init({event_manager}) do Logger.debug("[#{__MODULE__}]: Initializing Pool Party Supervisor") pool_size = Application.get_env(:poolparty, :pool_size) Logger.debug("[#{__MODULE__}]: Pool size: #{pool_size}") - children = [worker(PoolParty.Scheduler, [pool_size]), - worker(PoolParty.Pool.Supervisor, [pool_size])] + children = [worker(PoolParty.Scheduler, [pool_size, event_manager]), + worker(PoolParty.Pool.Supervisor, [pool_size, event_manager])] supervise(children, strategy: :one_for_one) end end |