From fe62ea8ee39e5460b663359daf4759e9a379e538 Mon Sep 17 00:00:00 2001 From: Michael Schaefermeyer Date: Tue, 19 Jul 2016 01:01:18 +0200 Subject: Initial commit --- lib/boltex/bolt.ex | 209 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 lib/boltex/bolt.ex (limited to 'lib/boltex/bolt.ex') diff --git a/lib/boltex/bolt.ex b/lib/boltex/bolt.ex new file mode 100644 index 0000000..dc0c04d --- /dev/null +++ b/lib/boltex/bolt.ex @@ -0,0 +1,209 @@ +defmodule Boltex.Bolt do + alias Boltex.{Utils, PackStream} + require Logger + + @recv_timeout 1_000 + @max_chunk_size 65_535 + + @user_agent "Boltex/1.0" + @hs_magic << 0x60, 0x60, 0xB0, 0x17 >> + @hs_version << 1 :: 32, 0 :: 32, 0 :: 32, 0 :: 32 >> + + @zero_chunk << 0, 0 >> + + @sig_init 0x01 + @sig_ack_failure 0x0E + @sig_reset 0x0F + @sig_run 0x10 + @sig_discard_all 0x2F + @sig_pull_all 0x3F + @sig_success 0x70 + @sig_record 0x71 + @sig_ignored 0x7E + @sig_failure 0x7F + + @summary ~w(success ignored failure)a + + @moduledoc """ + The Boltex.Bolt module handles the Bolt protocol specific steps (i.e. + handshake, init) as well as sending and receiving messages and wrapping + them in chunks. + + It abstracts transportation, expecing the transport layer to define + send/2 and recv/3 analogous to :gen_tcp. + """ + + @doc "Does the handshake" + def handshake(transport, port) do + transport.send port, @hs_magic <> @hs_version + case transport.recv(port, 4, @recv_timeout) do + {:ok, << 1 :: 32 >>} -> + :ok + + response -> + Logger.error "Handshake failed. Received: #{Utils.hex_encode response})" + {:error, :handshake_failed} + end + end + + @doc """ + Initialises the connection. + """ + def init(transport, port, params \\ %{}) do + send_messages transport, port, [{[@user_agent, params], @sig_init}] + + case receive_data(transport, port) do + {:success, %{}} -> + :ok + + response -> + Logger.error "Init failed. Received: #{Utils.hex_encode response})" + {:error, :init_failed} + end + end + + @doc """ + Sends a list of messages using the Bolt protocol and PackStream encoding. + + Messages have to be in the form of {[messages], signature}. + """ + def send_messages(transport, port, messages) do + Enum.map(messages, &generate_binary_message/1) + |> generate_chunks + |> Enum.each(&(transport.send(port, &1))) + end + + defp generate_binary_message({messages, signature}) do + messages = List.wrap messages + struct_size = length messages + + << 0xB :: 4, struct_size :: 4, signature >> <> + Utils.reduce_to_binary(messages, &PackStream.encode/1) + end + + defp generate_chunks(messages, chunks \\ [], current_chunk \\ <<>>) + defp generate_chunks([], chunks, current_chunk) do + [current_chunk | chunks] + |> Enum.reverse + end + defp generate_chunks([message | messages], chunks, current_chunk) + when byte_size(current_chunk <> message) <= @max_chunk_size do + message_size = byte_size message + current_chunk = + current_chunk <> + << message_size :: 16 >> <> + message <> + @zero_chunk + + generate_chunks messages, chunks, current_chunk + end + defp generate_chunks([chunk | chunks], chunks, current_chunk) do + oversized_chunk = current_chunk <> chunk + {first, rest} = binary_part oversized_chunk, 0, @max_chunk_size + first_size = byte_size first + rest_size = byte_size rest + current_chunk = current_chunk <> << first_size :: 16 >> <> first + new_chunk = << rest_size :: 16 >> <> rest + + generate_chunks chunks, [current_chunk | chunks], new_chunk + end + + @doc """ + Runs a statement (most like Cypher statement) and returns a list of the + records and a summary. + + Records are represented using PackStream's record data type. Their Elixir + representation is a Keyword with the indexse `:sig` and `:fields`. + + ## Examples + + iex> Boltex.Bolt.run_statement("MATCH (n) RETURN n") + [ + {:record, [sig: 1, fields: [1, "Exmaple", "Labels", %{"some_attribute" => "some_value"}, + {:success, %{"type" => "r"}} + ] + """ + def run_statement(transport, port, statement, params \\ %{}) do + send_messages transport, port, [ + {[statement, params], @sig_run}, + {[nil], @sig_pull_all} + ] + + with {:success, %{}} <- receive_data(transport, port), + do: receive_data transport, port + end + + @doc """ + Receives data. + + This function is supposed to be called after a request to the server has been + made. It receives data chunks, mends them (if they were split between frames) + and decodes them using PackStream. + + When just a single message is received (i.e. to acknowledge a command), this + function returns a tuple with two items, the first being the signature and the + second being the message(s) itself. If a list of messages is received it will + return a list of the former. + + The same goes for the messages: If there was a single data point in a message + said data point will be returned by itself. If there were multiple data points, + the list will be returned. + + The signature is represented as one of the following: + + * `:success` + * `:record` + * `:ignored` + * `:failure` + """ + def receive_data(transport, port, previous \\ []) do + case do_receive_data(transport, port) |> unpack do + {:record, _} = data -> + receive_data transport, port, [data | previous] + + {status, _} = data when status in @summary and previous == [] -> + data + + {status, _} = data when status in @summary -> + Enum.reverse [data | previous] + end + end + + defp do_receive_data(transport, port) do + with {:ok, <>} <- transport.recv(port, 2, @recv_timeout), + do: do_receive_data(transport, port, chunk_size) + end + defp do_receive_data(transport, port, chunk_size) do + with {:ok, data} <- transport.recv(port, chunk_size, @recv_timeout) + do + case transport.recv(port, 2, @recv_timeout) do + {:ok, @zero_chunk} -> + data + {:ok, <>} -> + data <> do_receive_data(transport, port, chunk_size) + end + else + {:error, :timeout} -> + {:error, :no_more_data_received} + other -> + IO.inspect Utils.hex_encode other + raise "receive failed" + end + end + + @doc """ + Unpacks (or in other words parses) a message. + """ + def unpack(<< 0x0B :: 4, packages :: 4, status, message :: binary >>) do + response = PackStream.decode(message) + response = if packages == 1, do: List.first(response), else: response + + case status do + @sig_success -> {:success, response} + @sig_record -> {:record, response} + @sig_ignored -> {:ignored, response} + @sig_failure -> {:failure, response} + other -> raise "Couldn't decode #{Utils.hex_encode << other >>}" + end + end +end -- cgit v1.2.1