Source file p2p_protocol.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
module Events = P2p_events.P2p_protocol
type ('msg, 'peer, 'conn) config = {
swap_linger : Time.System.Span.t option;
pool : ('msg, 'peer, 'conn) P2p_pool.t;
log : P2p_connection.P2p_event.t -> unit;
connect : P2p_point.Id.t -> ('msg, 'peer, 'conn) P2p_conn.t tzresult Lwt.t;
mutable latest_accepted_swap : Time.System.t;
mutable latest_successful_swap : Time.System.t;
}
open P2p_answerer
let message conn _request size msg =
Lwt_pipe.Maybe_bounded.push conn.messages (size, msg)
module Private_answerer = struct
let advertise conn _request _points =
Events.(emit private_node_new_peers) conn.peer_id
let bootstrap conn _request =
Lwt_result.ok @@ Events.(emit private_node_peers_request) conn.peer_id
let swap_request conn _request _new_point _peer =
Events.(emit private_node_swap_request) conn.peer_id
let swap_ack conn _request _point _peer_id =
Events.(emit private_node_swap_ack) conn.peer_id
let create conn =
P2p_answerer.
{
message = message conn;
advertise = advertise conn;
bootstrap = bootstrap conn;
swap_request = swap_request conn;
swap_ack = swap_ack conn;
}
end
module Default_answerer = struct
open P2p_connection.P2p_event
let advertise config conn _request points =
let log = config.log in
let source_peer_id = conn.peer_id in
log (Advertise_received {source = source_peer_id}) ;
P2p_pool.register_list_of_new_points
~medium:"advertise"
~source:conn.peer_id
config.pool
points
let bootstrap config conn _request_info =
let open Lwt_result_syntax in
let log = config.log in
let source_peer_id = conn.peer_id in
log (Bootstrap_received {source = source_peer_id}) ;
if conn.is_private then
let*! () = Events.(emit private_node_request) conn.peer_id in
return_unit
else
let*! points =
P2p_pool.list_known_points ~ignore_private:true config.pool
in
match points with
| [] -> return_unit
| points -> (
match conn.write_advertise points with
| Ok true ->
log (Advertise_sent {source = source_peer_id}) ;
return_unit
| Ok false ->
return_unit
| Error err as error ->
let*! () =
Events.(emit advertise_sending_failed) (source_peer_id, err)
in
Lwt.return error)
let swap t pool source_peer_id ~connect current_peer_id new_point =
let open Lwt_syntax in
t.latest_accepted_swap <- Time.System.now () ;
let* r = connect new_point in
match r with
| Ok _new_conn -> (
t.latest_successful_swap <- Time.System.now () ;
t.log (Swap_success {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Swap.success ;
let* () = Events.(emit swap_succeeded) new_point in
match P2p_pool.Connection.find_by_peer_id pool current_peer_id with
| None -> return_unit
| Some conn -> P2p_conn.disconnect conn)
| Error err -> (
t.latest_accepted_swap <- t.latest_successful_swap ;
t.log (Swap_failure {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Swap.fail ;
match err with
| [Timeout] -> Events.(emit swap_interrupted) (new_point, err)
| _ -> Events.(emit swap_failed) (new_point, err))
let swap_ack config conn request new_point _peer =
let open Lwt_syntax in
let source_peer_id = conn.peer_id in
let pool = config.pool in
let connect = config.connect in
let log = config.log in
log (Swap_ack_received {source = source_peer_id}) ;
let do_swap =
let open Option_syntax in
let* _ = config.swap_linger in
let* _time, proposed_peer_id =
request.last_sent_swap_request
in
P2p_pool.Connection.find_by_point pool new_point
|> Option.fold_f
~some:(fun _ -> None)
~none:(fun () ->
Some
(swap
config
pool
source_peer_id
~connect
proposed_peer_id
new_point))
in
Option.value ~default:return_unit do_swap
let swap_request config conn _request new_point _peer =
let open Result_syntax in
let source_peer_id = conn.peer_id in
let pool = config.pool in
let connect = config.connect in
let log = config.log in
log (Swap_request_received {source = source_peer_id}) ;
let do_swap =
let* swap_linger =
config.swap_linger |> Option.to_result ~none:`Swap_is_disabled
in
let span_since_last_swap =
Ptime.diff
(Time.System.now ())
(Time.System.max
config.latest_successful_swap
config.latest_accepted_swap)
in
let new_point_info = P2p_pool.Points.info pool new_point in
if
Ptime.Span.compare span_since_last_swap swap_linger < 0
|| not
(Option.fold
~none:true
~some:(fun new_point_info ->
P2p_point_state.is_disconnected new_point_info)
new_point_info)
then (
log (Swap_request_ignored {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Swap.ignored ;
return @@ Events.(emit swap_request_ignored) source_peer_id)
else
let* source_conn =
P2p_pool.Connection.find_by_peer_id pool source_peer_id
|> Option.to_result ~none:`Couldnt_find_by_peer
in
let* proposed_point, proposed_peer_id =
P2p_pool.Connection.random_addr
pool
~different_than:source_conn
~no_private:true
|> Option.to_result ~none:(`No_swap_candidate source_peer_id)
in
let* () =
let* sent_succeeded =
conn.write_swap_ack proposed_point proposed_peer_id
|> Result.map_error (fun _ ->
`Couldnt_write_swap_ack "Connection closed")
in
if sent_succeeded then return_unit
else
fail @@ `Couldnt_write_swap_ack "Buffer is full. Message dropped."
in
log (Swap_ack_sent {source = source_peer_id}) ;
return
@@ swap config pool source_peer_id ~connect proposed_peer_id new_point
in
Result.fold
~ok:Fun.id
~error:(function
| `No_swap_candidate source_peer_id ->
Events.(emit no_swap_candidate) source_peer_id
| `Couldnt_find_by_peer
| `Swap_is_disabled | `Couldnt_write_swap_ack _ ->
Lwt.return_unit)
do_swap
let create config conn =
P2p_answerer.
{
message = message conn;
advertise = advertise config conn;
bootstrap = bootstrap config conn;
swap_request = swap_request config conn;
swap_ack = swap_ack config conn;
}
end
let create_default = Default_answerer.create
let create_private () = Private_answerer.create