123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106(*
* Copyright (C) Citrix Systems Inc.
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)openSexplib.Std(* FIXME: This should probably be pushed into xen-evtchn *)openLwttypeport=int[@@derivingsexp_of]letport_of_stringx=`Ok(int_of_stringx)letstring_of_port=string_of_intletnext_port=ref0typeevent=int[@@derivingsexp_of]letinitial=0moduleLwt_condition=structincludeLwt_conditiontype_t=unit[@@derivingsexp]letsexp_of_t__=sexp_of__t()endtypestate=|Unbound|Closed|ConnectedToofchannelandchannel={mutableevents:event;(* incremented on send *)c:unitLwt_condition.t;mutablestate:state;port:port;}[@@derivingsexp_of]letcreate()=letport=!next_portinincrnext_port;letevents=initialinletc=Lwt_condition.create()inletstate=Unboundin{events;c;state;port}letrecrecvchannelevent=ifchannel.events>eventthenreturnchannel.eventselseLwt_condition.waitchannel.c>>=fun()->recvchanneleventletsendchannel=matchchannel.statewith|Unbound->(* This should never happen. This means there must be
a protocol bug. *)failwith"send: channel is unbound"|Closed->(* This will happen when signalling the other side of a
connection to shutdown. It does not indicate a bug. *)()|ConnectedTootherend->otherend.events<-otherend.events+1;Lwt_condition.broadcastotherend.c()moduleIntMap=Map.Make(structtypet=intletcompare(a:int)(b:int)=compareabend)letlistening=refIntMap.emptyletlisten_=lett=create()inlistening:=IntMap.addt.portt!listening;t.port,tletnr_connected=ref0letassert_cleaned_up()=if!nr_connected<>0thenfailwith(Printf.sprintf"%d event channels are still connected"!nr_connected);if!listening<>IntMap.emptythenfailwith(Printf.sprintf"%d"(IntMap.cardinal!listening))letconnect_port=letother=IntMap.findport!listeninginlistening:=IntMap.removeport!listening;letthis=create()inthis.state<-ConnectedToother;other.state<-ConnectedTothis;incrnr_connected;thisletcloset=matcht.statewith|ConnectedToother->other.state<-Closed;t.state<-Closed;decrnr_connected|_->()