123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446(*
* Copyright (c) 2013-2017 Thomas Gazagnaire <thomas@gazagnaire.org>
* and Romain Calascibetta <romain.calascibetta@gmail.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)letsrc=Logs.Src.create"negociator"~doc:"logs negociator's event"moduleLog=(valLogs.src_logsrc:Logs.LOG)moduleFlag=structtypet=intletdefault=0letcomplete=1lsl0letcommon=1lsl1letcommon_ref=1lsl2letseen=1lsl3letpopped=1lsl4letalternate=1lsl5letisvaluex=xlandvalue<>0let_is_completex=isxcompleteletis_commonx=isxcommonletis_common_refx=isxcommon_refletis_seenx=isxseenletis_poppedx=isxpoppedlet_is_alternatex=isxalternatelet_to_completex=xlorcompleteletto_commonx=xlorcommonlet_to_common_refx=xlorcommon_refletto_seenx=xlorseenletto_poppedx=xlorpoppedlet_to_alternatex=xloralternateletppppfx=letflags=[(ifisxcompletethen["COMPLETE"]else[]);(ifisxcommonthen["COMMON"]else[]);(ifisxcommon_refthen["COMMON_REF"]else[]);(ifisxseenthen["SEEN"]else[]);(ifisxpoppedthen["POPPED"]else[]);(ifisxalternatethen["ALTERNATE"]else[])]|>List.concatinFmt.list~sep:(Fmt.unit" |@ ")Fmt.stringppfflagsend(* XXX(dinosaure): see this paper
https://github.com/ocamllabs/papers/blob/master/irmin/2014.08.matthieu/rapport.pdf *)moduletypeS=sigmoduleStore:Minimal.SmoduleCommon:Smart.COMMONwithtypehash:=Store.Hash.tandtypereference:=Store.Reference.tmoduleDecoder:Smart.DECODERwithmoduleHash:=Store.HashandmoduleReference:=Store.ReferenceandmoduleCommon:=Commontypestatetypeacks=Store.Hash.tSync.acksvalfind_common:Store.t->(Store.Hash.Set.t*state*(acks->state->([`AgainofStore.Hash.Set.t|`Done|`Ready]*state)Lwt.t))Lwt.tendmoduleMake(G:Minimal.S)=structmoduleStore=GmoduleV=structtypet={commit:Store.Value.Commit.t;mutableflags:Flag.t}letcompareab=Store.Value.Commit.compareb.commita.commitletppppf{commit;flags}=Fmt.pfppf"{ @[<hov>commit = %a;@ flags = [ %a ];@] }"(Fmt.hvboxStore.Value.Commit.pp)commit(Fmt.hvboxFlag.pp)flagsendmodulePq=Psq.Make(Store.Hash)(V)moduleCommon=Smart.Common(Store.Hash)(Store.Reference)moduleDecoder=Smart.Decoder(Store.Hash)(Store.Reference)(Common)(* XXX(dinosaure): short-cut of the smart decoder module, the common way is
to load the [Sync] module but, I don't want. And because we annotated some
constraints about the type (and specifically about the hash), we can
compile. *)moduleBucket=structtypet=(Store.Hash.t,V.t)Hashtbl.t(* XXX(dinosaure): because the commit graph is a DAG, sometimes we ask to
read a specific commit already computed. In the negotiation process, we
keep some information for each commit (see {!Flag.t}) as a mutable
value. So we keep a mutable « bucket » which contains all commit/value
already processed. When, we ask to compute parents from a commit, we use
this function to get the new commit from {!Store.read} or return the
already computed commit with the mutable field flags kept.
The flag is important to avoid to re-computed the same commit n-times
(see the flag SEEN). *)letgett(bucket:t)hash=letopenLwt.InfixintryHashtbl.findbuckethash|>funv->Lwt.return(Okv)withNot_found->(Store.readthash>>=function|Ok(Store.Value.Commitcommit)->letvalue={V.commit;flags=Flag.default}inHashtbl.addbuckethashvalue;Lwt.return(Okvalue)|Ok(Store.Value.Tree_|Store.Value.Tag_|Store.Value.Blob_)->Lwt.return(Error(`Invalid_hashhash))|Errorerr->Lwt.return(Error(`Storeerr)))endtyperev={pq:Pq.t;non_common_revs:int}letpp_revppfrev=Fmt.pfppf"{ @[<hov>pq = %a;@ non_common_revs = %d;@] }"(Fmt.hvbox(Pq.ppFmt.Dump.(pairStore.Hash.ppV.pp)))rev.pqrev.non_common_revsletpushhashvaluemarkrev=if(value.V.flags:>int)landmark=0then(value.V.flags<-(value.V.flags:>int)lormark;{pq=Pq.addhashvaluerev.pq;non_common_revs=(ifnot(Flag.is_commonvalue.V.flags)thenrev.non_common_revs+1elserev.non_common_revs)})elserev(* XXX(dinosaure): this function marks a rev and its ancestors as common. In
some cases, it is desirable to mark only the ancestors (for example, hen
only the server does not yet know that they are common). *)letrecmark_commontbucket~ancestorshashvaluerev=letopenLwt.Infixinifnot(Flag.is_commonvalue.V.flags)then(ifnotancestorsthenvalue.V.flags<-Flag.to_commonvalue.V.flags;ifnot(Flag.is_seenvalue.V.flags)thenpushhashvalueFlag.seenrev|>funrev->Lwt.returnrevelseletrecgorev=function|[]->Lwt.returnrev|hash::rest->(Bucket.gettbuckethash>>=function|Error_->Lwt.returnrev(* XXX(dinosaure): in git, if we can not get a commit from an
hash (for any reason), we just avoid the rest of the compute
(so we avoid the rest of the ancestors).
For me, we need to notice to the user the error (so avoid the
next compute and return only the error). So we need to think
about that if it's what we really want. TODO! *)|Okvalue->mark_commontbucket~ancestors:falsehashvaluerev>>=funrev->gorevrest)inletnon_common_revs=if(notancestors)&¬(Flag.is_poppedvalue.V.flags)thenrev.non_common_revs-1elserev.non_common_revsingo{revwithnon_common_revs}(Store.Value.Commit.parentsvalue.V.commit))elseLwt.returnrevletgettbucketrev=letopenLwt.Infixinletrecgorev=ifrev.non_common_revs=0thenLwt.return(None,rev)elsematchPq.poprev.pqwith|None->Lwt.return(None,rev)|Some((hash,({V.commit;flags}asvalue)),pq)->value.V.flags<-Flag.to_poppedflags;letnon_common_revs=ifnot(Flag.is_commonflags)thenrev.non_common_revs-1elserev.non_common_revsinletleave,marks=ifFlag.is_commonflagsthenfalse,Flag.to_common@@Flag.to_seen@@Flag.default(* XXX(dinosaure): do not send "have", and ignore ancestors. *)elseifFlag.is_common_refflagsthentrue,Flag.to_common@@Flag.to_seen@@Flag.default(* XXX(dinosaure): send "have" and ignore ancestors. *)elsetrue,Flag.to_seen@@Flag.default(* XXX(dinosaure): send "have", also for its ancestors. *)inletrecgogorev=function|[]->Lwt.returnrev|hash::rest->(Bucket.gettbuckethash>>=function|Ok({V.flags;_}asvalue)->letrev=ifnot(Flag.is_seenflags)thenpushhashvaluemarksrevelserevin(ifFlag.is_commonmarksthenmark_commontbucket~ancestors:truehashvaluerevelseLwt.returnrev)>>=funrev->gogorevrest|Error_->Lwt.returnrev)ingogo{pq;non_common_revs}(Store.Value.Commit.parentscommit)>>=funrev->ifleavethenLwt.return(Somehash,rev)elsegorevingorevtypestate={ready:bool(* got ready detail *);continue:bool(* got continue detail *);finish:bool(* want to finish *);count:int(* how many hashes we get *);flush:int(* how many hashes before flush *);vain:int(* how many vainly hashes we have *);rev:rev(* priority queue *);in_fly:Store.Hash.tlist}typeacks=Store.Hash.tSync.ackslet_pp_stateppfstate=Fmt.pfppf"{ @[<hov>ready = %b;@ continue = %b;@ finish = %b;@ count = %d;@ flush \
= %d;@ vain = %d;@ rev = %a;@ in_fly = %a;@] }"state.readystate.continuestate.finishstate.countstate.flushstate.vain(Fmt.hvboxpp_rev)state.revFmt.Dump.(listStore.Hash.pp)state.in_fly(* XXX(dinosaure): to be clear, this implementation is very bad and we need
to change it (TODO). For example, the [in_fly] field is used only one time
(in the second call of [continue]. Then, we never used this field. You
need to understand than this implementation is close to what git does -
and apparently, it's not the best. *)exceptionJumpofStore.Hash.tlist*stateexceptionInvalid_commitof[`StoreofStore.error|`Invalid_hashofStore.Hash.t]let_INITIAL_FLUSH=16let_PIPESAFE_FLUSH=32(* XXX(dinosaure): the canonical implementation will send up to 32 of 'have'
at a time, then will send a flush packet line. The canonical
implementation will skip ahead and send the next 32 immediately, so that
there always a block of 32 "in-flight on the wire" at a time. *)let_VAIN=128letupdate_flushcount=ifcount<_PIPESAFE_FLUSHthencountlsl1elsecount+_PIPESAFE_FLUSHletfind_commont=letbucket=Hashtbl.create1024inletrev={pq=Pq.empty;non_common_revs=0}inletopenLwt.InfixinStore.Ref.listt>>=(funrefs->Log.debug(funl->l"Local references: %a."(Fmt.hvbox(Fmt.list(Fmt.pairStore.Reference.ppStore.Hash.pp)))refs);Lwt_list.fold_left_s(funrev(_,hash)->Bucket.gettbuckethash>|=function|Okvalue->pushhashvalueFlag.seenrev|Error_->rev)revrefs)>>=funrev->letrecconsumehavestate=function|0->Lwt.return(List.revhave,`Continuestate)|n->(gettbucketstate.rev>>=function|None,rev->Lwt.return(List.revhave,`Not_enough{statewithrev})|Somehash,rev->consume(hash::have){statewithrev;count=state.count+1;vain=state.vain+1}(n-1))inletcontinue{Sync.acks;_}state=letrecgostatehave=function|[]->Lwt.return(state,have)|(_,`ACK)::_->(* XXX(dinosaure): without multi-ack or multi-ack-detailed,
upload-pack sends 'ACK obj-id' on the first common object it
finds. After that is says nothing until the client gives it a
"done". *)Lwt.fail(Jump(have,state))|(hash,((`Common|`Ready|`Continue)asack))::rest->(Bucket.gettbuckethash>>=function|Errorerr->Lwt.fail(Invalid_commiterr)|Okvalue->letvain,have'=ifack=`Common&&Flag.is_commonvalue.V.flagsthen0,hash::have(* XXX(dinosaure): we need to replay the have for this
object on the next RPC request so the peer knows it is
in common with us. *)elseifack<>`Commonthen0,have(* XXX(dinosaure): reset [limit] because an ACK for this
commit has not been seen. *)elsestate.vain+1,haveinmark_commontbucket~ancestors:falsehashvaluestate.rev>|=(ifack=`Readythenfun_->{revwithpq=Pq.empty}elsefunx->x)>>=funrev->go{statewithrev;continue=true;ready=ack=`Ready;vain}have'rest)inifstate.finish&&List.lengthstate.in_fly=0thenLwt.return(`Done,state)elseLwt.try_bind(fun()->go{statewithin_fly=[]}state.in_flyacks)(fun(state,have)->(* XXX(dinosaure): this test does not appear on the Git code base
but it's the default case to end up the negotiation engine. *)ifstate.readythenLwt.return(`Ready,state)elseifstate.continue&&state.vain>_VAINthenLwt.return(`Again(Store.Hash.Set.of_listhave),{statewithfinish=true})elseletrecgostatehave=(* XXX(dinosaure): consume to the [state.flush] limit. *)gettbucketstate.rev>>=function|Somehash,rev->letstate={statewithvain=state.vain+1;count=state.count+1;rev}inifstate.count>=state.flushthenLwt.return(`Again(Store.Hash.Set.of_list(hash::have)),{statewithflush=update_flushstate.count})elsegostate(hash::have)|None,rev->Lwt.return(`Again(Store.Hash.Set.of_listhave),{statewithrev;finish=true})(* XXX(dinosaure): in the next step, we stop. *)ingostatehave)(function|Jump(have,state)->(* XXX(dinosaure): if we received a ready flag, that means is
not necessary to continue the negotiation (the server found
an acceptable common base commit) and we will receive then
the PACK file even if we don't write "done". *)ifnotstate.readythen(Log.debug(funl->l~header:"find_common""We catch a jump exception to get out of the main \
loop and return `Again.");Lwt.return(`Again(Store.Hash.Set.of_listhave),{statewithfinish=true}))else(Log.debug(funl->l~header:"find_common""We catch a jump exception to get out of the main \
loop and return `Ready.");Lwt.return(`Ready,state))|exn->Lwt.failexn)(* XXX(dinosaure): you need to take care about the exception
[Invalid_commit]. This can be happens when the server ACK a wrong
commit (that means the client does not have this commit). In this
case, the better case is to abort all.
NOTE: if you read the code, you can notice that we silent any error
from the store (for example, when we try to get an object from the
store and catch an error, we continue the compute without any warning
or exception). *)in(* XXX(dinosaure): first, we consume [_INITIAL_FLUSH] hashes of the
priority queue. *)consume[]{ready=false;continue=false;finish=false;count=0;flush=_INITIAL_FLUSH;vain=0;rev;in_fly=[]}_INITIAL_FLUSH>>=function|have,`Not_enoughstate->Lwt.return(Store.Hash.Set.of_listhave,{statewithfinish=true},fun_state->Lwt.return(`Done,{statewithfinish=true}))|have,`Continuestate->((* XXX(dinosaure): then, we update the flush limit: in other words,
which time we send the next have list. *)letstate={statewithflush=update_flushstate.count}in(* XXX(dinosaure): we keep one window "ahead" of the other side, and
will wait for an ACK only on the next one. *)consume[]statestate.flush>>=function|_,`Not_enoughstate->Lwt.return(Store.Hash.Set.of_listhave,{statewithfinish=true},fun_state->Lwt.return(`Done,{statewithfinish=true}))|in_fly,`Continuestate->Lwt.return(Store.Hash.Set.of_listhave,{statewithin_fly},continue))end