123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252open!Coreopen!Importletok_exn=Or_error.ok_exn(* A [Blocker.t] is an ordinary mutex and conditional variable used to implement blocking
when there is lock contention. *)moduleBlocker:sigtypet[@@derivingsexp_of]valcreate:unit->tvalcritical_section:t->f:(unit->'a)->'avalwait:t->unitvalsignal:t->unitvalsave_unused:t->unitend=struct(* Our use of mutexes is always via [Mutex.critical_section], so that we always lock
them and unlock them from a single thread. So, we use [Core.Mutex], which is
error-checking mutexes, which will catch any use that is not what we expect. *)moduleCondition=ConditionmoduleMutex=Core.Mutextypet={mutex:Mutex.tsexp_opaque;condition:Condition.tsexp_opaque}[@@derivingsexp_of](* We keep a cache of unused blockers, since they are relatively costly to create, and
we should never need very many simultaneously. We should never need more blockers
than the number of nano mutexes being simultaneously blocked on, which of course is
no more than the total number of simultaneous threads. *)letunused:tThread_safe_queue.t=Thread_safe_queue.create()(* [save_unused t] should be called when [t] is no longer in use, so it can be returned
by a future call of [create]. *)letsave_unusedt=Thread_safe_queue.enqueueunusedtletcreate()=ifThread_safe_queue.lengthunused>0thenThread_safe_queue.dequeue_exnunusedelse{mutex=Mutex.create();condition=Condition.create()};;letcritical_sectiont~f=Mutex.critical_sectiont.mutex~fletwaitt=Condition.waitt.conditiont.mutexletsignalt=Condition.signalt.conditionend(* We represent a nano mutex using an OCaml record. The [id_of_thread_holding_lock] field
represents whether the mutex is locked or not, and if it is locked, which thread holds
the lock. We do not use an [int option] for performance reasons (doing so slows down
lock+unlock by a factor of almost two). Instead, we have [id_of_thread_holding_lock =
bogus_thread_id] when the mutex is unlocked.
The mutex record has an optional [blocker] field for use when the mutex is contended.
We use the OS-level condition variable in [blocker] to [wait] in a thread that desires
the lock and to [signal] from a thread that is releasing it.
When thinking about the implementation, it is helpful to remember the following
desiderata:
* Safety -- only one thread can acquire the lock at a time. This is accomplished
usng a test-and-set to set [id_of_thread_holding_lock].
* Liveness -- if the mutex is unlocked and some threads are waiting on it, then one of
those threads will be woken up and given a chance to acquire it. This is accomplished
by only waiting when we can ensure that there will be a [signal] of the condition
variable in the future. See the more detailed comment in [lock].
* Performance -- do not spin trying to acquire the lock. This is accomplished by
waiting on a condition variable if a lock is contended. *)typet={mutableid_of_thread_holding_lock:int;mutablenum_using_blocker:int;mutableblocker:Blocker.toption}[@@derivingfields,sexp_of]letinvariantt=tryassert(t.num_using_blocker>=0);(* It is the case that if [t.num_using_blocker = 0] then [Option.is_none t.blocker],
however the converse does not necessarily hold. The code in [with_blocker] doesn't
take care to atomically increment [t.num_using_blocker] and set [t.blocker] to
[Some]. It could, but doing so is not necessary for the correctness of of
[with_blocker], which only relies on test-and-set of [t.blocker] to make sure
there is an agreed-upon winner in the race to create a blocker. *)ift.num_using_blocker=0thenassert(Option.is_nonet.blocker)with|exn->failwiths"invariant failed"(exn,t)[%sexp_of:exn*t];;letequal(t:t)t'=phys_equaltt'letbogus_thread_id=-1letcreate()={id_of_thread_holding_lock=bogus_thread_id;num_using_blocker=0;blocker=None};;letis_lockedt=t.id_of_thread_holding_lock<>bogus_thread_idletcurrent_thread_id()=Thread.id(Thread.self())letcurrent_thread_has_lockt=t.id_of_thread_holding_lock=current_thread_id()letrecursive_lock_errort=Error.create"attempt to lock mutex by thread already holding it"(current_thread_id(),t)[%sexp_of:int*t];;lettry_lockt=(* The following code relies on an atomic test-and-set of [id_of_thread_holding_lock],
so that there is a definitive winner in a race between multiple lockers and everybody
agrees who acquired the lock. *)letcurrent_thread_id=current_thread_id()in(* BEGIN ATOMIC *)ift.id_of_thread_holding_lock=bogus_thread_idthen(t.id_of_thread_holding_lock<-current_thread_id;(* END ATOMIC *)Ok`Acquired)elseifcurrent_thread_id=t.id_of_thread_holding_lockthenError(recursive_lock_errort)elseOk`Not_acquired;;lettry_lock_exnt=ok_exn(try_lockt)(* [with_blocker t f] runs [f blocker] in a critical section. It allocates a blocker for
[t] if [t] doesn't already have one. *)letwith_blockertf=t.num_using_blocker<-t.num_using_blocker+1;letblocker=matcht.blockerwith|Someblocker->blocker|None->letnew_blocker=Blocker.create()in(* We allocate [new_blocker_opt] here because one cannot allocate inside an atomic
region. *)letnew_blocker_opt=Somenew_blockerinletblocker=(* We need the following test-and-set to be atomic so that there is a definitive
winner in a race between multiple calls to [with_blocker], so that everybody
agrees what the underlying [blocker] is. *)(* BEGIN ATOMIC *)matcht.blockerwith|Someblocker->blocker|None->t.blocker<-new_blocker_opt;new_blocker(* END ATOMIC *)inifnot(phys_equalblockernew_blocker)thenBlocker.save_unusednew_blocker;blockerinprotect~f:(fun()->Blocker.critical_sectionblocker~f:(fun()->fblocker))~finally:(fun()->(* We need the following decrement-test-and-set to be atomic so that we're sure that
the last user of blocker clears it. *)(* BEGIN ATOMIC *)t.num_using_blocker<-t.num_using_blocker-1;ift.num_using_blocker=0then(t.blocker<-None;(* END ATOMIC *)Blocker.save_unusedblocker));;letreclockt=(* The following code relies on an atomic test-and-set of [id_of_thread_holding_lock],
so that there is a definitive winner in a race between multiple [lock]ers, and
everybody agrees who acquired the lock.
If [is_locked t], we block the locking thread using [Blocker.wait], until some
unlocking thread [Blocker.signal]s us. There is a race between the [wait] and the
[signal]. If the unlocking thread signals in between our test of
[t.id_of_thread_holding_lock] and our [wait], then our [wait] could miss the signal
and block forever. We avoid this race by committing to waiting inside a
[with_blocker], which increments [t.num_using_blocker]. If the [signal] occurs
before the [with_blocker], then it will have cleared [t.id_of_thread_holding_lock],
which we will notice as [not (is_locked t)], and then not [wait], and loop trying to
[lock] again. Otherwise, when an [unlock] occurs, it will see that [is_some
t.blocker], and will enter a critical section on [blocker]. But then it must wait
until our critical section on [blocker] finishes, and hence until our call to [wait]
finishes. Hence, the [signal] will occur after the [wait].
The recursive call to [lock] will not spin. It happens either because we just lost
the race with an unlocker, in which case the subsequent [lock] will succeed, or
we actually had to block because someone is holding the lock. The latter is the
overwhelmingly common case.
Other threads can change [t.id_of_thread_holding_lock] concurrently with this code.
However, no other thread can set it to our [current_thread_id], since threads only
ever set [t.id_of_thread_holding_lock] to their current thread id, or clear it. *)letcurrent_thread_id=current_thread_id()in(* BEGIN ATOMIC *)ift.id_of_thread_holding_lock=bogus_thread_idthen(t.id_of_thread_holding_lock<-current_thread_id;(* END ATOMIC *)Result.ok_unit)elseifcurrent_thread_id=t.id_of_thread_holding_lockthenError(recursive_lock_errort)else(with_blockert(funblocker->ifis_lockedtthenBlocker.waitblocker);lockt);;letlock_exnt=ok_exn(lockt)typemessage={current_thread_id:int;mutex:t}[@@derivingsexp_of]letunlockt=letcurrent_thread_id=current_thread_id()in(* We need the following test-and-set to be atomic so that there is a definitive
winner in a race between multiple unlockers, so that one unlock succeeds and the
rest fail. *)(* BEGIN ATOMIC *)ift.id_of_thread_holding_lock<>bogus_thread_idthenift.id_of_thread_holding_lock=current_thread_idthen(t.id_of_thread_holding_lock<-bogus_thread_id;(* END ATOMIC *)ifOption.is_somet.blockerthenwith_blockertBlocker.signal;Result.ok_unit)elseError(Error.create"attempt to unlock mutex held by another thread"{current_thread_id;mutex=t}[%sexp_of:message])elseError(Error.create"attempt to unlock an unlocked mutex"{current_thread_id;mutex=t}[%sexp_of:message]);;letunlock_exnt=ok_exn(unlockt)letcritical_sectiont~f=lock_exnt;protect~f~finally:(fun()->unlock_exnt);;