12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879(* Unit tests are in ../../lib_test/thread_safe_test.ml. *)open!Coreopen!Async_kernelopen!Async_unixopen!Importtype'at='aPipe.Writer.t[@@derivingsexp_of]letin_async?wakeup_schedulerf=Thread_safe.run_in_async_exn?wakeup_schedulerfletin_async_waitf=Thread_safe.run_in_async_wait_exnfmoduleWritten_or_closed=structtypet=|Written|ClosedendmoduleIf_closed=structtype'at=|Raise:unitt|Return:Written_or_closed.ttletclosed:typea.at->a=function|Raise->raise_s[%message"pipe is closed"]|Return->Closed;;letwritten:typea.at->a=function|Raise->()|Return->Written;;endletin_async_unless_closed?wakeup_schedulertf~if_closed=in_async?wakeup_scheduler(fun()->ifPipe.is_closedtthenIf_closed.closedif_closedelse(f();If_closed.writtenif_closed));;letin_async_unless_closed_waittf~if_closed=in_async_wait(fun()->ifPipe.is_closedtthenreturn(If_closed.closedif_closed)else(let%map()=f()inIf_closed.writtenif_closed));;letcreate()=ifThread_safe.am_holding_async_lock()thenPipe.create()elsein_asyncPipe.create;;letpushbackt=in_async_wait(fun()->Pipe.pushbackt)lettransfer_int~from~if_closed=in_async_unless_closed_waitt~if_closed(fun()->Pipe.transfer_int~from);;letwriteta~if_closed=in_async_unless_closed_waitt~if_closed(fun()->Pipe.writeta);;lettransfer_in_without_pushback?wakeup_schedulert~from~if_closed=in_async_unless_closed?wakeup_schedulert~if_closed(fun()->Pipe.transfer_in_without_pushbackt~from);;letwrite_without_pushback?wakeup_schedulerta~if_closed=in_async_unless_closed?wakeup_schedulert~if_closed(fun()->Pipe.write_without_pushbackta);;letcloset=in_async(fun()->Pipe.closet)letis_closedt=in_async(fun()->Pipe.is_closedt)letclosedt=in_async_wait(fun()->Pipe.closedt)