tate] defines the global state of which there is one instance for each call to
[Incremental.Make].
This module does not have an mli because it would essentially duplicate
[Incremental.S], except adding an extra [State.t] argument to functions. *)openCore_kernelopenImportopenTypes.Kindtypestatus=Types.Status.t=|Stabilizing|Running_on_update_handlers|Not_stabilizing|Stabilize_previously_raisedofRaised_exn.t[@@derivingsexp_of]moduleNode_update=On_update_handler.Node_updatemoduleRun_on_update_handlers=structtypet=Types.Run_on_update_handlers.t=T:'aNode.t*'aNode_update.t->t[@@derivingsexp_of]letinvariant(T(node,_node_update)ast)=Invariant.invariant[%here]t[%sexp_of:t](fun()->Node.invariantignorenode);;endmoduleOnly_in_debug=struct(* Extra state kept only when [debug] for the purpose of writing assertions. *)typet=Types.Only_in_debug.t={mutablecurrently_running_node:Node.Packed.toption;mutableexpert_nodes_created_by_current_node:Node.Packed.tlist}[@@derivingfields,sexp_of]letinvariantt=Invariant.invariant[%here]t[%sexp_of:t](fun()->Fields.iter~currently_running_node:ignore~expert_nodes_created_by_current_node:ignore);;letcreate()={currently_running_node=None;expert_nodes_created_by_current_node=[]};;endmodulePacked_weak_hashtbl=structtypet=Types.Packed_weak_hashtbl.t=T:(_,_)Weak_hashtbl.t->t[@@derivingsexp_of]endtypet=Types.State.t={mutablestatus:status;bind_lhs_change_should_invalidate_rhs:bool;(* [stabilization_num] starts at zero, and is incremented at the end of each
stabilization. *)mutablestabilization_num:Stabilization_num.t;mutablecurrent_scope:Scope.t;recompute_heap:Recompute_heap.t;adjust_heights_heap:Adjust_heights_heap.t;(* [propagate_invalidity] holds nodes that have invalid children that should be
considered for invalidation. It is only used during graph restructuring:
[invalidate_node] and [add_parent]. Once an element is added to the stack, we then
iterate until invalidity has propagated to all ancestors as necessary, according to
[Node.should_be_invalidated]. *)propagate_invalidity:Node.Packed.tStack.t;(* [num_active_observers] is the number of observers whose state is [Created] or
[In_use]. *)mutablenum_active_observers:int;(* [all_observers] is the doubly-linked list of all observers in effect, or that have
been disallowed since the most recent start of a stabilization -- these have
[state] as [In_use] or [Disallowed]. *)mutableall_observers:Internal_observer.Packed.tUopt.t;(* We enqueue finalized observers in a thread-safe queue, for handling during
stabilization. We use a thread-safe queue because OCaml finalizers can run in any
thread. *)finalized_observers:Internal_observer.Packed.tThread_safe_queue.t;(* [new_observers] holds observers created since the most recent start of a
stabilization -- these have [state] as [Created] or [Unlinked]. At the start of
stabilization, we link into [all_observers] all observers in [new_observers] whose
state is [Created] and add them to the [observers] of the node they are observing.
We structure things this way to allow observers to be created during stabilization
while running user code ([map], [bind], etc), but to not have to deal with nodes
becoming necessary and the the graph changing during such code. *)new_observers:Internal_observer.Packed.tStack.t;(* [disallowed_observers] holds all observers that have been disallowed since the most
recent start of a stabilization -- these have [state = Disallowed]. At the start
of stabilization, these are unlinked from [all_observers] and their state is
changed to [Unlinked]. We structure things this way to allow user code running
during stabilization to call [disallow_future_use], but to not have to deal with
nodes becoming unnecessary and the graph changing during such code. *)disallowed_observers:Internal_observer.Packed.tStack.t;(* We delay all [Var.set] calls that happen during stabilization so that they take
effect after stabilization. All variables set during stabilization are pushed on
[set_during_stabilization] rather than setting them. Then, after the graph has
stabilized, we do all the sets, so that they take effect at the start of the next
stabilization. *)set_during_stabilization:Var.Packed.tStack.t;(* [handle_after_stabilization] has all nodes with handlers to consider running at the
end of the next stabilization. At the end of stabilization, we consider each node
in [handle_after_stabilization], and if we decide to run its on-update handlers,
push it on [run_on_update_handlers]. Then, once we've considered all nodes in
[handle_after_stabilization], we iterate through [run_on_update_handlers] and
actually run the handlers.
These two passes are essential for correctness. During the first pass, we haven't
run any user handlers, so we know that the state is exactly as it was when
stabilization finished. In particular, we know that if a node is necessary, then
it has a stable value; once user handlers run, we don't know this. During the
second pass, user handlers can make calls to any incremental function except for
[stabilize]. In particular, some functions push nodes on
[handle_after_stabilization]. But no functions (except for [stabilize]) modify
[run_on_update_handlers]. *)handle_after_stabilization:Node.Packed.tStack.t;run_on_update_handlers:Run_on_update_handlers.tStack.t;mutableonly_in_debug:Only_in_debug.t;weak_hashtbls:Packed_weak_hashtbl.tThread_safe_queue.t;mutablekeep_node_creation_backtrace:bool;(* Stats. These are all incremented at the appropriate place, and never decremented. *)mutablenum_nodes_became_necessary:int;mutablenum_nodes_became_unnecessary:int;mutablenum_nodes_changed:int;mutablenum_nodes_created:int;mutablenum_nodes_invalidated:int;mutablenum_nodes_recomputed:int;mutablenum_nodes_recomputed_directly_because_one_child:int;mutablenum_nodes_recomputed_directly_because_min_height:int;mutablenum_var_sets:int}[@@derivingfields,sexp_of]moduleClock=structtypet=Types.Clock.t={(* We use [timing_wheel] for time-based incrementals. [now] is a variable holding
the current time. [handle_fired] is the closure passed to
[Timing_wheel.advance_clock]. It links all the fired alarm values into
[fired_alarm_values]. After [Timing_wheel.advance_clock] returns, it then
walks through the linked list and actually fires them. This two-pass approach is
necessary because one is not allowed to call [Timing_wheel] functions from the
[handle_fired] that one passes to [Timing_wheel.advance_clock]. *)timing_wheel:Alarm_value.tTiming_wheel.t;now:Time_ns.tVar.t;handle_fired:Alarm.t->unit;mutablefired_alarm_values:Alarm_value.tUopt.t}[@@derivingfields,sexp_of]letinvariantt=Invariant.invariant[%here]t[%sexp_of:t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~now:(check(fun(now:_Var.t)->assert(Time_ns.equalnow.value(Timing_wheel.nowt.timing_wheel))))~handle_fired:ignore~fired_alarm_values:(check(funfired_alarm_values->assert(Uopt.is_nonefired_alarm_values)))~timing_wheel:(check(Timing_wheel.invariantAlarm_value.invariant)));;letincr_statet=Var.incr_statet.nowendletnow(clock:Clock.t)=clock.now.valuelettiming_wheel_length(clock:Clock.t)=Timing_wheel.lengthclock.timing_wheelletnum_stabilizest=Stabilization_num.to_intt.stabilization_numletmax_height_allowedt=Adjust_heights_heap.max_height_allowedt.adjust_heights_heapletmax_height_seent=Adjust_heights_heap.max_height_seent.adjust_heights_heapletiter_observerst~f=letr=reft.all_observersinwhileUopt.is_some!rdoletobserver=Uopt.unsafe_value!rinr:=Internal_observer.Packed.next_in_allobserver;fobserverdone;;letdirectly_observedt=letr:Node.Packed.tlistref=ref[]initer_observerst~f:(fun(Tinternal_observer)->r:=Tinternal_observer.observing::!r);!r;;letsave_dottfile=Node.Packed.save_dotfile(directly_observedt)letiter_observer_descendantst~f=Node.Packed.iter_descendants(directly_observedt)~f;;moduleStats=structtypet={max_num_parents:int;percentage_of_nodes_by_num_parents:(int*Percent.t)list}[@@derivingsexp]endletstatst=letmax_num_parents=ref(-1)inletnum_necessary_nodes=ref0initer_observer_descendantst~f:(fun(Tnode)->incrnum_necessary_nodes;max_num_parents:=Int.max!max_num_parentsnode.num_parents);letmax_num_parents=!max_num_parentsinletnum_nodes_by_num_parents=Array.create~len:(max_num_parents+1)0initer_observer_descendantst~f:(fun(Tnode)->letnum_parents=node.num_parentsinnum_nodes_by_num_parents.(num_parents)<-num_nodes_by_num_parents.(num_parents)+1);letpercentage_of_nodes_by_num_parents=Array.foldinum_nodes_by_num_parents~init:[]~f:(funiacnum_nodes->ifnum_nodes=0thenacelse(i,Percent.of_mult(floatnum_nodes/.float!num_necessary_nodes))::ac)|>List.revin{Stats.max_num_parents;percentage_of_nodes_by_num_parents};;letam_stabilizingt=matcht.statuswith|Running_on_update_handlers|Stabilizing->true|Not_stabilizing->false|Stabilize_previously_raisedraised_exn->failwiths~here:[%here]"cannot call am_stabilizing -- stabilize previously raised"raised_exn[%sexp_of:Raised_exn.t];;letinvariantt=matcht.statuswith|Stabilize_previously_raised_->()|Running_on_update_handlers|Stabilizing|Not_stabilizing->Invariant.invariant[%here]t[%sexp_of:t](fun()->letcheckf=Invariant.check_fieldtfiniter_observerst~f:(fun(Tinternal_observer)->(matchinternal_observer.statewith|In_use|Disallowed->()|Created|Unlinked->failwiths~here:[%here]"member of all_observers with unexpected state"internal_observer[%sexp_of:_Internal_observer.t]);Internal_observer.invariantignoreinternal_observer);iter_observer_descendantst~f:(fun(Tnode)->Node.invariantignorenode;ifnot(am_stabilizingt)thenassert(Uopt.is_nonenode.old_value_opt);assert(node.height<=Adjust_heights_heap.max_height_seent.adjust_heights_heap));assert(Adjust_heights_heap.max_height_allowedt.adjust_heights_heap=Recompute_heap.max_height_allowedt.recompute_heap);Fields.iter~status:ignore~bind_lhs_change_should_invalidate_rhs:ignore~stabilization_num:(checkStabilization_num.invariant)~current_scope:(check(funcurrent_scope->assert(phys_equalcurrent_scopeScope.top)))~recompute_heap:(checkRecompute_heap.invariant)~adjust_heights_heap:(check(funadjust_heights_heap->assert(Adjust_heights_heap.lengthadjust_heights_heap=0);Adjust_heights_heap.invariantadjust_heights_heap))~propagate_invalidity:(check(funpropagate_invalidity->assert(Stack.is_emptypropagate_invalidity)))~num_active_observers:(check(funnum_active_observers->assert(num_active_observers>=0)))~all_observers:ignore~finalized_observers:ignore~new_observers:(check(Stack.invariant(funpacked->Internal_observer.Packed.invariantpacked;let(Tinternal_observer)=packedin(* When an observer is added to [new_observers], it has [state = Created].
The only possible transitions from there are to [Unlinked] or to
[In_use], which also removes it from [new_observers], never to be added
again. Thus it is impossible for an observer in [new_observers] to be
[In_use] or [Disallowed]. *)matchinternal_observer.statewith|Created|Unlinked->()|In_use|Disallowed->assertfalse)))~disallowed_observers:(check(Stack.invariant(funpacked->Internal_observer.Packed.invariantpacked;let(Tinternal_observer)=packedinmatchinternal_observer.statewith|Disallowed->()|Created|In_use|Unlinked->assertfalse)))~set_during_stabilization:(check(funset_during_stabilization->matcht.statuswith|Stabilize_previously_raised_->assertfalse|Running_on_update_handlers|Not_stabilizing->assert(Stack.is_emptyset_during_stabilization)|Stabilizing->Stack.invariant(fun(Var.Packed.Tvar)->assert(Uopt.is_somevar.value_set_during_stabilization))set_during_stabilization))~handle_after_stabilization:(check(Stack.invariantNode.Packed.invariant))~run_on_update_handlers:(check(Stack.invariantRun_on_update_handlers.invariant))~only_in_debug:(checkOnly_in_debug.invariant)~weak_hashtbls:ignore~keep_node_creation_backtrace:ignore~num_nodes_became_necessary:ignore~num_nodes_became_unnecessary:ignore~num_nodes_changed:ignore~num_nodes_created:ignore~num_nodes_invalidated:ignore~num_nodes_recomputed:ignore~num_nodes_recomputed_directly_because_one_child:ignore~num_nodes_recomputed_directly_because_min_height:ignore~num_var_sets:ignore);;letensure_not_stabilizingt~name~allow_in_update_handler=matcht.statuswith|Not_stabilizing->()|Running_on_update_handlers->ifnotallow_in_update_handlerthen(letbacktrace=Backtrace.get()infailwiths~here:[%here](sprintf"cannot %s during on-update handlers"name)backtrace[%sexp_of:Backtrace.t])|Stabilize_previously_raisedraised_exn->letbacktrace=Backtrace.get()infailwiths~here:[%here](sprintf"cannot %s -- stabilize previously raised"name)(raised_exn,backtrace)[%sexp_of:Raised_exn.t*Backtrace.t]|Stabilizing->letbacktrace=Backtrace.get()infailwiths~here:[%here](sprintf"cannot %s during stabilization"name)backtrace[%sexp_of:Backtrace.t];;letset_height(node:_Node.t)height=lett=node.stateinAdjust_heights_heap.set_heightt.adjust_heights_heapnodeheight;;letset_max_height_allowedtheight=ensure_not_stabilizingt~name:"set_max_height_allowed"~allow_in_update_handler:true;Adjust_heights_heap.set_max_height_allowedt.adjust_heights_heapheight;Recompute_heap.set_max_height_allowedt.recompute_heapheight;;lethandle_after_stabilization:typea.aNode.t->unit=funnode->ifnotnode.is_in_handle_after_stabilizationthen(lett=node.stateinnode.is_in_handle_after_stabilization<-true;Stack.pusht.handle_after_stabilization(Tnode));;letrecremove_children:typea.aNode.t->unit=funparent->Node.iteri_childrenparent~f:(funchild_index(Tchild)->remove_child~child~parent~child_index)andremove_child:typeab.child:bNode.t->parent:aNode.t->child_index:int->unit=fun~child~parent~child_index->Node.remove_parent~child~parent~child_index;check_if_unnecessarychildandcheck_if_unnecessary:typea.aNode.t->unit=funnode->ifnot(Node.is_necessarynode)thenbecame_unnecessarynodeandbecame_unnecessary:typea.aNode.t->unit=funnode->lett=node.stateint.num_nodes_became_unnecessary<-t.num_nodes_became_unnecessary+1;ifnode.num_on_update_handlers>0thenhandle_after_stabilizationnode;node.height<--1;remove_childrennode;(matchnode.kindwith|Unordered_array_foldu->Unordered_array_fold.force_full_computeu|Expertp->Expert.observability_changep~is_now_observable:false|_->());ifdebugthenassert(not(Node.needs_to_be_computednode));ifNode.is_in_recompute_heapnodethenRecompute_heap.removet.recompute_heapnode;;letremove_alarm(clock:Clock.t)alarm=ifTiming_wheel.memclock.timing_wheelalarmthenTiming_wheel.removeclock.timing_wheelalarm;;(* An invalid node is node whose kind is [Invalid]. A node's kind is set to [Invalid]
when the lhs of its scope changes, or one if its children propagate the invalidity
upward (see [Node.should_be_invalidated] to see in which case invalidity propagation
stops). Invalidating a node disconnects it from its children, which means:
1. an invalid node cannot end up on the scheduler (if it is on the scheduler when
it is invalidated, it is removed)
2. an invalid node doesn't make its children necessary anymore.
Invalid nodes usually have no parents, because the upward invalidity propagation means
that their parents will themselves become invalid and disconnect from their children.
However, [if], [join] or [bind] are not invalidated by the upward propagation, so an
invalid node can still have parents. Invalid nodes can be necessary, in the case where
they have parents, and also when they are observed.
The upward propagation of invalidity happens both when a node becomes invalid, and when
trying to add an edge from an invalid child node to another node. Because invalidity
is only propagated upward, and because the rhs of a bind is invalidated before it
executes, a node cannot be both computed and invalidated in the same stabilization.
When invalidating, we can't assume much about the nodes we visit. We cannot assume
that nodes are valid (the rhs can contain previously invalidated nodes), or that nodes
are unnecessary (nodes can be made necessary without going through their containing
binds). *)letrecinvalidate_node:typea.aNode.t->unit=funnode->ifNode.is_validnodethen(lett=node.stateinifnode.num_on_update_handlers>0thenhandle_after_stabilizationnode;node.value_opt<-Uopt.none;ifdebugthenassert(Uopt.is_nonenode.old_value_opt);node.changed_at<-t.stabilization_num;node.recomputed_at<-t.stabilization_num;t.num_nodes_invalidated<-t.num_nodes_invalidated+1;ifNode.is_necessarynodethen(remove_childrennode;(* The node doesn't have children anymore, so we can lower its height as much as
possible, to one greater than the scope it was created in. Also, because we
are lowering the height, we don't need to adjust any of its ancestors' heights.
We could leave the height alone, but we may as well lower it as much as
possible to avoid making the heights of any future ancestors unnecessarily
large. *)node.height<-Scope.heightnode.created_in+1);(* We don't set [node.created_in] or [node.next_node_in_same_scope]; we leave [node]
in the scope it was created in. If that scope is ever invalidated, then that
will clear [node.next_node_in_same_scope] *)(matchnode.kindwith|Atat->remove_alarmat.clockat.alarm|At_intervalsat_intervals->remove_alarmat_intervals.clockat_intervals.alarm|Bind_mainbind->invalidate_nodes_created_on_rhsbind.all_nodes_created_on_rhs|Step_function{alarm;clock;_}->remove_alarmclockalarm|_->());Node.set_kindnodeInvalid;(* If we called [propagate_invalidity] right away on the parents, we would get into
trouble. The parent would disconnect itself from the current node, thus
modifying the list of parents we iterate on. Even if we made a special case, it
still wouldn't be enough to handle other cases where the list of parents is
modified (e.g. when [lhs] is invalidated in the example in the comment about
[can_recompute_now] far below). *)forindex=0tonode.num_parents-1doStack.pusht.propagate_invalidity(Node.get_parentnode~index)done;ifdebugthenassert(not(Node.needs_to_be_computednode));ifNode.is_in_recompute_heapnodethenRecompute_heap.removet.recompute_heapnode)andinvalidate_nodes_created_on_rhsnode=letr=refnodeinwhileUopt.is_some!rdolet(Tnode_on_rhs)=Uopt.unsafe_value!rinr:=node_on_rhs.next_node_in_same_scope;node_on_rhs.next_node_in_same_scope<-Uopt.none;invalidate_nodenode_on_rhsdone;;(* When [not t.bind_lhs_change_should_invalidate_rhs] and a bind's lhs changes, we move
nodes created on the bind's rhs up to its parent bind, as opposed to [Scope.Top]. This
maintains their dependence on valid bind left-hand sides, and keeps them higher in the
graph. This in turn means that we will continue to compute those nodes after the
parent bind's lhs, which gives them more of a chance to become unnecessary and not be
computed should the parent bind's lhs change. *)letrescope_nodes_created_on_rhs_t(first_node_on_rhs:Node.Packed.tUopt.t)~new_scope=letr=reffirst_node_on_rhsinwhileUopt.is_some!rdolet(Tnode_on_rhs)=Uopt.unsafe_value!rinr:=node_on_rhs.next_node_in_same_scope;node_on_rhs.next_node_in_same_scope<-Uopt.none;node_on_rhs.created_in<-new_scope;Scope.add_nodenew_scopenode_on_rhsdone;;letpropagate_invalidityt=whilenot(Stack.is_emptyt.propagate_invalidity)dolet(Tnode)=Stack.pop_exnt.propagate_invalidityinifNode.is_validnodethenifNode.should_be_invalidatednodetheninvalidate_nodenodeelse((* [Node.needs_to_be_computed node] is true because
- node is necessary. This is because children can only point to necessary
parents
- node is stale. This is because: For bind, if, join, this is true because
- either the invalidation is caused by the lhs changing (in which case the
lhs-change node being newer makes us stale).
- or a child became invalid this stabilization cycle, in which case it has
t.changed_at of [t.stabilization_num], and so [node] is stale
- or [node] just became necessary and tried connecting to an already invalid
child. In that case, [child.changed_at > node.recomputed_at] for that child,
because if we had been recomputed when that child changed, we would have been
made invalid back then. For expert nodes, the argument is the same, except
that instead of lhs-change nodes make the expert nodes stale, it's made stale
explicitely when adding or removing children. *)ifdebugthenassert(Node.needs_to_be_computednode);(matchnode.kindwith|Expertexpert->(* If multiple children are invalid, they will push us as many times on the
propagation stack, so we count them right. *)Expert.incr_invalid_childrenexpert|kind->ifdebugthen(matchkindwith|Bind_main_|If_then_else_|Join_main_->()|_->assertfalse(* nodes with no children are never pushed on the stack *)));(* We do not check [Node.needs_to_be_computed node] here, because it should be
true, and because computing it takes O(number of children), node can be pushed
on the stack once per child, and expert nodes can have lots of children. *)ifnot(Node.is_in_recompute_heapnode)thenRecompute_heap.addt.recompute_heapnode)done;;(* [add_parent_without_adjusting_heights t ~child ~parent] adds [parent] as a parent of
[child], and makes [child] and all its descendants necessary, ensuring their heights
are accurate. There is no guarantee about the relative heights of [child] and [parent]
though. *)letrecadd_parent_without_adjusting_heights:typeab.child:aNode.t->parent:bNode.t->child_index:int->unit=fun~child~parent~child_index->ifdebugthenassert(Node.is_necessaryparent);lett=child.stateinletwas_necessary=Node.is_necessarychildinNode.add_parent~child~parent~child_index;ifnot(Node.is_validchild)thenStack.pusht.propagate_invalidity(Tparent);ifnotwas_necessarythenbecame_necessarychild;matchparent.kindwith|Experte->Expert.run_edge_callbacke~child_index|_->()andbecame_necessary:typea.aNode.t->unit=funnode->(* [Scope.is_necessary node.created_in] is true (assuming the scope itself is valid)
because [Node.iter_children] below first visits the lhs-change of bind nodes and
then the rhs. *)ifNode.is_validnode&¬(Scope.is_necessarynode.created_in)thenfailwiths~here:[%here]"Trying to make a node necessary whose defining bind is not necessary"node[%sexp_of:_Node.t];lett=node.stateint.num_nodes_became_necessary<-t.num_nodes_became_necessary+1;ifnode.num_on_update_handlers>0thenhandle_after_stabilizationnode;(* Since [node] became necessary, to restore the invariant, we need to:
- add parent pointers to [node] from its children.
- set [node]'s height.
- add [node] to the recompute heap, if necessary. *)set_heightnode(Scope.heightnode.created_in+1);Node.iteri_childrennode~f:(funchild_index(Tchild)->add_parent_without_adjusting_heights~child~parent:node~child_index;(* Now that child is necessary, it should have a valid height. *)ifdebugthenassert(child.height>=0);ifchild.height>=node.heightthenset_heightnode(child.height+1));(* Now that the height is correct, maybe add [node] to the recompute heap. [node]
just became necessary, so it can't have been in the recompute heap. Since [node]
is necessary, we should add it to the recompute heap iff it is stale. *)ifdebugthenassert(not(Node.is_in_recompute_heapnode));ifdebugthenassert(Node.is_necessarynode);ifNode.is_stalenodethenRecompute_heap.addt.recompute_heapnode;matchnode.kindwith|Expertp->Expert.observability_changep~is_now_observable:true|_->();;letbecame_necessarynode=became_necessarynode;propagate_invaliditynode.state;;letadd_parent~child~parent~child_index=ifdebugthenassert(Node.is_necessaryparent);lett=parent.statein(* In the case when the edge being added creates a cycle, it is possible for the
recursion in [add_parent_without_adjusting_heights] to reach [parent] as a descendant
of [child]. In that case, the recursion terminates, because [Node.is_necessary
parent]. We then return here and subsequently detect the cycle in
[adjust_heights]. *)add_parent_without_adjusting_heights~child~parent~child_index;(* We adjust heights so that we ensure there are no cycles before calling
[propagate_invalidity]. *)ifchild.height>=parent.heightthenAdjust_heights_heap.adjust_heightst.adjust_heights_heapt.recompute_heap~child~parent;propagate_invalidityt;ifdebugthenassert(Node.is_necessaryparent);(* we only add necessary parents *)if(not(Node.is_in_recompute_heapparent))&&(Stabilization_num.is_noneparent.recomputed_at||Node.edge_is_stale~child~parent)thenRecompute_heap.addt.recompute_heapparent;;letrun_with_scopetscope~f=letsaved=t.current_scopeint.current_scope<-scope;tryletv=f()int.current_scope<-saved;vwith|exn->t.current_scope<-saved;raiseexn;;letwithin_scopetscope~f=ifnot(Scope.is_validscope)thenfailwiths~here:[%here]"attempt to run within an invalid scope"t[%sexp_of:t];run_with_scopetscope~f;;letchange_child:typeab.parent:aNode.t->old_child:bNode.tUopt.t->new_child:bNode.t->child_index:int->unit=fun~parent~old_child~new_child~child_index->ifUopt.is_noneold_childthenadd_parent~child:new_child~parent~child_indexelse(letold_child=Uopt.unsafe_valueold_childinifnot(phys_equalold_childnew_child)then((* We remove [old_child] before adding [new_child], because they share the same
child index. *)Node.remove_parent~child:old_child~parent~child_index;(* We force [old_child] to temporarily be necessary so that [add_parent] can't
mistakenly think it is unnecessary and transition it to necessary (which would
add duplicate edges and break things horribly). *)old_child.force_necessary<-true;add_parent~child:new_child~parent~child_index;old_child.force_necessary<-false;(* We [check_if_unnecessary] after [add_parent], so that we don't unnecessarily
transition nodes from necessary to unnecessary and then back again. *)check_if_unnecessaryold_child));;letadd_alarmclock~atalarm_value=ifdebugthenassert(Time_ns.(>)at(nowclock));Timing_wheel.addclock.timing_wheel~atalarm_value;;letrecrecompute:typea.aNode.t->unit=funnode->lett=node.stateinifdebugthen(t.only_in_debug.currently_running_node<-Some(Tnode);t.only_in_debug.expert_nodes_created_by_current_node<-[]);t.num_nodes_recomputed<-t.num_nodes_recomputed+1;node.recomputed_at<-t.stabilization_num;matchnode.kindwith|Array_foldarray_fold->maybe_change_valuenode(Array_fold.computearray_fold)|At{at;clock;_}->(* It is a bug if we try to compute an [At] node after [at]. [advance_clock] was
supposed to convert it to a [Const] at the appropriate time. *)ifdebugthenassert(Time_ns.(>)at(nowclock));maybe_change_valuenodeBefore|At_intervals_->maybe_change_valuenode()|Bind_lhs_change({main;f;lhs;rhs_scope;rhs=old_rhs;all_nodes_created_on_rhs=old_all_nodes_created_on_rhs;_}asbind)->(* We clear [all_nodes_created_on_rhs] so it will hold just the nodes created by
this call to [f]. *)bind.all_nodes_created_on_rhs<-Uopt.none;letrhs=run_with_scopetrhs_scope~f:(fun()->f(Node.value_exnlhs))inbind.rhs<-Uopt.somerhs;(* Anticipate what [maybe_change_value] will do, to make sure Bind_main is stale
right away. This way, if the new child is invalid, we'll satisfy the invariant
saying that [needs_to_be_computed bind_main] in [propagate_invalidity] *)node.changed_at<-t.stabilization_num;change_child~parent:main~old_child:old_rhs~new_child:rhs~child_index:Kind.bind_rhs_child_index;ifUopt.is_someold_rhsthen((* We invalidate after [change_child], because invalidation changes the [kind] of
nodes to [Invalid], which means that we can no longer visit their children.
Also, the [old_rhs] nodes are typically made unnecessary by [change_child], and
so by invalidating afterwards, we will not waste time adding them to the
recompute heap and then removing them. *)ift.bind_lhs_change_should_invalidate_rhstheninvalidate_nodes_created_on_rhsold_all_nodes_created_on_rhselserescope_nodes_created_on_rhstold_all_nodes_created_on_rhs~new_scope:main.created_in;propagate_invalidityt);(* [node] was valid at the start of the [Bind_lhs_change] branch, and invalidation
only visits higher nodes, so [node] is still valid. *)ifdebugthenassert(Node.is_validnode);maybe_change_valuenode()|Bind_main{rhs;_}->copy_child~parent:node~child:(Uopt.value_exnrhs)|Consta->maybe_change_valuenodea|Freeze{child;only_freeze_when;_}->letvalue=Node.value_exnchildinifonly_freeze_whenvaluethen(remove_childrennode;Node.set_kindnode(Constvalue);ifNode.is_necessarynodethenset_heightnode0elsebecame_unnecessarynode);maybe_change_valuenodevalue|If_test_change({main;current_branch;test;then_;else_;_}asif_then_else)->letdesired_branch=ifNode.value_exntestthenthen_elseelse_inif_then_else.current_branch<-Uopt.somedesired_branch;(* see the comment in Bind_lhs_change *)node.changed_at<-t.stabilization_num;change_child~parent:main~old_child:current_branch~new_child:desired_branch~child_index:Kind.if_branch_child_index;maybe_change_valuenode()|If_then_else{current_branch;_}->copy_child~parent:node~child:(Uopt.value_exncurrent_branch)|Invalid->(* We never have invalid nodes in the recompute heap; they are never stale. *)assertfalse|Join_lhs_change({lhs;main;rhs=old_rhs;_}asjoin)->letrhs=Node.value_exnlhsinjoin.rhs<-Uopt.somerhs;(* see the comment in Bind_lhs_change *)node.changed_at<-t.stabilization_num;change_child~parent:main~old_child:old_rhs~new_child:rhs~child_index:Kind.join_rhs_child_index;maybe_change_valuenode()|Join_main{rhs;_}->copy_child~parent:node~child:(Uopt.value_exnrhs)|Map(f,n1)->maybe_change_valuenode(f(Node.value_exnn1))|Snapshot{at;before;clock;_}->(* It is a bug if we try to compute a [Snapshot] and the alarm should have fired.
[advance_clock] was supposed to convert it to a [Freeze] at the appropriate
time. *)ifdebugthenassert(Time_ns.(>)at(nowclock));maybe_change_valuenodebefore|Step_function({child;clock;_}asstep_function_node)->ifUopt.is_somechildthen(letchild=Uopt.value_exnchildinifStabilization_num.comparechild.changed_atstep_function_node.extracted_step_function_from_child_at>0then(step_function_node.extracted_step_function_from_child_at<-child.changed_at;remove_alarmclockstep_function_node.alarm;letstep_function=Node.value_exnchildinstep_function_node.value<-Uopt.some(Step_function.initstep_function);step_function_node.upcoming_steps<-Step_function.stepsstep_function;(* If the child is a constant, we drop our reference to it, to avoid holding on to
the entire step function. *)ifNode.is_constchildthen(remove_childrennode;step_function_node.child<-Uopt.none;set_heightnode(Scope.heightnode.created_in+1))));Step_function_node.advancestep_function_node~to_:(nowclock);letstep_function_value=Uopt.value_exnstep_function_node.valuein(matchSequence.hdstep_function_node.upcoming_stepswith|None->ifUopt.is_nonechildthenNode.set_kindnode(Conststep_function_value)|Some(at,_)->step_function_node.alarm<-add_alarmclock~atstep_function_node.alarm_value);maybe_change_valuenodestep_function_value|Unordered_array_foldu->maybe_change_valuenode(Unordered_array_fold.computeu)|Uninitialized->assertfalse|Varvar->maybe_change_valuenodevar.value|Map2(f,n1,n2)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2))|Map3(f,n1,n2,n3)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3))|Map4(f,n1,n2,n3,n4)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4))|Map5(f,n1,n2,n3,n4,n5)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5))|Map6(f,n1,n2,n3,n4,n5,n6)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6))|Map7(f,n1,n2,n3,n4,n5,n6,n7)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6)(Node.value_exnn7))|Map8(f,n1,n2,n3,n4,n5,n6,n7,n8)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6)(Node.value_exnn7)(Node.value_exnn8))|Map9(f,n1,n2,n3,n4,n5,n6,n7,n8,n9)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6)(Node.value_exnn7)(Node.value_exnn8)(Node.value_exnn9))|Map10(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6)(Node.value_exnn7)(Node.value_exnn8)(Node.value_exnn9)(Node.value_exnn10))|Map11(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6)(Node.value_exnn7)(Node.value_exnn8)(Node.value_exnn9)(Node.value_exnn10)(Node.value_exnn11))|Map12(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11,n12)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6)(Node.value_exnn7)(Node.value_exnn8)(Node.value_exnn9)(Node.value_exnn10)(Node.value_exnn11)(Node.value_exnn12))|Map13(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11,n12,n13)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6)(Node.value_exnn7)(Node.value_exnn8)(Node.value_exnn9)(Node.value_exnn10)(Node.value_exnn11)(Node.value_exnn12)(Node.value_exnn13))|Map14(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11,n12,n13,n14)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6)(Node.value_exnn7)(Node.value_exnn8)(Node.value_exnn9)(Node.value_exnn10)(Node.value_exnn11)(Node.value_exnn12)(Node.value_exnn13)(Node.value_exnn14))|Map15(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11,n12,n13,n14,n15)->maybe_change_valuenode(f(Node.value_exnn1)(Node.value_exnn2)(Node.value_exnn3)(Node.value_exnn4)(Node.value_exnn5)(Node.value_exnn6)(Node.value_exnn7)(Node.value_exnn8)(Node.value_exnn9)(Node.value_exnn10)(Node.value_exnn11)(Node.value_exnn12)(Node.value_exnn13)(Node.value_exnn14)(Node.value_exnn15))|Expertexpert->(matchExpert.before_main_computationexpertwith|`Invalid->invalidate_nodenode;propagate_invalidityt|`Ok->maybe_change_valuenode(expert.f()))andcopy_child:typea.parent:aNode.t->child:aNode.t->unit=fun~parent~child->ifNode.is_validchildthenmaybe_change_valueparent(Node.value_exnchild)else(invalidate_nodeparent;propagate_invalidityparent.state)andmaybe_change_value:typea.aNode.t->a->unit=funnodenew_value->lett=node.stateinletold_value_opt=node.value_optinifUopt.is_noneold_value_opt||not(Cutoff.should_cutoffnode.cutoff~old_value:(Uopt.unsafe_valueold_value_opt)~new_value)then(node.value_opt<-Uopt.somenew_value;node.changed_at<-t.stabilization_num;t.num_nodes_changed<-t.num_nodes_changed+1;ifnode.num_on_update_handlers>0then(node.old_value_opt<-old_value_opt;handle_after_stabilizationnode);ifnode.num_parents>=1then(forparent_index=1tonode.num_parents-1dolet(Tparent)=Uopt.value_exnnode.parent1_and_beyond.(parent_index-1)in(matchparent.kindwith|Expertexpert->letchild_index=node.my_child_index_in_parent_at_index.(parent_index)inExpert.run_edge_callback~child_indexexpert|Unordered_array_foldu->Unordered_array_fold.child_changedu~child:node~child_index:node.my_child_index_in_parent_at_index.(parent_index)~old_value_opt~new_value|_->());ifdebugthenassert(Node.needs_to_be_computedparent);(* We don't do the [can_recompute_now] optimization. Since most nodes only have
one parent, it is not probably not a big loss. If we did it anyway, we'd
have to be careful, because while we iterate over the list of parents, we
would execute them, and in particular we can execute lhs-change nodes who can
change the structure of the list of parents we iterate on. Think about:
{[
lhs >>= fun b -> if b then lhs >>| Fn.id else const b
]}
If the optimization kicks in when we propagate change to the parents of [lhs]
(which changes from [true] to [false]), we could execute the [lhs-change]
first, which would make disconnect the [map] node from [lhs]. And then we
would execute the second child of the [lhs], which doesn't exist anymore and
incremental would segfault (there may be a less naive way of making this work
though). *)ifnot(Node.is_in_recompute_heapparent)thenRecompute_heap.addt.recompute_heapparentdone;let(Tparent)=Uopt.value_exnnode.parent0in(matchparent.kindwith|Expertp->letchild_index=node.my_child_index_in_parent_at_index.(0)inExpert.run_edge_callback~child_indexp|Unordered_array_foldu->Unordered_array_fold.child_changedu~child:node~child_index:node.my_child_index_in_parent_at_index.(0)~old_value_opt~new_value|_->());ifdebugthenassert(Node.needs_to_be_computedparent);ifnot(Node.is_in_recompute_heapparent)then(letcan_recompute_now=matchparent.kindwith|Uninitialized->assertfalse(* These nodes aren't parents. *)|At_->assertfalse|At_intervals_->assertfalse|Const_|Invalid|Snapshot_|Var_->assertfalse(* These nodes have more than one child. *)|Array_fold_|Map2_|Map3_|Map4_|Map5_|Map6_|Map7_|Map8_|Map9_|Map10_|Map11_|Map12_|Map13_|Map14_|Map15_|Unordered_array_fold_|Expert_->false(* We can immediately recompute [parent] if no other node needs to be stable
before computing it. If [parent] has a single child (i.e. [node]), then
this amounts to checking that [parent] won't be invalidated, i.e. that
[parent]'s scope has already stabilized. *)|Bind_lhs_change_->node.height>Scope.heightparent.created_in|Freeze_->node.height>Scope.heightparent.created_in|If_test_change_->node.height>Scope.heightparent.created_in|Join_lhs_change_->node.height>Scope.heightparent.created_in|Map_->node.height>Scope.heightparent.created_in|Step_function_->node.height>Scope.heightparent.created_in(* For these, we need to check that the "_change" child has already been
evaluated (if needed). If so, this also implies:
{[
node.height > Scope.height parent.created_in
]} *)|Bind_mainb->node.height>b.lhs_change.height|If_then_elsei->node.height>i.test_change.height|Join_mainj->node.height>j.lhs_change.heightinifcan_recompute_nowthen(t.num_nodes_recomputed_directly_because_one_child<-t.num_nodes_recomputed_directly_because_one_child+1;recomputeparent)elseifparent.height<=Recompute_heap.min_heightt.recompute_heapthen((* If [parent.height] is [<=] the height of all nodes in the recompute heap
(possibly because the recompute heap is empty), then we can recompute
[parent] immediately and save adding it to and then removing it from the
recompute heap. *)t.num_nodes_recomputed_directly_because_min_height<-t.num_nodes_recomputed_directly_because_min_height+1;recomputeparent)else(ifdebugthenassert(Node.needs_to_be_computedparent);ifdebugthenassert(not(Node.is_in_recompute_heapparent));Recompute_heap.addt.recompute_heapparent))));ifdebugtheninvariantt;;letrecompute_everything_that_is_necessaryt=letmoduleR=Recompute_heapinletr=t.recompute_heapinwhileR.lengthr>0dolet(Tnode)=R.remove_minrinifdebug&¬(Node.needs_to_be_computednode)thenfailwiths~here:[%here]"node unexpectedly does not need to be computed"node[%sexp_of:_Node.t];recomputenodedone;ifdebugthen(t.only_in_debug.currently_running_node<-None;t.only_in_debug.expert_nodes_created_by_current_node<-[]);;letunlink_disallowed_observerst=whileStack.lengtht.disallowed_observers>0doletpacked=Stack.pop_exnt.disallowed_observersinletmodulePacked=Internal_observer.Packedinlet(Tinternal_observer)=packedinifdebugthenassert(matchinternal_observer.statewith|Disallowed->true|_->false);internal_observer.state<-Unlinked;let(Tall_observers)=Uopt.value_exnt.all_observersinifInternal_observer.sameinternal_observerall_observersthent.all_observers<-internal_observer.next_in_all;Internal_observer.unlinkinternal_observer;check_if_unnecessaryinternal_observer.observingdone;;letdisallow_future_useinternal_observer=lett=Internal_observer.incr_stateinternal_observerinmatchinternal_observer.statewith|Disallowed|Unlinked->()|Created->t.num_active_observers<-t.num_active_observers-1;internal_observer.state<-Unlinked;internal_observer.on_update_handlers<-[]|In_use->t.num_active_observers<-t.num_active_observers-1;internal_observer.state<-Disallowed;Stack.pusht.disallowed_observers(Tinternal_observer);;letdisallow_finalized_observerst=whileThread_safe_queue.lengtht.finalized_observers>0dolet(Tinternal_observer)=Thread_safe_queue.dequeue_exnt.finalized_observersinifList.is_emptyinternal_observer.on_update_handlersthendisallow_future_useinternal_observerdone;;letobserver_finalizert=stage(funobserver->letinternal_observer=!observerinThread_safe_queue.enqueuet.finalized_observers(Tinternal_observer));;letcreate_observer?(should_finalize=true)(observing:_Node.t)=lett=observing.stateinletinternal_observer:_Internal_observer.t={state=Created;observing;on_update_handlers=[];prev_in_all=Uopt.none;next_in_all=Uopt.none;prev_in_observing=Uopt.none;next_in_observing=Uopt.none}inStack.pusht.new_observers(Tinternal_observer);letobserver=refinternal_observerinifshould_finalizethenGc.Expert.add_finalizer_exnobserver(unstage(observer_finalizert));t.num_active_observers<-t.num_active_observers+1;observer;;letadd_new_observerst=whileStack.lengtht.new_observers>0doletpacked=Stack.pop_exnt.new_observersinletmodulePacked=Internal_observer.Packedinlet(Tinternal_observer)=packedinmatchinternal_observer.statewith|In_use|Disallowed->assertfalse|Unlinked->()|Created->internal_observer.state<-In_use;letold_all_observers=t.all_observersinifUopt.is_someold_all_observersthen(internal_observer.next_in_all<-old_all_observers;Packed.set_prev_in_all(Uopt.unsafe_valueold_all_observers)(Uopt.somepacked));t.all_observers<-Uopt.somepacked;letobserving=internal_observer.observinginletwas_necessary=Node.is_necessaryobservinginobserving.num_on_update_handlers<-observing.num_on_update_handlers+List.lengthinternal_observer.on_update_handlers;letold_observers=observing.observersinifUopt.is_someold_observersthen(internal_observer.next_in_observing<-old_observers;(Uopt.unsafe_valueold_observers).prev_in_observing<-Uopt.someinternal_observer);observing.observers<-Uopt.someinternal_observer;(* By adding [internal_observer] to [observing.observers], we may have added
on-update handlers to [observing]. We need to handle [observing] after this
stabilization to give those handlers a chance to run. *)handle_after_stabilizationobserving;ifdebugthenassert(Node.is_necessaryobserving);ifnotwas_necessarythenbecame_necessaryobservingdone;;letobserver_value_exnobserver=lett=Observer.incr_stateobserverinmatcht.statuswith|Not_stabilizing|Running_on_update_handlers->Observer.value_exnobserver|Stabilize_previously_raisedraised_exn->failwiths~here:[%here]"Observer.value_exn called after stabilize previously raised"raised_exn[%sexp_of:Raised_exn.t]|Stabilizing->failwiths~here:[%here]"Observer.value_exn called during stabilization"observer[%sexp_of:_Observer.t];;letobserver_valueobserver=tryOk(observer_value_exnobserver)with|exn->Error(Error.of_exnexn);;letnode_on_update(typea)(node:aNode.t)~f=lett=node.stateinNode.on_updatenode(On_update_handler.createf~at:t.stabilization_num);handle_after_stabilizationnode;;letobserver_on_update_exnobserver~f=lett=Observer.incr_stateobserverinObserver.on_update_exnobserver(On_update_handler.createf~at:t.stabilization_num);handle_after_stabilization(Observer.observingobserver);;letset_var_while_not_stabilizingvarvalue=lett=Var.incr_statevarint.num_var_sets<-t.num_var_sets+1;var.value<-value;ifStabilization_num.comparevar.set_att.stabilization_num<0then(var.set_at<-t.stabilization_num;letwatch=var.watchinifdebugthenassert(Node.is_stalewatch);ifNode.is_necessarywatch&¬(Node.is_in_recompute_heapwatch)thenRecompute_heap.addt.recompute_heapwatch);;letset_varvarvalue=lett=Var.incr_statevarinmatcht.statuswith|Running_on_update_handlers|Not_stabilizing->set_var_while_not_stabilizingvarvalue|Stabilize_previously_raisedraised_exn->failwiths~here:[%here]"cannot set var -- stabilization previously raised"raised_exn[%sexp_of:Raised_exn.t]|Stabilizing->ifUopt.is_nonevar.value_set_during_stabilizationthenStack.pusht.set_during_stabilization(Tvar);var.value_set_during_stabilization<-Uopt.somevalue;;letreclaim_space_in_weak_hashtblst=whileThread_safe_queue.lengtht.weak_hashtbls>0dolet(Tweak_hashtbl)=Thread_safe_queue.dequeue_exnt.weak_hashtblsinWeak_hashtbl.reclaim_space_for_keys_with_unused_dataweak_hashtbldone;;letstabilizet=ensure_not_stabilizingt~name:"stabilize"~allow_in_update_handler:false;tryt.status<-Stabilizing;disallow_finalized_observerst;(* Just like for binds, we add new observers before removing disallowed observers to
potentially avoid switching the observability of some nodes back and forth. *)add_new_observerst;unlink_disallowed_observerst;ifdebugtheninvariantt;recompute_everything_that_is_necessaryt;(* We increment [t.stabilization_num] before handling variables set during
stabilization, so that they are treated as set during the new stabilization cycle.
Also, we increment before running on-update handlers, to avoid running on update
handlers created during on update handlers. *)t.stabilization_num<-Stabilization_num.add1t.stabilization_num;whilenot(Stack.is_emptyt.set_during_stabilization)dolet(Tvar)=Stack.pop_exnt.set_during_stabilizationinletvalue=Uopt.value_exnvar.value_set_during_stabilizationinvar.value_set_during_stabilization<-Uopt.none;set_var_while_not_stabilizingvarvaluedone;whilenot(Stack.is_emptyt.handle_after_stabilization)dolet(Tnode)=Stack.pop_exnt.handle_after_stabilizationinnode.is_in_handle_after_stabilization<-false;letold_value=node.old_value_optinnode.old_value_opt<-Uopt.none;letnode_update:_Node_update.t=ifnot(Node.is_validnode)thenInvalidatedelseifnot(Node.is_necessarynode)thenUnnecessaryelse(letnew_value=Uopt.value_exnnode.value_optinifUopt.is_noneold_valuethenNecessarynew_valueelseChanged(Uopt.unsafe_valueold_value,new_value))inStack.pusht.run_on_update_handlers(T(node,node_update))done;t.status<-Running_on_update_handlers;letnow=t.stabilization_numinwhilenot(Stack.is_emptyt.run_on_update_handlers)dolet(T(node,node_update))=Stack.pop_exnt.run_on_update_handlersinNode.run_on_update_handlersnodenode_update~nowdone;t.status<-Not_stabilizing;reclaim_space_in_weak_hashtblstwith|exn->t.status<-Stabilize_previously_raised(Raised_exn.createexn);raiseexn;;letcreate_node_intcreated_inkind=t.num_nodes_created<-t.num_nodes_created+1;Node.createtcreated_inkind;;letcreate_nodetkind=create_node_intt.current_scopekindletcreate_node_toptkind=create_node_intScope.topkindletcreate_vart?(use_current_scope=false)value=letscope=ifuse_current_scopethent.current_scopeelseScope.topinletwatch=create_node_intscopeUninitializedinletvar={Var.value;value_set_during_stabilization=Uopt.none;set_at=t.stabilization_num;watch}inNode.set_kindwatch(Varvar);var;;(* A [const] value could come from the right-hand side of an outer bind. So, we create a
[const] node in the current scope, not in [Scope.top]. *)letconstta=create_nodet(Consta)letmap(n:_Node.t)~f=create_noden.state(Map(f,n))letmap2(n1:_Node.t)n2~f=create_noden1.state(Map2(f,n1,n2))letmap3(n1:_Node.t)n2n3~f=create_noden1.state(Map3(f,n1,n2,n3))letmap4(n1:_Node.t)n2n3n4~f=create_noden1.state(Map4(f,n1,n2,n3,n4))letmap5(n1:_Node.t)n2n3n4n5~f=create_noden1.state(Map5(f,n1,n2,n3,n4,n5));;letmap6(n1:_Node.t)n2n3n4n5n6~f=create_noden1.state(Map6(f,n1,n2,n3,n4,n5,n6));;letmap7(n1:_Node.t)n2n3n4n5n6n7~f=create_noden1.state(Map7(f,n1,n2,n3,n4,n5,n6,n7));;letmap8(n1:_Node.t)n2n3n4n5n6n7n8~f=create_noden1.state(Map8(f,n1,n2,n3,n4,n5,n6,n7,n8));;letmap9(n1:_Node.t)n2n3n4n5n6n7n8n9~f=create_noden1.state(Map9(f,n1,n2,n3,n4,n5,n6,n7,n8,n9));;letmap10(n1:_Node.t)n2n3n4n5n6n7n8n9n10~f=create_noden1.state(Map10(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10));;letmap11(n1:_Node.t)n2n3n4n5n6n7n8n9n10n11~f=create_noden1.state(Map11(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11));;letmap12(n1:_Node.t)n2n3n4n5n6n7n8n9n10n11n12~f=create_noden1.state(Map12(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11,n12));;letmap13(n1:_Node.t)n2n3n4n5n6n7n8n9n10n11n12n13~f=create_noden1.state(Map13(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11,n12,n13));;letmap14(n1:_Node.t)n2n3n4n5n6n7n8n9n10n11n12n13n14~f=create_noden1.state(Map14(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11,n12,n13,n14));;letmap15(n1:_Node.t)n2n3n4n5n6n7n8n9n10n11n12n13n14n15~f=create_noden1.state(Map15(f,n1,n2,n3,n4,n5,n6,n7,n8,n9,n10,n11,n12,n13,n14,n15));;letpreserve_cutoff~(input:_Node.t)~output=Node.set_cutoffoutput(Cutoff.create(fun~old_value:_~new_value:_->Stabilization_num.equalinput.changed_atoutput.changed_at));;letdepend_oninput~depend_on=letoutput=map2inputdepend_on~f:(funa_->a)inpreserve_cutoff~input~output;output;;letnecessary_if_aliveinput=(* If [output] is alive, then [observer] is alive, then [input] is necessary. If
[output] is unnecessary, then [output] is not a parent of [input], and thus
[output]'s liveness is dependent solely on user code. And in particular, if [output]
dies, then [observer] will be finalized, and then upon the next stabilization,
[input] will become unnecessary (at least with respect to [output]). *)letobserver=create_observerinputinletoutput=mapinput~f:(funa->Gc.keep_aliveobserver;a)inpreserve_cutoff~input~output;output;;letbind(lhs:_Node.t)~f=lett=lhs.stateinletlhs_change=create_nodetUninitializedinletmain=create_nodetUninitializedinletbind={Bind.main;f;lhs;lhs_change;rhs=Uopt.none;rhs_scope=Scope.top;all_nodes_created_on_rhs=Uopt.none}in(* We set [lhs_change] to never cutoff so that whenever [lhs] changes, [main] is
recomputed. This is necessary to handle cases where [f] returns an existing stable
node, in which case the [lhs_change] would be the only thing causing [main] to be
stale. *)Node.set_cutofflhs_changeCutoff.never;bind.rhs_scope<-Bindbind;Node.set_kindlhs_change(Bind_lhs_changebind);Node.set_kindmain(Bind_mainbind);main;;letbind2n1n2~f=bind(map2n1n2~f:(funv1v2->v1,v2))~f:(fun(v1,v2)->fv1v2);;letbind3n1n2n3~f=bind(map3n1n2n3~f:(funv1v2v3->v1,v2,v3))~f:(fun(v1,v2,v3)->fv1v2v3);;letbind4n1n2n3n4~f=bind(map4n1n2n3n4~f:(funv1v2v3v4->v1,v2,v3,v4))~f:(fun(v1,v2,v3,v4)->fv1v2v3v4);;letjoin(lhs:_Node.t)=lett=lhs.stateinletlhs_change=create_nodetUninitializedinletmain=create_nodetUninitializedinletjoin={Join.lhs;lhs_change;rhs=Uopt.none;main}inNode.set_cutofflhs_changeCutoff.never;Node.set_kindlhs_change(Join_lhs_changejoin);Node.set_kindmain(Join_mainjoin);main;;letif_(test:_Node.t)~then_~else_=lett=test.stateinlettest_change=create_nodetUninitializedinletmain=create_nodetUninitializedinletif_then_else={If_then_else.test;then_;else_;test_change;main;current_branch=Uopt.none}inNode.set_cutofftest_changeCutoff.never;Node.set_kindtest_change(If_test_changeif_then_else);Node.set_kindmain(If_then_elseif_then_else);main;;letlazy_from_funt~f=letscope=t.current_scopeinLazy.from_fun(fun()->within_scopetscope~f);;letdefault_hash_table_initial_size=4letmemoize_fun_by_key?(initial_size=default_hash_table_initial_size)thashableproject_keyf=(* Here's an explanation of why we get [t.current_scope] here, and then call
[within_scope] below. Consider this (impossible) alternate implementation of
[memoize_fun_by_key]:
{[
let table =
Hashtbl.of_alist_exn hashable
(List.map all_possible_a_values ~f:(fun a -> (project_key a, f a))
in
stage (fun key -> Hashtbl.find_exn table (project_key a))
]}
This implementation doesn't use [current_scope] or [within_scope]. All calls to [f]
naturally occur in [t.current_scope].
Such an implementation is impossible because we do not have [all_possible_a_values].
The implementation below uses [within_scope] to call [f a] in the scope that was
current at the point of the call to [memoize_fun_by_key] so that we can think of the
[table] as having been created then, when it in reality is created on-demand. *)letscope=t.current_scopeinlettable=Hashtbl.createhashable~size:initial_sizeinstage(funa->letkey=project_keyainmatchHashtbl.findtablekeywith|Someb->b|None->letb=within_scopetscope~f:(fun()->fa)inHashtbl.add_exntable~key~data:b;b);;letarray_foldtchildren~init~f=ifArray.lengthchildren=0thenconsttinitelsecreate_nodet(Array_fold{init;f;children});;letalltts=array_foldt(Array.of_list_revts)~init:[]~f:(funaca->a::ac)moduleUnordered_array_fold_update=Unordered_array_fold.Updateletunordered_array_foldt?(full_compute_every_n_changes=Int.max_value)children~init~f~update=ifArray.lengthchildren=0thenconsttinitelseiffull_compute_every_n_changes<=0thenfailwiths~here:[%here]"unordered_array_fold got non-positive full_compute_every_n_changes"full_compute_every_n_changes[%sexp_of:int]else(letmain=create_nodetUninitializedinNode.set_kindmain(Unordered_array_fold(Unordered_array_fold.create~init~f~update~full_compute_every_n_changes~children~main));main);;letopt_unordered_array_foldt?full_compute_every_n_changests~init~f~f_inverse=letf(accum,num_invalid)x=matchxwith|None->accum,num_invalid+1|Somex->faccumx,num_invalidinletf_inverse(accum,num_invalid)x=matchxwith|None->accum,num_invalid-1|Somex->f_inverseaccumx,num_invalidinmap(unordered_array_foldtts~init:(init,0)~f~update:(F_inversef_inverse)?full_compute_every_n_changes)~f:(fun(accum,num_invalid)->ifnum_invalid=0thenSomeaccumelseNone);;letat_least_k_oftnodes~k=letbool_to_intb=ifbthen1else0inmap~f:(funi->i>=k)(unordered_array_foldtnodes~init:0~f:(funnum_trueb->num_true+bool_to_intb)~update:(F_inverse(funnum_trueb->num_true-bool_to_intb)));;letexiststnodes=at_least_k_oftnodes~k:1letfor_alltnodes=at_least_k_oftnodes~k:(Array.lengthnodes)letsumt?full_compute_every_n_changesnodes~zero~add~sub=unordered_array_foldtnodes~init:zero~f:add~update:(F_inversesub)?full_compute_every_n_changes;;letopt_sumt?full_compute_every_n_changesnodes~zero~add~sub=opt_unordered_array_foldtnodes~init:zero~f:add~f_inverse:sub?full_compute_every_n_changes;;letsum_inttnodes=sumtnodes~zero:0~add:(+)~sub:(-)letsum_floattnodes=sumtnodes~zero:0.~add:(+.)~sub:(-.)~full_compute_every_n_changes:(Array.lengthnodes);;letset_freeze(node:_Node.t)~child~only_freeze_when=ifdebugthenassert(Scope.is_topnode.created_in);(* By making [node.kind] be [Freeze], we are making [Node.is_necessary node]. *)letwas_necessary=Node.is_necessarynodeinNode.set_kindnode(Freeze{main=node;child;only_freeze_when});ifwas_necessarythenadd_parent~child~parent:node~child_index:Kind.freeze_child_indexelsebecame_necessarynode;;letfreeze(child:_Node.t)~only_freeze_when=lett=child.stateinletnode=create_node_toptUninitializedinset_freezenode~child~only_freeze_when;node;;letatclocktime=lett=Clock.incr_stateclockinifTime_ns.(<=)time(nowclock)thenconsttBefore_or_after.Afterelse(letmain=create_nodetUninitializedinletat={At.at=time;main;alarm=Alarm.null;clock}inNode.set_kindmain(Atat);at.alarm<-add_alarmclock~at:time(Alarm_value.create(Atat));main);;letafterclockspan=atclock(Time_ns.add(nowclock)span)letnext_interval_alarm_strict(clock:Clock.t)~base~interval=letafter=nowclockinletat=Time_ns.next_multiple~base~after~interval~can_equal_after:false()inifdebugthenassert(Time_ns.(>)atafter);at;;letat_intervals(clock:Clock.t)interval=lett=Clock.incr_stateclockinifTime_ns.Span.(<)interval(Timing_wheel.alarm_precisionclock.timing_wheel)thenfailwiths~here:[%here]"at_intervals got too small interval"interval[%sexp_of:Time_ns.Span.t];letmain=create_nodetUninitializedinletbase=nowclockinletat_intervals={At_intervals.main;base;interval;alarm=Alarm.null;clock}inNode.set_kindmain(At_intervalsat_intervals);(* [main : unit Node.t], so we make it never cutoff so it changes each time it is
recomputed. *)Node.set_cutoffmainCutoff.never;at_intervals.alarm<-add_alarmclock~at:(next_interval_alarm_strictclock~base~interval)(Alarm_value.create(At_intervalsat_intervals));main;;letsnapshotclockvalue_at~at~before=lett=Clock.incr_stateclockinifTime_ns.(<=)at(nowclock)thenifTime_ns.(<)at(nowclock)thenOr_error.error"cannot take snapshot in the past"at[%sexp_of:Time_ns.t]elseOk(freezevalue_at~only_freeze_when:(Fn.consttrue))else(letmain=create_node_toptUninitializedinletsnapshot={Snapshot.main;at;before;value_at;clock}inNode.set_kindmain(Snapshotsnapshot);(* Unlike other time-based incrementals, a snapshot is created in [Scope.top] and
cannot be invalidated by its scope. Thus, there is no need to keep track of the
alarm that is added, because it will never need to be removed early. *)ignore(add_alarmclock~at(Alarm_value.create(Snapshotsnapshot)):Alarm.t);Okmain);;letincremental_step_functionclockchild=lett=Clock.incr_stateclockinletmain=create_nodetUninitializedinletstep_function_node={Step_function_node.main;value=Uopt.none;child=Uopt.somechild;extracted_step_function_from_child_at=Stabilization_num.none;upcoming_steps=Sequence.empty;alarm=Alarm.null;alarm_value=Obj.magicNone(* set below *);clock}instep_function_node.alarm_value<-Alarm_value.create(Step_functionstep_function_node);Node.set_kindmain(Step_functionstep_function_node);main;;letmake_stale(node:_Node.t)=lett=node.stateinnode.recomputed_at<-Stabilization_num.none;(* force the node to be stale *)ifNode.needs_to_be_computednode&¬(Node.is_in_recompute_heapnode)thenRecompute_heap.addt.recompute_heapnode;;letadvance_clock(clock:Clock.t)~to_=lett=Clock.incr_stateclockinensure_not_stabilizingt~name:"advance_clock"~allow_in_update_handler:true;ifdebugtheninvariantt;ifTime_ns.(>)to_(nowclock)then(set_var_while_not_stabilizingclock.nowto_;Timing_wheel.advance_clockclock.timing_wheel~to_~handle_fired:clock.handle_fired;Timing_wheel.fire_past_alarmsclock.timing_wheel~handle_fired:clock.handle_fired;whileUopt.is_someclock.fired_alarm_valuesdoletalarm_value=Uopt.unsafe_valueclock.fired_alarm_valuesinclock.fired_alarm_values<-alarm_value.next_fired;alarm_value.next_fired<-Uopt.none;matchalarm_value.actionwith|At{main;_}->ifNode.is_validmainthen(Node.set_kindmain(ConstAfter);make_stalemain)|At_intervals({main;base;interval;_}asat_intervals)->ifNode.is_validmainthen(at_intervals.alarm<-add_alarmclock~at:(next_interval_alarm_strictclock~base~interval)alarm_value;make_stalemain)|Snapshot{main;value_at;_}->ifdebugthenassert(Node.is_validmain);set_freezemain~child:value_at~only_freeze_when:(fun_->true);make_stalemain|Step_function{main;_}->ifNode.is_validmainthenmake_stalemaindone;ifdebugtheninvariantt);;letcreate_clockt~timing_wheel_config~start=lettiming_wheel=Timing_wheel.create~config:timing_wheel_config~startinletrecclock:Clock.t={now=create_vartstart;handle_fired;fired_alarm_values=Uopt.none;timing_wheel}andhandle_firedalarm=letalarm_value=Timing_wheel.Alarm.valueclock.timing_wheelalarminalarm_value.next_fired<-clock.fired_alarm_values;clock.fired_alarm_values<-Uopt.somealarm_valueinclock;;letcreate(moduleConfig:Config.Incremental_config)~max_height_allowed=letadjust_heights_heap=Adjust_heights_heap.create~max_height_allowedinletrecompute_heap=Recompute_heap.create~max_height_allowedinlett={status=Not_stabilizing;bind_lhs_change_should_invalidate_rhs=Config.bind_lhs_change_should_invalidate_rhs;stabilization_num=Stabilization_num.zero;current_scope=Scope.top;adjust_heights_heap;recompute_heap;propagate_invalidity=Stack.create();num_active_observers=0;all_observers=Uopt.none;finalized_observers=Thread_safe_queue.create();disallowed_observers=Stack.create();new_observers=Stack.create();set_during_stabilization=Stack.create();handle_after_stabilization=Stack.create();run_on_update_handlers=Stack.create();only_in_debug=Only_in_debug.create();weak_hashtbls=Thread_safe_queue.create();keep_node_creation_backtrace=false;num_nodes_became_necessary=0;num_nodes_became_unnecessary=0;num_nodes_changed=0;num_nodes_invalidated=0;num_nodes_created=0;num_nodes_recomputed=0;num_nodes_recomputed_directly_because_one_child=0;num_nodes_recomputed_directly_because_min_height=0;num_var_sets=0}int;;letweak_memoize_fun_by_key?(initial_size=default_hash_table_initial_size)thashableproject_keyf=letscope=t.current_scopeinlettable=Weak_hashtbl.create~size:initial_sizehashableinletpacked=Packed_weak_hashtbl.TtableinWeak_hashtbl.set_run_when_unused_datatable~thread_safe_f:(fun()->Thread_safe_queue.enqueuet.weak_hashtblspacked);stage(funa->letkey=project_keyainmatchWeak_hashtbl.findtablekeywith|Someb->b|None->letb=within_scopetscope~f:(fun()->fa)inWeak_hashtbl.add_exntable~key~data:b;b);;moduleExpert=struct(* Given that invalid node are at attempt at avoiding breaking the entire incremental
computation on problems, let's just ignore any operation on an invalid incremental
rather than raising. *)letexpert_kind_of_node(node:_Node.t)=matchnode.kindwith|Experte->Uopt.somee|Invalid->Uopt.none|kind->raise_s[%sexp"unexpected kind for expert node",(kind:_Kind.t)];;letcreatestate~on_observability_changef=lete=Expert.create~f~on_observability_changeinletnode=create_nodestate(Experte)inifdebugthenifOption.is_somestate.only_in_debug.currently_running_nodethenstate.only_in_debug.expert_nodes_created_by_current_node<-Tnode::state.only_in_debug.expert_nodes_created_by_current_node;node;;letcurrently_running_node_exnstatename=matchstate.only_in_debug.currently_running_nodewith|None->raise_s[%sexp("can only call "^name^" during stabilization":string)]|Somecurrent->current;;(* Note that the two following functions are not symmetric of one another: in [let y =
map x], [x] is always a child of [y] (assuming [x] doesn't become invalid) but [y] in
only a parent of [x] if y is necessary. *)letassert_currently_running_node_is_childstatenodename=let(Tcurrent)=currently_running_node_exnstatenameinifnot(Node.has_childnode~child:current)thenraise_s[%sexp("can only call "^name^" on parent nodes":string),~~(node.kind:_Kind.t),~~(current.kind:_Kind.t)];;letassert_currently_running_node_is_parentstatenodename=let(Tcurrent)=currently_running_node_exnstatenameinifnot(Node.has_parent~parent:currentnode)thenraise_s[%sexp("can only call "^name^" on children nodes":string),~~(node.kind:_Kind.t),~~(current.kind:_Kind.t)];;letmake_stale(node:_Node.t)=letstate=node.stateinlete_opt=expert_kind_of_nodenodeinifUopt.is_somee_optthen(ifdebugthenassert_currently_running_node_is_childstatenode"make_stale";lete=Uopt.unsafe_valuee_optinmatchExpert.make_staleewith|`Already_stale->()|`Ok->ifNode.is_necessarynode&¬(Node.is_in_recompute_heapnode)thenRecompute_heap.addstate.recompute_heapnode);;letinvalidate(node:_Node.t)=letstate=node.stateinifdebugthenassert_currently_running_node_is_childstatenode"invalidate";invalidate_nodenode;propagate_invaliditystate;;letadd_dependency(node:_Node.t)(dep:_Expert.edge)=letstate=node.stateinlete_opt=expert_kind_of_nodenodeinifUopt.is_somee_optthen(ifdebugthenifam_stabilizingstate&¬(List.mem~equal:phys_equalstate.only_in_debug.expert_nodes_created_by_current_node(Tnode))thenassert_currently_running_node_is_childstatenode"add_dependency";lete=Uopt.unsafe_valuee_optinletnew_child_index=Expert.add_child_edgee(Edep)inifNode.is_necessarynodethen(add_parent~child:dep.child~parent:node~child_index:new_child_index;ifdebugthenassert(Node.needs_to_be_computednode);ifnot(Node.is_in_recompute_heapnode)thenRecompute_heap.addstate.recompute_heapnode));;letremove_dependency(node:_Node.t)(edge:_Expert.edge)=letstate=node.stateinlete_opt=expert_kind_of_nodenodeinifUopt.is_somee_optthen(ifdebugthenassert_currently_running_node_is_childstatenode"remove_dependency";lete=Uopt.unsafe_valuee_optin(* It would require additional thoughts to check whether allowing the node not to be
necessary makes sense. *)assert(Node.is_necessarynode);letedge_index=Uopt.value_exnedge.indexinlet(Elast_edge)=Expert.last_child_edge_exneinletlast_edge_index=Uopt.value_exnlast_edge.indexinifedge_index<>last_edge_indexthen(Node.swap_children_except_in_kindnode~child1:edge.child~child_index1:edge_index~child2:last_edge.child~child_index2:last_edge_index;Expert.swap_childrene~child_index1:edge_index~child_index2:last_edge_index;ifdebugthenNode.invariantignorenode);Expert.remove_last_child_edge_exne;remove_child~child:edge.child~parent:node~child_index:last_edge_index;ifdebugthenassert(Node.needs_to_be_computednode);ifnot(Node.is_in_recompute_heapnode)thenRecompute_heap.addstate.recompute_heapnode;ifnot(Node.is_validedge.child)thenExpert.decr_invalid_childrene);;end