summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkballou <kballou@devnulllabs.io>2014-10-16 19:22:50 -0600
committerkballou <kballou@devnulllabs.io>2014-10-20 16:28:38 -0600
commitccc7ba834e11b0da450800a8695954baee9c06b7 (patch)
treece9f64744ca8fdeaa184b139eb70364e9c331946
parent226970e89bb4b8b32c2c08c2ef2999a3e8e4abbd (diff)
downloadpoolparty-ccc7ba834e11b0da450800a8695954baee9c06b7.tar.gz
poolparty-ccc7ba834e11b0da450800a8695954baee9c06b7.tar.xz
PoolParty Scheduler, supervisor, and workers
Add PoolParty supervisor, scheduler, pool supervisor, and pool workers
-rw-r--r--lib/poolparty/pool/supervisor.ex16
-rw-r--r--lib/poolparty/pool/worker.ex22
-rw-r--r--lib/poolparty/scheduler.ex80
-rw-r--r--lib/poolparty/supervisor.ex14
4 files changed, 132 insertions, 0 deletions
diff --git a/lib/poolparty/pool/supervisor.ex b/lib/poolparty/pool/supervisor.ex
new file mode 100644
index 0000000..e95ceac
--- /dev/null
+++ b/lib/poolparty/pool/supervisor.ex
@@ -0,0 +1,16 @@
+defmodule PoolParty.Pool.Supervisor do
+ use Supervisor
+
+ def start_link(pool_size, opts \\ []) do
+ Supervisor.start_link(__MODULE__, {pool_size}, opts)
+ end
+
+ def init({pool_size}) do
+ children = (1..pool_size) |>
+ Enum.map(fn (id) ->
+ worker(PoolParty.Pool.Worker, [], id: id)
+ end)
+ supervise(children, strategy: :one_for_one)
+ end
+
+end
diff --git a/lib/poolparty/pool/worker.ex b/lib/poolparty/pool/worker.ex
new file mode 100644
index 0000000..c4685f5
--- /dev/null
+++ b/lib/poolparty/pool/worker.ex
@@ -0,0 +1,22 @@
+defmodule PoolParty.Pool.Worker do
+ use GenServer
+
+ def start_link(opts \\ []) do
+ GenServer.start_link(__MODULE__, {}, opts)
+ end
+
+ def init(_) do
+ PoolParty.Scheduler.join(self)
+ {:ok, nil}
+ end
+
+ def process(pid, function, args) do
+ GenServer.cast(pid, {:compute, function, args})
+ end
+
+ def handle_cast({:compute, function, args}, _) do
+ PoolParty.Scheduler.ready({:result, function.(args), self})
+ {:noreply, nil}
+ end
+
+end
diff --git a/lib/poolparty/scheduler.ex b/lib/poolparty/scheduler.ex
new file mode 100644
index 0000000..e0cce5f
--- /dev/null
+++ b/lib/poolparty/scheduler.ex
@@ -0,0 +1,80 @@
+defmodule PoolParty.Scheduler do
+ use GenServer
+
+ def start_link(pool_size, opts \\ []) do
+ GenServer.start_link(
+ __MODULE__,
+ {pool_size},
+ [name: __MODULE__] ++ opts)
+ end
+
+ def init({pool_size}) do
+ {:ok, %{max_pool_size: pool_size,
+ workers: [],
+ queue: [],
+ processing: HashDict.new()}
+ }
+ end
+
+ def join(worker_pid) do
+ GenServer.cast(__MODULE__, {:join, worker_pid})
+ end
+
+ def ready({:result, result, worker_pid}) do
+ GenServer.cast(__MODULE__, {:ready, result, worker_pid})
+ end
+
+ def process(func, args, from) do
+ GenServer.cast(__MODULE__, {:process, func, args, from})
+ end
+
+ def handle_cast({:process, func, args, from}, state) do
+ queue = state.queue ++ [{func, args, from}]
+ case length(state.workers) do
+ 0 ->
+ {:noreply, %{state| queue: queue}}
+ _ ->
+ {queue, workers, processing} =
+ schedule_process(queue, state.workers, state.processing)
+ {:noreply, %{state|
+ queue: queue,
+ workers: workers,
+ processing: processing}
+ }
+ end
+ end
+
+ def handle_cast({:join, pid}, state) do
+ {:noreply, %{state| workers: [pid | state.workers]}}
+ end
+
+ def handle_cast({:leave, pid}, state) do
+ {:noreply, %{state| workers: state.workers -- [pid]}}
+ end
+
+ def handle_cast({:ready, result, pid}, state) do
+ {client, processing} = HashDict.pop(state.processing, pid)
+ send(client, {:result, result})
+ workers = [pid | state.workers]
+ case length(state.queue) do
+ 0 ->
+ {:noreply, %{state| workers: workers}}
+ _ ->
+ {queue, workers, processing} =
+ schedule_process(state.queue, workers, processing)
+ {:noreply, %{state|
+ workers: workers,
+ queue: queue,
+ processing: processing}
+ }
+ end
+ end
+
+ defp schedule_process(queue, workers, processing) do
+ [{f, args, client} | queue] = queue
+ [next | workers] = workers
+ PoolParty.Pool.Worker.process(next, f, args)
+ processing = HashDict.put(processing, next, client)
+ {queue, workers, processing}
+ end
+end
diff --git a/lib/poolparty/supervisor.ex b/lib/poolparty/supervisor.ex
new file mode 100644
index 0000000..2f4d150
--- /dev/null
+++ b/lib/poolparty/supervisor.ex
@@ -0,0 +1,14 @@
+defmodule PoolParty.Supervisor do
+ use Supervisor
+
+ def start_link(opts \\ []) do
+ Supervisor.start_link(__MODULE__, {}, [name: __MODULE__] ++ opts)
+ end
+
+ def init(_) do
+ pool_size = Application.get_env(:poolparty, :pool_size)
+ children = [worker(PoolParty.Scheduler, [pool_size]),
+ worker(PoolParty.Pool.Supervisor, [pool_size])]
+ supervise(children, strategy: :one_for_one)
+ end
+end