123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342openCoreopenImportletdebug=Debug.fdmoduleFile_descr=Unix.File_descrmoduleKind=structtypet=|Char|Fifo|File|Socketof[`Unconnected|`Bound|`Passive|`Active|`Unknown][@@derivingsexp_of]endmoduleState=struct(* [State] is is used to keep track of when the file descriptor is in use or being
closed. Here are the allowed transitions.
Open --> Close_requested --> Closed *)typet=(* [Close_requested (execution_context, do_close_syscall)] indicates that [Fd.close t]
has been called, but that we haven't yet started the close() syscall, because there
are still active syscalls using the file descriptor. Once there are no active
syscalls, we enqueue a job to [do_close_syscall] in [execution_context]. *)|Close_requestedofExecution_context.t*(unit->unit)(* [Closed] indicates that there are no more active syscalls and we have started the
close() syscall. *)|Closed(* [Open] is the initial state of a file descriptor, and the normal state when it is
in use. It indicates that it has not not yet been closed. The argument is an ivar
to be filled when [close] is called. *)|OpenofunitIvar.t[@@derivingsexp_of]lettransition_is_allowedtt'=matcht,t'with|Open_,Close_requested_|Close_requested_,Closed->true|_->false;;letis_open=function|Open_->true|Close_requested_|Closed->false;;endtypeready_to_result=[`Ready|`Bad_fd|`Closed|`Interrupted|`Unsupported][@@derivingsexp_of]moduleWatching=struct(* Every fd can be monitored by a file_descr_watcher for read, for write, for both, or
for neither. Each fd also has its own notion, independent of the file_descr_watcher,
of a [Watching.t], for both read and write that indicates the desired state of the
file_descr_watcher for this fd. That desired state is maintained only in the fd
while async jobs are running, and is then synchronized with the file_descr_watcher's
notion, via calls to [File_descr_watcher.set], just prior to asking the
file_descr_watcher to check fds for ready I/O.
Initially, watching state starts as [Not_watching]. When one initially requests that
the fd be monitored via [request_start_watching], the state transitions to
[Watch_once] or [Watch_repeatedly]. After the file_descr_watcher detects I/O is
available, the job in [Watch_repeatedly] is enqueued, or the ivar in [Watch_once] is
filled and the state transitions to [Stop_requested]. Or, if one calls
[request_stop_watching], the state transitions to [Stop_requested]. Finally,
[Stop_requested] will transition to [Not_watching] when the desired state is
synchronized with the file_descr_watcher. *)typet=|Not_watching|Watch_onceofready_to_resultIvar.t|Watch_repeatedlyofJob.t*[`Bad_fd|`Closed|`Interrupted|`Unsupported]Ivar.t|Stop_requested[@@derivingsexp_of]letinvariantt:unit=trymatchtwith|Not_watching|Stop_requested->()|Watch_onceivar->assert(Ivar.is_emptyivar)|Watch_repeatedly(_,ivar)->assert(Ivar.is_emptyivar)with|exn->raise_s[%message"Watching.invariant failed"(exn:exn)~watching:(t:t)];;endmoduleNonblock_status=struct(* Encodes the knowledge of the O_NONBLOCK flag of an fd. *)typet=|Blocking|Nonblocking|Unknown[@@derivingsexp_of]endmoduleT=structtypet={file_descr:File_descr.t;(* [info] is for debugging info. It is mutable because it changes after [bind],
[listen], or[connect]. *)mutableinfo:Info.t;(* [kind] is mutable because it changes after [bind], [listen], or [connect]. *)mutablekind:Kind.t;(* if [can_set_nonblock] is true, async will switch the underlying file
descriptor into nonblocking mode any time a non-blocking operation is attempted.
It can be [false] if the user explicitly tells async to avoid modifying that
flag on the underlying fd, or if Async detects that the file descriptor
doesn't support nonblocking I/O. *)mutablecan_set_nonblock:bool;mutablenonblock_status:Nonblock_status.t;mutablestate:State.t;watching:Watching.tRead_write_pair.Mutable.t;(* [watching_has_changed] is true if [watching] has changed since the last time
[watching] was synchronized with the file_descr_watcher. In this case, the
fd appears in the scheduler's [fds_whose_watching_has_changed] list so that
it can be synchronized later. *)mutablewatching_has_changed:bool;(* [num_active_syscalls] is used to ensure that we don't call [close] on a file
descriptor until there are no active system calls involving that file descriptor.
This prevents races in which the OS assigns that file descriptor to a new
open file, and thus a system call deals with the wrong open file. If the
state of an fd is [Close_requested], then once [num_active_syscalls] drops to
zero, the close() syscall will start and the state will transition to [Closed],
thus preventing further system calls from using the file descriptor.
[num_active_syscalls] is abused slightly to include the syscall to the
file_descr_watcher to check for ready I/O. Watching for read and for write
each potentially count for one active syscall. *)mutablenum_active_syscalls:int;(* [close_finished] becomes determined after the file descriptor has been closed
and the underlying close() system call has finished. *)close_finished:unitIvar.t}[@@derivingfields~iterators:iter,sexp_of]typet_hum=tletsexp_of_t_hum{file_descr;info;kind;_}=[%sexp{file_descr:File_descr.t;info:Info.t;kind:Kind.t}];;endincludeTletequal(t:t)t'=phys_equaltt'letinvariantt:unit=tryletcheckffield=f(Field.getfieldt)inFields.iter~info:ignore~file_descr:ignore~kind:ignore~can_set_nonblock:ignore~nonblock_status:ignore~state:ignore~watching:(check(funwatching->Read_write_pair.iterwatching~f:Watching.invariant))~watching_has_changed:ignore~num_active_syscalls:(check(funnum_active_syscalls->assert(t.num_active_syscalls>=0);letwatchingread_or_write=matchRead_write_pair.gett.watchingread_or_writewith|Not_watching->0|Stop_requested|Watch_once_|Watch_repeatedly_->1inassert(t.num_active_syscalls>=watching`Read+watching`Write);matcht.statewith|Closed->assert(num_active_syscalls=0)|Close_requested_|Open_->()))~close_finished:(check(funclose_finished->matcht.statewith|Closed->()|Close_requested_->assert(Ivar.is_emptyclose_finished)|Openclose_started->assert(Ivar.is_emptyclose_finished);assert(Ivar.is_emptyclose_started)))with|exn->raise_s[%message"Fd.invariant failed"(exn:exn)~fd:(t:t)];;letto_intt=File_descr.to_intt.file_descrletcreate?(avoid_setting_nonblock=false)(kind:Kind.t)file_descrinfo=letcan_set_nonblock=ifavoid_setting_nonblockthenfalseelse(matchkindwith(* No point in setting nonblocking for files. Unix doesn't care. *)|File->false(* We don't use nonblocking I/O for char devices because we don't want to change the
blocking status of TTYs, which would affect all processes currently attached to
that TTY and even persist after this process terminates.
Also, /dev/null is a char device not supported by epoll.
We don't really care about doing nonblocking I/O on other character devices,
e.g. /dev/random. *)|Char->false|Fifo->true|Socket_->(* All one can do on a `Bound socket is listen() to it, and we don't use listen()
in a nonblocking way.
`Unconnected sockets support nonblocking so we can connect() them.
`Passive sockets support nonblocking so we can accept() them.
`Active sockets support nonblocking so we can read() and write() them.
We need the sockets to be in nonblocking mode for [`Unconnected] and [`Passive]
sockets, because [accept_interruptible] and [connect_interruptible] in
unix_syscalls.ml assume that such sockets are nonblocking. On the other hand,
there is no such assumption about [`Active] sockets.
In any case, returning [true] here makes sense. Not only is it harmless
in case we don't end up using it, but this field is also not updated by
socket state transitions ([Fd.Private.replace]), so its value needs to be the
same across the different socket states.
*)true)inlett={info;file_descr;kind;can_set_nonblock;nonblock_status=Nonblock_status.Unknown;state=State.Open(Ivar.create());watching=Read_write_pair.create_bothWatching.Not_watching;watching_has_changed=false;num_active_syscalls=0;close_finished=Ivar.create()}inifdebugthenDebug.log"Fd.create"t[%sexp_of:t];t;;letinc_num_active_syscallst=matcht.statewith|Close_requested_|Closed->`Already_closed|Open_->t.num_active_syscalls<-t.num_active_syscalls+1;`Ok;;letset_statetnew_state=ifdebugthenDebug.log"Fd.set_state"(new_state,t)[%sexp_of:State.t*t];ifState.transition_is_allowedt.statenew_statethent.state<-new_stateelseraise_s[%message"Fd.set_state attempted disallowed state transition"~fd:(t:t)(new_state:State.t)];;letis_opent=State.is_opent.stateletis_closedt=not(is_opent)letdetermine_nonblock_statust=matcht.nonblock_statuswith|Unknown->letflags=Core_unix.fcntl_getflt.file_descrinletnonblock=Core_unix.Open_flags.is_subsetCore_unix.Open_flags.nonblock~of_:flagsint.nonblock_status<-(matchnonblockwith|false->Blocking|true->Nonblocking);nonblock|Nonblocking->true|Blocking->false;;letsupports_nonblockt=ift.can_set_nonblockthentrueelsedetermine_nonblock_statustletset_nonblock_if_necessary?(nonblocking=false)t=ifnonblockingthen(matchdetermine_nonblock_statustwith|true->()|false->ift.can_set_nonblockthen(Unix.set_nonblockt.file_descr;t.nonblock_status<-Nonblocking)elseraise_s[%message"Fd.set_nonblock_if_necessary called on fd that does not support nonblock"~fd:(t:t)]);;letwith_file_descr_exn?nonblockingtf=ifis_closedtthenraise_s[%message"Fd.with_file_descr_exn got closed fd"~_:(t:t)]else(set_nonblock_if_necessaryt?nonblocking;ft.file_descr);;letwith_file_descr?nonblockingtf=ifis_closedtthen`Already_closedelse(tryset_nonblock_if_necessaryt?nonblocking;`Ok(ft.file_descr)with|exn->`Errorexn);;letsyscall?nonblockingtf=with_file_descrt?nonblocking(funfile_descr->Result.ok_exn(Syscall.syscall(fun()->ffile_descr)));;letsyscall_exn?nonblockingtf=matchsyscalltf?nonblockingwith|`Oka->a|`Already_closed->raise_s[%message"Fd.syscall_exn got closed fd"~_:(t:t)]|`Errorexn->raiseexn;;letsyscall_result_exn?nonblockingtaf=ifis_closedtthenraise_s[%message"Fd.syscall_result_exn got closed fd"~_:(t:t)]else(set_nonblock_if_necessaryt?nonblocking;Syscall.syscall_result2t.file_descraf);;