123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377(*
* Copyright (C) Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)letinfofmt=Logging.info"connection"fmtleterrorfmt=Logging.debug"connection"fmtexceptionEnd_of_filetypewatch={con:t;token:string;name:Store.Name.t;mutablecount:int;}andt={address:Xs_protocol.address;interface:(moduleNamespace.IO)option;domid:int;domstr:string;idx:int;(* unique counter *)transactions:(int32,Transaction.t)Hashtbl.t;mutablenext_tid:int32;watches:(Store.Name.t,watchlist)Hashtbl.t;mutablenb_watches:int;mutablenb_dropped_watches:int;mutablestat_nb_ops:int;mutableperm:Perms.t;watch_events:(string*string)Queue.t;cvar:unitLwt_condition.t;domainpath:Store.Path.t;}letby_address:(Xs_protocol.address,t)Hashtbl.t=Hashtbl.create128letby_index:(int,t)Hashtbl.t=Hashtbl.create128letwatches:(string,watchlist)Trie.tref=ref(Trie.create())letlist_of_watches()=Trie.fold(funpathv_optacc->matchv_optwith|None->Printf.sprintf"%s <- None"path::acc|Somevs->Printf.sprintf"%s <- %s"path(String.concat", "(List.map(funv->v.con.domstr)vs))::acc)!watches[]letwatch_create~con~name~token={con=con;token=token;name=name;count=0;}letget_conw=w.conletnumber_of_transactionscon=Hashtbl.lengthcon.transactionsletanon_id_next=ref1letdestroyaddress=tryletc=Hashtbl.findby_addressaddressinLogging.end_connection~tid:Transaction.none~con:c.domstr;watches:=Trie.map(funwatches->matchList.filter(funw->w.con!=c)watcheswith|[]->None|ws->Somews)!watches;Hashtbl.removeby_addressaddress;Hashtbl.removeby_indexc.idx;withNot_found->error"Failed to remove connection for: %s"(Xs_protocol.string_of_addressaddress)letcounter=ref0letcreateaddressinterface=ifHashtbl.memby_addressaddressthenbegininfo"Connection.create: found existing connection for %s: closing"(Xs_protocol.string_of_addressaddress);destroyaddressend;letdom=Xs_protocol.domain_of_addressaddressinletcon={address=address;interface=interface;domid=dom;idx=!counter;domstr=Xs_protocol.string_of_addressaddress;transactions=Hashtbl.create5;next_tid=1l;watches=Hashtbl.create8;nb_watches=0;nb_dropped_watches=0;stat_nb_ops=0;perm=Perms.of_domaindom;watch_events=Queue.create();cvar=Lwt_condition.create();domainpath=Store.Path.getdomainpathdom;}inincrcounter;Logging.new_connection~tid:Transaction.none~con:con.domstr;Hashtbl.replaceby_addressaddresscon;Hashtbl.replaceby_indexcon.idxcon;conletrestrictcondomid=con.perm<-Perms.restrictcon.permdomidletget_watches(con:t)name=ifHashtbl.memcon.watchesnamethenHashtbl.findcon.watchesnameelse[]letadd_watchconnametoken=ifcon.nb_watches>=(Quota.maxwatch_of_domaincon.domid)thenraiseQuota.Limit_reached;letl=get_watchesconnameinifList.exists(funw->w.token=token)lthenraise(Store.Already_exists(Printf.sprintf"%s:%s"(Store.Name.to_stringname)token));letwatch=watch_create~con~token~nameinHashtbl.replacecon.watchesname(watch::l);con.nb_watches<-con.nb_watches+1;watches:=(letkey=Store.Name.(to_key(make_absolutename(Store.Path.to_stringcon.domainpath)))inletws=ifTrie.mem!watcheskeythenTrie.find!watcheskeyelse[]inTrie.set!watcheskey(watch::ws));watchletdel_watchconnametoken=letws=Hashtbl.findcon.watchesnameinletw=List.find(funw->w.token=token)wsinletfiltered=List.filter(fune->e!=w)wsinifList.lengthfiltered>0thenHashtbl.replacecon.watchesnamefilteredelseHashtbl.removecon.watchesname;con.nb_watches<-con.nb_watches-1;watches:=(letkey=Store.Name.(to_key(make_absolutename(Store.Path.to_stringcon.domainpath)))inletws=List.filter(funx->x!=w)(Trie.find!watcheskey)inifws=[]thenTrie.unset!watcheskeyelseTrie.set!watcheskeyws)letfire_onenamewatch=letname=matchnamewith|None->(* If no specific path was modified then we fire the generic watch *)watch.name|Somename->(* If the watch was registered as a relative path, then we make
all the watch events relative too *)ifStore.Name.is_relativewatch.namethenStore.Path.make_relativewatch.con.domainpathnameelsenameinletname=Store.Name.to_stringnameinletopenXs_protocolinLogging.response~tid:0l~con:watch.con.domstr(Response.Watchevent(name,watch.token));watch.count<-watch.count+1;ifQueue.lengthwatch.con.watch_events>=(Quota.maxwatchevent_of_domainwatch.con.domid)thenbeginerror"domid %d reached watch event quota (%d >= %d): dropping watch %s:%s"watch.con.domid(Queue.lengthwatch.con.watch_events)(Quota.maxwatchevent_of_domainwatch.con.domid)namewatch.token;watch.con.nb_dropped_watches<-watch.con.nb_dropped_watches+1endelsebeginQueue.add(name,watch.token)watch.con.watch_events;Lwt_condition.signalwatch.con.cvar()endletfire(op,name)=letkey=Store.Name.to_keynameinTrie.iter_path(fun_w->matchwwith|None->()|Somews->List.iter(fire_one(Somename))ws)!watcheskey;ifop=Xs_protocol.Op.RmthenTrie.iter(fun_w->matchwwith|None->()|Somews->List.iter(fire_oneNone)ws)(Trie.sub!watcheskey)letfind_next_tidcon=letret=con.next_tidincon.next_tid<-Int32.addcon.next_tid1l;retletregister_transactionconstore=ifHashtbl.lengthcon.transactions>=(Quota.maxtransaction_of_domaincon.domid)thenraiseQuota.Limit_reached;letid=find_next_tidconinletntrans=Transaction.makeidstoreinHashtbl.addcon.transactionsidntrans;Logging.start_transaction~tid:id~con:con.domstr;idletunregister_transactioncontid=Hashtbl.removecon.transactionstidletget_transactioncontid=tryHashtbl.findcon.transactionstidwithNot_foundase->error"Failed to find transaction %lu on %s"tidcon.domstr;raiseeletmark_symbolscon=Hashtbl.iter(fun_t->Store.mark_symbols(Transaction.get_storet))con.transactionsletstatscon=Hashtbl.lengthcon.watches,con.stat_nb_opsletdebugcon=letlist_watchescon=letll=Hashtbl.fold(fun_watchesacc->List.map(funwatch->watch.name,watch.token)watches::acc)con.watches[]inList.concatllinletwatches=List.map(fun(name,token)->Printf.sprintf"watch %s: %s %s\n"con.domstr(Store.Name.to_stringname)token)(list_watchescon)inString.concat""watchesmoduleInterface=structincludeNamespace.Unsupportedletread_connectiontpermspathc=function|[]->""|"address"::[]->Xs_protocol.string_of_addressc.address|"current-transactions"::[]->string_of_int(Hashtbl.lengthc.transactions)|"total-operations"::[]->string_of_intc.stat_nb_ops|"current-watch-queue-length"::[]->string_of_int(Queue.lengthc.watch_events)|"total-dropped-watches"::[]->string_of_intc.nb_dropped_watches|"watch"::[]->""|"watch"::n::[]->letn=int_of_stringninletall=Hashtbl.fold(fun_wacc->w@acc)c.watches[]inifn>(List.lengthall)thenStore.Path.doesnt_existpath;""|"watch"::n::"name"::[]->letn=int_of_stringninletall=Hashtbl.fold(fun_wacc->w@acc)c.watches[]inifn>(List.lengthall)thenStore.Path.doesnt_existpath;Store.Name.to_string(List.nthalln).name|"watch"::n::"token"::[]->letn=int_of_stringninletall=Hashtbl.fold(fun_wacc->w@acc)c.watches[]inifn>(List.lengthall)thenStore.Path.doesnt_existpath;(List.nthalln).token|"watch"::n::"total-events"::[]->letn=int_of_stringninletall=Hashtbl.fold(fun_wacc->w@acc)c.watches[]inifn>(List.lengthall)thenStore.Path.doesnt_existpath;string_of_int(List.nthalln).count|"backend"::rest->beginmatchc.interfacewith|None->Store.Path.doesnt_existpath|Somei->letmoduleI=(vali:Namespace.IO)inI.readtperms(Store.Path.of_string_listrest)end|_->Store.Path.doesnt_existpathletreadt(perms:Perms.t)(path:Store.Path.t)=Perms.haspermsPerms.CONFIGURE;matchStore.Path.to_string_listpathwith|[]->""|"socket"::[]->""|"socket"::idx::rest->letidx=int_of_stringidxinifnot(Hashtbl.memby_indexidx)thenStore.Path.doesnt_existpath;letc=Hashtbl.findby_indexidxinread_connectiontpermspathcrest|"domain"::[]->""|"domain"::domid::rest->letaddress=Xs_protocol.Domain(int_of_stringdomid)inifnot(Hashtbl.memby_addressaddress)thenStore.Path.doesnt_existpath;letc=Hashtbl.findby_addressaddressinread_connectiontpermspathcrest|_->Store.Path.doesnt_existpathletexiststpermspath=tryignore(readtpermspath);truewithStore.Path.Doesnt_exist_->falseletwrite_connectiontcreatorpermspathcv=function|"backend"::rest->beginmatchc.interfacewith|None->Store.Path.doesnt_existpath|Somei->letmoduleI=(vali:Namespace.IO)inI.writetcreatorperms(Store.Path.of_string_listrest)vend|_->raisePerms.Permission_deniedletwritetcreator(perms:Perms.t)(path:Store.Path.t)v=Perms.haspermsPerms.CONFIGURE;matchStore.Path.to_string_listpathwith|"socket"::idx::rest->letidx=int_of_stringidxinifnot(Hashtbl.memby_indexidx)thenStore.Path.doesnt_existpath;letc=Hashtbl.findby_indexidxinwrite_connectiontcreatorpermspathcvrest|"domain"::domid::rest->letaddress=Xs_protocol.Domain(int_of_stringdomid)inifnot(Hashtbl.memby_addressaddress)thenStore.Path.doesnt_existpath;letc=Hashtbl.findby_addressaddressinwrite_connectiontcreatorpermspathcvrest|_->raisePerms.Permission_deniedletrecbetweenstartfinish=ifstart>finishthen[]elsestart::(between(start+1)finish)letlist_connectiontpermsc=function|[]->["address";"current-transactions";"total-operations";"watch";"current-watch-queue-length";"total-dropped-watches";"backend"]|["watch"]->letall=Hashtbl.fold(fun_wacc->w@acc)c.watches[]inList.mapstring_of_int(between0(List.lengthall-1))|["watch";_]->["name";"token";"total-events"]|"backend"::rest->beginmatchc.interfacewith|None->[]|Somei->letmoduleI=(vali:Namespace.IO)inI.listtperms(Store.Path.of_string_listrest)end|_->[]letlisttpermspath=Perms.haspermsPerms.CONFIGURE;matchStore.Path.to_string_listpathwith|[]->["socket";"domain"]|["socket"]->Hashtbl.fold(funxcacc->matchxwith|Xs_protocol.Unix_->string_of_intc.idx::acc|_->acc)by_address[]|["domain"]->Hashtbl.fold(funx_acc->matchxwith|Xs_protocol.Domainx->string_of_intx::acc|_->acc)by_address[]|"domain"::domid::rest->letaddress=Xs_protocol.Domain(int_of_stringdomid)inifnot(Hashtbl.memby_addressaddress)thenStore.Path.doesnt_existpath;letc=Hashtbl.findby_addressaddressinlist_connectiontpermscrest|"socket"::idx::rest->letidx=int_of_stringidxinifnot(Hashtbl.memby_indexidx)thenStore.Path.doesnt_existpath;letc=Hashtbl.findby_indexidxinlist_connectiontpermscrest|_->[]end