123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193moduleMake(T:Deferred.T)=structopenTopenDeferred.InfixexceptionRetrytype'at={socket:'aZmq.Socket.t;fd:Fd.t;senders:(unit->unit)Queue.t;receivers:(unit->unit)Queue.t;condition:unitCondition.t;fd_condition:unitCondition.t;mutableclosing:bool;}letto_string_humt=letstate=match(Zmq.Socket.eventst.socket)with|Zmq.Socket.No_event->"No_event"|Poll_in->"Poll_in"|Poll_out->"Poll_out"|Poll_in_out->"Poll_in_out"|Poll_error->"Poll_error"|exception_->"Closed"inPrintf.sprintf"State: %s, Senders #%d, Receivers #%d"state(Queue.lengtht.senders)(Queue.lengtht.receivers)(** Small process that will notify of the fd changes *)letrecfd_monitort=Condition.waitt.fd_condition>>=fun()->matcht.closingwith|true->Deferred.return()|false->beginDeferred.catch(fun()->Fd.wait_readablet.fd)>>=fun_->Condition.signalt.condition();matcht.closingwith|true->Deferred.return()|false->fd_monitortend(** The event loop repeats acting on events as long as there are
sends or receives to be processed.
According to the zmq specification, send and receive may update the event,
and the fd can only be trusted after reading the status of the socket.
*)letrecevent_loopt=matcht.closingwith|true->Deferred.return()|false->beginletopenZmq.Socketinletprocessqueue=letf=Queue.peekqueueintryf();(* Success, pop the sender *)(Queue.popqueue:unit->unit)|>ignorewith|Retry->(* If f raised EAGAIN, dont pop the message *)()inmatcheventst.socket,Queue.is_emptyt.senders,Queue.is_emptyt.receiverswith|_,true,true->Condition.waitt.condition>>=fun()->event_loopt|Poll_error,_,_->failwith"Cannot poll socket"(* Prioritize send's to keep network busy *)|Poll_in_out,false,_|Poll_out,false,_->processt.senders;event_loopt|Poll_in_out,_,false|Poll_in,_,false->processt.receivers;event_loopt|Poll_in,_,true|Poll_out,true,_|No_event,_,_->Condition.signalt.fd_condition();Condition.waitt.condition>>=fun()->event_loopt|exceptionUnix.Unix_error(Unix.ENOTSOCK,"zmq_getsockopt","")->Deferred.return()endletof_socket:'aZmq.Socket.t->'at=funsocket->letfd=Fd.create(Zmq.Socket.get_fdsocket)inlett={socket;fd;senders=Queue.create();receivers=Queue.create();condition=Condition.create();fd_condition=Condition.create();closing=false;}inDeferred.don't_wait_for(fun()->event_loopt);Deferred.don't_wait_for(fun()->fd_monitort);ttypeop=Send|Receiveletpost:_t->op->(_Zmq.Socket.t->'a)->'aDeferred.t=funtopf->letf'mailbox()=letres=matchft.socketwith|v->Okv|exceptionUnix.Unix_error(Unix.EAGAIN,_,_)->(* Signal try again *)raiseRetry|exceptionexn->ErrorexninMailbox.sendmailboxresinletqueue=matchopwith|Send->t.senders|Receive->t.receiversinletmailbox=Mailbox.create()inletshould_signal=Queue.is_emptyqueueinQueue.push(f'mailbox)queue;(* Wakeup the thread if the queue was empty *)beginmatchshould_signalwith|true->Condition.signalt.condition()|false->()end;Mailbox.recvmailbox>>=function|Okv->Deferred.returnv|Errorexn->Deferred.failexnletto_sockett=t.socketletrecvs=postsReceive(funs->Zmq.Socket.recv~block:falses)letsendsm=postsSend(funs->Zmq.Socket.send~block:falsesm)letrecv_msgs=postsReceive(funs->Zmq.Socket.recv_msg~block:falses)letsend_msgsm=postsSend(funs->Zmq.Socket.send_msg~block:falsesm)(** Recevie all message blocks. *)letrecv_alls=(* The documentaton says that either all message parts are
transmitted, or none. So once a message becomes available, all
parts can be read wothout blocking.
Also receiving a multipart message must not be interleaved with
another receving thread on the same socket.
We could have a read-mutex and a write mutex in order to limit
potential starvation of other threads while reading large
multipart messages.
*)postsReceive(funs->Zmq.Socket.recv_all~block:falses)letsend_allsparts=(* See the comment in recv_all. *)postsSend(funs->Zmq.Socket.send_all~block:falsesparts)letrecv_msg_alls=postsReceive(funs->Zmq.Socket.recv_msg_all~block:falses)letsend_msg_allsparts=postsSend(funs->Zmq.Socket.send_msg_all~block:falsesparts)letcloset=t.closing<-true;Deferred.catch(fun()->Fd.releaset.fd)>>=fun_->Condition.signalt.fd_condition();Condition.signalt.condition();Zmq.Socket.closet.socket;Deferred.return()moduleRouter=structtypeid_t=stringletid_of_stringt=tletrecvs=recv_alls>>=function|id::message->Deferred.return(id,message)|_->assertfalseletsendsidmessage=send_alls(id::message)endmoduleMonitor=structletrecvs=postsReceive(funs->Zmq.Monitor.recv~block:falses)endend