
* Copyright (C) Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)(** A multiplexing xenstore protocol client over a byte-level transport *)openXs_protocolletfinallyfg=tryletresult=f()ing();resultwithe->g();raiseeletwith_mutexmf=Mutex.lockm;finallyf(fun()->Mutex.unlockm)letfind_opthx=ifHashtbl.memhxthenSome(Hashtbl.findhx)elseNonemoduletypeIO=sigtype'at='avalreturn:'a->'atval(>>=):'at->('a->'bt)->'bttypechannelvalcreate:unit->channeltvaldestroy:channel->unittvalread:channel->bytes->int->int->inttvalwrite:channel->bytes->int->int->unittendmoduleStringSet=Xs_handle.StringSetexceptionWatch_overflowmoduleWatcher=struct(** Someone who is watching paths is represented by one of these: *)typet={mutablepaths:StringSet.t;(* we never care about events or ordering, only paths *)mutablecancelling:bool;(* we need to stop watching and clean up *)c:Condition.t;m:Mutex.t;}letmake()={paths=StringSet.empty;cancelling=false;c=Condition.create();m=Mutex.create();}(** Register that a watched path has been changed *)letput(x:t)path=with_mutexx.m(fun()->x.paths<-StringSet.addpathx.paths;Condition.signalx.c)(** Return a set of modified paths, or an empty set if we're cancelling *)letget(x:t)=with_mutexx.m(fun()->whilex.paths=StringSet.empty&¬x.cancellingdoCondition.waitx.cx.mdone;letresults=x.pathsinx.paths<-StringSet.empty;results)(** Called to shutdown the watcher and trigger an orderly cleanup *)letcancel(x:t)=with_mutexx.m(fun()->x.cancelling<-true;Condition.signalx.c)endexceptionMalformed_watch_eventexceptionUnexpected_ridofint32exceptionDispatcher_failedexceptionCancelledmoduleTask=structtype'au={mutablething:'aoption;mutablecancelling:bool;mutableon_cancel:unit->unit;m:Mutex.t;c:Condition.t}letmake()={thing=None;cancelling=false;on_cancel=(fun()->());m=Mutex.create();c=Condition.create();}letwakeuputhing=with_mutexu.m(fun()->u.thing<-Something;Condition.signalu.c)leton_canceluon_cancel=u.on_cancel<-on_cancelletcancelu=with_mutexu.m(fun()->u.cancelling<-true;Condition.signalu.c);u.on_cancel()letwaitu=with_mutexu.m(fun()->letrecloop()=ifu.cancellingthenraiseCancelledelsematchu.thingwith|None->Condition.waitu.cu.m;loop()|Something->thinginloop())endtypewatch_callback=string*string->unitletauto_watch_prefix="auto:"letstartswithprefixx=letprefix'=String.lengthprefixandx'=String.lengthxinx'>=prefix'&&(String.subx0prefix')=prefixmoduleClient=functor(IO:IOwithtype'at='a)->structmodulePS=PacketStream(IO)letlogger=ref(funs->let_:string=sin())leterrorfmt=Printf.ksprintf!loggerfmtletset_loggerf=logger:=f(* Represents a single acive connection to a server *)typeclient={transport:IO.channel;ps:PS.stream;rid_to_wakeup:(int32,Xs_protocol.tTask.u)Hashtbl.t;mutabledispatcher_thread:Thread.toption;mutabledispatcher_shutting_down:bool;mutablewatch_callback_thread:Thread.toption;watchevents:(string,Watcher.t)Hashtbl.t;incoming_watches:(string*string)Queue.t;queue_overflowed:boolref;incoming_watches_m:Mutex.t;incoming_watches_c:Condition.t;mutableextra_watch_callback:((string*string)->unit);m:Mutex.t;}typehandle=clientXs_handle.tletrecv_onet=match(PS.recvt.ps)with|Okx->x|Exceptione->raiseeletsend_onet=PS.sendt.pslethandle_exnte=error"Caught: %s\n%!"(Printexc.to_stringe);beginmatchewith|Xs_protocol.Response_parser_failed_->(* Lwt_io.hexdump Lwt_io.stderr x *)()|_->()end;t.dispatcher_shutting_down<-true;raiseeletenqueue_watchtevent=with_mutext.incoming_watches_m(fun()->ifQueue.lengtht.incoming_watches=65536thent.queue_overflowed:=trueelseQueue.pusheventt.incoming_watches;Condition.signalt.incoming_watches_c)letrecdispatchert=letpkt=tryrecv_onetwithe->handle_exnteinmatchget_typktwith|Op.Watchevent->beginmatchUnmarshal.listpktwith|Some[path;token]->(* All 'extra' non-automatic watches are passed to the extra_watch_callback.
Note this can include old watches which were still queued in
the server when an 'unwatch' is received. *)letw=with_mutext.m(fun()->find_optt.watcheventstoken)inbeginmatchwwith|Somew->Watcher.putwpath|None->ifnot(startswithauto_watch_prefixtoken)thenenqueue_watcht(path,token)end;|_->handle_exntMalformed_watch_eventend;dispatchert|_->letrid=get_ridpktinletu=with_mutext.m(fun()->find_optt.rid_to_wakeuprid)inbeginmatchuwith|Someu->Task.wakeupupkt|None->error"Unexpected rid: %ld\n%!"ridend;dispatchertletdequeue_watchest=whiletruedotryletevent=with_mutext.incoming_watches_m(fun()->whileQueue.is_emptyt.incoming_watches&¬(!(t.queue_overflowed))doCondition.waitt.incoming_watches_ct.incoming_watches_mdone;if!(t.queue_overflowed)thenbeginraiseWatch_overflow;end;Queue.popt.incoming_watches)inlet()=t.extra_watch_callbackeventin()with|Watch_overflowase->error"Caught watch_overflow. Not retrying.";raisee|e->error"Caught '%s' while dequeuing watches. Ignoring.\n%!"(Printexc.to_stringe);doneletmake()=lettransport=IO.create()inlett={transport=transport;ps=PS.maketransport;rid_to_wakeup=Hashtbl.create10;dispatcher_thread=None;dispatcher_shutting_down=false;watch_callback_thread=None;watchevents=Hashtbl.create10;incoming_watches=Queue.create();queue_overflowed=reffalse;incoming_watches_m=Mutex.create();incoming_watches_c=Condition.create();extra_watch_callback=(fun_->());m=Mutex.create();}int.dispatcher_thread<-Some(Thread.createdispatchert);t.watch_callback_thread<-Some(Thread.createdequeue_watchest);tletset_watch_callbackclientcb=client.extra_watch_callback<-cbletmake_rid=letcounter=ref0linletm=Mutex.create()infun()->with_mutexm(fun()->letresult=!counterincounter:=Int32.succ!counter;result)letrpchinthpayloadunmarshal=letopenXs_handleinletrid=make_rid()inletrequest=Request.printpayload(get_tidh)ridinlett=Task.make()inletc=get_clienthinifc.dispatcher_shutting_downthenraiseDispatcher_failedelsebeginwith_mutexc.m(fun()->Hashtbl.addc.rid_to_wakeupridt);send_onecrequest;letres=Task.waittinwith_mutexc.m(fun()->Hashtbl.removec.rid_to_wakeuprid);responsehintrequestresunmarshalendletdirectoryhpath=rpc"directory"(Xs_handle.accessed_pathhpath)Request.(PathOp(path,Directory))Unmarshal.listletreadhpath=rpc"read"(Xs_handle.accessed_pathhpath)Request.(PathOp(path,Read))Unmarshal.stringletwritehpathdata=rpc"write"(Xs_handle.accessed_pathhpath)Request.(PathOp(path,Writedata))Unmarshal.okletrmhpath=rpc"rm"(Xs_handle.accessed_pathhpath)Request.(PathOp(path,Rm))Unmarshal.okletmkdirhpath=rpc"mkdir"(Xs_handle.accessed_pathhpath)Request.(PathOp(path,Mkdir))Unmarshal.okletsetpermshpathacl=rpc"setperms"(Xs_handle.accessed_pathhpath)Request.(PathOp(path,Setpermsacl))Unmarshal.okletdebughcmd_args=rpc"debug"h(Request.Debugcmd_args)Unmarshal.listletrestricthdomid=rpc"restrict"h(Request.Restrictdomid)Unmarshal.okletgetdomainpathhdomid=rpc"getdomainpath"h(Request.Getdomainpathdomid)Unmarshal.stringletwatchhpathtoken=rpc"watch"(Xs_handle.watchhpath)(Request.Watch(path,token))Unmarshal.okletunwatchhpathtoken=rpc"unwatch"(Xs_handle.unwatchhpath)(Request.Unwatch(path,token))Unmarshal.okletintroducehdomidstore_mfnstore_port=rpc"introduce"h(Request.Introduce(domid,store_mfn,store_port))Unmarshal.okletset_targethstubdom_domiddomid=rpc"set_target"h(Request.Set_target(stubdom_domid,domid))Unmarshal.okletimmediateclientf=f(Xs_handle.no_transactionclient)letcounter=ref0lletwaitclientf=letopenStringSetincounter:=Int32.succ!counter;lettoken=Printf.sprintf"%s%ld"auto_watch_prefix!counterin(* When we register the 'watcher', the dispatcher thread will signal us when
watches arrive. *)letwatcher=Watcher.make()inwith_mutexclient.m(fun()->Hashtbl.addclient.watcheventstokenwatcher);(* We signal the caller via this cancellable task: *)lett=Task.make()inTask.on_cancelt(fun()->(* Trigger an orderly cleanup in the background: *)Watcher.cancelwatcher);leth=Xs_handle.watchingclientin(* Adjust the paths we're watching (if necessary) and block (if possible) *)letadjust_paths()=letcurrent_paths=Xs_handle.get_watched_pathshin(* Paths which weren't read don't need to be watched: *)letold_paths=diffcurrent_paths(Xs_handle.get_accessed_pathsh)inList.iter(funp->unwatchhptoken)(elementsold_paths);(* Paths which were read do need to be watched: *)letnew_paths=diff(Xs_handle.get_accessed_pathsh)current_pathsinList.iter(funp->watchhptoken)(elementsnew_paths);(* If we're watching the correct set of paths already then just block *)ifold_paths=empty&&(new_paths=empty)thenbeginletresults=Watcher.getwatcherin(* an empty results set means we've been cancelled: trigger cleanup *)ifresults=emptythenraise(Failure"goodnight")endin(* Main client loop: *)letrecloop()=letfinished=tryletresult=fhinTask.wakeuptresult;truewithEagain->falseinifnotfinishedthenbeginadjust_paths();loop()endinlet(_:Thread.t)=Thread.create(fun()->finallyloop(fun()->letcurrent_paths=Xs_handle.get_watched_pathshinList.iter(funp->unwatchhptoken)(elementscurrent_paths);with_mutexclient.m(fun()->Hashtbl.removeclient.watcheventstoken);))()intlet_transaction_leave_openclientf=lettid=rpc"transaction_start"(Xs_handle.no_transactionclient)Request.Transaction_startUnmarshal.int32inleth=Xs_handle.transactionclienttidinletresult=fhin(h,result)let_commithresult=letres'=rpc"transaction_end"h(Request.Transaction_endtrue)Unmarshal.stringinifres'="OK"thenresultelseraise(Error(Printf.sprintf"Unexpected transaction result: %s"res'))lettransaction_one_tryclientf=let(h,result)=_transaction_leave_openclientfin_commithresultletrectransaction_attemptsattemptsclientf=let(h,result)=_transaction_leave_openclientfintry_commithresultwithEagainwhen(attempts>1)->transaction_attempts(attempts-1)clientf(** Deprecated: retries for ever on repeated Eagain *)letrectransactionclientf=let(h,result)=_transaction_leave_openclientfintry_commithresultwithEagain->transactionclientfend