123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293openCoreopenImportmoduleFd=Raw_fdletdebug=Debug.interruptortypet={pipe:Fd.tRead_write_pair.t;(* [already_interrupted] keeps track of whether we've already interrupted since the
most recent call to [clear], and if so, avoid writing to the pipe again.
[already_interrupted] does not exactly track the state of [pipe]. It is possible
for [already_interrupted] to be false and for the [pipe] to be nonempty. The key
property is that if [already_interrupted] is true then [pipe] is nonempty*)mutablealready_interrupted:bool;clearbuffer:(Bytes.t[@sexp.opaque])}[@@derivingsexp_of]letinvariant_=()letread_fdt=Read_write_pair.gett.pipe`Readletcreate~create_fd=letpipe_read,pipe_write=Unix.pipe()inUnix.set_close_on_execpipe_read;Unix.set_close_on_execpipe_write;letpipe_read=create_fdFd.Kind.Fifopipe_read(Info.of_string"interruptor_pipe_read")inletpipe_write=create_fdFifopipe_write(Info.of_string"interruptor_pipe_write")in{pipe=Read_write_pair.create~read:pipe_read~write:pipe_write;already_interrupted=false;clearbuffer=Bytes.make1024' '};;(* [bytes_w] is a toplevel to make sure it's not allocated multiple times. *)letbytes_w=Bytes.of_string"w"(* [thread_safe_interrupt]
As the name implies, it is safe to call from any thread; [thread_safe_interrupt] does
not assume the scheduler lock is held, although it is fine if it is. Because of
OCaml's compilation, the test-and-set of [t.already_interrupted] is atomic, so
we will only ever write one byte to the pipe before it is cleared. *)letthread_safe_interruptt=ifdebugthenDebug.log_string"Interruptor.thread_safe_interrupt";(* BEGIN ATOMIC *)ifnott.already_interruptedthen(t.already_interrupted<-true;(* END ATOMIC *)ifdebugthenDebug.log_string"writing to interrupt_pipe_write";Fd.syscall_exn(Read_write_pair.gett.pipe`Write)~nonblocking:true(funfile_descr->tryignore(Unix.write_assume_fd_is_nonblockingfile_descrbytes_w:int)with|Unix.Unix_error((EWOULDBLOCK|EAGAIN),_,_)->()));;letcleart=ifdebugthenDebug.log_string"Interruptor.clear";(* We only need to clear the pipe if it was written to. This saves a system call in the
common case. *)ift.already_interruptedthenFd.syscall_exn(Read_write_pair.gett.pipe`Read)~nonblocking:true(funfile_descr->letrecloop()=letread_again=tryletbytes_read=Unix.read_assume_fd_is_nonblockingfile_descrt.clearbuffer~pos:0~len:(Bytes.lengtht.clearbuffer)inignore(bytes_read:int);truewith|Unix.Unix_error((EWOULDBLOCK|EAGAIN),_,_)->falseinifread_againthenloop()inloop());(* We must clear [already_interrupted] after emptying the pipe. If we did it before,
a [thread_safe_interrupt] could come along in between. We would then be left with
[already_interrupted = true] and an empty pipe, which would then cause a
[thread_safe_interrupt] after [clear] returns to incorrectly be a no-op. *)t.already_interrupted<-false;;