123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151(*
* Copyright (C) Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)(** Lwt connection multiplexer. Multiplexes between parallel requests from
multiple clients over a single output channel to a server that may send
responses out of order. Each request and response carries an [id] that is
used to match responses to requests. *)openLwt.InfixmoduletypeRPC=sig(** The transport mechanism used to send and receive messages *)typetransport(** Each [request_hdr] and [response_hdr] carries an [id] that is used to
match responses to requests. *)typeidtyperequest_hdrtyperequest_bodytyperesponse_hdrtyperesponse_bodyvalrecv_hdr:transport->(idoption*response_hdr)Lwt.tvalrecv_body:transport->request_hdr->response_hdr->response_body->(unit,Protocol.Error.t)resultLwt.t(** [recv_body transport request_hdr response_hdr response_body] returns [Ok ()]
and receives and writes the body of the response into [response_body] if
the request has been successful, otherwise returns an [Error]. The
[request_hdr] parameter is the output of a preceding [recv_hdr] call. *)valsend_one:transport->request_hdr->request_body->unitLwt.t(** Send a single request. Invocations of this function will not be interleaved
because they are protected by a mutex *)valid_of_request:request_hdr->idvalhandle_unrequested_packet:transport->response_hdr->unitLwt.tendmoduleMake(R:RPC):sigtypeclientvalrpc:R.request_hdr->R.request_body->R.response_body->client->(unit,Protocol.Error.t)Result.tLwt.t(** [rpc req_hdr req_body response_body client] sends a request to the server, and
saves the response into [response_body]. Will block until a response to
this request is received from the server. *)valcreate:R.transport->clientLwt.t(** [create transport] creates a new client that manages parallel requests
over the given transport channel. All communication over this channel
must go through the returned client. *)end=structexceptionUnexpected_idofR.idexceptionShutdowntypeclient={transport:R.transport;outgoing_mutex:Lwt_mutex.t;id_to_wakeup:(R.id,R.request_hdr*(unit,Protocol.Error.t)resultLwt.u*R.response_body)Hashtbl.t;mutabledispatcher_thread:unitLwt.t;mutabledispatcher_shutting_down:bool}letrecdispatchert=letth=Lwt.catch(fun()->R.recv_hdrt.transport>>=fun(id,pkt)->matchidwith|None->R.handle_unrequested_packett.transportpkt|Someid->ifnot(Hashtbl.memt.id_to_wakeupid)thenLwt.fail(Unexpected_idid)elseletrequest_hdr,waker,response_body=Hashtbl.findt.id_to_wakeupidinR.recv_bodyt.transportrequest_hdrpktresponse_body>>=funresponse->Lwt.wakeupwakerresponse;Hashtbl.removet.id_to_wakeupid;Lwt.return())(fune->t.dispatcher_shutting_down<-true;Hashtbl.iter(fun_(_,u,_)->Lwt.wakeup_later_exnue)t.id_to_wakeup;Lwt.faile)inth>>=fun()->dispatchertletrpcreq_hdrreq_bodyresponse_bodyt=letsleeper,waker=Lwt.wait()inift.dispatcher_shutting_downthenLwt.failShutdownelseletid=R.id_of_requestreq_hdrinHashtbl.addt.id_to_wakeupid(req_hdr,waker,response_body);Lwt_mutex.with_lockt.outgoing_mutex(fun()->R.send_onet.transportreq_hdrreq_body)>>=fun()->sleeperletcreatetransport=lett={transport;outgoing_mutex=Lwt_mutex.create();id_to_wakeup=Hashtbl.create10;dispatcher_thread=Lwt.return();dispatcher_shutting_down=false}int.dispatcher_thread<-dispatchert;Lwt.returntend