summaryrefslogtreecommitdiff
path: root/lib/poolparty/scheduler.ex
blob: 225556ac17efd76797c557a668855c00579c79df (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
defmodule PoolParty.Scheduler do
  use GenServer
  require Logger

  def start_link(pool_size, opts \\ []) do
    Logger.debug("[#{__MODULE__}]: Starting Pool Scheduler")
    GenServer.start_link(
      __MODULE__,
      {pool_size},
      [name: __MODULE__] ++ opts)
  end

  def init({pool_size}) do
    Logger.debug("[#{__MODULE__}]: Initializing Pool Scheduler")
    {:ok, %{max_pool_size: pool_size,
            workers: [],
            queue: [],
            processing: HashDict.new()}
    }
  end

  def join(worker_pid) do
    Logger.debug("[#{__MODULE__}]: Worker joining pool")
    GenServer.cast(__MODULE__, {:join, worker_pid})
  end

  def ready({:result, result, worker_pid}) do
    Logger.debug("[#{__MODULE__}]: Worker jumping into the pool")
    GenServer.cast(__MODULE__, {:ready, result, worker_pid})
  end

  def process(func, args, from) do
    Logger.debug("[#{__MODULE__}]: Casting work request")
    GenServer.cast(__MODULE__, {:process, func, args, from})
  end

  def handle_cast({:process, func, args, from}, state) do
    Logger.debug("[#{__MODULE__}]: Work request received")
    queue = state.queue ++ [{func, args, from}]
    case length(state.workers) do
      0 ->
        Logger.debug("[#{__MODULE__}]: No workers available")
        {:noreply, %{state| queue: queue}}
      _ ->
        {queue, workers, processing} =
          schedule_process(queue, state.workers, state.processing)
        {:noreply, %{state|
                     queue: queue,
                     workers: workers,
                     processing: processing}
     }
    end
  end

  def handle_cast({:join, pid}, state) do
    Logger.debug("[#{__MODULE__}]: Worker joined pool")
    {:noreply, %{state| workers: [pid | state.workers]}}
  end

  def handle_cast({:leave, pid}, state) do
    Logger.debug("[#{__MODULE__}]: Worker left pool")
    {:noreply, %{state| workers: state.workers -- [pid]}}
  end

  def handle_cast({:ready, result, pid}, state) do
    Logger.debug("[#{__MODULE__}]: Worker making a splash in the pool")
    {client, processing} = HashDict.pop(state.processing, pid)
    send(client, {:result, result})
    workers = [pid | state.workers]
    case length(state.queue) do
      0 ->
        {:noreply, %{state| workers: workers}}
      _ ->
        {queue, workers, processing} =
          schedule_process(state.queue, workers, processing)
        {:noreply, %{state|
                     workers: workers,
                     queue: queue,
                     processing: processing}
        }
    end
  end

  defp schedule_process(queue, workers, processing) do
    [{f, args, client} | queue] = queue
    [next | workers] = workers
    PoolParty.Pool.Worker.process(next, f, args)
    processing = HashDict.put(processing, next, client)
    {queue, workers, processing}
  end
end