123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173open!Coreopen!Asyncopen!Importletcreate?message?close_on_exec?unlink_on_exitpath=In_thread.run(fun()->Lock_file_blocking.create?message?close_on_exec?unlink_on_exitpath);;letcreate_exn?message?close_on_exec?unlink_on_exitpath=create?message?close_on_exec?unlink_on_exitpath>>|funb->ifnotbthenfailwiths"Lock_file.create"path[%sexp_of:string];;letrandom=lazy(Random.State.make_self_init~allow_in_tests:true());;letrepeat_with_abort~abort~f=Deferred.repeat_until_finished()(fun()->f()>>=function|true->return(`Finished`Ok)|false->letdelay=sec(Random.State.float(Lazy.forcerandom)0.3)inchoose[choice(afterdelay)(fun()->`Repeat);choiceabort(fun()->`Abort)]>>|function|`Abort->`Finished`Aborted|`Repeat->`Repeat());;letfail_on_abortpath~held_by=function|`Ok->()|`Aborted->failwiths"Lock_file timed out waiting for existing lock"path(funpath->(matchheld_bywith|None->[%sexp(path:string)]|Someheld_by->[%sexp{lock:string=path;held_by:Sexp.t=held_by;}]));;letwaiting_create?(abort=Deferred.never())?message?close_on_exec?unlink_on_exitpath=repeat_with_abort~abort~f:(fun()->create?message?close_on_exec?unlink_on_exitpath)>>|fail_on_abortpath~held_by:None;;letis_lockedpath=In_thread.run(fun()->Lock_file_blocking.is_lockedpath)moduleNfs=structletget_hostname_and_pidpath=In_thread.run(fun()->Lock_file_blocking.Nfs.get_hostname_and_pidpath);;letget_messagepath=In_thread.run(fun()->Lock_file_blocking.Nfs.get_messagepath);;letunlock_exnpath=In_thread.run(fun()->Lock_file_blocking.Nfs.unlock_exnpath);;letunlockpath=In_thread.run(fun()->Lock_file_blocking.Nfs.unlockpath);;letcreate?messagepath=In_thread.run(fun()->Lock_file_blocking.Nfs.create?messagepath);;letcreate_exn?messagepath=In_thread.run(fun()->Lock_file_blocking.Nfs.create_exn?messagepath);;letwaiting_create?(abort=Deferred.never())?messagepath=repeat_with_abort~abort~f:(fun()->create?messagepath>>|function|Ok()->true|Error_->false)>>|fail_on_abortpath~held_by:None;;letcritical_section?messagepath~abort~f=waiting_create~abort?messagepath>>=fun()->Monitor.protectf~finally:(fun()->unlock_exnpath);;endmoduleFlock=structtypet=Lock_file_blocking.Flock.tletlock_exn~lock_path=In_thread.run(fun()->Lock_file_blocking.Flock.lock_exn~lock_path);;letlock~lock_path=Monitor.try_with_or_error~extract_exn:true(fun()->lock_exn~lock_path);;letunlock_exnt=In_thread.run(fun()->Lock_file_blocking.Flock.unlock_exnt)letunlockt=Monitor.try_with_or_error~extract_exn:true(fun()->unlock_exnt)letwait_for_lock_exn?(abort=Deferred.never())~lock_path()=letlock_handle=Set_once.create()inlet%map()=repeat_with_abort~abort~f:(fun()->match%maplock_exn~lock_pathwith|`We_took_itt->Set_once.set_exnlock_handle[%here]t;true|`Somebody_else_took_it->false)>>|fail_on_abortlock_path~held_by:NoneinSet_once.get_exnlock_handle[%here];;letwait_for_lock?abort~lock_path()=Monitor.try_with_or_error~extract_exn:true(fun()->wait_for_lock_exn?abort~lock_path());;endmoduleSymlink=structtypet=Lock_file_blocking.Symlink.tletlock_exn~lock_path~metadata=In_thread.run(fun()->Lock_file_blocking.Symlink.lock_exn~lock_path~metadata);;letlock~lock_path~metadata=Monitor.try_with_or_error~extract_exn:true(fun()->lock_exn~lock_path~metadata);;letunlock_exnt=In_thread.run(fun()->Lock_file_blocking.Symlink.unlock_exnt)letunlockt=Monitor.try_with_or_error~extract_exn:true(fun()->unlock_exnt)letwait_for_lock_exn?(abort=Deferred.never())~lock_path~metadata()=letlock_handle=Set_once.create()inletlast_lock_holder=refNoneinlet%map()=repeat_with_abort~abort~f:(fun()->match%maplock_exn~lock_path~metadatawith|`We_took_itt->Set_once.set_exnlock_handle[%here]t;true|`Somebody_else_took_itother_party_info->last_lock_holder:=Someother_party_info;false)>>|fail_on_abortlock_path~held_by:(Option.map!last_lock_holder~f:(function|Oks->Sexp.Atoms|Errore->[%sexpError(e:Error.t)]))inSet_once.get_exnlock_handle[%here];;letwait_for_lock?abort~lock_path~metadata()=Monitor.try_with_or_error~extract_exn:true(fun()->wait_for_lock_exn?abort~lock_path~metadata());;end