Source file p2p_protocol.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
module Events = P2p_events.P2p_protocol
type ('msg, 'peer, 'conn) config = {
swap_linger : Time.System.Span.t;
pool : ('msg, 'peer, 'conn) P2p_pool.t;
log : P2p_connection.P2p_event.t -> unit;
connect : P2p_point.Id.t -> ('msg, 'peer, 'conn) P2p_conn.t tzresult Lwt.t;
mutable latest_accepted_swap : Time.System.t;
mutable latest_successful_swap : Time.System.t;
}
open P2p_answerer
let message conn _request size msg =
Lwt_pipe.Maybe_bounded.push conn.messages (size, msg)
module Private_answerer = struct
let advertise conn _request _points =
Events.(emit private_node_new_peers) conn.peer_id
let bootstrap conn _request =
Lwt_result.ok @@ Events.(emit private_node_peers_request) conn.peer_id
let swap_request conn _request _new_point _peer =
Events.(emit private_node_swap_request) conn.peer_id
let swap_ack conn _request _point _peer_id =
Events.(emit private_node_swap_ack) conn.peer_id
let create conn =
P2p_answerer.
{
message = message conn;
advertise = advertise conn;
bootstrap = bootstrap conn;
swap_request = swap_request conn;
swap_ack = swap_ack conn;
}
end
module Default_answerer = struct
open P2p_connection.P2p_event
let advertise config conn _request points =
let log = config.log in
let source_peer_id = conn.peer_id in
log (Advertise_received {source = source_peer_id}) ;
P2p_pool.register_list_of_new_points
~medium:"advertise"
~source:conn.peer_id
config.pool
points
let bootstrap config conn _request_info =
let open Lwt_result_syntax in
let log = config.log in
let source_peer_id = conn.peer_id in
log (Bootstrap_received {source = source_peer_id}) ;
if conn.is_private then
let*! () = Events.(emit private_node_request) conn.peer_id in
return_unit
else
let*! points =
P2p_pool.list_known_points ~ignore_private:true config.pool
in
match points with
| [] -> return_unit
| points -> (
match conn.write_advertise points with
| Ok true ->
log (Advertise_sent {source = source_peer_id}) ;
return_unit
| Ok false ->
return_unit
| Error err as error ->
let*! () =
Events.(emit advertise_sending_failed) (source_peer_id, err)
in
Lwt.return error)
let swap t pool source_peer_id ~connect current_peer_id new_point =
let open Lwt_syntax in
t.latest_accepted_swap <- Time.System.now () ;
let* r = connect new_point in
match r with
| Ok _new_conn -> (
t.latest_successful_swap <- Time.System.now () ;
t.log (Swap_success {source = source_peer_id}) ;
let* () = Events.(emit swap_succeeded) new_point in
match P2p_pool.Connection.find_by_peer_id pool current_peer_id with
| None -> Lwt.return_unit
| Some conn -> P2p_conn.disconnect conn)
| Error err -> (
t.latest_accepted_swap <- t.latest_successful_swap ;
t.log (Swap_failure {source = source_peer_id}) ;
match err with
| [Timeout] -> Events.(emit swap_interrupted) (new_point, err)
| _ -> Events.(emit swap_failed) (new_point, err))
let swap_ack config conn request new_point _peer =
let open Lwt_syntax in
let source_peer_id = conn.peer_id in
let pool = config.pool in
let connect = config.connect in
let log = config.log in
log (Swap_ack_received {source = source_peer_id}) ;
let* () = Events.(emit swap_ack_received) source_peer_id in
match request.last_sent_swap_request with
| None -> Lwt.return_unit
| Some (_time, proposed_peer_id) -> (
match P2p_pool.Connection.find_by_peer_id pool proposed_peer_id with
| None ->
swap config pool source_peer_id ~connect proposed_peer_id new_point
| Some _ -> Lwt.return_unit)
let swap_request config conn _request new_point _peer =
let open Lwt_syntax in
let source_peer_id = conn.peer_id in
let pool = config.pool in
let swap_linger = config.swap_linger in
let connect = config.connect in
let log = config.log in
log (Swap_request_received {source = source_peer_id}) ;
let* () = Events.(emit swap_request_received) source_peer_id in
let span_since_last_swap =
Ptime.diff
(Time.System.now ())
(Time.System.max
config.latest_successful_swap
config.latest_accepted_swap)
in
let new_point_info = P2p_pool.register_point pool new_point in
if
Ptime.Span.compare span_since_last_swap swap_linger < 0
|| not (P2p_point_state.is_disconnected new_point_info)
then (
log (Swap_request_ignored {source = source_peer_id}) ;
Events.(emit swap_request_ignored) source_peer_id)
else
match P2p_pool.Connection.random_addr pool ~no_private:true with
| None -> Events.(emit no_swap_candidate) source_peer_id
| Some (proposed_point, proposed_peer_id) -> (
match conn.write_swap_ack proposed_point proposed_peer_id with
| Ok true ->
log (Swap_ack_sent {source = source_peer_id}) ;
swap
config
pool
source_peer_id
~connect
proposed_peer_id
new_point
| Ok false | Error _ -> Lwt.return_unit)
let create config conn =
P2p_answerer.
{
message = message conn;
advertise = advertise config conn;
bootstrap = bootstrap config conn;
swap_request = swap_request config conn;
swap_ack = swap_ack config conn;
}
end
let create_default = Default_answerer.create
let create_private () = Private_answerer.create