123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204(*
* Copyright (C) Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)letdebugfmt=Logging.debug"transaction"fmtopenJunkletnone=0llettest_eagain=reffalseletcheck_parents_perms_identicalroot1root2path=lethierarch=Store.Path.get_hierarchypathinletpermdiff=List.fold_left(funaccpath->letn1=Store.lookuproot1pathandn2=Store.lookuproot2pathinmatchn1,n2with|Somen1,Somen2->(Store.Node.get_permsn1)<>(Store.Node.get_permsn2)||acc|_->true||acc)falsehierarchin(notpermdiff)letget_lowestpath1path2=matchpath2with|None->Somepath1|Somepath2->Some(Store.Path.get_common_prefixpath1path2)lettest_coalesceoldrootcurrentrootpath=letoldnode=Store.lookupoldrootpathandcurrentnode=Store.lookupcurrentrootpathinmatcholdnode,currentnodewith|(Someoldnode),(Somecurrentnode)->ifoldnode==currentnodethen(check_parents_perms_identicaloldrootcurrentrootpath)else(false)|None,None->((* ok then it doesn't exists in the old version and the current version,
just sneak it in as a child of the parent node if it exists, or else fail *)letpnode=Store.lookupcurrentroot(Store.Path.get_parentpath)inmatchpnodewith|None->false(* ok it doesn't exists, just bail out. *)|Some_pnode->true)|_->falseletcan_coalesceoldrootcurrentrootpath=trytest_coalesceoldrootcurrentrootpathwith_->falsetypety=No|Fullof(int32*Store.Node.t*Store.t)typet={ty:ty;store:Store.t;quota:Quota.t;mutablepaths:(Xs_protocol.Op.t*Store.Name.t)list;mutableoperations:(Xs_protocol.Request.payload*Xs_protocol.Response.payload)list;mutableread_lowpath:Store.Path.toption;mutablewrite_lowpath:Store.Path.toption;}letmakeidstore=letty=ifid=nonethenNoelseFull(id,store.Store.root,store)in{ty=ty;store=ifid=nonethenstoreelseStore.copystore;quota=Quota.copystore.Store.quota;paths=[];operations=[];read_lowpath=None;write_lowpath=None;}letget_idt=matcht.tywithNo->none|Full(id,_,_)->idletget_storet=t.storeletget_pathst=t.pathsletadd_wopttypath=t.paths<-(ty,Store.Path.to_namepath)::t.pathsletadd_operationtrequestresponse=t.operations<-(request,response)::t.operationsletget_operationst=List.revt.operationsletset_read_lowpathtpath=t.read_lowpath<-get_lowestpatht.read_lowpathletset_write_lowpathtpath=t.write_lowpath<-get_lowestpatht.write_lowpathletexistst_permspath=Store.existst.storepathletwritetcreatorpermpathvalue=letpath_existed=existstpermpathinStore.writet.storecreatorpermpathvalue;ifpath_existedthenset_write_lowpathtpathelseset_write_lowpatht(Store.Path.get_parentpath);add_woptXs_protocol.Op.Writepathletmkdir?(with_watch=true)tcreatorpermpath=Store.mkdirt.storecreatorpermpath;set_write_lowpathtpath;ifwith_watchthenadd_woptXs_protocol.Op.Mkdirpathletsetpermstpermpathperms=Store.setpermst.storepermpathperms;set_write_lowpathtpath;add_woptXs_protocol.Op.Setpermspathletrmtpermpath=Store.rmt.storepermpath;set_write_lowpatht(Store.Path.get_parentpath);add_woptXs_protocol.Op.Rmpathletlisttpermpath=letr=Store.lst.storepermpathinset_read_lowpathtpath;rletreadtpermpath=letr=Store.readt.storepermpathinset_read_lowpathtpath;rletgetpermstpermpath=letr=Store.getpermst.storepermpathinset_read_lowpathtpath;rletcommit~cont=lethas_write_ops=List.lengtht.paths>0inlethas_coalesced=reffalseinlethas_commited=matcht.tywith|No->true|Full(_id,oldroot,cstore)->letcommit_partialoldrootcstorestore=(* get the lowest path of the query and verify that it hasn't
been modified by others transactions. *)letreadpath_ok=matcht.read_lowpathwith|None->true(* no reads recorded *)|Somepath->can_coalesceoldrootcstore.Store.rootpathinletwritepath_ok=matcht.write_lowpathwith|None->true(* no writes recorded *)|Somepath->can_coalesceoldrootcstore.Store.rootpathinifreadpath_ok&&writepath_okthen(maybe(funp->letn=Store.lookupstore.Store.rootpin(* it has to be in the store, otherwise it means bugs
in the lowpath registration. we don't need to handle none. *)maybe(funn->Store.set_nodecstorepnt.quotastore.Store.quota)n;Logging.write_coalesce~tid:(get_idt)~con(Store.Path.to_stringp);)t.write_lowpath;maybe(funp->Logging.read_coalesce~tid:(get_idt)~con(Store.Path.to_stringp))t.read_lowpath;has_coalesced:=true;cstore.Store.stat_transaction_coalesce<-cstore.Store.stat_transaction_coalesce+1;true)else((* cannot do anything simple, just discard the queries,
and the client need to redo it later *)cstore.Store.stat_transaction_abort<-cstore.Store.stat_transaction_abort+1;false)inlettry_commitoldrootcstorestore=ifoldroot==cstore.Store.rootthen((* move the new root to the current store, if the oldroot
has not been modified *)ifhas_write_opsthen(Store.set_rootcstorestore.Store.root;Store.set_quotacstorestore.Store.quota);true)else(* we try a partial commit if possible *)commit_partialoldrootcstorestoreinif!test_eagain&&Random.int3=0thenfalseelsetry_commitoldrootcstoret.storein(*
if has_commited && has_write_ops then
Disk.write t.store;
*)ifnothas_commitedthenLogging.conflict~tid:(get_idt)~conelseifnot!has_coalescedthenLogging.commit~tid:(get_idt)~con;has_commited