123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106openCoreopenAsyncmoduleWorker_id=structletcreate=Uuid_unix.create(* If we do not use the stable sexp serialization, when running
inline tests, we will create UUIDs that fail tests *)moduleT=Uuid.Stable.V1typet=T.t[@@derivingsexp,bin_io]includeComparable.Make_binable(T)includeHashable.Make_binable(T)includeSexpable.To_stringable(T)letppfmtt=String.ppfmt(Sexp.to_string([%sexp_of:t]t))endmoduleWorker_type_id=Unique_id.Int()moduleInternal_connection_state=structtype('worker_state,'conn_state)t1={worker_state:'worker_state;conn_state:'conn_state;worker_id:Worker_id.t}type('worker_state,'conn_state)t=Rpc.Connection.t*('worker_state,'conn_state)t1Set_once.tendlettry_within~monitorf=letivar=Ivar.create()inScheduler.within~monitor(fun()->Monitor.try_with~run:`Now~rest:`Raisef>>>funr->Ivar.fillivar(Result.map_errorr~f:Error.of_exn));Ivar.readivar;;lettry_within_exn~monitorf=match%maptry_within~monitorfwith|Okx->x|Errore->Error.raisee;;(* Use /proc/PID/exe to get the currently running executable.
- argv[0] might have been deleted (this is quite common with jenga)
- `cp /proc/PID/exe dst` works as expected while `cp /proc/self/exe dst` does not *)letour_binary=letour_binary_lazy=lazy(Unix.getpid()|>Pid.to_int|>sprintf"/proc/%d/exe")infun()->Lazy.forceour_binary_lazy;;letour_md5=letour_md5_lazy=lazy(Process.run~prog:"md5sum"~args:[our_binary()]()>>|?funour_md5->letour_md5,_=String.lsplit2_exn~on:' 'our_md5inour_md5)infun()->Lazy.forceour_md5_lazy;;letis_child_env_var="ASYNC_PARALLEL_IS_CHILD_MACHINE"letwhoami()=matchSys.getenvis_child_env_varwith|Some_->`Worker|None->`Master;;letclear_env()=Unix.unsetenvis_child_env_varletvalidate_envenv=matchList.findenv~f:(fun(key,_)->key=is_child_env_var)with|Somee->Or_error.error"Environment variable conflicts with Rpc_parallel machinery"e[%sexp_of:string*string]|None->Ok();;(* Don't run tests in the worker if we are running an expect test. A call to
[Rpc_parallel.For_testing.initialize] will initialize the worker and start the
Async scheduler. *)letforce_drop_inline_test=ifam_running_inline_testthen["FORCE_DROP_INLINE_TEST",""]else[];;letcreate_worker_env~extra=letopenOr_error.Let_syntaxinlet%map()=validate_envextrainextra@force_drop_inline_test@For_testing_internal.worker_environment()@[is_child_env_var,""];;letto_daemon_fd_redirection=function|`Dev_null->`Dev_null|`File_appends->`File_appends|`File_truncates->`File_truncates;;