1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
defmodule Boltex.Bolt do
alias Boltex.{Utils, PackStream}
require Logger
@recv_timeout 1_000
@max_chunk_size 65_535
@user_agent "Boltex/1.0"
@hs_magic << 0x60, 0x60, 0xB0, 0x17 >>
@hs_version << 1 :: 32, 0 :: 32, 0 :: 32, 0 :: 32 >>
@zero_chunk << 0, 0 >>
@sig_init 0x01
@sig_ack_failure 0x0E
@sig_reset 0x0F
@sig_run 0x10
@sig_discard_all 0x2F
@sig_pull_all 0x3F
@sig_success 0x70
@sig_record 0x71
@sig_ignored 0x7E
@sig_failure 0x7F
@summary ~w(success ignored failure)a
@moduledoc """
The Boltex.Bolt module handles the Bolt protocol specific steps (i.e.
handshake, init) as well as sending and receiving messages and wrapping
them in chunks.
It abstracts transportation, expecing the transport layer to define
send/2 and recv/3 analogous to :gen_tcp.
"""
@doc "Does the handshake"
def handshake(transport, port) do
transport.send port, @hs_magic <> @hs_version
case transport.recv(port, 4, @recv_timeout) do
{:ok, << 1 :: 32 >>} ->
:ok
response ->
Logger.error "Handshake failed. Received: #{Utils.hex_encode response})"
{:error, :handshake_failed}
end
end
@doc """
Initialises the connection.
"""
def init(transport, port, params \\ %{}) do
send_messages transport, port, [{[@user_agent, params], @sig_init}]
case receive_data(transport, port) do
{:success, %{}} ->
:ok
response ->
Logger.error "Init failed. Received: #{Utils.hex_encode response})"
{:error, :init_failed}
end
end
@doc """
Sends a list of messages using the Bolt protocol and PackStream encoding.
Messages have to be in the form of {[messages], signature}.
"""
def send_messages(transport, port, messages) do
Enum.map(messages, &generate_binary_message/1)
|> generate_chunks
|> Enum.each(&(transport.send(port, &1)))
end
defp generate_binary_message({messages, signature}) do
messages = List.wrap messages
struct_size = length messages
<< 0xB :: 4, struct_size :: 4, signature >> <>
Utils.reduce_to_binary(messages, &PackStream.encode/1)
end
defp generate_chunks(messages, chunks \\ [], current_chunk \\ <<>>)
defp generate_chunks([], chunks, current_chunk) do
[current_chunk | chunks]
|> Enum.reverse
end
defp generate_chunks([message | messages], chunks, current_chunk)
when byte_size(current_chunk <> message) <= @max_chunk_size do
message_size = byte_size message
current_chunk =
current_chunk <>
<< message_size :: 16 >> <>
message <>
@zero_chunk
generate_chunks messages, chunks, current_chunk
end
defp generate_chunks([chunk | chunks], chunks, current_chunk) do
oversized_chunk = current_chunk <> chunk
{first, rest} = binary_part oversized_chunk, 0, @max_chunk_size
first_size = byte_size first
rest_size = byte_size rest
current_chunk = current_chunk <> << first_size :: 16 >> <> first
new_chunk = << rest_size :: 16 >> <> rest
generate_chunks chunks, [current_chunk | chunks], new_chunk
end
@doc """
Runs a statement (most likely Cypher statement) and returns a list of the
records and a summary.
Records are represented using PackStream's record data type. Their Elixir
representation is a Keyword with the indexse `:sig` and `:fields`.
## Examples
iex> Boltex.Bolt.run_statement("MATCH (n) RETURN n")
[
{:record, [sig: 1, fields: [1, "Exmaple", "Labels", %{"some_attribute" => "some_value"}]]},
{:success, %{"type" => "r"}}
]
"""
def run_statement(transport, port, statement, params \\ %{}) do
send_messages transport, port, [
{[statement, params], @sig_run},
{[nil], @sig_pull_all}
]
with {:success, %{}} <- receive_data(transport, port),
do: receive_data transport, port
end
@doc """
Receives data.
This function is supposed to be called after a request to the server has been
made. It receives data chunks, mends them (if they were split between frames)
and decodes them using PackStream.
When just a single message is received (i.e. to acknowledge a command), this
function returns a tuple with two items, the first being the signature and the
second being the message(s) itself. If a list of messages is received it will
return a list of the former.
The same goes for the messages: If there was a single data point in a message
said data point will be returned by itself. If there were multiple data points,
the list will be returned.
The signature is represented as one of the following:
* `:success`
* `:record`
* `:ignored`
* `:failure`
"""
def receive_data(transport, port, previous \\ []) do
case do_receive_data(transport, port) |> unpack do
{:record, _} = data ->
receive_data transport, port, [data | previous]
{status, _} = data when status in @summary and previous == [] ->
data
{status, _} = data when status in @summary ->
Enum.reverse [data | previous]
end
end
defp do_receive_data(transport, port) do
with {:ok, <<chunk_size :: 16>>} <- transport.recv(port, 2, @recv_timeout),
do: do_receive_data(transport, port, chunk_size)
end
defp do_receive_data(transport, port, chunk_size) do
with {:ok, data} <- transport.recv(port, chunk_size, @recv_timeout)
do
case transport.recv(port, 2, @recv_timeout) do
{:ok, @zero_chunk} ->
data
{:ok, <<chunk_size :: 16>>} ->
data <> do_receive_data(transport, port, chunk_size)
end
else
{:error, :timeout} ->
{:error, :no_more_data_received}
other ->
IO.inspect Utils.hex_encode other
raise "receive failed"
end
end
@doc """
Unpacks (or in other words parses) a message.
"""
def unpack(<< 0x0B :: 4, packages :: 4, status, message :: binary >>) do
response = PackStream.decode(message)
response = if packages == 1, do: List.first(response), else: response
case status do
@sig_success -> {:success, response}
@sig_record -> {:record, response}
@sig_ignored -> {:ignored, response}
@sig_failure -> {:failure, response}
other -> raise "Couldn't decode #{Utils.hex_encode << other >>}"
end
end
end
|