1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
open Common_
module Inet_addr = struct
type t = Unix.inet_addr
let any = Unix.inet_addr_any
let loopback = Unix.inet_addr_loopback
let show = Unix.string_of_inet_addr
let of_string s = try Some (Unix.inet_addr_of_string s) with _ -> None
let of_string_exn s =
try Unix.inet_addr_of_string s with _ -> invalid_arg "Inet_addr.of_string"
end
module Sockaddr = struct
type t = Unix.sockaddr
let show = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port
let unix s : t = Unix.ADDR_UNIX s
let inet addr port : t = Unix.ADDR_INET (addr, port)
let inet_parse addr port =
try Some (inet (Unix.inet_addr_of_string addr) port) with _ -> None
let inet_parse_exn addr port =
try inet (Unix.inet_addr_of_string addr) port
with _ -> invalid_arg "Sockadd.inet_parse"
let inet_local port = inet Unix.inet_addr_loopback port
let inet_any port = inet Unix.inet_addr_any port
end
module TCP_server = struct
type t = { fiber: unit Fiber.t } [@@unboxed]
exception Stop
let stop_ fiber =
let ebt = Exn_bt.get Stop in
Fuseau.Fiber.Private_.cancel fiber ebt
let stop self = stop_ self.fiber
let join self = Fuseau.await self.fiber
let with_serve' (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.bind sock addr;
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.listen sock 32;
let fiber = Fuseau.Fiber.Private_.create () in
let self = { fiber } in
let loop_client client_sock client_addr : unit =
Unix.set_nonblock client_sock;
Unix.setsockopt client_sock Unix.TCP_NODELAY true;
let@ () =
Fun.protect ~finally:(fun () ->
try Unix.close client_sock with _ -> ())
in
handle_client client_addr client_sock
in
let loop () =
while not (Fiber.is_done fiber) do
match Unix.accept sock with
| client_sock, client_addr ->
ignore
(Fuseau.spawn ~propagate_cancel_to_parent:false (fun () ->
loop_client client_sock client_addr)
: _ Fiber.t)
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
let loop = U_loop.cur () in
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
ignore
(loop#on_readable sock (fun _ev ->
wakeup ();
Cancel_handle.cancel _ev)
: Cancel_handle.t))
done
in
let loop_fiber =
let sched = Fuseau.get_scheduler () in
Fuseau.spawn_as_child_of ~propagate_cancel_to_parent:true sched fiber loop
in
let finally () =
stop_ loop_fiber;
Unix.close sock
in
let@ () = Fun.protect ~finally in
f self
let with_serve (addr : Sockaddr.t) handle_client (f : t -> 'a) : 'a =
with_serve' addr
(fun client_addr client_sock ->
let ic = IO_unix.In.of_unix_fd client_sock in
let oc = IO_unix.Out.of_unix_fd client_sock in
handle_client client_addr ic oc)
f
end
module TCP_client = struct
let with_connect' addr (f : Unix.file_descr -> 'a) : 'a =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true;
while
try
Unix.connect sock addr;
false
with
| Unix.Unix_error
((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
->
Fuseau.Private_.suspend ~before_suspend:(fun ~wakeup ->
let loop = U_loop.cur () in
ignore
(loop#on_writable sock (fun _ev ->
wakeup ();
Cancel_handle.cancel _ev)
: Cancel_handle.t));
true
do
()
done;
let finally () = try Unix.close sock with _ -> () in
let@ () = Fun.protect ~finally in
f sock
let with_connect addr (f : Iostream.In.t -> Iostream.Out.t -> 'a) : 'a =
with_connect' addr (fun sock ->
let ic = IO_unix.In.of_unix_fd sock in
let oc = IO_unix.Out.of_unix_fd sock in
f ic oc)
end