1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
open Stdune
open Dune_util
open Result.O
open Cache_intf
type t =
{ socket : out_channel
; fd : Unix.file_descr
; input : in_channel
; cache : Local.t
; thread : Thread.t
; finally : (unit -> unit) option
; version : Messages.version
}
let versions_supported_by_dune : Messages.version list =
[ { major = 1; minor = 1 } ]
let err msg = User_error.E (User_error.make [ Pp.text msg ])
let errf msg = User_error.E (User_error.make msg)
let read version input =
let* sexp = Csexp.input input in
let+ (Dedup v) = Messages.incoming_message_of_sexp version sexp in
Dedup v
let make ?finally ?duplication_mode ~command_handler () =
let () = Sys.set_signal Sys.sigpipe Sys.Signal_ignore in
let* cache =
Result.map_error ~f:err
(Local.make ?duplication_mode ~command_handler:ignore ())
in
let* port =
let cmd =
Format.sprintf "%s cache start --display progress --exit-no-client"
Sys.executable_name
and f stdout =
match Io.input_lines stdout with
| [] -> Result.Error (err "empty output starting cache")
| [ line ] -> Result.Ok line
| _ -> Result.Error (err "unrecognized output starting cache")
and finally stdout = ignore (Unix.close_process_in stdout) in
Exn.protectx (Unix.open_process_in cmd) ~finally ~f
in
let* addr, port =
match String.split_on_char ~sep:':' port with
| [ addr; port ] -> (
match Int.of_string port with
| Some i -> (
try Result.Ok (Unix.inet_addr_of_string addr, i)
with Failure _ ->
Result.Error (errf [ Pp.textf "invalid address: %s" addr ]) )
| None -> Result.Error (errf [ Pp.textf "invalid port: %s" port ]) )
| _ -> Result.Error (errf [ Pp.textf "invalid endpoint: %s" port ])
in
let fd = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
let* _ =
Result.try_with (fun () -> Unix.connect fd (Unix.ADDR_INET (addr, port)))
in
let socket = Unix.out_channel_of_descr fd in
let input = Unix.in_channel_of_descr fd in
let+ version =
Result.map_error ~f:err
(Messages.negotiate_version ~versions_supported_by_dune fd input socket)
in
Log.info
[ Pp.textf "negotiated version: %s" (Messages.string_of_version version) ];
let rec thread input =
match
let+ command = read version input in
Log.info
[ (let open Pp.O in
Pp.text "dune-cache command: " ++ Dyn.pp (command_to_dyn command))
];
command_handler command
with
| Result.Error e ->
Log.info [ Pp.textf "dune-cache read error: %s" e ];
Option.iter ~f:(fun f -> f ()) finally
| Result.Ok () -> (thread [@tailcall]) input
in
let thread = Thread.create thread input in
{ socket; fd; input; cache; thread; finally; version }
let with_repositories client repositories =
Messages.send client.version client.socket (SetRepos repositories);
client
let promote (client : t) files key metadata ~repository ~duplication =
let duplication =
Some
(Option.value ~default:(Local.duplication_mode client.cache) duplication)
in
try
Messages.send client.version client.socket
(Promote { key; files; metadata; repository; duplication });
Result.Ok ()
with Sys_error _ ->
Result.Error "lost connection to cache daemon"
let set_build_dir client path =
Messages.send client.version client.socket (SetBuildRoot path);
client
let search client key = Local.search client.cache key
let retrieve client file = Local.retrieve client.cache file
let deduplicate client file = Local.deduplicate client.cache file
let teardown client =
( try Unix.shutdown client.fd Unix.SHUTDOWN_SEND
with Unix.Unix_error (Unix.ENOTCONN, _, _) -> () );
Thread.join client.thread;
Local.teardown client.cache