123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2021 Nomadic Labs. <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)type'at={threshold:int;expected:int;candidates:'aP2p_peer.Id.Table.t;compare:'a->'a->int;values_map_module:(moduleMap.Swithtypekey='a);}letupdatet(peer_id,data)=(* To guarantee that the memory used by the table is bounded by
[t.expected]. *)ifP2p_peer.Id.Table.lengtht.candidates<t.expected||P2p_peer.Id.Table.memt.candidatespeer_idthenP2p_peer.Id.Table.replacet.candidatespeer_iddatatype'astate=|Consensusof'a|No_consensusof('a*int)list|Need_more_candidatesletcreate(typea)?(compare=compare)~expected~threshold()=ifthreshold<0theninvalid_arg"Hash_heuristic.create: threshold negative";ifexpected<thresholdtheninvalid_arg"Hash_heuristic.create: expected lower than threshold";ifexpected>=threshold*2theninvalid_arg"Hash_heuristic.create: expected greater than twice the threshold";(* We store the result of the functor application so that we don't
do it at every call of `get_state`. *)let(values_map_module:(moduleMap.Swithtypekey=a))=(moduleMap.Make(structtypet=aletcompare=compareend))in{expected;threshold;candidates=P2p_peer.Id.Table.create11;compare;values_map_module;}letget_state(typea)(t:at)=ifP2p_peer_id.Table.lengtht.candidates<t.thresholdthenNeed_more_candidateselse(* We retrieve the result of the functor application. *)let(moduleMap:Map.Swithtypekey=a)=t.values_map_modulein(* We use Error as an early return mechanism. *)P2p_peer_id.Table.fold_e(fun_valuemap->letcount=Map.findvaluemap|>Option.fold~some:succ~none:1inletmap'=Map.addvaluecountmapinifcount=t.thresholdthenErrorvalueelseOkmap')t.candidatesMap.empty|>function|Okmap->ifP2p_peer_id.Table.lengtht.candidates<t.expectedthenNeed_more_candidateselseNo_consensus(Map.bindingsmap)|Errorvalue->ConsensusvaluemoduleWorker=structtype'at={expire_time:Ptime.Span.t;(* Expiration delay for a consensus value *)state:'astate;(* Internal state of the consensus heuristic *)mutableexpired:unitLwt.t;(* Expiration promise which is fulfilled when the consensus value
has expired. The expiration mechanism could be simply
implemented as a timestamp check too. *)job:unit->'astateLwt.t;(* Job associated to the consensus heuristic *)mutableresult:'aLwt.t;(* Promise which is fulfilled when a consensus is found *)restart_delay:Ptime.Span.t;(* Restart delay when the consensus heuristic did not reach a consensus *)mutableall_consensus_hooks:('a->unit)list;(* Hooks to be executed on all the consensus values found by the
heuristic. *)mutablenext_consensus_hooks:('a->unit)list;(* Hooks to be executed for the next consensus value found by the
heuristic. *)}letcreate~expire_time~job~restart_delay={expire_time;state=Need_more_candidates(* only for initialisation *);expired=Lwt.return_unit;job;result=Lwt.failLwt.Canceled(* only for initialisation *);restart_delay;all_consensus_hooks=[];next_consensus_hooks=[];}letrecloopt()=letopenLwt_syntaxin(* If we cancel the worker, we cancel [t.result]. This triggers
the cancellation of [loop] if t.result was not fulfilled. If
[t.job] is not a cancelable promise, the cancellation will stop
here and consequently, if [t.job] returns
[Need_more_candidates] for example, we will rerun the
[loop]. To provent this, we wrap [t.job] as a cancelable
promise. *)let*v=Lwt.wrap_in_cancelable(t.job())inmatchvwith|Need_more_candidates|No_consensus_->let*()=Systime_os.sleept.restart_delayinloopt()|Consensusdata->t.expired<-Systime_os.sleept.expire_time;(* We call [List.rev] to ensure hooks are called in the same
order they were registered. *)letone_shot_hooks=List.revt.next_consensus_hooksint.next_consensus_hooks<-[];letforever_hooks=List.revt.all_consensus_hooksinList.iter(funhook->hookdata)one_shot_hooks;List.iter(funhook->hookdata)forever_hooks;Lwt.returndataletwaitt=(* [t]'s job is ongoing if its [result] promise is pending
([Lwt.Sleep]). [t]'s result is expired if its [expired]
promise is resolved ([Lwt.Return]). We start/restart the job if
the current [result] has expired meaning the [result] promise
is not pending. *)ifLwt.statet.result<>Lwt.Sleep&&Lwt.statet.expired=Lwt.Return()thent.result<-loopt();Lwt.protectedt.resultleton_next_consensusthook=matchLwt.statet.resultwith|Lwt.Returndata->hookdata|_->t.next_consensus_hooks<-hook::t.next_consensus_hooksleton_all_consensusthook=t.all_consensus_hooks<-hook::t.all_consensus_hooks;matchLwt.statet.resultwithLwt.Returndata->hookdata|_->()letcancelt=(* We cancel the promises handled by the worker. We also ensure
that these two promises are resolved with a [Canceled]
state. Finally, we remove all the hooks to avoid any memory
leaks. *)Lwt.cancelt.expired;t.expired<-Lwt.failLwt.Canceled;Lwt.cancelt.result;t.result<-Lwt.failLwt.Canceled;t.all_consensus_hooks<-[];t.next_consensus_hooks<-[]end