From ccc7ba834e11b0da450800a8695954baee9c06b7 Mon Sep 17 00:00:00 2001 From: kballou Date: Thu, 16 Oct 2014 19:22:50 -0600 Subject: PoolParty Scheduler, supervisor, and workers Add PoolParty supervisor, scheduler, pool supervisor, and pool workers --- lib/poolparty/pool/supervisor.ex | 16 ++++++++ lib/poolparty/pool/worker.ex | 22 +++++++++++ lib/poolparty/scheduler.ex | 80 ++++++++++++++++++++++++++++++++++++++++ lib/poolparty/supervisor.ex | 14 +++++++ 4 files changed, 132 insertions(+) create mode 100644 lib/poolparty/pool/supervisor.ex create mode 100644 lib/poolparty/pool/worker.ex create mode 100644 lib/poolparty/scheduler.ex create mode 100644 lib/poolparty/supervisor.ex diff --git a/lib/poolparty/pool/supervisor.ex b/lib/poolparty/pool/supervisor.ex new file mode 100644 index 0000000..e95ceac --- /dev/null +++ b/lib/poolparty/pool/supervisor.ex @@ -0,0 +1,16 @@ +defmodule PoolParty.Pool.Supervisor do + use Supervisor + + def start_link(pool_size, opts \\ []) do + Supervisor.start_link(__MODULE__, {pool_size}, opts) + end + + def init({pool_size}) do + children = (1..pool_size) |> + Enum.map(fn (id) -> + worker(PoolParty.Pool.Worker, [], id: id) + end) + supervise(children, strategy: :one_for_one) + end + +end diff --git a/lib/poolparty/pool/worker.ex b/lib/poolparty/pool/worker.ex new file mode 100644 index 0000000..c4685f5 --- /dev/null +++ b/lib/poolparty/pool/worker.ex @@ -0,0 +1,22 @@ +defmodule PoolParty.Pool.Worker do + use GenServer + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, {}, opts) + end + + def init(_) do + PoolParty.Scheduler.join(self) + {:ok, nil} + end + + def process(pid, function, args) do + GenServer.cast(pid, {:compute, function, args}) + end + + def handle_cast({:compute, function, args}, _) do + PoolParty.Scheduler.ready({:result, function.(args), self}) + {:noreply, nil} + end + +end diff --git a/lib/poolparty/scheduler.ex b/lib/poolparty/scheduler.ex new file mode 100644 index 0000000..e0cce5f --- /dev/null +++ b/lib/poolparty/scheduler.ex @@ -0,0 +1,80 @@ +defmodule PoolParty.Scheduler do + use GenServer + + def start_link(pool_size, opts \\ []) do + GenServer.start_link( + __MODULE__, + {pool_size}, + [name: __MODULE__] ++ opts) + end + + def init({pool_size}) do + {:ok, %{max_pool_size: pool_size, + workers: [], + queue: [], + processing: HashDict.new()} + } + end + + def join(worker_pid) do + GenServer.cast(__MODULE__, {:join, worker_pid}) + end + + def ready({:result, result, worker_pid}) do + GenServer.cast(__MODULE__, {:ready, result, worker_pid}) + end + + def process(func, args, from) do + GenServer.cast(__MODULE__, {:process, func, args, from}) + end + + def handle_cast({:process, func, args, from}, state) do + queue = state.queue ++ [{func, args, from}] + case length(state.workers) do + 0 -> + {:noreply, %{state| queue: queue}} + _ -> + {queue, workers, processing} = + schedule_process(queue, state.workers, state.processing) + {:noreply, %{state| + queue: queue, + workers: workers, + processing: processing} + } + end + end + + def handle_cast({:join, pid}, state) do + {:noreply, %{state| workers: [pid | state.workers]}} + end + + def handle_cast({:leave, pid}, state) do + {:noreply, %{state| workers: state.workers -- [pid]}} + end + + def handle_cast({:ready, result, pid}, state) do + {client, processing} = HashDict.pop(state.processing, pid) + send(client, {:result, result}) + workers = [pid | state.workers] + case length(state.queue) do + 0 -> + {:noreply, %{state| workers: workers}} + _ -> + {queue, workers, processing} = + schedule_process(state.queue, workers, processing) + {:noreply, %{state| + workers: workers, + queue: queue, + processing: processing} + } + end + end + + defp schedule_process(queue, workers, processing) do + [{f, args, client} | queue] = queue + [next | workers] = workers + PoolParty.Pool.Worker.process(next, f, args) + processing = HashDict.put(processing, next, client) + {queue, workers, processing} + end +end diff --git a/lib/poolparty/supervisor.ex b/lib/poolparty/supervisor.ex new file mode 100644 index 0000000..2f4d150 --- /dev/null +++ b/lib/poolparty/supervisor.ex @@ -0,0 +1,14 @@ +defmodule PoolParty.Supervisor do + use Supervisor + + def start_link(opts \\ []) do + Supervisor.start_link(__MODULE__, {}, [name: __MODULE__] ++ opts) + end + + def init(_) do + pool_size = Application.get_env(:poolparty, :pool_size) + children = [worker(PoolParty.Scheduler, [pool_size]), + worker(PoolParty.Pool.Supervisor, [pool_size])] + supervise(children, strategy: :one_for_one) + end +end -- cgit v1.2.1