Source file capnp_rpc_unix.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
open Astring
open Lwt.Infix
module Log = Capnp_rpc.Debug.Log
module Unix_flow = Unix_flow
let () = Mirage_crypto_rng_lwt.initialize (module Mirage_crypto_rng.Fortuna)
type flow = Unix_flow.flow
module CapTP = Vat_network.CapTP
module Vat = Vat_network.Vat
module Network = Network
module Vat_config = Vat_config
module File_store = File_store
module Sturdy_ref = Capnp_rpc_lwt.Sturdy_ref
let error fmt =
fmt |> Fmt.kstr @@ fun msg ->
Error (`Msg msg)
let parse_uri s =
match Uri.of_string s with
| exception ex -> error "Failed to parse URI %S: %a" s Fmt.exn ex
| uri ->
match Network.Address.parse_uri uri with
| Ok _ -> Ok uri
| Error _ as e -> e
module Cap_file = struct
let load_uri path =
try
let ch = open_in_bin path in
let len = in_channel_length ch in
let data = really_input_string ch len in
close_in ch;
parse_uri (String.trim data)
with ex ->
if Sys.file_exists path then
error "Error loading %S: %a" path Fmt.exn ex
else
error "File %S does not exist" path
let load vat path =
match load_uri path with
| Ok uri -> Vat.import vat uri
| Error _ as e -> e
let save_uri uri path =
try
let data = Uri.to_string uri ^ "\n" in
let oc = open_out_gen [Open_wronly; Open_creat; Open_trunc; Open_binary] 0o600 path in
output_string oc data;
close_out oc;
Ok ()
with ex ->
error "Error saving to %S: %a" path Fmt.exn ex
let save_sturdy vat sr path =
save_uri (Vat.export vat sr) path
let save_service vat id path =
let uri = Vat.sturdy_uri vat id in
save_uri uri path
end
let sturdy_uri =
let of_string s =
if String.is_prefix s ~affix:"capnp://" then parse_uri s
else if Sys.file_exists s then Cap_file.load_uri s
else error "Expected a URI starting with \"capnp://\" \
or the path to a file containing such a URI, but got %S." s
in
Cmdliner.Arg.conv (of_string, Uri.pp_hum)
module Console = struct
let messages = ref []
let clear () =
match !messages with
| [] -> ()
| msg :: _ ->
let blank = Stdlib.String.make (String.length msg) ' ' in
Printf.fprintf stderr "\r%s\r%!" blank
let show () =
match !messages with
| [] -> ()
| msg :: _ ->
prerr_string msg;
flush stderr
let with_msg msg f =
clear ();
messages := msg :: !messages;
show ();
Lwt.finalize f
(fun () ->
clear ();
let rec remove_first = function
| [] -> assert false
| x :: xs when x = msg -> xs
| x :: xs -> x :: remove_first xs
in
messages := remove_first !messages;
show ();
Lwt.return_unit
)
end
let addr_of_sr sr =
match Capnp_rpc_net.Capnp_address.parse_uri (Capnp_rpc_lwt.Cast.sturdy_to_raw sr)#to_uri_with_secrets with
| Ok ((addr, _auth), _id) -> addr
| Error (`Msg m) -> failwith m
let rec connect_with_progress ?(mode=`Auto) sr =
let pp = Fmt.using addr_of_sr Capnp_rpc_net.Capnp_address.Location.pp in
match mode with
| `Auto
| `Log ->
let did_log = ref false in
Log.info (fun f -> did_log := true; f "Connecting to %a..." pp sr);
if !did_log then (
Sturdy_ref.connect sr >|= function
| Ok _ as x -> Log.info (fun f -> f "Connected to %a" pp sr); x
| Error _ as e -> e
) else (
if Unix.(isatty stderr) then
connect_with_progress ~mode:`Console sr
else
connect_with_progress ~mode:`Batch sr
)
| `Batch ->
Fmt.epr "Connecting to %a... %!" pp sr;
begin Sturdy_ref.connect sr >|= function
| Ok _ as x -> Fmt.epr "OK@."; x
| Error _ as x -> Fmt.epr "ERROR@."; x
end
| `Console ->
let x = Sturdy_ref.connect sr in
Lwt.choose [Lwt_unix.sleep 0.5; Lwt.map ignore x] >>= fun () ->
if Lwt.is_sleeping x then (
Console.with_msg (Fmt.str "[ connecting to %a ]" pp sr)
(fun () -> x)
) else x
| `Silent -> Sturdy_ref.connect sr
let with_cap_exn ?progress sr f =
connect_with_progress ?mode:progress sr >>= function
| Error ex -> Fmt.failwith "%a" Capnp_rpc.Exception.pp ex
| Ok x -> Capnp_rpc_lwt.Capability.with_ref x f
let handle_connection ?tags ~secret_key vat client =
Lwt.catch (fun () ->
let switch = Lwt_switch.create () in
let raw_flow = Unix_flow.connect ~switch client in
Network.accept_connection ~switch ~secret_key raw_flow >>= function
| Error (`Msg msg) ->
Log.warn (fun f -> f ?tags "Rejecting new connection: %s" msg);
Lwt.return_unit
| Ok ep ->
Vat.add_connection vat ~switch ~mode:`Accept ep >|= fun (_ : CapTP.t) ->
()
)
(fun ex ->
Log.err (fun f -> f "Uncaught exception handling connection: %a" Fmt.exn ex);
Lwt.return_unit
)
let addr_of_host host =
match Unix.gethostbyname host with
| exception Not_found ->
Capnp_rpc.Debug.failf "Unknown host %S" host
| addr ->
if Array.length addr.Unix.h_addr_list = 0 then
Capnp_rpc.Debug.failf "No addresses found for host name %S" host
else
addr.Unix.h_addr_list.(0)
let serve ?switch ?tags ?restore config =
let {Vat_config.backlog; secret_key = _; serve_tls; listen_address; public_address} = config in
let vat =
let auth = Vat_config.auth config in
let secret_key = lazy (fst (Lazy.force config.secret_key)) in
Vat.create ?switch ?tags ?restore ~address:(public_address, auth) ~secret_key ()
in
let socket =
match listen_address with
| `Unix path ->
begin match Unix.lstat path with
| { Unix.st_kind = Unix.S_SOCK; _ } -> Unix.unlink path
| _ -> ()
| exception Unix.Unix_error(Unix.ENOENT, _, _) -> ()
end;
let socket = Unix.(socket PF_UNIX SOCK_STREAM 0) in
Unix.bind socket (Unix.ADDR_UNIX path);
socket
| `TCP (host, port) ->
let socket = Unix.(socket PF_INET SOCK_STREAM 0) in
Unix.setsockopt socket Unix.SO_REUSEADDR true;
Unix.setsockopt socket Unix.SO_KEEPALIVE true;
Keepalive.try_set_idle socket 60;
Unix.bind socket (Unix.ADDR_INET (addr_of_host host, port));
socket
in
Unix.listen socket backlog;
Log.info (fun f -> f ?tags "Waiting for %s connections on %a"
(if serve_tls then "(encrypted)" else "UNENCRYPTED")
Vat_config.Listen_address.pp listen_address);
let lwt_socket = Lwt_unix.of_unix_file_descr socket in
let rec loop () =
Lwt_switch.check switch;
Lwt_unix.accept lwt_socket >>= fun (client, _addr) ->
Log.info (fun f -> f ?tags "Accepting new connection");
let secret_key = if serve_tls then Some (Vat_config.secret_key config) else None in
Lwt.async (fun () -> handle_connection ?tags ~secret_key vat client);
loop ()
in
Lwt.async (fun () ->
Lwt.catch
(fun () ->
let th = loop () in
Lwt_switch.add_hook switch (fun () -> Lwt.cancel th; Lwt.return_unit);
th
)
(function
| Lwt.Canceled -> Lwt.return_unit
| ex -> Lwt.fail ex
)
>>= fun () ->
Lwt_unix.close lwt_socket
);
Lwt.return vat
let client_only_vat ?switch ?tags ?restore () =
let secret_key = lazy (Capnp_rpc_net.Auth.Secret_key.generate ()) in
Vat.create ?switch ?tags ?restore ~secret_key ()
let manpage_capnp_options = Vat_config.docs