123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2020 Nomadic Labs. <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)typestatus=Chain_validator_worker_state.synchronisation_status=|Synchronisedof{is_chain_stuck:bool}|Not_synchronisedtypecandidate=Time.Protocol.t*P2p_peer.Id.t(* An order is used on candidates. This order is given by the
predicate (and implemented in the [earlier] function) :
forall v, None < Some v \/ forall t t' p p', Time.Protocol.(t < t')
-> Some (t,p) < Some (t',p') = true. The reflexive closure
(according to the timestamp) of this order is implemented in
[earlier_or_coincident].
Variants provide specifalised option/non-option versions *)letearlier_olr=match(l,r)with|None,None->false|None,Some_->true|Some(i,_),Some(j,_)->Time.Protocol.(i<j)|Some_,None->falseletearlier_ro(i,_)r=matchrwithSome(j,_)->Time.Protocol.(i<j)|None->falseletearlierl(j,_)=matchlwithNone->true|Some(i,_)->Time.Protocol.(i<j)letcoincident_olr=match(l,r)with|None,None->true|Some(i,_),Some(j,_)->Time.Protocol.(i=j)|_->falseletearlier_or_coincident_olr=earlier_olr||coincident_olrmoduleCore=structtypet={(* Number of peers which are taken into account to be considered as
synchronized *)threshold:int;(* The least recent block validated from the [threshold] peers
should be dated no more than [latency] seconds. *)latency:int;(* Store the [threshold] best validated block timestamps with their peer. *)candidates:candidateoptionArray.t;(* Index of one of the candidates in [candidates] with the oldest timestamp. *)mutableindex_of_oldest_candidate:int;(* Index of one of the candidates in [candidates] with the most recent
timestamp. *)mutableindex_of_youngest_candidate:int;(* Current status of the heuristic. *)mutablecurrent_status:status;}(* Invariants:
- forall state, state.threshold >= 0 -> Array.length
state.candidates = state.threshold
- forall state, state.threshold > 0 -> state.least_index is a valid
index of state.candidates
- forall state, state.threshold > 0 -> state.best_index is a valid
index of state.candidates
- forall state i, state.threshold > 0 -> 0 <= i < state.threshold
-> state.candidates.(state.index_of_oldest_candidate) <= state.candidates.(i)
- forall state i, state.threshold > 0 -> 0 <= i < state.threshold
-> state.candidates.(state.index_of_youngest_candidate) >= state.candidates.(i)
- forall state i j, 0 <= i,j < state.threshold ->
snd(state.candidates(i)) = snd (state.candidates(j)) -> i = j
This is denoted as "valid(state)". *)(* Update only if the timestamp is greater at the given index (see
[earlier]). *)letmay_update_bindingstateindexcandidate=ifearlierstate.candidates.(index)candidatethenstate.candidates.(index)<-Somecandidate(* Return [true] if the candidate's peer is [peer_id] *)letsame_peer(_,peer_id)=function|None->false|Some(_,peer)->P2p_peer.Id.equalpeerpeer_id(* Invariant:
- forall threshold, latency, valid(create ~threshold ~latency). *)letcreate~threshold~latency:t=letcurrent_status=ifthreshold=0thenSynchronised{is_chain_stuck=false}elseNot_synchronisedin{latency;threshold;candidates=(ifthreshold<=0then[||]elseArray.makethresholdNone);index_of_youngest_candidate=0;index_of_oldest_candidate=0;current_status;}letcompute_statusstate=ifstate.threshold<0thenNot_synchronisedelseifstate.threshold=0thenSynchronised{is_chain_stuck=false}elseletnow=Time.System.to_protocol@@Time.System.now()inmatch(state.candidates.(state.index_of_youngest_candidate),state.candidates.(state.index_of_oldest_candidate))with|None,_|_,None->(* The threshold is not reached *)Not_synchronised|Some(best,_),Some(least,_)->letleast_timestamp_drifted=Time.Protocol.addleast(Int64.of_intstate.latency)inifTime.Protocol.(least_timestamp_drifted>=now)thenSynchronised{is_chain_stuck=false}elseifTime.Protocol.(best=least)&&state.threshold<>1then(* The reason why the heuristic does not allow to be stuck
when threshold is one is related to the behavior of the
node. A node should not be bootstrapped while
bootstrapping. When the threshold is one, if the node
validates a block in the past, then it will be declared
as [Synchronised {is_stuck=true}]. Once the threshold is
2, this cannot happen for new validated blocks since a
new validated block comes only from one peer. *)Synchronised{is_chain_stuck=true}elseNot_synchronised(* Invariant:
- forall state candidate, valid(state) ->
valid(update state candidate; state). *)letupdatestatecandidate=ifstate.threshold<=0then()elseifstate.threshold=1thenmay_update_bindingstate0candidateelseifearlier_rocandidatestate.candidates.(state.index_of_oldest_candidate)then()else(* If we find a candidate for the same peer as candidate's, we'll
set this, but otherwise we should update the oldest candidate *)letindex_to_update=refstate.index_of_oldest_candidatein(* We search for the second-worst entry by starting with the best
and just recording whatever is worse than currently known
except for the known-worst *)letindex_of_second_oldest_candidate=refstate.index_of_youngest_candidateinArray.iteri(funiknown_candidate->(* check that we have found the same peer as the candidate *)ifsame_peercandidateknown_candidatethenindex_to_update:=i;(* check if we have found the (an) index for the second oldest candidate *)if(* we are looking for the second-oldest, not the oldest
(remember threshold >= 2 so they are distinct) *)i<>state.index_of_oldest_candidate&&(* has to be at least as old as the previously known second oldest *)earlier_or_coincident_oknown_candidatestate.candidates.(!index_of_second_oldest_candidate)thenindex_of_second_oldest_candidate:=i)state.candidates;(* Properties at this time:
- forall v, index_of_second_oldest_candidate <> state.index_of_oldest_candidate &&
(either v = least or v >= state.candidates.(index_of_second_oldest_candidate))
*)(* patch the candidate that needs patching *)may_update_bindingstate!index_to_updatecandidate;(* patch the pointer to the oldest candidate in case it was rewritten *)if!index_to_update=state.index_of_oldest_candidate&&earlierstate.candidates.(!index_of_second_oldest_candidate)candidatethenstate.index_of_oldest_candidate<-!index_of_second_oldest_candidate;(* patch the pointer to the youngest candidate in case we wrote something
younger *)ifearlierstate.candidates.(state.index_of_youngest_candidate)candidatethenstate.index_of_youngest_candidate<-!index_to_update(* We shadow update to ensure the current_status is updated. *)letupdatestatecandidate=updatestatecandidate;state.current_status<-compute_statusstateletget_statusstate=state.current_statusendmoduleBootstrapping=structtypet={heuristic:Core.t;mutablebootstrapped:bool;when_status_changes:status->unitLwt.t;when_bootstrapped_changes:bool->unitLwt.t;on_bootstrapped:unitLwt_condition.t;}(* [initalisation] is a particular case when the heuristic is
created to ensure that we call the [when_bootstrapped_changes]
callback. *)letset_bootstrapped?(initialisation=false)statebootstrapped=letold_value=state.bootstrappedinstate.bootstrapped<-bootstrapped;ifold_value=false&&bootstrappedthenLwt_condition.signalstate.on_bootstrapped();ifold_value<>bootstrapped||initialisationthenstate.when_bootstrapped_changesbootstrappedelseLwt.return_unitletcreate?(when_bootstrapped_changes=fun_->Lwt.return_unit)?(when_status_changes=fun_->Lwt.return_unit)~threshold~latency():t=letheuristic=Core.create~threshold~latencyin{heuristic;when_status_changes;when_bootstrapped_changes;on_bootstrapped=Lwt_condition.create();bootstrapped=false;}letactivatestate=letopenLwt_syntaxinletis_synchronised=matchstate.heuristic.current_statuswith|Synchronised_->true|_->falseinlet*()=set_bootstrapped~initialisation:truestateis_synchronisedinstate.when_status_changes(Core.get_statusstate.heuristic)letupdatestatecandidate=letopenLwt_syntaxinletold_status=Core.get_statusstate.heuristicinCore.updatestate.heuristiccandidate;letnew_status=Core.get_statusstate.heuristicinlet*()=ifold_status<>new_statusthenstate.when_status_changesnew_statuselseLwt.return_unitinmatchnew_statuswith|Synchronised_whenstate.bootstrapped=false->set_bootstrappedstatetrue|_->Lwt.return_unitletget_statusstate=Core.get_statusstate.heuristicletis_bootstrappedstate=state.bootstrappedletforce_bootstrappedstateb=set_bootstrappedstatebletbootstrappedstate=ifstate.bootstrappedthenLwt.return_unitelseLwt_condition.waitstate.on_bootstrappedend