Source file tcp_connect.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
open! Core
open! Async
let connect_and_handshake'
?time_source
~timeout
~connect
~handshake
~on_handshake_error
()
=
let time_source =
match time_source with
| Some x -> Time_source.read_only x
| None -> Time_source.wall_clock ()
in
Deferred.Or_error.try_with_join ~run:`Now ~rest:`Raise (fun () ->
let finish_handshake_by = Time_float.add (Time_float.now ()) timeout in
let%bind connect_ret = connect time_source in
let timeout = Time_float.diff finish_handshake_by (Time_float.now ()) in
let result = handshake connect_ret in
let return_error err =
let%bind () = on_handshake_error connect_ret in
Deferred.Or_error.fail
(Error.tag err ~tag:"The server logs might have more information.")
in
match%bind
Time_source.with_timeout
time_source
(Time_ns.Span.of_span_float_round_nearest timeout)
result
with
| `Result (Ok res) -> Deferred.Or_error.return res
| `Result (Error error) -> return_error error
| `Timeout ->
return_error
(Error.create_s
[%message "Timed out doing Krb.Rpc handshake" (timeout : Time_float.Span.t)]))
;;
let close_connection_via_reader_and_writer reader writer =
Writer.close writer ~force_close:(Clock.after (sec 30.))
>>= fun () -> Reader.close reader
;;
let connect_and_handshake
?buffer_age_limit
?interrupt
?reader_buffer_size
?writer_buffer_size
?(timeout =
Time_ns.Span.to_span_float_round_nearest
Async_rpc_kernel.Async_rpc_kernel_private.default_handshake_timeout)
?time_source
where_to_connect
~handshake
=
connect_and_handshake'
?time_source
~timeout
~connect:(fun time_source ->
Tcp.connect
?buffer_age_limit
?interrupt
?reader_buffer_size
?writer_buffer_size
~timeout
~time_source
where_to_connect)
~handshake:(fun (socket, tcp_reader, tcp_writer) ->
handshake ~socket ~tcp_reader ~tcp_writer)
~on_handshake_error:(fun (_socket, tcp_reader, tcp_writer) ->
close_connection_via_reader_and_writer tcp_reader tcp_writer)
()
;;
let connect_sock_and_handshake
?interrupt
?(timeout =
Time_ns.Span.to_span_float_round_nearest
Async_rpc_kernel.Async_rpc_kernel_private.default_handshake_timeout)
?time_source
where_to_connect
~handshake
=
let open Deferred.Or_error.Let_syntax in
connect_and_handshake'
?time_source
~timeout
~connect:(fun time_source ->
Tcp.connect_sock ?interrupt ~timeout ~time_source where_to_connect)
~handshake:(fun socket ->
let%bind conn = handshake ~socket in
return (conn, socket))
~on_handshake_error:(fun _socket -> Deferred.return ())
()
;;