123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2020 Nomadic Labs <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openLwt.Infix(* Forward compatibility with lwt.5.4.0 *)letdont_waitfh=letp=Lwt.applyf()inLwt.on_failureph(* 1. clean-up callback registration/unregistration *)(* Identifiers are used for unregistering clean-up callbacks *)typeclean_up_callback_id=intletclean_up_callback_id_counter=refmin_intletnew_clean_up_callback_id()=incrclean_up_callback_id_counter;!clean_up_callback_id_counter(* clean-up callbacks are stored in a reference to a map *)moduleCallbacks_map=Map.Make(Int)typecallback={callback:int->unitLwt.t;after:clean_up_callback_idlist;loc:string;}letclean_up_callbacks:callbackCallbacks_map.tref=refCallbacks_map.empty(* adding and removing clean-up callbacks affects the global reference map *)letregister_clean_up_callback?(after=[])~loccallback=letid=new_clean_up_callback_id()inletcallback={callback;after;loc}inclean_up_callbacks:=Callbacks_map.addidcallback!clean_up_callbacks;idletunregister_clean_up_callbackid=clean_up_callbacks:=Callbacks_map.removeid!clean_up_callbacks(* 2. clean-up *)(* cleaning-up is just calling all the clean-up callbacks, note that the
function is not exported: it cannot be called directly, it can only be
triggered as a side effect to calling [exit_and_raise] or [exit_and_wait]
Returns a seq of clean-up promises along with their identifiers. *)letclean_upstatus=(* NOTE: [to_seq] iterates in increasing order of keys *)letcallbacks=Callbacks_map.to_seq!clean_up_callbacksinclean_up_callbacks:=Callbacks_map.empty;letpromises:(string*unitLwt.t)Callbacks_map.t=Seq.fold_left(funpromises(id,{callback;after;loc})->letpre=matchafterwith|[]->Lwt.return_unit|_::_asafter->(Callbacks_map.to_seqpromises|>Seq.filter_map(fun(id,(_,p))->ifList.memidafterthenSomepelseNone)|>List.of_seq|>function|[]->(* This can happen if all after-callbacks were unregistered *)Lwt.return_unit|[p]->p|_::_::_asps->Lwt.joinps)inletpromise=pre>>=fun()->callbackstatusinLwt.on_failurepromise(funexc->Format.eprintf"(%s) Exit: uncaught exception during clean-up (%s): %s\n%!"Sys.executable_nameloc(Printexc.to_stringexc));Callbacks_map.addid(loc,promise)promises)Callbacks_map.emptycallbacksinSeq.mapsnd@@Callbacks_map.to_seqpromises(* 3. synchronisation primitives *)(* [clean_up_starts] an exported promise that resolves when the clean-up starts.
[start_clean_up] a non-exported resolver for the promise.
Note that the promise is not cancelable and we never pass an exception to the
resolver. Consequently, the promise cannot be rejected. *)let(clean_up_starts_internal,start_clean_up)=Lwt.wait()(* [clean_up_starts] is exported with a delay to ensure that [wrap_and_*]
function witness the start of the cleaning up before users of the library. *)letclean_up_starts=Lwt.no_cancel(clean_up_starts_internal>>=funv->Lwt.pause()>>=fun()->Lwt.returnv)letclean_up_promises=clean_up_starts_internal>>=funstatus->Lwt.return@@clean_upstatus(* [clean_up_ends] is a promise that resolves once the clean-up is finished. *)letclean_up_ends=clean_up_starts_internal>>=funstatus->clean_up_promises>>=funpromises->Lwt.join@@List.of_seq@@Seq.map(fun(_,promise)->Lwt.try_bind(fun()->promise)(fun()->Lwt.return_unit)(fun_->Lwt.return_unit))promises>>=fun()->Lwt.returnstatus(* 4. exiting *)(* simple exit is not exported, it is just to factor out exiting *)letexitn=matchLwt.stateclean_up_starts_internalwith|Sleep->Lwt.wakeupstart_clean_upn|Return_->()|Fail_->(* Remember [clean_up_starts_internal] cannot be rejected. *)assertfalse(* [exit_and_raise] is meant to be used deep inside the program after having
witnessed, say, a fatal error. It raises an exception so that it can be used
anywhere in the program. *)letexit_and_raisen=exitn;raiseExit(* [exit_and_wait] is meant to be used near the main invocation of the program,
right inside of [Lwt_main.run] but presumably after [wrap_and_error]. *)letexit_and_waitn=exitn;clean_up_ends(* exit codes *)letincomplete_clean_up_mask=128letsignal_exit_code=127letuncaught_exception_exit_code=126letmask_code_bc_incomplete_clean_upcode=codelorincomplete_clean_up_maskletmask_code_if_incomplete_clean_up~complete:all_finecode=ifall_finethencodeelsemask_code_bc_incomplete_clean_upcode(* 5. signals *)typesignal_setup={soft:(int*string)list;hard:(int*string)list}(** Known signals and their names *)letall_signal_names=letopenSysin[(sigabrt,"ABRT");(sigalrm,"ALRM");(sigfpe,"FPE");(sighup,"HUP");(sigill,"ILL");(sigint,"INT");(sigkill,"KILL");(sigpipe,"PIPE");(sigquit,"QUIT");(sigsegv,"SEGV");(sigterm,"TERM");(sigusr1,"USR1");(sigusr2,"USR2");(sigchld,"CHLD");(sigcont,"CONT");(sigstop,"STOP");(sigtstp,"TSTP");(sigttin,"TTIN");(sigttou,"TTOU");(sigvtalrm,"VTALRM");(sigprof,"PROF");(sigbus,"BUS");(sigpoll,"POLL");(sigsys,"SYS");(sigtrap,"TRAP");(sigurg,"URG");(sigxcpu,"XCPU");(sigxfsz,"XFSZ");](** recovering the name of a signal *)letsignal_namesignal=matchList.assoc_optsignalall_signal_nameswith|Somename->name|None->Format.asprintf"%d"signalletmake_signal_setup~soft~hard=tryletsoft=List.map(funsignal->(signal,signal_namesignal))softinlethard=List.map(funsignal->(signal,signal_namesignal))hardin{soft;hard}withNot_found->raise(Invalid_argument"Lwt_exit.make_signal_setup")letdefault_signal_setup=make_signal_setup~soft:[Sys.sigint;Sys.sigterm]~hard:[]letsleep_spans=Lwt_unix.sleep(Ptime.Span.to_float_ss)letset_already_received_oncedouble_signal_safetyalready_received_oncename=ifPtime.Span.(equaldouble_signal_safetyzero)then(Format.eprintf"(%s) %s: send signal again to force-quit.\n%!"Sys.executable_namename;already_received_once:=true)elsedont_wait(fun()->(* Wait one second for safety, then set force-quitting *)sleep_spandouble_signal_safety>>=fun()->Format.eprintf"(%s) %s: send signal again to force-quit.\n%!"Sys.executable_namename;already_received_once:=true;Lwt.return_unit)(fun_exc->assertfalse)letdefault_double_signal_safety=Option.get@@Ptime.Span.of_float_s1.0(* soft handling: trigger an exit on first signal, immediately terminate
process on second signal *)letset_soft_handler?(double_signal_safety=default_double_signal_safety)signalname=letalready_received_once=reffalseinLwt_unix.on_signalsignal(fun_signal->if!already_received_oncethen(Format.eprintf"(%s) %s: signal received again, forcing immediate termination.\n%!"Sys.executable_namename;Stdlib.exit(mask_code_bc_incomplete_clean_upsignal_exit_code))elsematchLwt.stateclean_up_starts_internalwith|Sleep->Format.eprintf"(%s) %s: triggering shutdown.\n%!"Sys.executable_namename;exitsignal_exit_code;set_already_received_oncedouble_signal_safetyalready_received_oncename|Return_->Format.eprintf"(%s) %s: already in shutdown.\n%!"Sys.executable_namename;set_already_received_oncedouble_signal_safetyalready_received_oncename|Fail_->(* Remember [clean_up_starts_internal] cannot be rejected. *)assertfalse)(* hard handling: immediately terminate process *)letset_hard_handlersignalname=Lwt_unix.on_signalsignal(fun_signal->Format.eprintf"(%s) %s: force-quiting.\n%!"Sys.executable_namename;Stdlib.exit(mask_code_bc_incomplete_clean_upsignal_exit_code))letsetup_signal_handlers?double_signal_safetysignal_setup=letsoft_handler_ids=List.fold_left(funacc(signal,name)->set_soft_handler?double_signal_safetysignalname::acc)[]signal_setup.softinletall_handler_ids=List.fold_left(funacc(signal,name)->set_hard_handlersignalname::acc)soft_handler_idssignal_setup.hardinall_handler_idsletunset_handlers=List.iterLwt_unix.disable_signal_handler(* 6. internal synchronisation *)letwait_for_clean_upmax_clean_up_time=(matchLwt.stateclean_up_starts_internalwith|Return_->()|Fail_->(* Remember [clean_up_starts_internal] cannot be rejected. *)assertfalse|Sleep->(* We only call this function after the clean-up has started, and we do
not export the function *)assertfalse);Lwt.pause()>>=fun()->(matchLwt.stateclean_up_promiseswith|Return_->()|Fail_->(* the promises are a promise that cannot be rejected *)assertfalse|Sleep->(* One tick after the clean-up has started, all the promises have been
collected. *)assertfalse);(matchLwt.stateclean_up_endswith|Fail_->assertfalse|Return_->(* This happens when there are no callbacks registered: the clean-up is
immediate. *)Lwt.return_unit|Sleep->(matchmax_clean_up_timewith|None->(* without timeout: just wait *)clean_up_ends>>=fun_->Lwt.return_unit|Somes->(* with timeout: pick first to finish *)Lwt.choose[(clean_up_ends>>=fun_->Lwt.return_unit);sleep_spans]))(* pause in case timeout and clean-up needs to deal with cancellation *)>>=Lwt.pause>|=fun()->matchLwt.stateclean_up_promiseswith|Lwt.Sleep->(* we have already asserted this earlier in the function *)assertfalse|Lwt.Fail_->(* we have already asserted this earlier in the function *)assertfalse|Lwt.Returnpromises->(* check (and log) whether all clean-up is done successfully *)Seq.fold_left(funall_fine(id,promise)->matchLwt.statepromisewith|Lwt.Sleep->(* if a promise has not been given enough time to resolve, then it
means it was interupted by a timeout: [max_clean_up_time] *)assert(max_clean_up_time<>None);Format.eprintf"(%s) Exit: timeout, clean-up callback not terminated (%s)\n%!"Sys.executable_nameid;false|Lwt.Failexc->Format.eprintf"(%s) Exit: clean-up callback failed (%s): %s\n%!"Sys.executable_nameid(Printexc.to_stringexc);false|Lwt.Return()->all_fine)truepromises(* 7. main interface: wrapping promises *)(* take a promise and wrap it in `Ok` but also watch for exiting and wrap that
in `Error` *)letwrap_and_error?(signal_setup=default_signal_setup)?double_signal_safety?max_clean_up_timep=(matchLwt.stateclean_up_starts_internalwith|Lwt.Fail_->(* Remember [clean_up_starts_internal] cannot be rejected. *)assertfalse|Lwt.Return_->raise(Invalid_argument"Lwt_exit.wrap")|Lwt.Sleep->());matchLwt.statepwith|Lwt.Fail_|Lwt.Return_->p>>=Lwt.return_ok|Lwt.Sleep->lethandler_ids=setup_signal_handlers?double_signal_safetysignal_setupinLwt.try_bind(fun()->(* Watch out for both [p] and the start of clean-up *)Lwt.choose[p>>=Lwt.return_ok;clean_up_starts_internal>>=Lwt.return_error;])(function|Okv->(* In this branch, the [Ok] indicates that [p] was resolved before
[clean_up_starts_internal]. As a result,
[clean_up_starts_internal] must still be pending.
It is only possible for two promises to resolve simultaneously
if they are physically equal, if one is a proxy for the other,
or some other similar situation. Because
[clean_up_starts_internal] is not exported, this is not
possible. *)assert(Lwt.stateclean_up_starts_internal=Lwt.Sleep);unset_handlershandler_ids;Lwt.return(Okv)|Errorstatus->(* Conversely to the previous comment: when
[clean_up_starts_internal] resolves first, then [p] cannot have
resolved yet. *)assert(Lwt.stateclean_up_starts_internal=Lwt.Returnstatus);Lwt.cancelp;wait_for_clean_upmax_clean_up_time>>=funcomplete->unset_handlershandler_ids;letstatus=mask_code_if_incomplete_clean_up~completestatusinLwt.return(Errorstatus))(function|Exit->((* When [Exit] bubbles from the wrapped promise, maybe it called
[exit_and_raise] *)Lwt.pause()>>=fun()->matchLwt.stateclean_up_starts_internalwith|Returnstatus->wait_for_clean_upmax_clean_up_time>>=funcomplete->unset_handlershandler_ids;letstatus=mask_code_if_incomplete_clean_up~completestatusinLwt.return(Errorstatus)|Fail_->(* Remember [clean_up_starts_internal] cannot be rejected. *)assertfalse|Sleep->exituncaught_exception_exit_code;Format.eprintf"(%s) Exit: exit because of uncaught exception: %s\n%!"Sys.executable_name(Printexc.to_stringExit);wait_for_clean_upmax_clean_up_time>>=funcomplete->unset_handlershandler_ids;letstatus=mask_code_if_incomplete_clean_up~completeuncaught_exception_exit_codeinLwt.return(Errorstatus))|exc->exituncaught_exception_exit_code;Format.eprintf"(%s) Exit: exit because of uncaught exception: %s\n%!"Sys.executable_name(Printexc.to_stringexc);wait_for_clean_upmax_clean_up_time>>=funcomplete->unset_handlershandler_ids;letstatus=mask_code_if_incomplete_clean_up~completeuncaught_exception_exit_codeinLwt.return(Errorstatus))(* same but exit on error *)letwrap_and_exit?signal_setup?double_signal_safety?max_clean_up_timep=wrap_and_error?max_clean_up_time?double_signal_safety?signal_setupp>>=functionOkv->Lwt.returnv|Errorstatus->Stdlib.exitstatus(* same but just return exit status *)letwrap_and_forward?signal_setup?double_signal_safety?max_clean_up_timep=wrap_and_error?max_clean_up_time?double_signal_safety?signal_setupp>>=functionOkv->Lwt.returnv|Errorstatus->Lwt.returnstatus