123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120(*
* Copyright (C) Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)openLwtopenXs_protocollet(|>)ab=balet(++)fgx=f(gx)letdebugfmt=Logging.debug"xs_server"fmtleterrorfmt=Logging.error"xs_server"fmtletstore=letstore=Store.create()inList.iter(funpath->letp=Store.Path.createpath(Store.Path.getdomainpath0)inifnot(Store.existsstorep)thenStore.mkdirstore0(Perms.of_domain0)p)["/local";"/local/domain";"/tool";"/tool/xenstored";"/tool/xenstored/quota";"/tool/xenstored/connection";"/tool/xenstored/log";"/tool/xenstored/memory"];storemoduletypeTRANSPORT=sigtype'at='aLwt.tvalreturn:'a->'aLwt.tval(>>=):'at->('a->'bLwt.t)->'bLwt.ttypeservervallisten:unit->serverLwt.ttypechannelvalread:channel->bytes->int->int->intLwt.tvalwrite:channel->bytes->int->int->unitLwt.tvaldestroy:channel->unitLwt.tvaladdress_of:channel->Xs_protocol.addressLwt.tvalnamespace_of:channel->(moduleNamespace.IO)optionvalaccept_forever:server->(channel->unitLwt.t)->'aLwt.tendmoduleServer=functor(T:TRANSPORT)->structmodulePS=PacketStream(T)lethandle_connectiont=T.address_oft>>=funaddress->letinterface=T.namespace_oftinletc=Connection.createaddressinterfaceinletchannel=PS.maketinletm=Lwt_mutex.create()inlettake_watch_events()=letq=List.rev(Queue.fold(funaccx->x::acc)[]c.Connection.watch_events)inQueue.clearc.Connection.watch_events;qinletflush_watch_eventsq=Lwt_list.iter_s(fun(path,token)->PS.sendchannel(Xs_protocol.(Response.(print(Watchevent(path,token))0l0l))))qinlet(background_watch_event_flusher:unitLwt.t)=letrecforever()=Lwt_mutex.with_lockm(fun()->letrecloop()=ifQueue.lengthc.Connection.watch_events=0thenbeginLwt_condition.wait~mutex:mc.Connection.cvar>>=fun()->loop()endelsereturn()inloop()>>=fun()->flush_watch_events(take_watch_events()))>>=fun()->forever()inforever()inLwt.catch(fun()->letrecforever()=(PS.recvchannel>>=function|Okx->returnx|Exceptione->Lwt.faile)>>=funrequest->letevents=take_watch_events()inletreply=Call.replystorecrequestinLwt_mutex.with_lockm(fun()->flush_watch_eventsevents>>=fun()->PS.sendchannelreply)>>=fun()->forever()inforever()>>=fun()->T.destroyt)(fun_->Lwt.cancelbackground_watch_event_flusher;Connection.destroyaddress;T.destroyt)letserve_forever()=Parser.allow_oversize_packets:=false;T.listen()>>=funserver->T.accept_foreverserverhandle_connectionend