123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248(* Based on "Adaptive Functional Programming"
https://www.cs.cmu.edu/~guyb/papers/popl02.pdf *)typechangeable=unit(* Detect attempts to change the inputs in the middle of a propagate operation. *)letin_propagate=reffalse(* A record of a computation that takes an input of type ['a]. *)type'aedge={start:Time.t;(* When this computation started. *)fn:'a->unit;(* The operation to run on updates. *)}(* The state of an initialised modifiable. *)type'afull={value:'a;(* The current value. *)readers:'aedgeQueue.t(* The computations which read this value. *)}type'amodval=|Uninitialised|Fullof'afull|Redirectof(eq:('a->'a->bool)->'a->unit)(* To write here, just call the function instead. *)(* A modifiable value starts off [Uninitialised] and then becomes [Full] once the
initial value is known. When the value changes, it is replaced with a new [Full]
value. *)type'at='amodvalrefmodulePq:sig(* A priority queue that returns the earliest edge first. *)typetvalcreate:unit->tvaladd:t->unitedge->unitvalpop:t->unitedgeoption(** [pop t] removes and returns the earliest edge in [t]. *)end=structmoduleEdge_set=Set.Make(structtypet=unitedgeletcompareab=Time.comparea.startb.startend)typet=Edge_set.trefletcreate()=refEdge_set.emptyletaddtedge=ifTime.is_validedge.startthen(t:=Edge_set.addedge!t;Time.set_forgetedge.start(fun()->t:=Edge_set.removeedge!t))letpopt=matchEdge_set.min_elt_opt!twith|None->None|Someedge->t:=Edge_set.removeedge!t;Time.clear_forgetedge.start;Someedgeend(* The singleton propagation queue. This contains all edges that need to be recalculated. *)letq=Pq.create()(* The very first and last entry in the timeline. *)letroot_start=Time.root()letroot_stop=Time.after~depth:(Time.depthroot_start)root_startletnow=refroot_startletdepth=ref0(* Number of [read] currently open surrounding [now]. *)(* Insert a new time directly after [now] at the current [depth]. *)letinsert_now?on_forget()=now:=Time.after?on_forget~depth:!depth!now;!nowletread_start()=letstart=insert_now()inincrdepth;startletread_stop()=decrdepthletcreateinit=lett=refUninitialisedininitt;tletnon_empty(t:'at)=match!twith|Fullx->x|Uninitialised->failwith"Modifiable is empty! (this shouldn't happen)"|Redirect_->failwith"Got an unexpected Redirect (this shouldn't happen)"(* If we keep reading a modifiable that doesn't change often, the list of
readers can build up over time. So each time we add something to the queue,
we also take one existing item and check that it's still valid. *)letminor_tidyq=matchQueue.take_optqwith|None->()|Someedge->ifTime.is_validedge.startthenQueue.addedgeqletreadtfn=letvalue=(non_emptyt).valueinletstart=read_start()infnvalue;read_stop();letreaders=(non_emptyt).readersin(* Readers might have changed by now *)letedge={start;fn}inminor_tidyreaders;Queue.addedgereaders;t:=Full{value;readers}leton_releasefn=let_:Time.t=insert_now~on_forget:fn()in()(* A more efficient version of [read], when we already know the start and stop times. *)letrereadtreader()=match!twith|Uninitialised->failwith"Modifiable is empty! (this shouldn't happen)"|Redirect_->failwith"Modifiable is a redirect! (this shouldn't happen)"|Fullf->minor_tidyf.readers;Queue.addreaderf.readers;reader.fnf.valueletwrite~eqtvalue=match!twith|Uninitialised->t:=Full{value;readers=Queue.create()}|Redirectf->f~eqvalue|Full{value=old;readers=_}wheneqoldvalue->()|Fullold->t:=Full{value;readers=Queue.create()};old.readers|>Queue.iter(funr->Pq.addq{rwithfn=rereadtr})moduleSeparate(Map:Map.S)=struct(* Normally, if we processed all the elements of a set with a function then
then we would automatically invalidate all of the work whenever the set changed.
Instead, we pretend that the read of the set finishes before any of the
elements are processed, so that changing the set just calls our [update]
function. Then we manually remove any time periods that are no longer needed
and create any new ones (for newly added elements). The result of the user
function is intercepted and turned into an operation to add the result to
the results map.
Note that this might cause the output to be written to many times in a
single propagate, but that shouldn't cause any problems. The first write
will add all readers to the queue but the final result will be set before
any of them actually run. *)(* The start time of a computation that processed an element of the set.
There is no [fn] here because an input element cannot change, it can only
be removed from the set. When an element is removed, the [start] and
its children are erased from history. *)typeperiod=Time.t(* When this computation started. *)letmapxs_incr(f:Map.key->'bt->changeable):'bMap.tt=letactive:periodMap.tref=refMap.emptyinletresult=create(fund->write~eq:(==)dMap.empty)inletstart=read_start()inletupdatexs=(* Called initially and whenever [xs] changes, always at time [start] and [depth + 1].
We exit the scope of [start] so that the Map elements are not spliced out
after an update of the set. *)read_stop();assert(!now==start);assert(!depth=Time.depthstart);active:=Map.merge(funkeyab->matcha,bwith|None,Some()->(* A new element has been added. Add it to the timeline: *)letstart=read_start()in(* Run [f key]. When it tries to write the result, add that to [results]: *)fkey(ref(Redirect(fun~eqvalue->letold_map=(non_emptyresult).valueinmatchMap.find_optkeyold_mapwith|Someold_valuewheneqold_valuevalue->()|_->writeresult(Map.addkeyvalueold_map)~eq:(==);)));read_stop();(* Record the time period during which [f key] ran, so we can erase it later. *)Somestart|Some_asexisting,Some()->(* An existing element is still present. Keep it. *)existing|Someold_start,None->(* An element has been removed. Erase it from history: *)Time.splice_outold_start;Time.forgetold_start;(* Remove its result from the output: *)letold_map=(non_emptyresult).valueinwriteresult(Map.removekeyold_map)~eq:(==);(* Remove it from [active]: *)None|None,None->assertfalse)!activexsinbeginletxs=non_emptyxs_incrinminor_tidyxs.readers;updatexs.value(* Note: [xs] might have been replaced by now. *)end;(* Arrange to call [update] again if [xs] changes: *)letedge={start;fn=update}inQueue.addedge(non_emptyxs_incr).readers;resultendletdereft=(non_emptyt).valueletchange~eqtv=if!in_propagatethenfailwith"Current_incr.change called within propagate!";letpresent=!nowinwrite~eqtv;now:=presentletrecpropagate2()=matchPq.popqwith|None->()|Some{start;fn}->(* Note: The later paper splices out after calling [fn] rather than before - why? *)Time.splice_outstart;(* They also added a [finger] variable - but never use it. *)now:=start;depth:=Time.depthstart+1;(* Inside the read. *)fn();propagate2()letpropagate()=assert(not!in_propagate);in_propagate:=true;Fun.protectpropagate2~finally:(fun()->(* Set the current time after all created elements. *)now:=Time.prevroot_stop;depth:=0;in_propagate:=false)