123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235open!Coreopen!Importmodule_=Io_uring_raw_null[%%import"io_uring_config.h"][%%ifdefJSC_IO_URING]moduleUring=Ocaml_uring.UringmoduleInt63=Optint.Int63modulePoll_mask=Uring.Poll_maskmoduleClock=structtypet=|Boottime|Realtimeletto_uring_clockt=matchtwith|Boottime->Uring.Boottime|Realtime->Uring.Realtime;;endmoduleStatx=Uring.StatxmoduleOpen_flags=Uring.Open_flagsmoduleResolve=Uring.ResolvemoduleSyscall_result=structtypet=(int,Unix.Error.t)Result.t[@@derivingsexp_of]endtypet=Syscall_result.tIvar.tUring.tmoduleStatus=structtypet=|To_prepare|Prepared_or_finished|Cancel_preparedof(Syscall_result.tIvar.t[@sexp.opaque])|Cancelled_early[@@derivingsexp_of]endmoduleHandle=structtypet={result:Syscall_result.tIvar.t;mutablejob:Syscall_result.tIvar.tOcaml_uring.Uring.joboption;mutablestatus:Status.t}[@@derivingfields~getters]letinvariant(t:t)=trymatcht.statuswith|To_prepare->assert(Ivar.is_emptyt.result);assert(Option.is_nonet.job)|Prepared_or_finished|Cancel_prepared_->assert(Option.is_somet.job)|Cancelled_early->assert(Ivar.is_fullt.result)with|exn->raise_s[%message"Io_uring_raw.Status.invariant failed"(exn:exn)];;letset_jobtnew_job=t.job<-new_jobletset_statustnew_status=t.status<-new_statusendletsupports_opsprobe=List.for_all~f:(funop->Uring.op_supportedprobeop)Uring.Op.[nop;read;write;readv;writev;poll_add;openat2;close;linkat;unlinkat;timeout;statx;async_cancel];;letcreate?polling_timeout~queue_depth()=leturing=Uring.create?polling_timeout~queue_depth()inletprobe=Uring.get_probeuringinifsupports_opsprobethenOkuringelse(Uring.exituring;error_s[%sexp"The underlying kernel does not support all the io_uring operations needed"]);;letexit=Uring.exitletsupports_ext_arg=Uring.supports_ext_argletregister_eventfd=Uring.register_eventfdletsubmitt=Uring.submittletcqe_readyt=Uring.cqe_readytletreciter_completions_internalio_uring~fcount=matchUring.get_cqe_nonblockingio_uringwith|Some{result;data}->f~result~data;iter_completions_internalio_uring~f(count+1)|None->count;;letiter_completionsio_uring~f=iter_completions_internalio_uring~f0letfill_syscall_ivar~result~data=ifresult>=0thenIvar.fill_exndata(Okresult)elseIvar.fill_exndata(Error(Unix.Error.of_system_int~errno:(-result)));;letfill_completionst=iter_completionst~f:fill_syscall_ivarletmax_attempts=-1letprepare_internalf=letivar=Ivar.create()inlethandle={Handle.result=ivar;job=None;status=To_prepare}inDeferred.don't_wait_for(letrecsubmit_until_successcount=matchhandle.statuswith|To_prepare->(matchfivarwith|None->ifcount=max_attemptsthenfailwith"Tried resubmitting to the Io_uring queue too many times";let%bind()=Async_kernel_scheduler.yield()insubmit_until_success(count+1)|Somejob->Handle.set_statushandlePrepared_or_finished;Handle.set_jobhandle(Somejob);return())|Cancelled_early->Handle.set_statushandlePrepared_or_finished;return()|Cancel_prepared_|Prepared_or_finished->raise_s[%sexp(("Io_uring_raw syscall found in unexpected state while submitting",Handle.statushandle):string*Status.t)]insubmit_until_success0);Deferred.upon(Ivar.readivar)(fun_->Handle.set_jobhandleNone;(* We need this branching in order to keep the invariant of the submit_until_success
loop. Otherwise, there is a race that can happen: the submit_until_success job gets
scheduled, a cancel job moves the job to Cancelled_early and fills its ivar, but
the ivar being filled triggers this callback that moves it to Prepared_or_finished.
*)matchhandle.statuswith|Cancelled_early|Prepared_or_finished->()|Cancel_prepared_->Handle.set_statushandlePrepared_or_finished|To_prepare->raise_s[%sexp"Io_uring_raw syscall ivar filled while in state To_prepare"]);handle;;letnoopt=prepare_internal(Uring.noopt)letreadt~file_offsetfdbuf=prepare_internal(Uring.readt~file_offsetfdbuf)letwritet~file_offsetfdbuf=prepare_internal(Uring.writet~file_offsetfdbuf)letreadvt~file_offsetfdbufs=prepare_internal(Uring.readvt~file_offsetfdbufs)letwritevt~file_offsetfdbufs=prepare_internal(Uring.writevt~file_offsetfdbufs)letpoll_addtfdflags=prepare_internal(Uring.poll_addtfdflags)letopenat2t~access~flags~perm~resolve?fdfilename=prepare_internal(Uring.openat2t~access~flags~perm~resolve?fdfilename);;letclosetfd=prepare_internal(Uring.closetfd)letlinkt~follow~target~link_name=prepare_internal(Uring.linkatt~follow~target~link_name);;letunlinkt~dir?fdfilename=prepare_internal(Uring.unlinkt~dir?fdfilename)lettimeoutt?absoluteclocktimeout_ns=letclock=Clock.to_uring_clockclockinprepare_internal(Uring.timeoutt?absoluteclocktimeout_ns);;letstatxt?fd~maskpathstatxflags=prepare_internal(Uring.statxt?fd~maskpathstatxflags);;letcancelthandle=letreccancel_until_success()=ifIvar.is_full(Handle.resulthandle)thenreturn()else(matchHandle.statushandlewith|Cancelled_early->return()|Cancel_preparedcancel_ivar->(match%mapIvar.readcancel_ivarwith|Ok_->()|ErrorUnix.Error.ENOENT->(* The job we are trying to cancel has already finished by the time the cancel
was executed. *)()|Errorerr->raise(Unix.Unix_error(err,"cancel","")))|To_prepare->Handle.set_statushandleCancelled_early;Ivar.fill_if_empty(Handle.resulthandle)(Error(Unix.Error.EUNKNOWNERR125));return()|Prepared_or_finished->letcancel_ivar=Ivar.create()in(* [Uring.cancel] requires that the completion wasn't collected yet. We know it
wasn't collected because we just checked [Ivar.is_full result_ivar] earlier.
*)(matchUring.cancelt(Option.value_exnhandle.job)cancel_ivarwith|None->let%bind()=Async_kernel_scheduler.yield()incancel_until_success()|Some_cancel_job->Handle.set_statushandle(Cancel_preparedcancel_ivar);cancel_until_success()))incancel_until_success();;letsyscall_resulthandle=Ivar.read(Handle.resulthandle)[%%else]includeIo_uring_raw_null[%%endif]