diff options
Diffstat (limited to 'lib/poolparty/scheduler.ex')
-rw-r--r-- | lib/poolparty/scheduler.ex | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/lib/poolparty/scheduler.ex b/lib/poolparty/scheduler.ex new file mode 100644 index 0000000..e0cce5f --- /dev/null +++ b/lib/poolparty/scheduler.ex @@ -0,0 +1,80 @@ +defmodule PoolParty.Scheduler do + use GenServer + + def start_link(pool_size, opts \\ []) do + GenServer.start_link( + __MODULE__, + {pool_size}, + [name: __MODULE__] ++ opts) + end + + def init({pool_size}) do + {:ok, %{max_pool_size: pool_size, + workers: [], + queue: [], + processing: HashDict.new()} + } + end + + def join(worker_pid) do + GenServer.cast(__MODULE__, {:join, worker_pid}) + end + + def ready({:result, result, worker_pid}) do + GenServer.cast(__MODULE__, {:ready, result, worker_pid}) + end + + def process(func, args, from) do + GenServer.cast(__MODULE__, {:process, func, args, from}) + end + + def handle_cast({:process, func, args, from}, state) do + queue = state.queue ++ [{func, args, from}] + case length(state.workers) do + 0 -> + {:noreply, %{state| queue: queue}} + _ -> + {queue, workers, processing} = + schedule_process(queue, state.workers, state.processing) + {:noreply, %{state| + queue: queue, + workers: workers, + processing: processing} + } + end + end + + def handle_cast({:join, pid}, state) do + {:noreply, %{state| workers: [pid | state.workers]}} + end + + def handle_cast({:leave, pid}, state) do + {:noreply, %{state| workers: state.workers -- [pid]}} + end + + def handle_cast({:ready, result, pid}, state) do + {client, processing} = HashDict.pop(state.processing, pid) + send(client, {:result, result}) + workers = [pid | state.workers] + case length(state.queue) do + 0 -> + {:noreply, %{state| workers: workers}} + _ -> + {queue, workers, processing} = + schedule_process(state.queue, workers, processing) + {:noreply, %{state| + workers: workers, + queue: queue, + processing: processing} + } + end + end + + defp schedule_process(queue, workers, processing) do + [{f, args, client} | queue] = queue + [next | workers] = workers + PoolParty.Pool.Worker.process(next, f, args) + processing = HashDict.put(processing, next, client) + {queue, workers, processing} + end +end |