
* Copyright (C) Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)openXs_protocolopenJunklet(|>)ab=baletdebugfmt=Logging.debug"call"fmtleterrorfmt=Logging.error"call"fmtexceptionParse_failureexceptionTransaction_againexceptionTransaction_nestedletget_namespace_implementationpath=matchStore.Path.to_string_listpathwith|"tool"::"xenstored"::"quota"::rest->Store.Path.of_string_listrest,(moduleQuota_interface:Namespace.IO)|"tool"::"xenstored"::"connection"::rest->Store.Path.of_string_listrest,(moduleConnection.Interface:Namespace.IO)|"tool"::"xenstored"::"log"::rest->Store.Path.of_string_listrest,(moduleLogging_interface:Namespace.IO)|"tool"::"xenstored"::"memory"::rest->Store.Path.of_string_listrest,(moduleHeap_debug_interface:Namespace.IO)|_->path,(moduleTransaction:Namespace.IO)(* Perform a 'simple' operation (not a Transaction_start or Transaction_end)
and create a response. *)letop_exn_storect(payload:Request.payload):Response.payload=letconnection_path=c.Connection.domainpathinletresolvedata=Store.Path.createdataconnection_pathinletopenRequestinmatchpayloadwith|Transaction_start|Transaction_end_|Watch(_,_)|Unwatch(_,_)|Debug_|Introduce(_,_,_)|Resume(_)|Release(_)|Set_target(_,_)|Restrict_|Isintroduced_|Error_|Watchevent_->assertfalse|Getdomainpathdomid->letv=Store.Path.getdomainpathdomid|>Store.Path.to_stringinResponse.Getdomainpathv|PathOp(path,op)->letpath=resolvepathinletpath,m=get_namespace_implementationpathinletmoduleImpl=(valm:Namespace.IO)inletmkdir_ptcreatorpermpath=letdirname=Store.Path.get_parentpathinifnot(Impl.existstpermdirname)then(letreccheck_pathp=matchpwith|[]->[]|h::l->ifImpl.existstpermhthencheck_pathlelsepinletret=check_path(List.tl(Store.Path.get_hierarchydirname))inList.iter(funs->Impl.mkdir~with_watch:falsetcreatorperms)ret)inbeginmatchopwith|Read->letv=Impl.readtc.Connection.permpathinResponse.Readv|Directory->letentries=Impl.listtc.Connection.permpathinResponse.Directoryentries|Getperms->letv=Impl.getpermstc.Connection.permpathinResponse.Getpermsv|Writevalue->mkdir_ptc.Connection.domidc.Connection.permpath;Impl.writetc.Connection.domidc.Connection.permpathvalue;Response.Write|Mkdir->mkdir_ptc.Connection.domidc.Connection.permpath;Impl.mkdirtc.Connection.domidc.Connection.permpath;Response.Mkdir|Rm->Impl.rmtc.Connection.permpath;Response.Rm|Setpermsperms->Impl.setpermstc.Connection.permpathperms;Response.Setpermsend(* Replay a stored transaction against a fresh store, check the responses are
all equivalent: if so, commit the transaction. Otherwise send the abort to
the client. *)lettransaction_replaystorect=letops=Transaction.get_operationstinlettid=Connection.register_transactioncstoreinlett=Transaction.maketidstoreinletcon="replay request:"^c.Connection.domstrinletperform_exn(request,response)=Logging.request~tid~con:("replay request:"^c.Connection.domstr)request;Logging.response~tid~con:("replay reply1: "^c.Connection.domstr)response;letresponse'=op_exnstorectrequestinLogging.response~tid~con:("replay reply2: "^c.Connection.domstr)response';Logging.response~tid~conresponse';ifresponse<>response'thenbeginraiseTransaction_againendintryLogging.start_transaction~con~tid;List.iterperform_exnops;Logging.end_transaction~tid~con;Transaction.commit~contwithe->error"transaction_replay caught: %s"(Printexc.to_stringe);falseletreply_exnstorec(request:t):Response.payload=lettid=get_tidrequestinlett=iftid=Transaction.nonethenTransaction.maketidstoreelseConnection.get_transactionctidinletpayload:Xs_protocol.Request.payload=matchXs_protocol.Request.parse(request:t)with|None->error"Failed to parse request: got %s"(hexify(Bytes.to_string@@Xs_protocol.to_bytesrequest));raiseParse_failure|Somex->xinLogging.request~tid~con:c.Connection.domstrpayload;letresponse_payload=matchpayloadwith|Request.Transaction_start->iftid<>Transaction.nonethenraiseTransaction_nested;lettid=Connection.register_transactioncstoreinResponse.Transaction_starttid|Request.Transaction_endcommit->Connection.unregister_transactionctid;ifcommitthenbeginLogging.end_transaction~tid~con:c.Connection.domstr;iftrue&¬(Transaction.commit~con:c.Connection.domstrt)&¬(transaction_replaystorect)thenraiseTransaction_again;Transaction.get_pathst|>List.rev|>List.iterConnection.fire;Response.Transaction_endendelsebegin(* Don't log an explicit abort *)Response.Transaction_endend|Request.Watch(path,token)->letwatch=Connection.add_watchc(Store.Name.of_stringpath)tokeninConnection.fire_oneNonewatch;Response.Watch|Request.Unwatch(path,token)->Connection.del_watchc(Store.Name.of_stringpath)token;Response.Unwatch|Request.Debugcmd->Perms.hasc.Connection.permPerms.DEBUG;Response.Debug(trymatchcmdwith|"print"::msg::_->Logging.debug_print~tid:0l~con:c.Connection.domstrmsg;[]|_->[]with_->[])|Request.Introduce(domid,mfn,remote_port)->Perms.hasc.Connection.permPerms.INTRODUCE;Introduce.(introduce{domid=domid;mfn=mfn;remote_port=remote_port});Connection.fire(Xs_protocol.Op.Write,Store.Name.introduceDomain);Response.Introduce|Request.Resume_->Perms.hasc.Connection.permPerms.RESUME;(* register domain *)Response.Resume|Request.Release_->Perms.hasc.Connection.permPerms.RELEASE;(* unregister domain *)Connection.fire(Xs_protocol.Op.Write,Store.Name.releaseDomain);Response.Release|Request.Set_target(mine,yours)->Perms.hasc.Connection.permPerms.SET_TARGET;Hashtbl.iter(funaddressc->ifXs_protocol.domain_of_addressaddress=minethenc.Connection.perm<-Perms.set_targetc.Connection.permyours;)Connection.by_address;Response.Set_target|Request.Restrictdomid->Perms.hasc.Connection.permPerms.RESTRICT;c.Connection.perm<-Perms.restrictc.Connection.permdomid;Response.Restrict|Request.Isintroduced_->Perms.hasc.Connection.permPerms.ISINTRODUCED;Response.Isintroducedfalse|Request.Errormsg->error"client sent us an error: %s"(hexifymsg);raiseParse_failure|Request.Watcheventmsg->error"client sent us a watch event: %s"(hexifymsg);raiseParse_failure|op->letreply=op_exnstorectopiniftid<>Transaction.nonethenTransaction.add_operationtopreply;replyiniftid=Transaction.nonethenTransaction.get_pathst|>List.rev|>List.iterConnection.fire;response_payloadletgcstore=ifSymbol.created()>1000||Symbol.used()>20000thenbegindebug"Started symbol GC";Symbol.mark_all_as_unused();Store.mark_symbolsstore;Hashtbl.iter(fun_c->Connection.mark_symbolsc)Connection.by_address;Symbol.garbage()endletreplystorecrequest=gcstore;c.Connection.stat_nb_ops<-c.Connection.stat_nb_ops+1;lettid=get_tidrequestinletrid=get_ridrequestinletresponse_payload,info=tryreply_exnstorecrequest,Nonewithe->letdefault=Some(Printexc.to_stringe)inletreplycode=Response.Errorcodeinbeginmatchewith|Store.Already_existsp->reply"EEXIST",Somep|Store.Path.Doesnt_existp->reply"ENOENT",Somep|Perms.Permission_denied->reply"EACCES",default|Not_found->reply"ENOENT",default|Parse_failure->reply"EINVAL",default|Invalid_argumenti->reply"EINVAL",Somei|Transaction_again->reply"EAGAIN",default|Transaction_nested->reply"EBUSY",default|Quota.Limit_reached->reply"EQUOTA",default|Quota.Data_too_big->reply"E2BIG",default|Quota.Transaction_opened->reply"EQUOTA",default|Failure_->reply"EINVAL",default|Namespace.Unsupported->reply"ENOTSUP",default|_->reply"EIO",defaultendinLogging.response~tid~con:c.Connection.domstr?inforesponse_payload;Response.printresponse_payloadtidrid