123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581(*
* Copyright (c) 2018-2022 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)openImportopenIrmin_serveropenLwt.SyntaxopenLwt.InfixincludeClient_intfexceptionContinuemoduleConf=structincludeIrmin.Backend.Confletspec=Irmin.Backend.Conf.Spec.v"irmin-client"leturi=Irmin.Type.(mapstring)Uri.of_stringUri.to_stringleturi=Irmin.Backend.Conf.key~spec"uri"uri(Uri.of_string"tcp://127.0.0.1:9181")lettls=Irmin.Backend.Conf.key~spec"tls"Irmin.Type.boolfalselethostname=Irmin.Backend.Conf.key~spec"hostname"Irmin.Type.string"127.0.0.1"endletconfig?(tls=false)?hostnameuri=letdefault_host=Uri.host_with_default~default:"127.0.0.1"uriinletconfig=Irmin.Backend.Conf.add(Irmin.Backend.Conf.emptyConf.spec)Conf.uriuriinletconfig=Irmin.Backend.Conf.addconfigConf.hostname(Option.value~default:default_hosthostname)inIrmin.Backend.Conf.addconfigConf.tlstlsmoduleClient(I:IO)(Codec:Conn.Codec.S)(Store:Irmin.Generic_key.S)=structmoduleC=Command.Make(I)(Codec)(Store)openCmoduleIO=Itypet={ctx:IO.ctx;config:Conf.t;mutableconn:Conn.t;mutableclosed:bool;lock:Lwt_mutex.t;}letcloset=t.closed<-true;IO.close(t.conn.ic,t.conn.oc)letmk_clientconf=leturi=Conf.getconfConf.uriinlethostname=Conf.getconfConf.hostnameinlettls=Conf.getconfConf.tlsinletscheme=Uri.schemeuri|>Option.value~default:"tcp"inletaddr=Uri.host_with_default~default:"127.0.0.1"uriinletclient=matchString.lowercase_asciischemewith|"unix"->`Unix_domain_socket(`File(Uri.pathuri))|"tcp"->letport=Uri.porturi|>Option.value~default:9181inletip=Ipaddr.of_string_exnaddrinifnottlsthen`TCP(`IPip,`Portport)else`TLS(`Hostnamehostname,`IPip,`Portport)|"ws"|"wss"->(letport=Uri.porturi|>Option.value~default:9181inmatchIpaddr.of_stringaddrwith|Okip->ifnottlsthen`Ws(Some(`IPip,`Portport),Uri.to_stringuri)else`TLS(`Hostnamehostname,`IPip,`Portport)|_->`Ws(None,Uri.to_stringuri))|x->invalid_arg("Unknown client scheme: "^x)inclientletlocktf=Lwt_mutex.with_lockt.lockf[@@inline]letsend_command_headert(moduleCmd:C.CMD)=letheader=Conn.Request.v_header~command:Cmd.nameinConn.Request.write_headert.connheaderletrecv(t:t)namety=let*res=Conn.Response.read_headert.conninConn.Response.get_errort.connres>>=function|Someerr->[%log.err"Request error: command=%s, error=%s"nameerr];Lwt.return_error(`Msgerr)|None->let+x=Conn.readt.conntyin[%log.debug"Completed request: command=%s"name];xletrequest(t:t)(typexy)(moduleCmd:C.CMDwithtyperes=xandtypereq=y)(a:y)=ift.closedthenraiseIrmin.Closedelseletname=Cmd.namein[%log.debug"Starting request: command=%s"name];lockt(fun()->let*()=send_command_headert(moduleCmd)inlet*()=Conn.writet.connCmd.req_tainlet*()=IO.flusht.conn.ocinrecvtnameCmd.res_t)letrecv_branch_diff(t:t)=let*_status=Conn.Response.read_headert.conninConn.readt.conn(Irmin.Type.pairStore.Branch.t(Irmin.Diff.tStore.commit_key_t))>|=Error.unwrap"recv_branch_diff"letrecv_branch_key_diff(t:t)=let*_status=Conn.Response.read_headert.conninConn.readt.conn(Irmin.Diff.tStore.commit_key_t)>|=Error.unwrap"recv_branch_key_diff"endmoduleMake(IO:IO)(Codec:Conn.Codec.S)(Store:Irmin.Generic_key.S)=structmoduleClient=Client(IO)(Codec)(Store)moduleCommand=Command.Make(IO)(Codec)(Store)moduleConn=Command.ConnmoduleCommands=Command.Commandsletrequest=Client.requestletrecconnect?ctxconfig=letctx=Option.value~default:(Lazy.forceIO.default_ctx)ctxinletclient=Client.mk_clientconfiginlet*ic,oc=IO.connect~ctxclientinletconn=Conn.vicocinlet+ok=Conn.Handshake.V1.send(moduleStore)conninifnotokthenError.raise_error"invalid handshake"elselett=Client.{config;ctx;conn;closed=false;lock=Lwt_mutex.create()}intandreconnectt=let*()=Lwt.catch(fun()->Client.closet)(fun_->Lwt.return_unit)inlet+conn=connect~ctx:t.ctxt.Client.configint.conn<-conn.conn;t.closed<-falseletdupclient=let*c=connect~ctx:client.Client.ctxclient.Client.configinlet()=ifclient.closedthenc.closed<-trueinLwt.returncleturit=Conf.gett.Client.configConf.urimoduleX=structopenLwt.InfixmoduleSchema=Store.SchemamoduleHash=Store.HashmoduleContents=structtypenonrec'at=Client.topenCommands.ContentsmoduleKey=Store.Backend.Contents.KeymoduleVal=Store.Backend.Contents.ValmoduleHash=Store.Backend.Contents.Hashtypekey=Key.ttypevalue=Val.ttypehash=Hash.tletmemtkey=requestt(moduleMem)key>|=Error.unwrap"Contents.mem"letfindtkey=requestt(moduleFind)key>|=Error.unwrap"Contents.find"letaddtvalue=requestt(moduleAdd)value>|=Error.unwrap"Contents.add"letunsafe_addtkeyvalue=requestt(moduleUnsafe_add)(key,value)>|=Error.unwrap"Contents.unsafe_add"letindexthash=requestt(moduleIndex)hash>|=Error.unwrap"Contents.index"letbatchtf=ftletcloset=Client.closetletmerget=letf~oldab=let*old=old()inmatcholdwith|Okold->requestt(moduleMerge)(old,a,b)>|=Error.unwrap"Contents.merge"|Errore->Lwt.return_erroreinIrmin.Merge.vIrmin.Type.(optionKey.t)fendmoduleNode=structtypenonrec'at=Client.topenCommands.NodemoduleKey=Store.Backend.Node.KeymoduleVal=Store.Backend.Node.ValmoduleHash=Store.Backend.Node.HashmodulePath=Store.Backend.Node.PathmoduleMetadata=Store.Backend.Node.MetadatamoduleContents=Store.Backend.Node.Contentstypekey=Key.ttypevalue=Val.ttypehash=Hash.tletmemtkey=requestt(moduleMem)key>|=Error.unwrap"Node.mem"letfindtkey=requestt(moduleFind)key>|=Error.unwrap"Node.find"letaddtvalue=requestt(moduleAdd)value>|=Error.unwrap"Node.add"letunsafe_addtkeyvalue=requestt(moduleUnsafe_add)(key,value)>|=Error.unwrap"Node.unsafe_add"letindexthash=requestt(moduleIndex)hash>|=Error.unwrap"Node.index"letbatchtf=ftletcloset=Client.closetletmerget=letf~oldab=let*old=old()inmatcholdwith|Okold->requestt(moduleMerge)(old,a,b)>|=Error.unwrap"Node.merge"|Errore->Lwt.return_erroreinIrmin.Merge.vIrmin.Type.(optionKey.t)fendmoduleNode_portable=Store.Backend.Node_portablemoduleCommit=structtypenonrec'at=Client.topenCommands.CommitmoduleKey=Store.Backend.Commit.KeymoduleVal=Store.Backend.Commit.ValmoduleHash=Store.Backend.Commit.HashmoduleInfo=Store.Backend.Commit.InfomoduleNode=Nodetypekey=Key.ttypevalue=Val.ttypehash=Hash.tletmemtkey=requestt(moduleMem)key>|=Error.unwrap"Commit.mem"letfindtkey=requestt(moduleFind)key>|=Error.unwrap"Commit.find"letaddtvalue=requestt(moduleAdd)value>|=Error.unwrap"Commit.add"letunsafe_addtkeyvalue=requestt(moduleUnsafe_add)(key,value)>|=Error.unwrap"Commit.unsafe_add"letindexthash=requestt(moduleIndex)hash>|=Error.unwrap"Commit.index"letbatchtf=ftletcloset=Client.closetletmerget~info=letf~oldab=let*old=old()inmatcholdwith|Okold->requestt(moduleMerge)(info(),(old,a,b))>|=Error.unwrap"Node.merge"|Errore->Lwt.return_erroreinIrmin.Merge.vIrmin.Type.(optionKey.t)fendmoduleCommit_portable=Store.Backend.Commit_portablemoduleBranch=structtypenonrect=Client.topenCommands.BranchmoduleKey=Store.Backend.Branch.KeymoduleVal=Store.Backend.Branch.Valtypekey=Key.ttypevalue=Val.tletmemtkey=requestt(moduleMem)key>|=Error.unwrap"Branch.mem"letfindtkey=requestt(moduleFind)key>|=Error.unwrap"Branch.find"letsettkeyvalue=requestt(moduleSet)(key,value)>|=Error.unwrap"Branch.set"lettest_and_settkey~test~set=requestt(moduleTest_and_set)(key,test,set)>|=Error.unwrap"Branch.test_and_set"letremovetkey=requestt(moduleRemove)key>|=Error.unwrap"Branch.remove"letlistt=requestt(moduleList)()>|=Error.unwrap"Branch.list"typewatch=tletwatcht?initf=let*t=duptinlet*()=requestt(moduleWatch)init>|=Error.unwrap"Branch.watch"inletrecloop()=ift.closed||Conn.is_closedt.connthenLwt.return_unitelseLwt.catch(fun()->Lwt.catch(fun()->Client.recv_branch_difft)(fun_->raiseContinue)>>=fun(key,diff)->fkeydiff>>=loop)(function_->loop())inLwt.asyncloop;Lwt.returntletwatch_keytkey?initf=let*t=duptinlet*()=requestt(moduleWatch_key)(init,key)>|=Error.unwrap"Branch.watch_key"inletrecloop()=ift.closed||Conn.is_closedt.connthenLwt.return_unitelseLwt.catch(fun()->Lwt.catch(fun()->Client.recv_branch_key_difft)(fun_->raiseContinue)>>=f>>=loop)(function_->loop())inLwt.asyncloop;Lwt.returntletunwatch_twatch=let*()=Conn.writewatch.Client.connUnwatch.req_t()inClient.closewatchletcleart=requestt(moduleClear)()>|=Error.unwrap"Branch.clear"letcloset=Client.closetendmoduleSlice=Store.Backend.SlicemoduleRepo=structtypenonrect=Client.tletvconfig=connectconfigletconfig(t:t)=t.Client.configletclose(t:t)=Client.closetletcontents_t(t:t)=tletnode_t(t:t)=tletcommit_t(t:t)=tletbranch_t(t:t)=tletbatch(t:t)f=ftttendmoduleRemote=Irmin.Backend.Remote.None(Commit.Key)(Store.Branch)endincludeIrmin.Of_backend(X)letpingt=requestt(moduleCommands.Ping)()letexport?deptht=requestt(moduleCommands.Export)depth>|=Error.unwrap"export"letimporttslice=requestt(moduleCommands.Import)slice>|=Error.unwrap"import"letcloset=Client.closetletconnect?tls?hostnameuri=letconf=config?tls?hostnameuriinRepo.vconfletrequest_storestore=matchstatusstorewith|`Empty->`Empty|`Branchb->`Branchb|`Commitc->`Commit(Commit.keyc)moduleBatch=structmoduleRequest_tree=Command.Treetypestore=ttypet=(Store.path*[`Contentsof[`HashofStore.Hash.t|`ValueofStore.contents]*Store.metadataoption|`TreeofRequest_tree.t|`Remove])list[@@derivingirmin]letv()=[]letremovekt=(k,`Remove)::tletadd_valuepath?metadatavaluet=(path,`Contents(`Valuevalue,metadata))::tletadd_hashpath?metadatahasht=(path,`Contents(`Hashhash,metadata))::tletadd_treepathtreet=let+tree=matchTree.keytreewith|None->let+concrete_tree=Tree.to_concretetreeinRequest_tree.Concreteconcrete_tree|Somekey->Request_tree.Keykey|>Lwt.returnin(path,`Treetree)::tletapply~info?(path=Store.Path.empty)storet=letrepo=repostoreinletstore=request_storestoreinrequestrepo(moduleCommands.Batch.Apply)((store,path),info(),t)>|=Error.unwrap"Batch.apply"end(* Overrides *)moduleCommit=structincludeCommitmoduleCache=structmoduleKey=Irmin.Backend.Lru.Make(structtypet=commit_keylethash=Hashtbl.hashletequal=Irmin.Type.(unstage(equalcommit_key_t))end)moduleHash=Irmin.Backend.Lru.Make(structtypet=hashlethash=Hashtbl.hashletequal=Irmin.Type.(unstage(equalhash_t))end)letkey:commitKey.t=Key.create32lethash:commitHash.t=Hash.create32endletof_keyrepokey=ifCache.Key.memCache.keykeythenLwt.return_some(Cache.Key.findCache.keykey)elselet+x=of_keyrepokeyinOption.iter(Cache.Key.addCache.keykey)x;xletof_hashrepohash=ifCache.Hash.memCache.hashhashthenLwt.return_some(Cache.Hash.findCache.hashhash)elselet+x=of_hashrepohashinOption.iter(Cache.Hash.addCache.hashhash)x;xendmoduleContents=structincludeContentsmoduleCache=structmoduleHash=Irmin.Backend.Lru.Make(structtypet=hashlethash=Hashtbl.hashletequal=Irmin.Type.(unstage(equalhash_t))end)lethash:contentsHash.t=Hash.create32endletof_hashrepohash=ifCache.Hash.memCache.hashhashthenLwt.return_some(Cache.Hash.findCache.hashhash)elselet+x=of_hashrepohashinOption.iter(Cache.Hash.addCache.hashhash)x;xendletclone~src~dst=letrepo=reposrcinlet*()=Head.findsrc>>=function|None->Branch.removerepodst|Someh->Branch.setrepodsthinof_branchrepodstletrequest_storestore=matchstatusstorewith|`Empty->`Empty|`Branchb->`Branchb|`Commitc->`Commit(Commit.keyc)letmemstorepath=letrepo=repostoreinrequestrepo(moduleCommands.Store.Mem)(request_storestore,path)>|=Error.unwrap"mem"letmem_treestorepath=letrepo=repostoreinrequestrepo(moduleCommands.Store.Mem_tree)(request_storestore,path)>|=Error.unwrap"mem_tree"letfindstorepath=letrepo=repostoreinrequestrepo(moduleCommands.Store.Find)(request_storestore,path)>|=Error.unwrap"find"letremove_exn?clear?retries?allow_empty?parents~infostorepath=letparents=Option.map(List.map(func->Commit.hashc))parentsinletrepo=repostoreinrequestrepo(moduleCommands.Store.Remove)(((clear,retries),(allow_empty,parents)),(request_storestore,path),info())>|=Error.unwrap"remove"letremove?clear?retries?allow_empty?parents~infostorepath=let*x=remove_exn?clear?retries?allow_empty?parents~infostorepathinLwt.return_okxletfind_treestorepath=letrepo=repostoreinlet+concrete=requestrepo(moduleCommands.Store.Find_tree)(request_storestore,path)>|=Error.unwrap"find_tree"inOption.mapTree.of_concreteconcreteend