123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667(*
* Copyright (C) 2020-2021 Anil Madhavapeddy
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)modulePrivate=structmoduleHeap=HeapendmoduleRegion=RegionmoduleInt63=Optint.Int63moduletypeFLAGS=sigtypet=privateintvalempty:tvalof_int:int->tval(+):t->t->tvalmem:t->t->boolendmoduleFlags=structtypet=intletempty=0letof_intx=xlet(+)=(lor)letmemab=(alandb)=aendmoduleOpen_flags=structincludeFlagsletrdonly=Config.o_rdonlyletwronly=Config.o_wronlyletrdwr=Config.o_rdwrletcreat=Config.o_creatletexcl=Config.o_exclletnoctty=Config.o_nocttylettrunc=Config.o_truncletappend=Config.o_appendletnonblock=Config.o_nonblockletdsync=Config.o_dsyncletdirect=Config.o_directletdirectory=Config.o_directoryletnofollow=Config.o_nofollowletnoatime=Config.o_noatimeletcloexec=Config.o_cloexecletsync=Config.o_syncletpath=Config.o_pathlettmpfile=Config.o_tmpfileendmoduleResolve=structincludeFlagsletno_xdev=0x01letno_magiclinks=0x02letno_symlinks=0x04letbeneath=0x08letin_root=0x10letcached=0x20endmodulePoll_mask=structincludeFlagsletpollin=Config.pollinletpollout=Config.polloutletpollerr=Config.pollerrletpollhup=Config.pollhupendmoduleStatx=structtypettypekind=[|`Unknown|`Fifo|`Character_special|`Directory|`Block_device|`Regular_file|`Symbolic_link|`Socket]letpp_kindfk=Fmt.pff"%s"(matchkwith|`Unknown->"unknown"|`Fifo->"fifo"|`Character_special->"character special file"|`Directory->"directory"|`Block_device->"block device"|`Regular_file->"regular file"|`Symbolic_link->"symbolic link"|`Socket->"socket")externalcreate:unit->t="ocaml_uring_make_statx"moduleFlags=structincludeFlagsincludeConfig.AtendmoduleAttr=structincludeFlagsincludeConfig.Statx.Attrletcheck?maskattrt=leti=Int64.of_inttin(matchmaskwithSomem->ifInt64.equal(Int64.logandmi)itheninvalid_arg"Attribute not supported"|_->());Int64.equal(Int64.logandattri)iendmoduleMask=structincludeFlagsincludeConfig.Statx.Maskletcheckmaskt=leti=Int64.of_inttinInt64.equal(Int64.logandmaski)iendexternalblksize:t->(int64[@unboxed])="ocaml_uring_statx_blksize_bytes""ocaml_uring_statx_blksize_native"[@@noalloc]externalattributes:t->(int64[@unboxed])="ocaml_uring_statx_attributes_bytes""ocaml_uring_statx_attributes_native"[@@noalloc]externalnlink:t->(int64[@unboxed])="ocaml_uring_statx_nlink_bytes""ocaml_uring_statx_nlink_native"[@@noalloc]externaluid:t->(int64[@unboxed])="ocaml_uring_statx_uid_bytes""ocaml_uring_statx_uid_native"[@@noalloc]externalgid:t->(int64[@unboxed])="ocaml_uring_statx_gid_bytes""ocaml_uring_statx_gid_native"[@@noalloc]externalino:t->(int64[@unboxed])="ocaml_uring_statx_ino_bytes""ocaml_uring_statx_ino_native"[@@noalloc]externalsize:t->(int64[@unboxed])="ocaml_uring_statx_size_bytes""ocaml_uring_statx_size_native"[@@noalloc]externalblocks:t->(int64[@unboxed])="ocaml_uring_statx_blocks_bytes""ocaml_uring_statx_blocks_native"[@@noalloc]externalattributes_mask:t->(int64[@unboxed])="ocaml_uring_statx_attributes_mask_bytes""ocaml_uring_statx_attributes_mask_native"[@@noalloc]externalrdev:t->(int64[@unboxed])="ocaml_uring_statx_rdev_bytes""ocaml_uring_statx_rdev_native"[@@noalloc]externaldev:t->(int64[@unboxed])="ocaml_uring_statx_dev_bytes""ocaml_uring_statx_dev_native"[@@noalloc]externalmask:t->(int64[@unboxed])="ocaml_uring_statx_mask_bytes""ocaml_uring_statx_mask_native"[@@noalloc]externalmnt_id:t->(int64[@unboxed])="ocaml_uring_statx_mnt_id_bytes""ocaml_uring_statx_mnt_id_native"[@@noalloc]externaldio_mem_align:t->(int64[@unboxed])="ocaml_uring_statx_dio_mem_align_bytes""ocaml_uring_statx_dio_mem_align_native"[@@noalloc]externaldio_offset_align:t->(int64[@unboxed])="ocaml_uring_statx_dio_offset_align_bytes""ocaml_uring_statx_dio_offset_align_native"[@@noalloc]externalatime_sec:t->(int64[@unboxed])="ocaml_uring_statx_atime_sec_bytes""ocaml_uring_statx_atime_sec_native"[@@noalloc]externalbtime_sec:t->(int64[@unboxed])="ocaml_uring_statx_btime_sec_bytes""ocaml_uring_statx_btime_sec_native"[@@noalloc]externalctime_sec:t->(int64[@unboxed])="ocaml_uring_statx_ctime_sec_bytes""ocaml_uring_statx_ctime_sec_native"[@@noalloc]externalmtime_sec:t->(int64[@unboxed])="ocaml_uring_statx_mtime_sec_bytes""ocaml_uring_statx_mtime_sec_native"[@@noalloc]externalatime_nsec:t->int="ocaml_uring_statx_atime_nsec"[@@noalloc]externalbtime_nsec:t->int="ocaml_uring_statx_btime_nsec"[@@noalloc]externalctime_nsec:t->int="ocaml_uring_statx_ctime_nsec"[@@noalloc]externalmtime_nsec:t->int="ocaml_uring_statx_mtime_nsec"[@@noalloc]externalmode:t->(int[@untagged])="ocaml_uring_statx_mode_bytes""ocaml_uring_statx_mode_native"[@@noalloc]externalperm:t->(int[@untagged])="ocaml_uring_statx_perm_bytes""ocaml_uring_statx_perm_native"[@@noalloc]externalkind:t->kind="ocaml_uring_statx_kind"endmoduleSockaddr=structtypetexternalof_unix:Unix.sockaddr->t="ocaml_uring_make_sockaddr"externalget:t->Unix.sockaddr="ocaml_uring_extract_sockaddr"letdummy_addr=Unix.ADDR_UNIX"-"letcreate()=of_unixdummy_addrendmoduleOpen_how=structtypetexternalmake:int->Unix.file_perm->int->string->t="ocaml_uring_make_open_how"letv~open_flags~perm~resolvepath=makeopen_flagspermresolvepathendmoduleOp=Config.Op(* The C stubs rely on the layout of Cstruct.t, so we just check here that it hasn't changed. *)moduleCheck_cstruct:sig[@@@warning"-34"]typet=private{buffer:(char,Bigarray.int8_unsigned_elt,Bigarray.c_layout)Bigarray.Array1.t;off:int;len:int;}end=Cstruct(*
* A Sketch buffer is an area used to hold objects that remain alive
* until the next `Uring.submit`.
* For example an `iovec` must be passed to io_uring in `readv` and
* `writev`, once we call `Uring.submit` the `iovec` structures are
* copied by the kernel and we can release them, which we do.
*)moduleSketch=structtypet={mutablebuffer:Cstruct.buffer;mutableoff:int;mutableold_buffers:Cstruct.bufferlist;}typeptr=Cstruct.buffer*int*intletcreate_bufferlen=Bigarray.(Array1.createcharc_layoutlen)letcreate()={buffer=Cstruct.empty.buffer;off=0;old_buffers=[]}letlengtht=Bigarray.Array1.size_in_bytest.bufferletroundax=(x+(a-1))land(lnot(a-1))letround=round(Sys.word_size/8)letavailt=(lengtht)-t.offletalloctalloc_len=letalloc_len=roundalloc_leninifalloc_len>availtthenbegin(* At least 64 bytes, at least twice the previous size, and
at least big enough for the new allocation. *)letnew_size=max64(max(2*lengtht)alloc_len)inletnew_buffer=create_buffernew_sizeint.old_buffers<-t.buffer::t.old_buffers;t.off<-0;t.buffer<-new_buffer;end;letoff=t.offint.off<-t.off+alloc_len;(t.buffer,off,alloc_len)let_cstruct_of_ptr((buf,off,len):ptr)=Cstruct.of_bigarraybuf~off~lenletreleaset=t.off<-0;t.old_buffers<-[]moduleIovec=structexternalset:ptr->Cstruct.tlist->unit="ocaml_uring_set_iovec"[@@noalloc]letsizeof=Config.sizeof_iovecletalloctcsl=letptr=alloct(List.lengthcsl*sizeof)insetptrcsl;ptrendmoduleString=structexternalset:ptr->string->unit="ocaml_uring_set_string"[@@noalloc]letallocts=letptr=alloct(String.lengths+1)insetptrs;ptrendend(* Used for the sendmsg/recvmsg calls. Liburing doesn't support sendto/recvfrom at the time of writing. *)moduleMsghdr=structtypemsghdrtypet=msghdr*Sockaddr.toption*Cstruct.tlist(* `Cstruct.t list` is here only for preventing it being GCed *)externalmake_msghdr:int->Unix.file_descrlist->Sockaddr.toption->msghdr="ocaml_uring_make_msghdr"externalget_msghdr_fds:msghdr->Unix.file_descrlist="ocaml_uring_get_msghdr_fds"letget_fds(hdr,_,_)=get_msghdr_fdshdr(* Create a value with space for [n_fds] file descriptors.
When sending, [fds] is used to fill those slots. When receiving, they can be left blank. *)letcreate_with_addr~n_fds~fds?addrbuffs=make_msghdrn_fdsfdsaddr,addr,buffsletcreate?(n_fds=0)?addrbuffs=create_with_addr~n_fds~fds:[]?addrbuffsendtype'ajob='aHeap.entrytypeclock=Boottime|RealtimetypeprobemoduleUring=structtypetexternalcreate:int->intoption->t="ocaml_uring_setup"externalexit:t->unit="ocaml_uring_exit"externalunregister_buffers:t->unit="ocaml_uring_unregister_buffers"externalregister_bigarray:t->Cstruct.buffer->unit="ocaml_uring_register_ba"externalsubmit:t->int="ocaml_uring_submit"externalsq_ready:t->int="ocaml_uring_sq_ready"[@@noalloc]externalget_probe_ring:t->probe="ocaml_uring_get_probe_ring"externalopcode_supported:probe->Op.t->bool="ocaml_uring_opcode_supported"[@@noalloc]typeid=Heap.ptrtypeoffset=Optint.Int63.texternalsubmit_nop:t->id->bool="ocaml_uring_submit_nop"[@@noalloc]externalsubmit_timeout:t->id->Sketch.ptr->clock->bool->bool="ocaml_uring_submit_timeout"[@@noalloc]externalsubmit_poll_add:t->Unix.file_descr->id->Poll_mask.t->bool="ocaml_uring_submit_poll_add"[@@noalloc]externalsubmit_read:t->Unix.file_descr->id->Cstruct.t->offset->bool="ocaml_uring_submit_read"[@@noalloc]externalsubmit_write:t->Unix.file_descr->id->Cstruct.t->offset->bool="ocaml_uring_submit_write"[@@noalloc]externalsubmit_readv:t->Unix.file_descr->id->Sketch.ptr->offset->bool="ocaml_uring_submit_readv"[@@noalloc]externalsubmit_writev:t->Unix.file_descr->id->Sketch.ptr->offset->bool="ocaml_uring_submit_writev"[@@noalloc]externalsubmit_readv_fixed:t->Unix.file_descr->id->Cstruct.buffer->int->int->offset->bool="ocaml_uring_submit_readv_fixed_byte""ocaml_uring_submit_readv_fixed_native"[@@noalloc]externalsubmit_writev_fixed:t->Unix.file_descr->id->Cstruct.buffer->int->int->offset->bool="ocaml_uring_submit_writev_fixed_byte""ocaml_uring_submit_writev_fixed_native"[@@noalloc]externalsubmit_close:t->Unix.file_descr->id->bool="ocaml_uring_submit_close"[@@noalloc]externalsubmit_statx:t->id->Unix.file_descr->Statx.t->Sketch.ptr->int->int->bool="ocaml_uring_submit_statx_byte""ocaml_uring_submit_statx_native"[@@noalloc]externalsubmit_splice:t->id->Unix.file_descr->Unix.file_descr->int->bool="ocaml_uring_submit_splice"[@@noalloc]externalsubmit_connect:t->id->Unix.file_descr->Sockaddr.t->bool="ocaml_uring_submit_connect"[@@noalloc]externalsubmit_accept:t->id->Unix.file_descr->Sockaddr.t->bool="ocaml_uring_submit_accept"[@@noalloc]externalsubmit_cancel:t->id->id->bool="ocaml_uring_submit_cancel"[@@noalloc]externalsubmit_openat2:t->id->Unix.file_descr->Open_how.t->bool="ocaml_uring_submit_openat2"[@@noalloc]externalsubmit_linkat:t->id->Unix.file_descr->Sketch.ptr->Unix.file_descr->Sketch.ptr->int->bool="ocaml_uring_submit_linkat_byte""ocaml_uring_submit_linkat_native"[@@noalloc]externalsubmit_unlinkat:t->id->Unix.file_descr->Sketch.ptr->bool->bool="ocaml_uring_submit_unlinkat"[@@noalloc]externalsubmit_send_msg:t->id->Unix.file_descr->Msghdr.t->Sketch.ptr->bool="ocaml_uring_submit_send_msg"[@@noalloc]externalsubmit_recv_msg:t->id->Unix.file_descr->Msghdr.t->Sketch.ptr->bool="ocaml_uring_submit_recv_msg"[@@noalloc]externalsubmit_fsync:t->id->Unix.file_descr->int64->int->bool="ocaml_uring_submit_fsync"[@@noalloc]externalsubmit_fdatasync:t->id->Unix.file_descr->int64->int->bool="ocaml_uring_submit_fdatasync"[@@noalloc]typecqe_option=private|Cqe_none|Cqe_someof{user_data_id:id;res:int}[@@ocaml.warning"-37"(* Avoids "Unused constructor" warning on OCaml <= 4.09. *)]externalwait_cqe:t->cqe_option="ocaml_uring_wait_cqe"externalwait_cqe_timeout:float->t->cqe_option="ocaml_uring_wait_cqe_timeout"externalpeek_cqe:t->cqe_option="ocaml_uring_peek_cqe"externalerror_of_errno:int->Unix.error="ocaml_uring_error_of_errno"externalregister_eventfd:t->Unix.file_descr->unit="ocaml_uring_register_eventfd"endtype'at={id:<>;uring:Uring.t;mutablefixed_iobuf:Cstruct.buffer;data:'aHeap.t;sketch:Sketch.t;queue_depth:int;}moduleGeneric_ring=structtypering=T:'at->ringtypet=ringletcompare(Ta)(Tb)=comparea.idb.idendmoduleRing_set=Set.Make(Generic_ring)(* Garbage collection and buffers shared with the Linux kernel.
Many uring operations involve passing Linux the address of a buffer to which it
should write the results. This means that both Linux and OCaml have pointers to the
buffer, and it must not be freed until both have finished with it, but the OCaml
garbage collector doesn't know this. To avoid OCaml's GC freeing the buffer while
Linux is still using it:
- We attach all such buffers to their [t.data] entry, so they don't get freed until
the job is complete, even if the caller loses interest in the buffer.
- We add the ring itself to the global [gc_roots] set, so that [t.data] can't be freed
unless [exit] is called, which checks that there are no operations in progress. *)letgc_roots=Atomic.makeRing_set.emptyletrecupdate_gc_rootsfn=letold_set=Atomic.getgc_rootsinletnew_set=fnold_setinifnot(Atomic.compare_and_setgc_rootsold_setnew_set)thenupdate_gc_rootsfnletregister_gc_roott=update_gc_roots(Ring_set.add(Generic_ring.Tt))letunregister_gc_roott=update_gc_roots(Ring_set.remove(Generic_ring.Tt))letcreate?polling_timeout~queue_depth()=ifqueue_depth<1thenFmt.invalid_arg"Non-positive queue depth: %d"queue_depth;leturing=Uring.createqueue_depthpolling_timeoutinletdata=Heap.createqueue_depthinletid=objectendinletfixed_iobuf=Cstruct.empty.bufferinletsketch=Sketch.create()inlett={id;uring;sketch;fixed_iobuf;data;queue_depth}inregister_gc_roott;tletcheckt=ifHeap.is_releasedt.datatheninvalid_arg"Can't use ring after Uring.exit has been called"letensure_idletop=checkt;matchHeap.in_uset.datawith|0->()|n->Fmt.invalid_arg"%s: %d request(s) still active!"opnletset_fixed_buffertiobuf=ensure_idlet"set_fixed_buffer";ifBigarray.Array1.dimt.fixed_iobuf>0thenUring.unregister_bufferst.uring;t.fixed_iobuf<-iobuf;ifBigarray.Array1.dimiobuf>0then(matchUring.register_bigarrayt.uringiobufwith|()->Ok()|exceptionUnix.Unix_error(Unix.ENOMEM,"io_uring_register_buffers","")->Error`ENOMEM)elseOk()letexitt=ensure_idlet"exit";Heap.releaset.data;Uring.exitt.uring;unregister_gc_roottletwith_id_full:typea.at->(Heap.ptr->bool)->a->extra_data:'b->ajoboption=funtfndatum~extra_data->matchHeap.alloct.datadatum~extra_datawith|exception(Invalid_argument_asex)->checkt;raiseex|entry->letptr=Heap.ptrentryinlethas_space=fnptrinifhas_spacethenSomeentryelse(ignore(Heap.freet.dataptr:a);None)letwith_idtfna=with_id_fulltfna~extra_data:()letnooptuser_data=with_idt(funid->Uring.submit_nopt.uringid)user_dataexternalset_timespec:Sketch.ptr->int64->unit="ocaml_uring_set_timespec"[@@noalloc]lettimeout?(absolute=false)tclocktimeout_nsuser_data=lettimespec_ptr=Sketch.alloct.sketchConfig.sizeof_kernel_timespecinset_timespectimespec_ptrtimeout_ns;with_idt(funid->Uring.submit_timeoutt.uringidtimespec_ptrclockabsolute)user_dataletat_fdcwd:Unix.file_descr=Obj.magicConfig.at_fdcwdletopenat2t~access~flags~perm~resolve?(fd=at_fdcwd)pathuser_data=letopen_flags=flagslormatchaccesswith|`R->Open_flags.rdonly|`W->Open_flags.wronly|`RW->Open_flags.rdwrinletopen_how=Open_how.v~open_flags~perm~resolvepathinwith_id_fullt(funid->Uring.submit_openat2t.uringidfdopen_how)user_data~extra_data:open_howmoduleLinkat_flags=structincludeFlagsletempty_path=Config.At.empty_pathletsymlink_follow=Config.At.symlink_followendletlinkatt?(old_dir_fd=at_fdcwd)?(new_dir_fd=at_fdcwd)~flags~old_path~new_pathuser_data=with_idt(funid->letold_path_buf=Sketch.String.alloct.sketchold_pathinletnew_path_buf=Sketch.String.alloct.sketchnew_pathinUring.submit_linkatt.uringidold_dir_fdold_path_bufnew_dir_fdnew_path_bufflags)user_dataletunlinkt~dir?(fd=at_fdcwd)pathuser_data=with_idt(funid->letbuf=Sketch.String.alloct.sketchpathinUring.submit_unlinkatt.uringidfdbufdir)user_dataletreadt~file_offsetfd(buf:Cstruct.t)user_data=with_id_fullt(funid->Uring.submit_readt.uringfdidbuffile_offset)user_data~extra_data:bufletwritet~file_offsetfd(buf:Cstruct.t)user_data=with_id_fullt(funid->Uring.submit_writet.uringfdidbuffile_offset)user_data~extra_data:bufletiov_max=Config.iov_maxletreadvt~file_offsetfdbuffersuser_data=with_id_fullt(funid->letiovec=Sketch.Iovec.alloct.sketchbuffersinUring.submit_readvt.uringfdidiovecfile_offset)user_data~extra_data:buffersletread_fixedt~file_offsetfd~off~lenuser_data=with_idt(funid->Uring.submit_readv_fixedt.uringfdidt.fixed_iobufofflenfile_offset)user_dataletread_chunk?lent~file_offsetfdchunkuser_data=let{Cstruct.buffer;off;len}=Region.to_cstruct?lenchunkinifbuffer!=t.fixed_iobuftheninvalid_arg"Chunk does not belong to ring!";with_idt(funid->Uring.submit_readv_fixedt.uringfdidt.fixed_iobufofflenfile_offset)user_dataletwrite_fixedt~file_offsetfd~off~lenuser_data=with_idt(funid->Uring.submit_writev_fixedt.uringfdidt.fixed_iobufofflenfile_offset)user_dataletwrite_chunk?lent~file_offsetfdchunkuser_data=let{Cstruct.buffer;off;len}=Region.to_cstruct?lenchunkinifbuffer!=t.fixed_iobuftheninvalid_arg"Chunk does not belong to ring!";with_idt(funid->Uring.submit_writev_fixedt.uringfdidt.fixed_iobufofflenfile_offset)user_dataletwritevt~file_offsetfdbuffersuser_data=with_id_fullt(funid->letiovec=Sketch.Iovec.alloct.sketchbuffersinUring.submit_writevt.uringfdidiovecfile_offset)user_data~extra_data:buffersletpoll_addtfdpoll_maskuser_data=with_idt(funid->Uring.submit_poll_addt.uringfdidpoll_mask)user_dataletclosetfduser_data=with_idt(funid->Uring.submit_closet.uringfdid)user_dataletstatxt?(fd=at_fdcwd)~maskpathstatxflagsuser_data=letspath=Sketch.String.alloct.sketchpathinwith_id_fullt(funid->Uring.submit_statxt.uringidfdstatxspathflagsmask)user_data~extra_data:statxletsplicet~src~dst~lenuser_data=with_idt(funid->Uring.submit_splicet.uringidsrcdstlen)user_dataletconnecttfdaddruser_data=letaddr=Sockaddr.of_unixaddrinwith_id_fullt(funid->Uring.submit_connectt.uringidfdaddr)user_data~extra_data:addrletaccepttfdaddruser_data=with_id_fullt(funid->Uring.submit_acceptt.uringidfdaddr)user_data~extra_data:addrletsend_msg?(fds=[])?dsttfdbuffersuser_data=letaddr=Option.mapSockaddr.of_unixdstinletn_fds=List.lengthfdsinletmsghdr=Msghdr.create_with_addr~n_fds~fds?addrbuffersin(* NOTE: `msghdr` references `buffers`, so it's enough for `extra_data` *)with_id_fullt(funid->letiovec=Sketch.Iovec.alloct.sketchbuffersinUring.submit_send_msgt.uringidfdmsghdriovec)user_data~extra_data:msghdrletrecv_msgtfdmsghdruser_data=let_,_,buffers=msghdrin(* NOTE: `msghdr` references `buffers`, so it's enough for `extra_data` *)with_id_fullt(funid->letiovec=Sketch.Iovec.alloct.sketchbuffersinUring.submit_recv_msgt.uringidfdmsghdriovec)user_data~extra_data:msghdrletfsynct?(off=0L)?(len=0)fduser_data=with_idt(funid->Uring.submit_fsynct.uringidfdofflen)user_dataletfdatasynct?(off=0L)?(len=0)fduser_data=with_idt(funid->Uring.submit_fdatasynct.uringidfdofflen)user_dataletcanceltjobuser_data=ignore(Heap.ptrjob:Uring.id);(* Check it's still valid *)with_idt(funid->Uring.submit_cancelt.uringid(Heap.ptrjob))user_dataletsqe_readyt=Uring.sq_readyt.uring(* Free stale entries in the sketch buffer, if possible.
This isn't quite right: a busy system might never have 0 unsubmitted entries.
We should probably track how many requests need to be submitted before each
of [t.sketch.old_buffers] can be released, but this will do for now. *)letgc_sketcht=ifUring.sq_readyt.uring=0thenSketch.releaset.sketchletsubmitt=checkt;letv=ifUring.sq_readyt.uring>0thenUring.submitt.uringelse0in(* In non-polling mode, we will almost always be able to free the sketch buffer here.
However, in polling mode it's unlikely the entries have been consumed by the kernel yet,
and we must rely on other GC points. *)gc_sketcht;vtype'acompletion_option=|None|Someof{result:int;data:'a}letfn_on_ringfnt=matchfnt.uringwith|Uring.Cqe_none->None|Uring.Cqe_some{user_data_id;res}->letdata=Heap.freet.datauser_data_idinSome{result=res;data}letget_cqe_nonblockingt=checkt;gc_sketcht;fn_on_ringUring.peek_cqetletpeek=get_cqe_nonblockingletregister_eventfdtfd=checkt;Uring.register_eventfdt.uringfdletwait?timeoutt=checkt;letr=matchtimeoutwith|None->fn_on_ringUring.wait_cqet|Sometimeout->fn_on_ring(Uring.wait_cqe_timeouttimeout)tin(* In polling mode, this is a good time to GC the sketch buffer, because the
kernel has probably consumed all the enties while we were blocking. *)gc_sketcht;rletqueue_depth{queue_depth;_}=queue_depthletbuf{fixed_iobuf;_}=fixed_iobufleterror_of_errnoe=Uring.error_of_errno(abse)letget_probet=checkt;Uring.get_probe_ringt.uringletop_supportedprobeop=Uring.opcode_supportedprobeopmoduleStats=structtypet={sqe_ready:int;active_ops:int;sketch_buffer_size:int;sketch_used:int;sketch_old_buffers:int;}letppf{sqe_ready;active_ops;sketch_used;sketch_buffer_size;sketch_old_buffers}=Fmt.pff"@[<v>SQEs ready: %d@,\
Operations active: %d@,\
Sketch buffer: %d/%d (plus %d old buffers)@]"sqe_readyactive_opssketch_usedsketch_buffer_sizesketch_old_buffersendletactive_opst=Heap.in_uset.dataletget_debug_statst={Stats.sqe_ready=Uring.sq_readyt.uring;active_ops=active_opst;sketch_used=t.sketch.off;sketch_buffer_size=Bigarray.Array1.dimt.sketch.buffer;sketch_old_buffers=List.lengtht.sketch.old_buffers;}