123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253moduletypeSocket=sigtype'adeferred(** An concurrent zeromq socket *)type'attype'aof_socket_args(** [of_socket s] wraps the zeromq socket [s]*)valof_socket:('aZmq.Socket.t->'at)of_socket_args(** [to_socket s] extracts the raw zeromq socket from [s] *)valto_socket:'at->'aZmq.Socket.t(** [recv socket] waits for a message on [socket] without blocking
other concurrent threads *)valrecv:'at->stringdeferred(** [send socket] sends a message on [socket] without blocking other
concurrent threads *)valsend:'at->string->unitdeferred(** [recv_all socket] waits for a multi-part message on [socket] without
blocking other concurrent threads *)valrecv_all:'at->stringlistdeferred(** [send_all socket m] sends all parts of the multi-part message [m] on
[socket] without blocking other concurrent threads *)valsend_all:'at->stringlist->unitdeferred(** [recv_msg socket] waits for a message on [socket] without blocking
other concurrent threads *)valrecv_msg:'at->Zmq.Msg.tdeferred(** [send_msg socket] sends a message on [socket] without blocking other
concurrent threads *)valsend_msg:'at->Zmq.Msg.t->unitdeferred(** [recv_msg_all socket] waits for a multi-part message on [socket] without
blocking other concurrent threads *)valrecv_msg_all:'at->Zmq.Msg.tlistdeferred(** [send_msg_all socket m] sends all parts of the multi-part message [m] on
[socket] without blocking other concurrent threads *)valsend_msg_all:'at->Zmq.Msg.tlist->unitdeferredvalclose:'at->unitdeferredmoduleRouter:sig(** Identity of a socket connected to the router. *)typeid_t(** [id_of_string s] coerces [s] into an {!id_t}. *)valid_of_string:string->id_t(** [recv socket] waits for a message on [socket] without blocking other Lwt
threads. *)valrecv:[`Router]t->(id_t*stringlist)deferred(** [send socket id message] sends [message] on [socket] to [id] without
blocking other Lwt threads. *)valsend:[`Router]t->id_t->stringlist->unitdeferredendmoduleMonitor:sig(** [recv socket] waits for a monitoring event on [socket] without blocking other concurrent threads. *)valrecv:[`Monitor]t->Zmq.Monitor.eventdeferredendendmoduleMake(T:Deferred.T)=structopenTopenDeferred.Infixtype'adeferred='aT.ttype'aof_socket_args='aexceptionRetrytype'at={socket:'aZmq.Socket.t;fd:Fd.t;senders:(unit->unit)Queue.t;receivers:(unit->unit)Queue.t;condition:unitCondition.t;fd_condition:unitCondition.t;mutableclosing:bool;}(** Small process that will notify of the fd changes *)letrecfd_monitort=Condition.waitt.fd_condition>>=fun()->matcht.closingwith|true->Deferred.return()|false->beginDeferred.catch(fun()->Fd.wait_readablet.fd)>>=fun_->Condition.signalt.condition();matcht.closingwith|true->Deferred.return()|false->fd_monitortend(** The event loop repeats acting on events as long as there are
sends or receives to be processed.
According to the zmq specification, send and receive may update the event,
and the fd can only be trusted after reading the status of the socket.
*)letrecevent_loopt=matcht.closingwith|true->Deferred.return()|false->beginletopenZmq.Socketinletprocessqueue=letf=Queue.peekqueueintryf();(* Success, pop the sender *)(Queue.popqueue:unit->unit)|>ignorewith|Retry->(* If f raised EAGAIN, dont pop the message *)()inmatcheventst.socket,Queue.is_emptyt.senders,Queue.is_emptyt.receiverswith|_,true,true->Condition.waitt.condition>>=fun()->event_loopt|Poll_error,_,_->failwith"Cannot poll socket"(* Prioritize send's to keep network busy *)|Poll_in_out,false,_|Poll_out,false,_->processt.senders;event_loopt|Poll_in_out,_,false|Poll_in,_,false->processt.receivers;event_loopt|Poll_in,_,true|Poll_out,true,_|No_event,_,_->Condition.signalt.fd_condition();Condition.waitt.condition>>=fun()->event_loopt|exceptionUnix.Unix_error(Unix.ENOTSOCK,"zmq_getsockopt","")->Deferred.return()endletof_socket:('aZmq.Socket.t->'at)of_socket_args=funsocket->letfd=Fd.create(Zmq.Socket.get_fdsocket)inlett={socket;fd;senders=Queue.create();receivers=Queue.create();condition=Condition.create();fd_condition=Condition.create();closing=false;}inDeferred.don't_wait_for(fun()->event_loopt);Deferred.don't_wait_for(fun()->fd_monitort);ttypeop=Send|Receiveletpost:_t->op->(_Zmq.Socket.t->'a)->'aDeferred.t=funtopf->letf'mailbox()=letres=matchft.socketwith|v->Okv|exceptionUnix.Unix_error(Unix.EAGAIN,_,_)->(* Signal try again *)raiseRetry|exceptionexn->ErrorexninMailbox.sendmailboxresinletqueue=matchopwith|Send->t.senders|Receive->t.receiversinletmailbox=Mailbox.create()inletshould_signal=Queue.is_emptyqueueinQueue.push(f'mailbox)queue;(* Wakeup the thread if the queue was empty *)beginmatchshould_signalwith|true->Condition.signalt.condition()|false->()end;Mailbox.recvmailbox>>=function|Okv->Deferred.returnv|Errorexn->Deferred.failexnletto_sockett=t.socketletrecvs=postsReceive(funs->Zmq.Socket.recv~block:falses)letsendsm=postsSend(funs->Zmq.Socket.send~block:falsesm)letrecv_msgs=postsReceive(funs->Zmq.Socket.recv_msg~block:falses)letsend_msgsm=postsSend(funs->Zmq.Socket.send_msg~block:falsesm)(** Recevie all message blocks. *)letrecv_alls=(* The documentaton says that either all message parts are
transmitted, or none. So once a message becomes available, all
parts can be read wothout blocking.
Also receiving a multipart message must not be interleaved with
another receving thread on the same socket.
We could have a read-mutex and a write mutex in order to limit
potential starvation of other threads while reading large
multipart messages.
*)postsReceive(funs->Zmq.Socket.recv_all~block:falses)letsend_allsparts=(* See the comment in recv_all. *)postsSend(funs->Zmq.Socket.send_all~block:falsesparts)letrecv_msg_alls=postsReceive(funs->Zmq.Socket.recv_msg_all~block:falses)letsend_msg_allsparts=postsSend(funs->Zmq.Socket.send_msg_all~block:falsesparts)letcloset=t.closing<-true;Deferred.catch(fun()->Fd.releaset.fd)>>=fun_->Condition.signalt.fd_condition();Condition.signalt.condition();Zmq.Socket.closet.socket;Deferred.return()moduleRouter=structtypeid_t=stringletid_of_stringt=tletrecvs=recv_alls>>=function|id::message->Deferred.return(id,message)|_->assertfalseletsendsidmessage=send_alls(id::message)endmoduleMonitor=structletrecvs=postsReceive(funs->Zmq.Monitor.recv~block:falses)endend