123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202open!CoreopenAsynctypet=Context.tThrottle.Sequencer.tletthe_t:tLazy_deferred.t=Lazy_deferred.create(fun()->(* Before calling down into libkrb5, we release the OCaml runtime. We do our calls in
another thread, so it is possible for caml_sys_exit to be initiated while we are
still doing Kerberos things. Because libkrb5 is dynamically linked, this can cause
undefined behavior, most often segfaults or assertion failures.
We make sure all outstanding calls (including the [Context.init] call) are
completed before letting Async shutdown complete.
Because [shutdown] only calls the handlers that have been registered at the time
that [shutdown] is called, we have to be careful not to call [Context.init] if we
are already shutting down. *)ifShutdown.is_shutting_down()thenfailwith"Not initializing global Kerberos context because async is already shutting down";Krb_debug.log_s(fun()->[%message"Initializing global Kerberos context"]);letcontext_initialized=Ivar.create()inShutdown.don't_finish_before(Ivar.readcontext_initialized);let%mapt=In_thread.runContext.init>>|(funresult->Ivar.fillcontext_initialized();result)>>|Result.map_error~f:(funcode->letkrb_error=Krb_error.to_string~info:"krb5_init_context"codeinmatchKrb_info.sandbox_tagwith|Sometag->Error.create_s[%message"Failed to initialize global Krb context"~_:(krb_error:string)(code:int32)(tag:Sexp.t)]|None->Error.create_s[%message"Failed to initialize global Krb context"~_:(krb_error:string)(code:int32)])>>|ok_exn>>|Throttle.Sequencer.createinShutdown.at_shutdown(fun()->Throttle.prior_jobs_donet);t);;(* This is the monitor that we run in when calling [Gc.add_finalizer]. [Gc.add_finalizer]
stores a reference to the current monitor. Because finalizers are GC roots, this
prevents the monitor from being GC'd until the finalizer is caller. It isn't too
difficult to get yourself into a situation where the monitor holds a reference to the
thing you are adding a finalizer for. When this happens, the finalizer will never run
and the monitor will never be GC'd.
To make the above a bit more concrete, take a look at
lib/krb/jane/test/bin/finalizer_memory_leak.ml *)letfinalizer_monitor=lazy(letmonitor=Monitor.create~name:"Krb.Context_sequencer"()in(* We have to detach the monitor so it doesn't hold onto a reference to it's parent
monitor (i.e. the current monitor when this lazy is forced). We don't expect any
of the finalizers to raise, nor do we really have anything useful to do with the
exception, so we just ignore it. *)Monitor.detach_and_iter_errorsmonitor~f:(ignore:exn->unit);monitor);;(* This will raise if [f] raises or if forcing [the_t] raises. The latter can happen if
you are in the kerberos sandbox. *)letenqueue_job_internal_exn~f=Lazy_deferred.force_exnthe_t>>=funt->Throttle.enqueuet(func->fc);;letenqueue_job_internal_krb_result~f=match%bindLazy_deferred.forcethe_twith|Errorerror->Deferred.Result.fail(`Raisederror)|Okt->(match%bindThrottle.enqueue't(func->fc)with|`Aborted->assertfalse(* We don't call [Throttle.abort] *)|`Raisedexn->Deferred.Result.fail(`Raised(Error.of_exnexn))|`Ok(Okres)->return(Okres)|`Ok(Errorcode)->Deferred.Result.fail(`Krb_errorcode));;letenqueue_job_exn~f=enqueue_job_internal_exn~f:(func->In_thread.run(fun()->fc));;letgen_error_msg~enqueue~(info:_Krb_info.t)code=let%bindkrb_error=enqueue~f:(funcontext->Krb_error.to_string~context~info:info.function_code)inmatch%mapKrb_info.tagsinfocodewith|None->Error.create_s[%message""~_:(krb_error:string)]|Sometags->Error.create_s[%message""~_:(krb_error:string)~_:(tags:Sexp.t)];;letdebug_before_job~(info:_Krb_info.t)~is_blocking()=Krb_debug.log_s(fun()->matchinfo.tag_argumentswith|None->[%message"Calling Kerberos function"~info:(info.function_:string)(is_blocking:bool)]|Sometags->lettags=Lazy.forcetagsin[%message"Calling Kerberos function"~info:(info.function_:string)(is_blocking:bool)(tags:Sexp.t)]);;letdebug_after_job~(info:'aKrb_info.t)result=Krb_debug.log_s(fun()->lettags=matchresult,info.tag_resultwith|Okresult,Someget_tags->Some(get_tagsresult)|Ok_,None->None|Errorerror,_->Some([%sexp_of:Error.t]error)inmatchtagswith|None->[%message"Called Kerberos function"~info:(info.function_:string)]|Sometags->[%message"Called Kerberos function"~info:(info.function_:string)(tags:Sexp.t)]);;letenqueue_job_with_info_aux~info~error_msg~is_blocking~f=debug_before_job~info~is_blocking();match%bindenqueue_job_internal_krb_result~fwith|Okresult->debug_after_job~info(Okresult);return(Okresult)|Error(`Raised_asraised)->Deferred.Result.failraised|Error(`Krb_errorcode)->let%binderror=error_msg~infocodeindebug_after_job~info(Errorerror);Deferred.Result.fail(`Krb_error(error,code));;leterror_msg_non_blocking~infocode=gen_error_msg~enqueue:enqueue_job_exn~infocodeletenqueue_job_with_info'~info~f=enqueue_job_with_info_aux~info~error_msg:error_msg_non_blocking~is_blocking:false~f:(func->In_thread.run(fun()->fc));;letenqueue_job_with_info~info~f=enqueue_job_with_info'~info~f|>Deferred.Result.map_error~f:(function|`Raisederror->error|`Krb_error(error,_code)->error);;letadd_finalizerarg~f:finalize=Scheduler.within~monitor:(forcefinalizer_monitor)(fun()->Gc.add_finalizer_exnarg(funarg->don't_wait_for(enqueue_job_exn~f:(func->finalizecarg))));;moduleExpert=structletenqueue_job_blocking_exn~f=enqueue_job_internal_exn~f:(func->return(fc))leterror_msg_blocking~infocode=gen_error_msg~enqueue:enqueue_job_blocking_exn~infocode;;letenqueue_job_with_info_blocking~info~f=enqueue_job_with_info_aux~info~error_msg:error_msg_blocking~is_blocking:true~f:(func->return(fc))|>Deferred.Result.map_error~f:(function|`Raisederror->error|`Krb_error(error,_code)->error);;end(* Payloads up to this threshold will be encrypted on the main thread,
blocking Async. Anything above this threshold will be encrypted on a separate thread.
This was chosen somewhat arbitrarily based on benchmark results. An
encryption/decryption roundtrip for a 1MB payload is around 60ms. *)letthreshold_for_blocking_encryption=1024*1024letenqueue_blocking_if_below_encryption_size_threshold~data_size=ifdata_size<=threshold_for_blocking_encryptionthenExpert.enqueue_job_with_info_blockingelseenqueue_job_with_info;;