aboutsummaryrefslogtreecommitdiff
path: root/lib/boltex/bolt.ex
blob: 5c58a629f020255aecb492fe5e7143b6d7d314a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
defmodule Boltex.Bolt do
  alias Boltex.{Utils, PackStream}
  require Logger

  @recv_timeout    1_000
  @max_chunk_size  65_535

  @user_agent      "Boltex/1.0"
  @hs_magic        << 0x60, 0x60, 0xB0, 0x17 >>
  @hs_version      << 1 :: 32, 0 :: 32, 0 :: 32, 0 :: 32 >>

  @zero_chunk      << 0, 0 >>

  @sig_init        0x01
  @sig_ack_failure 0x0E
  @sig_reset       0x0F
  @sig_run         0x10
  @sig_discard_all 0x2F
  @sig_pull_all    0x3F
  @sig_success     0x70
  @sig_record      0x71
  @sig_ignored     0x7E
  @sig_failure     0x7F

  @summary         ~w(success ignored failure)a

  @moduledoc """
  The Boltex.Bolt module handles the Bolt protocol specific steps (i.e.
  handshake, init) as well as sending and receiving messages and wrapping
  them in chunks.

  It abstracts transportation, expecing the transport layer to define
  send/2 and recv/3 analogous to :gen_tcp.
  """

  @doc "Does the handshake"
  def handshake(transport, port) do
    transport.send port, @hs_magic <> @hs_version
    case transport.recv(port, 4, @recv_timeout) do
      {:ok, << 1 :: 32 >>} ->
        :ok

      response ->
        Logger.error "Handshake failed. Received: #{Utils.hex_encode response})"
        {:error, :handshake_failed}
    end
  end

  @doc """
  Initialises the connection.

  Expects a transport module (i.e. `gen_tcp`) and a `Port`. Accepts
  authorisation params in the form of {username, password}.

  ## Examples

      iex> Boltex.Bolt.init :gen_tcp, port
      :ok

      iex> Boltex.Bolt.init :gen_tcp, port, {"username", "password"}
      :ok
  """
  def init(transport, port, auth \\ nil) do
    params = auth_params auth
    send_messages transport, port, [{[@user_agent, params], @sig_init}]

    case receive_data(transport, port) do
      {:success, %{}} ->
        :ok

      response ->
        Logger.error "Init failed. Received: #{Utils.hex_encode response})"
        {:error, :init_failed}
    end
  end

  defp auth_params(nil), do: %{}
  defp auth_params({username, password}) do
    %{
      scheme: "basic",
      principal: username,
      credentials: password
    }
  end

  @doc """
  Sends a list of messages using the Bolt protocol and PackStream encoding.

  Messages have to be in the form of {[messages], signature}.
  """
  def send_messages(transport, port, messages) do
    messages
    |> Enum.map(&generate_binary_message/1)
    |> generate_chunks
    |> Enum.each(&(transport.send(port, &1)))
  end

  defp generate_binary_message({messages, signature}) do
    messages    = List.wrap messages
    struct_size = length messages

    << 0xB :: 4, struct_size :: 4, signature >> <>
    Utils.reduce_to_binary(messages, &PackStream.encode/1)
  end

  defp generate_chunks(messages, chunks \\ [], current_chunk \\ <<>>)
  defp generate_chunks([], chunks, current_chunk) do
    [current_chunk | chunks]
    |> Enum.reverse
  end
  defp generate_chunks([message | messages], chunks, current_chunk)
  when byte_size(current_chunk <> message) <= @max_chunk_size do
    message_size  = byte_size message
    current_chunk =
      current_chunk <>
      << message_size :: 16 >> <>
      message <>
      @zero_chunk

    generate_chunks messages, chunks, current_chunk
  end
  defp generate_chunks([chunk | chunks], chunks, current_chunk) do
    oversized_chunk = current_chunk <> chunk
    {first, rest}   = binary_part oversized_chunk, 0, @max_chunk_size
    first_size      = byte_size first
    rest_size       = byte_size rest
    current_chunk   = current_chunk <> << first_size :: 16 >> <> first
    new_chunk       = << rest_size :: 16 >> <> rest

    generate_chunks chunks, [current_chunk | chunks], new_chunk
  end

  @doc """
  Runs a statement (most likely Cypher statement) and returns a list of the
  records and a summary.

  Records are represented using PackStream's record data type. Their Elixir
  representation is a Keyword with the indexse `:sig` and `:fields`.

  ## Examples

      iex> Boltex.Bolt.run_statement("MATCH (n) RETURN n")
      [
        {:record, [sig: 1, fields: [1, "Exmaple", "Labels", %{"some_attribute" => "some_value"}]]},
        {:success, %{"type" => "r"}}
      ]
  """
  def run_statement(transport, port, statement, params \\ %{}) do
    send_messages transport, port, [
      {[statement, params], @sig_run},
      {[nil], @sig_pull_all}
    ]

    with {:success, %{}} = data <- receive_data(transport, port),
    do:  [data | transport |> receive_data(port) |> List.wrap]
  end

  @doc """
  Receives data.

  This function is supposed to be called after a request to the server has been
  made. It receives data chunks, mends them (if they were split between frames)
  and decodes them using PackStream.

  When just a single message is received (i.e. to acknowledge a command), this
  function returns a tuple with two items, the first being the signature and the
  second being the message(s) itself. If a list of messages is received it will
  return a list of the former.

  The same goes for the messages: If there was a single data point in a message
  said data point will be returned by itself. If there were multiple data
  points, the list will be returned.

  The signature is represented as one of the following:

  * `:success`
  * `:record`
  * `:ignored`
  * `:failure`
  """
  def receive_data(transport, port, previous \\ []) do
    case transport |> do_receive_data(transport) |> unpack do
      {:record, _} = data ->
        receive_data transport, port, [data | previous]

      {status, _} = data when status in @summary and previous == [] ->
        data

      {status, _} = data when status in @summary ->
        Enum.reverse [data | previous]
    end
  end

  defp do_receive_data(transport, port) do
    with {:ok, <<chunk_size :: 16>>} <- transport.recv(port, 2, @recv_timeout),
    do:  do_receive_data(transport, port, chunk_size)
  end
  defp do_receive_data(transport, port, chunk_size) do
    with {:ok, data} <- transport.recv(port, chunk_size, @recv_timeout)
    do
      case transport.recv(port, 2, @recv_timeout) do
        {:ok, @zero_chunk} ->
          data
        {:ok, <<chunk_size :: 16>>} ->
          data <> do_receive_data(transport, port, chunk_size)
      end
    else
      {:error, :timeout} ->
        {:error, :no_more_data_received}
      other ->
        raise "receive failed"
    end
  end

  @doc """
  Unpacks (or in other words parses) a message.
  """
  def unpack(<< 0x0B :: 4, packages :: 4, status, message :: binary >>) do
    response = PackStream.decode(message)
    response = if packages == 1, do: List.first(response), else: response

    case status do
      @sig_success -> {:success, response}
      @sig_record  -> {:record,  response}
      @sig_ignored -> {:ignored, response}
      @sig_failure -> {:failure, response}
      other        -> raise "Couldn't decode #{Utils.hex_encode << other >>}"
    end
  end
end