Source file tcp_channel.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
open! Core
open! Async
let close_channels reader writer =
let%bind () = Output_channel.close writer in
Input_channel.close reader
;;
let collect_errors writer fn =
let monitor = Output_channel.monitor writer in
Monitor.detach monitor;
choose
[ choice (Monitor.get_next_error monitor) (fun e -> Error e)
; choice (Monitor.try_with ~run:`Now ~rest:`Log fn) Fn.id
]
;;
let listen
?max_connections
?max_accepts_per_batch
?backlog
?socket
?max_buffer_size
?buf_len
?write_timeout
?time_source
~on_handler_error
where_to_listen
handler
=
Tcp.Server.create_sock
?max_connections
?max_accepts_per_batch
?backlog
?socket
?time_source
~on_handler_error
where_to_listen
(fun addr socket ->
let fd = Socket.fd socket in
let input_channel = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in
let output_channel =
Output_channel.create ?max_buffer_size ?buf_len ?write_timeout ?time_source fd
in
let%bind res =
Deferred.any
[ collect_errors output_channel (fun () ->
handler addr input_channel output_channel)
; Output_channel.remote_closed output_channel |> Deferred.ok
]
in
let%bind () = close_channels input_channel output_channel in
match res with
| Ok () -> Deferred.unit
| Error exn -> raise exn)
;;
let listen_inet
?max_connections
?max_accepts_per_batch
?backlog
?socket
?max_buffer_size
?buf_len
?write_timeout
?time_source
~on_handler_error
where_to_listen
handler
=
Tcp.Server.create_sock_inet
?max_connections
?max_accepts_per_batch
?backlog
?socket
?time_source
~on_handler_error
where_to_listen
(fun addr socket ->
let fd = Socket.fd socket in
let input_channel = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in
let output_channel =
Output_channel.create ?max_buffer_size ?buf_len ?write_timeout ?time_source fd
in
let%bind res =
Deferred.any
[ collect_errors output_channel (fun () ->
handler addr input_channel output_channel)
; Output_channel.remote_closed output_channel |> Deferred.ok
]
in
let%bind () = close_channels input_channel output_channel in
match res with
| Ok () -> Deferred.unit
| Error exn -> raise exn)
;;
let with_connection
?interrupt
?connect_timeout
?max_buffer_size
?buf_len
?write_timeout
?time_source
where_to_connect
f
=
let%bind socket =
Tcp.connect_sock ?interrupt ?timeout:connect_timeout ?time_source where_to_connect
in
let fd = Socket.fd socket in
let input_channel = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in
let output_channel =
Output_channel.create ?max_buffer_size ?buf_len ?time_source ?write_timeout fd
in
let res = collect_errors output_channel (fun () -> f input_channel output_channel) in
let%bind () =
Deferred.any_unit
[ (res >>| fun _ -> ())
; Output_channel.close_finished output_channel
; Input_channel.closed input_channel
]
in
let%bind () = close_channels input_channel output_channel in
match%map res with
| Ok v -> v
| Error exn ->
Exn.reraise exn "Shuttle.Connection: Unhandled exception in TCP client connection"
;;
let connect
?interrupt
?connect_timeout
?max_buffer_size
?buf_len
?write_timeout
?time_source
where_to_connect
=
let%map socket =
Tcp.connect_sock ?interrupt ?timeout:connect_timeout ?time_source where_to_connect
in
let fd = Socket.fd socket in
let reader = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in
let writer =
Output_channel.create ?max_buffer_size ?buf_len ?time_source ?write_timeout fd
in
reader, writer
;;