123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510letsrc=Logs.Src.create"gc"moduleLog=(valLogs.src_logsrc:Logs.LOG)moduleSet=Set.Make(Int)(* This garbage collector is a ‘mark-and-sweep’ garbage collector whose
allocation is protected by a global lock between all writers.
- [get_free_cell] is the first attempt to allocate a memory area, accessed
with the protection of our mutex [free_locker]. This function consists of
reusing unused cells.
- If there are no unused areas, we ‘sweep’, i.e. we look at the cells that
have been collected and try [get_free_cell] again.
- If we still don't have any available cells, we ‘really allocate’ by moving
the boundary of our segment ([brk]).
- If we cannot move [brk], we end up in the worst case scenario: having to
extend (if possible) our segment (which consists of creating a new segment,
copying the old one and then working on the new one).
Sweep
Rowex is based on a fairly simple memory model, where data is created by a
given task (identified by a number, which simply increments, allowing tasks
to be ordered from oldest to most recent). According to the Rowex design,
once allocated, this cell is reachable by the task that allocated it, as
well as all tasks prior to the one that created the cell.
Thus, if a cell is marked as free, we must "wait" until the oldest active
task is more recent than the one that marked the cell. If this is the case,
then no active task can now reach this cell, and we can therefore consider
it to be truly free.
A collected cell is therefore associated with the identifier of the task
that requested the GC to collect it. Then, during a "sweep" phase, all
collected cells are scanned and those that meet this predicate are swept:
[cell.uid < oldest_active_task_uid]. These cells are then added to our [free]
table (protected by a mutex) to be available when a task wishes to allocate.
Collect
The collection occurs when a task wants to delete a cell. This operation is
very simple and consists of adding the cell to be collected to an (atomic)
queue. We keep track of the task that wanted to delete this cell, and as
long as there are tasks that predate it, we keep the cell in our queue. It
is during a sweep and when the predicate is satisfied that we can consider
the cell to be truly free.
Extension of our [brk]
It may happen that there are no free cells. In this case, we move the
boundary of our segment in order to allocate a new cell: we move the [brk].
The format of our rowex file consists of having our [brk] at offset [0], the
root of our tree at offset [size_of_word], and then our rowex (i.e. at
offset [size_of_word * 2]).
Extension of our data-segment
The worst case scenario is when the [brk] can no longer be moved. In this
case, we need to extend our segment. The extension is not part of the GC,
but the latter creates a situation where such an extension can be made.
When a task cannot allocate, it will set up a "trap" for all tasks (that
want to write). It will then wait for all these tasks to "release"
(terminate) or fall into our trap. We can do this thanks to [Clatch]
(Counted down latch).
In other words, the extension takes effect once all tasks have finished or
fallen into the trap:
- if a new task attempts to appear, it is not counted by our [clatch] (since
it was created before it appeared) and will wait for the extension
- if a task ends, it is counted down from the [clatch]
- it is possible that a task may continue to allocate even if another task
has set up our trap. This is not a problem because if this task continues,
it will eventually either finish or want to allocate again (and may have
fallen into our trap). But in any case, the task that set the trap will
still be waiting and the extension will still not have started. We say that
tasks "converge" towards our trap.
Readers are a special case because they do not allocate. It does not matter
whether they are working on the new version or the old one. However, they
are important in determining whether cells can be "swept" or not (which is
why they are counted when we want to know "the oldest active task").
Once the writing tasks have fallen into our trap, we can start creating a
new segment, copy the old one onto the new one (knowing that no tasks can
write to it at the same time) and rework this new segment.
*)moduleClatch=structtypet={mutex:Miou.Mutex.t;condition:Miou.Condition.t;mutablecount:int}letcreaten={mutex=Miou.Mutex.create();condition=Miou.Condition.create();count=n}letawaitt=Miou.Mutex.protectt.mutex@@fun()->whilet.count>0doMiou.Condition.waitt.conditiont.mutexdoneletcount_downt=Miou.Mutex.protectt.mutex@@fun()->t.count<-t.count-1;Miou.Condition.broadcastt.conditionendtypecell={addr:int;len:int;uid:int}type'memt={queue_locker:Miou.Mutex.t;active_processes:(int*[`Wr|`Rd])Queue.t;released_processes:Set.tref;clatch:Clatch.toptionref;free_locker:Miou.Mutex.t;free:(int,Set.t)Hashtbl.t;free_cells:intAtomic.t;older_active_process:intAtomic.t;collected:cellMiou.Queue.t;in_sync:'memMiou.Computation.toptionref;extend_and_copy:int->'mem->'mem*int;mutablememory:'mem;memory_from_t:'memAtomic.t}(* NOTE(dinosaure): The use of [ref] here is important because these are values
that must be shared between tasks. In particular, we need to make a copy of
[t] for each new task (see [with_memory]). We therefore ensure that we copy
the pointer to these values rather than the values themselves (and thus,
they are shared between all our tasks). *)typeuid=intletmake~extend_and_copymemory_from_t=letfree_locker=Miou.Mutex.create()inletfree=Hashtbl.create0x7ffinletfree_cells=Atomic.make0inletolder_active_process=Atomic.make0inletcollected=Miou.Queue.create()inletqueue_locker=Miou.Mutex.create()inletactive_processes=Queue.create()inletreleased_processes=refSet.emptyinletclatch=refNoneinletin_sync=refNonein{free_locker;free;free_cells;older_active_process;collected;queue_locker;active_processes;released_processes;clatch;in_sync;extend_and_copy;memory=Atomic.getmemory_from_t;memory_from_t}letatomic_memory{memory_from_t;_}=Atomic.getmemory_from_tletmemory{memory;_}=memoryletwith_memorytmemory={twithmemory}letnull=0letunsafe_add_free_celltwriter~addr~len=Log.debug(funm->m"[%016x] add a new free cell %016x (%d byte(s))"writeraddrlen);let()=tryletcells=Hashtbl.findt.freeleninHashtbl.replacet.freelen(Set.addaddrcells)withNot_found->Hashtbl.addt.freelen(Set.singletonaddr)inignore(Atomic.fetch_and_addt.free_cells1)letget_free_cellwriter~len=ifAtomic.getwriter.free_cells>0thenMiou.Mutex.protectwriter.free_locker@@fun()->matchSet.to_list(Hashtbl.findwriter.freelen)with|[cell]->ignore(Atomic.fetch_and_addwriter.free_cells(-1));Hashtbl.removewriter.freelen;Somecell|cell::cells->ignore(Atomic.fetch_and_addwriter.free_cells(-1));Hashtbl.replacewriter.freelen(Set.of_listcells);Somecell|[]->Hashtbl.removewriter.freelen;None|exceptionNot_found->NoneelseNoneletcan_we_sweep_itwriteruid'=letolder_active_process=Atomic.get(Sys.opaque_identitywriter.older_active_process)inolder_active_process=null||uid'<older_active_processletcollecttwriteraddr~len~uid=letaddr=Rowex.Addr.unsafe_to_intaddrinLog.debug(funm->m"[%016x] collects %016x (%d byte(s)) made by %016x"writeraddrlenuid);(* TODO(dinosaure): I don't remmember if we need to keep the task which
collects the cell ([writer]) or if we need to keep the task which made the
cell ([uid]):
- If we use [uid], we fallback to an unreachable case on [rowex]: not
in-sync nodes... So we break something.
- If we use [writer], we don't really re-use cells. *)Miou.Queue.enqueuet.collected{addr;len;uid=writer}letsweeptwriter=letreally_sweep()=Log.debug(funm->m"sweep: %016x start"writer);letcollected=Miou.Queue.(to_list(transfert.collected))inletfree,keep=List.fold_left(fun(free,keep)({addr;len;uid}ascell)->ifcan_we_sweep_ittuidthen((addr,len)::free,keep)else(free,cell::keep))([],[])collectedinLog.debug(funm->m"sweep: keep %d cell(s)"(List.lengthkeep));Log.debug(funm->m"sweep: free %d cell(s)"(List.lengthfree));List.iter(Miou.Queue.enqueuet.collected)keep;Miou.Mutex.protectt.free_locker@@fun()->List.iter(fun(addr,len)->unsafe_add_free_celltwriter~addr~len)freeinifMiou.Queue.lengtht.collected>0thenreally_sweep()exceptionRetry_after_extensionletunsafe_count_active_writerst=letrw=!(t.released_processes)inletfnacc(uid,k)=matchkwith`Wrwhennot(Set.memuidrw)->acc+1|_->accinQueue.foldfn0t.active_processesletsize_of_word=Sys.word_size/8externalstring_unsafe_get_uint32:string->int->int32="%caml_string_get32"moduletypeS=sigtypememoryvallength:memory->intvalatomic_fetch_add_leuintnat:memory->int->int->intvalatomic_set_leuintnat:memory->int->int->unitvalset_int32:memory->int->int32->unitvalset_uint8:memory->int->int->unitendmoduleMake(C:S)=structtypememory=C.memory(* XXX(dinosaure): An important note is that the [t.memory] used is always
the correct one (even if it is not atomic) for readers and writers. It may
happen that [t.memory] for readers continues to refer to the old memory
area (the reader takes the memory area, a writer appears and attempts to
extend the memory area: in this case, our reader still refers to the old
area), but we always refer to the latest memory area (regardless of
extensions) for writers. *)letrecblitvpayloadsmemorydst_off=matchpayloadswith|hd::tl->letlen=String.lengthhdinletlen0=lenland3inletlen1=lenasr2infori=0tolen1-1doleti=i*4inletv=string_unsafe_get_uint32hdiinC.set_int32memory(dst_off+i)vdone;fori=0tolen0-1doleti=(len1*4)+iinC.set_uint8memory(dst_off+i)(Char.codehd.[i])done;blitvtlmemory(dst_off+len)|[]->()letreally_alloctwriter~kindlenpayloads=letlen=(len+(size_of_word-1))/size_of_word*size_of_wordinLog.debug(funm->m"[%016x] try to allocate %d byte(s)"writerlen);letold_brk=C.atomic_fetch_add_leuintnatt.memory0leninifold_brk+len<=C.lengtht.memorythenbeginletaddr=old_brkinLog.debug(funm->m"brk: %016x (owner: [%016x]) => %016x"addrwriter(old_brk+len));blitvpayloadst.memoryaddr;ifkind=`NodethenC.atomic_set_leuintnatt.memory(addr+Rowex._header_owner)writer;Rowex.Addr.of_int_to_rdwraddrendelsebeginC.atomic_set_leuintnatt.memory0old_brk;(* NOTE(dinosaure): we must replace [brk] to be sure that a next usage
of our rowex file will not fail with a SIGSEGV (because the current
[brk] farther than expected. *)(* NOTE(dinosaure): the idea here is to "trap" our writers in this part of
the code. if, by mistake, one has finished in the meantime, it will
"count_down" (see [release_writer]) itself but will not participate in
the extension. a writer will create the "clatch" and wait for all the
others to fall into the trap as well. then, we will create an ivar and
our first writer will perform the extension while the others wait for
the result of this extension. *)Log.debug(funm->m"start to extend our rowex file");Miou.Mutex.lockt.queue_locker;match!(t.clatch)with|None->letactive_writers=unsafe_count_active_writerstinassert(active_writers>=1);letclatch=Clatch.create(active_writers-1)inletin_sync=Miou.Computation.create()int.clatch:=Someclatch;t.in_sync:=Somein_sync;Log.debug(funm->m"lucky you are %016x, start to wait %d writer(s)"writeractive_writers);Miou.Mutex.unlockt.queue_locker;Clatch.awaitclatch;t.clatch:=None;letnew_memory,new_size=t.extend_and_copywritert.memoryinLog.debug(funm->m"new memory (new size: %d byte(s))"new_size);t.memory<-new_memory;Atomic.sett.memory_from_tnew_memory;assert(Miou.Computation.try_returnin_syncnew_memory);(* TODO(dinosaure): should we clean-up [t.in_sync]? I would like to
say yes but I'm not sure. *)raiseRetry_after_extension|Someclatch->Log.debug(funm->m"%016x trapped"writer);Miou.Mutex.unlockt.queue_locker;letin_sync=Option.get!(t.in_sync)inClatch.count_downclatch;Clatch.awaitclatch;letnew_memory=Miou.Computation.await_exnin_syncint.memory<-new_memory;raiseRetry_after_extensionendletalloct~writer~kindlenpayloads=(* The subtlety here is that if one writer falls into the file extension
case, another writer may want to allocate at the same time. There are
then two possible scenarios:
- the case where this new writer attempts to allocate smaller data and
finds an empty cell; in this case, we continue to work on the "old"
memory area and the extension only takes effect when ALL writers have
fallen into the trap
- the case where this new writer does not find a free cell and therefore
falls into the trap.
In other words, we can continue to work on the old memory area (before
its extension) even if a writer requests the extension at the same time.
We will reach a point where all writers will be unable to allocate and
will fall into the trap of the first writer (and all allocations, even
those made while our first writer waits for all the others to fall into
the trap, will be effective during the extension). *)matchget_free_cellt~lenwith|Someaddr->blitvpayloadst.memoryaddr;ifkind=`NodethenC.atomic_set_leuintnatt.memory(addr+Rowex._header_owner)writer;Rowex.Addr.of_int_to_rdwraddr|None->beginignore(sweeptwriter);matchget_free_cellt~lenwith|None->begintryreally_alloctwriter~kindlenpayloadswithRetry_after_extension->Log.debug(funm->m"retry an allocation");really_alloctwriter~kindlenpayloadsend|Someaddr->Log.debug(funm->m"re-use(2) %016x (owner: [%016x])"addrwriter);blitvpayloadst.memoryaddr;ifkind=`NodethenC.atomic_set_leuintnatt.memory(addr+Rowex._header_owner)writer;Rowex.Addr.of_int_to_rdwraddrend(* the goal here is to update [t.older_active_writer] to the one we get from
[t.active_writers]. We **really try** to be synchrone between the last
[t.active_writers] and [t.older_active_writer]. *)letrecupdate_older_active_process?(backoff=Miou.Backoff.default)?oldert=letolder=matcholderwith|Someolder->older|None->beginMiou.Mutex.protectt.queue_locker@@fun()->matchQueue.peekt.active_processeswith|older,_->older|exceptionQueue.Empty->0endinletseen=Atomic.gett.older_active_processinifseen<>older&¬(Atomic.compare_and_sett.older_active_processseenolder)thenupdate_older_active_process~backoff:(Miou.Backoff.oncebackoff)tletgen=letv=Atomic.make1infun()->Atomic.fetch_and_addv1letadd_processtkind=letuid=gen()inLog.debug(funm->m"new writer %016x"uid);letset=Atomic.compare_and_sett.older_active_process0uidinifnotsetthenbeginletolder,_=Miou.Mutex.protectt.queue_locker@@fun()->(* Here, if a writer attempts to extend the area, and our new writer is
not counted in our trap, but we should have our [in_sync] notifying
everyone that the extension has finished and that [t.memory] is
indeed the new memory area. We therefore wait for the extension to
finish here before launching our new writer. *)let()=match(kind,!(t.clatch))with|`Wr,Some_->ignore(Miou.Computation.await_exn(Option.get!(t.in_sync)))|_->()inQueue.push(uid,kind)t.active_processes;(* here, we take the previous writer before the apparition of our new one. *)Queue.peekt.active_processesinupdate_older_active_process~oldert;uidendelseMiou.Mutex.protectt.queue_locker@@fun()->Queue.push(uid,kind)t.active_processes;uidletrecunsafe_clean_released_processest=letrw=!(t.released_processes)inifSet.is_emptyrw=falsethenmatchQueue.peekt.active_processeswith|older,_->ifSet.memolderrwthenbegint.released_processes:=Set.removeolderrw;ignore(Queue.popt.active_processes);unsafe_clean_released_processestend|exceptionQueue.Empty->()letrelease_processtkind~uid=letolder=Miou.Mutex.protectt.queue_locker@@fun()->Log.debug(funm->m"release writer %016x"uid);(* here, if our writer has finished but another writer tries to extend the
file, we count down to prevent the other writer from waiting for us
indefinitely! *)let()=match(kind,!(t.clatch))with|`Wr,Someclatch->Log.debug(funm->m"writer %016x unlock our extension"uid);Clatch.count_downclatch|_->()inmatchQueue.peekt.active_processeswith|older,_->ifuid=olderthenbeginassert(fst(Queue.popt.active_processes)=uid);(* here, we possibly have few writers ahead our writer [uid]. they
must have ended before us. *)Log.debug(funm->m"clean possible released writers");unsafe_clean_released_processest;letolder=Queue.peek_optt.active_processesinletolder=Option.mapfstolderinOption.value~default:0olderendelsebeginLog.debug(funm->m"it exists an older active writer (%016x) than %016x"olderuid);letrw=!(t.released_processes)inletrw=Set.adduidrwint.released_processes:=rw;olderend|exceptionQueue.Empty->Log.err(funm->m"we missed writer %016x"uid);assertfalseinupdate_older_active_process~oldertletcollect=collectletdeletet(addr:'aRowex.Addr.t)len=Miou.Mutex.protectt.free_locker@@fun()->unsafe_add_free_cellt0~addr:(Rowex.Addr.unsafe_to_intaddr)~lenletunsafe_deletet(addr:'aRowex.Addr.t)len=unsafe_add_free_cellt0~addr:(Rowex.Addr.unsafe_to_intaddr)~lenend