1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
open Core
open Poly
open Async
module Worker_id = struct
let create = Uuid_unix.create
module T = Uuid.Stable.V1
type t = T.t [@@deriving sexp, bin_io]
include Comparable.Make_binable (T)
include Hashable.Make_binable (T)
include Sexpable.To_stringable (T)
let pp fmt t = String.pp fmt (Sexp.to_string ([%sexp_of: t] t))
end
module Worker_type_id = Unique_id.Int ()
module Internal_connection_state = struct
type ('worker_state, 'conn_state) t1 =
{ worker_state : 'worker_state
; conn_state : 'conn_state
; worker_id : Worker_id.t
}
type ('worker_state, 'conn_state) t =
Rpc.Connection.t * ('worker_state, 'conn_state) t1 Set_once.t
end
let try_within ~monitor f =
let ivar = Ivar.create () in
Scheduler.within ~monitor (fun () ->
Monitor.try_with ~run:`Now ~rest:`Raise f
>>> fun r -> Ivar.fill_exn ivar (Result.map_error r ~f:Error.of_exn));
Ivar.read ivar
;;
let try_within_exn ~monitor f =
match%map try_within ~monitor f with
| Ok x -> x
| Error e -> Error.raise e
;;
let our_binary =
let our_binary_lazy = lazy (Unix.getpid () |> Pid.to_int |> sprintf "/proc/%d/exe") in
fun () -> Lazy.force our_binary_lazy
;;
let our_md5 =
let our_md5_lazy =
lazy
(Process.run ~prog:"md5sum" ~args:[ our_binary () ] ()
>>|? fun our_md5 ->
let our_md5, _ = String.lsplit2_exn ~on:' ' our_md5 in
our_md5)
in
fun () -> Lazy.force our_md5_lazy
;;
let is_child_env_var = "ASYNC_PARALLEL_IS_CHILD_MACHINE"
let whoami () =
match Sys.getenv is_child_env_var with
| Some _ -> `Worker
| None -> `Master
;;
let clear_env () = Unix.unsetenv is_child_env_var
let validate_env env =
match List.find env ~f:(fun (key, _) -> key = is_child_env_var) with
| Some e ->
Or_error.error
"Environment variable conflicts with Rpc_parallel machinery"
e
[%sexp_of: string * string]
| None -> Ok ()
;;
let force_drop_inline_test =
if Core.am_running_test then [ "FORCE_DROP_INLINE_TEST", "" ] else []
;;
let create_worker_env ~ =
let open Or_error.Let_syntax in
let%map () = validate_env extra in
extra
@ force_drop_inline_test
@ For_testing_internal.worker_environment ()
@ [ is_child_env_var, "" ]
;;
let to_daemon_fd_redirection = function
| `Dev_null -> `Dev_null
| `File_append s -> `File_append s
| `File_truncate s -> `File_truncate s
;;