123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* Copyright (c) 2020-2021 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)moduleEvents=P2p_events.P2p_io_schedulerletalpha=0.2moduletypeIO=sigvalname:stringtypein_paramtypedatavallength:data->intvalpop:in_param->datatzresultLwt.ttypeout_paramvalpush:out_param->data->unittzresultLwt.tvalclose:out_param->errorlist->unitLwt.tendmoduleScheduler(IO:IO)=struct(* The IO scheduler schedules the bandwidth usage of a unidirectional
connection.
It is a worker with two queues: a high priority queue and a low
priority queue.
A quota of IO is given to each attached connections for a time
window (currently 1 sec.).
The quota is the average quantity of IO done by the attached
connections in the previous time window.
Those which have not exhausted their quota are allowed to
perform an IO first (they are placed in the high priority queue).
If all connections have exhausted their quota, the low priority
queue is served.
A quota can also be set for the whole set of connections.
The implementation of the scheduler is highly dependent on the
implementation of the moving average worker. *)(* Two labels or constructors of the same name are defined in two mutually
recursive types: fields canceler, counter and quota. *)[@@@ocaml.warning"-30"]typet={ma_state:Moving_average.state;canceler:Lwt_canceler.t;mutableworker:unitLwt.t;counter:Moving_average.t;max_speed:intoption;mutablequota:int;quota_updated:unitLwt_condition.t;readys:unitLwt_condition.t;readys_high:(connection*IO.datatzresult)Queue.t;readys_low:(connection*IO.datatzresult)Queue.t;}andconnection={id:int;mutableclosed:bool;canceler:Lwt_canceler.t;in_param:IO.in_param;out_param:IO.out_param;mutablecurrent_pop:IO.datatzresultLwt.t;mutablecurrent_push:unittzresultLwt.t;counter:Moving_average.t;mutablequota:int;}[@@@ocaml.warning"+30"](* if the connection is not already closed,
- mark it closed
- call the closer on the out_param
- cancel its canceler *)letcancel(conn:connection)err=letopenLwt_syntaxinifconn.closedthenLwt.return_unitelselet*()=Events.(emitconnection_closed)("cancel",conn.id,IO.name)inconn.closed<-true;let*()=Unit.catch_s(fun()->IO.closeconn.out_paramerr)inError_monad.cancel_with_exceptionsconn.canceler(* [waiter] is an asynchronous thread that triggers an IO and then
put back the [conn] in the queue for further IO treatment.
Once an IO has been treated (current pop and push are resolved),
the [conn] and its last pop result are pushed either in the high
priority queue if [conn] has not exhausted its quota or in the
low priority queue otherwise.
If both scheduler's queues are empty, the scheduler is
notified. *)letwaiterstconn=assert(Lwt.stateconn.current_pop<>Sleep);conn.current_pop<-IO.popconn.in_param;Lwt.dont_wait(fun()->letopenLwt_syntaxin(* To ensure that there is no concurrent calls to IO.pop, we
wait for the promise to be fulfilled. *)let*res=conn.current_popinlet*_=conn.current_pushinletwas_empty=Queue.is_emptyst.readys_high&&Queue.is_emptyst.readys_lowinifconn.quota>0thenQueue.push(conn,res)st.readys_highelseQueue.push(conn,res)st.readys_low;ifwas_emptythenLwt_condition.broadcastst.readys();Lwt.return_unit)(funexc->Format.eprintf"Uncaught exception: %s\n%!"(Printexc.to_stringexc))(* Wait for a connection to be available, with data in one of the
queues. *)letwait_datast=letis_empty=Queue.is_emptyst.readys_high&&Queue.is_emptyst.readys_lowinifis_emptythenLwt_condition.waitst.readyselseLwt.return_unit(* Check if the global quota has been reached.
If so, wait until the moving average worker updates the quota.
Quota is ignored if no max speed is set. *)letcheck_quotast=ifst.max_speed<>None&&st.quota<0thenletopenLwt_syntaxinlet*()=Events.(emitwait_quota)IO.nameinLwt_condition.waitst.quota_updatedelseLwt.pause()(* Main worker loop:
- Check that the global IO limit is not reach (or wait)
- wait for available data
- take the first connection [conn] ready in the queues (looking
at high priority queue first). Connections comes with the chunk
of bytes ready in their input.
- push its chunk of bytes in its output pipe
- add the bytes count to the global counter
- remove the bytes count from the global quota
- add the bytes count to the [conn]'s counter
- remove the bytes count from the [conn]'s quota
- Call [waiter] to trigger an asynchronous "perform IO-enqueue
conn" on [conn]
During the loop, if an IO result is an error, the associated
connection is canceled
The loop is stopped on cancellation of st.canceler
Warning: It can be stuck, pending forever before reaching the
canceler if the global quota has been surpassed and the
moving_average worker is stopped beforehand.
Implicit assumption: the quota of the scheduler and the
connections are updated asynchronously by the moving_average
worker. *)letrecworker_loopst=letopenLwt_syntaxinlet*()=check_quotastinlet*()=Events.(emitwait)IO.nameinlet*()=Lwt.pick[Lwt_canceler.when_cancelingst.canceler;wait_datast]inifLwt_canceler.canceledst.cancelerthenLwt.return_unitelseletprio,(conn,msg)=ifnot(Queue.is_emptyst.readys_high)then(true,Queue.popst.readys_high)else(false,Queue.popst.readys_low)inmatchmsgwith|Error(Canceled::_)->worker_loopst|Error(P2p_errors.Connection_closed::_aserr)|Error(ExnLwt_pipe.Closed::_aserr)->let*()=Events.(emitconnection_closed)("pop",conn.id,IO.name)inlet*()=cancelconnerrinworker_loopst|Errorerr->let*()=Events.(emitunexpected_error)("pop",conn.id,IO.name,err)inlet*()=cancelconnerrinworker_loopst|Okmsg->conn.current_push<-(let*r=IO.pushconn.out_parammsginmatchrwith|Ok()|Error(Canceled::_)->return_ok_unit|Error(P2p_errors.Connection_closed::_aserr)|Error(ExnLwt_pipe.Closed::_aserr)->let*()=Events.(emitconnection_closed)("push",conn.id,IO.name)inlet*()=cancelconnerrinreturn_ok_unit|Errorerr->let*()=Events.(emitunexpected_error)("push",conn.id,IO.name,err)inlet*()=cancelconnerrinreturn_errorerr);letlen=IO.lengthmsginlet*()=Events.(emithandle_connection)(len,conn.id,IO.name)inMoving_average.addst.counterlen;st.quota<-st.quota-len;Moving_average.addconn.counterlen;ifpriothenconn.quota<-conn.quota-len;waiterstconn;worker_loopst(* Create an IO scheduler over a moving average state and optional
maximum speed. *)letcreatema_statemax_speed=letst={ma_state;canceler=Lwt_canceler.create();worker=Lwt.return_unit;counter=Moving_average.createma_state~init:0~alpha;max_speed;(* if max_speed is None the quota will be ignored anyway *)quota=Option.value~default:0max_speed;quota_updated=Lwt_condition.create();readys=Lwt_condition.create();readys_high=Queue.create();readys_low=Queue.create();}inst.worker<-Lwt_utils.workerIO.name~on_event:Internal_event.Lwt_worker_logger.on_event~run:(fun()->worker_loopst)~cancel:(fun()->Error_monad.cancel_with_exceptionsst.canceler);st(* Scheduled connection. *)letcreate_connectionstin_paramout_paramcancelerid=Events.(emit__dont_wait__use_with_carecreate_connection(id,IO.name));letconn={id;closed=false;canceler;in_param;out_param;current_pop=Lwt.failNot_found(* dummy *);current_push=Lwt_result_syntax.return_unit;counter=Moving_average.createst.ma_state~init:0~alpha;quota=0;}inwaiterstconn;conn(* Updating the global quota of the set of scheduled connections.
If a max_speed is set, the new time frame's quota is max_speed
unless previous window consumed more than its quota, in which
case the excessive consumption is deducted to the max quota.
The low priority queue is scanned for connections that deserve to
be moved to the high priority queue. *)letupdate_quotast=Events.(emit__dont_wait__use_with_careupdate_quotaIO.name);Option.iter(funmax_speed->st.quota<-minst.quota0+max_speed;Lwt_condition.broadcastst.quota_updated())st.max_speed;ifnot(Queue.is_emptyst.readys_low)then(lettmp=Queue.create()inQueue.iter(fun(((conn:connection),_)asmsg)->ifconn.quota>0thenQueue.pushmsgst.readys_highelseQueue.pushmsgtmp)st.readys_low;Queue.clearst.readys_low;Queue.transfertmpst.readys_low)(* On shutdown, cancel the scheduler canceler and wait for the
worker to terminate.
The canceler does not have attached callback. *)letshutdownst=letopenLwt_syntaxinlet*()=Error_monad.cancel_with_exceptionsst.cancelerinlet*()=st.workerinEvents.(emitshutdown)IO.nameendmoduleReadIO=struct(* The ReaderScheduler schedules the connections reads.
Data are popped for the fd by chunks of max_len and pushed into a
pipe.
As the quota update is set to 1 sec., the max_len should be
lesser than what is expected to be used by one connection in such
a time frame. Otherwise, the bandwidth usage will not be
regular.*)letname="io_scheduler(read)"typein_param={fd:P2p_fd.t;(* File descriptor from which data are read *)maxlen:int;(* Length of data we want to read from the file descriptor *)read_buffer:Circular_buffer.t;(* Cache where data will be stored *)}typedata=Circular_buffer.dataletlength=Circular_buffer.length(* [pop] at most [maxlen] data from the given [fd] and write them in
the circular buffer [read_buffer].
Invariant: Given a connection, there is not concurrent call to
pop. *)letpop{fd;maxlen;read_buffer}=Error_monad.catch_es(fun()->letopenLwt_result_syntaxinlet*!data_result=Circular_buffer.write~maxlen~fill_using:(P2p_fd.readfd)read_bufferinmatchdata_resultwith|Okdata->ifCircular_buffer.lengthdata=0thentzfailP2p_errors.Connection_closedelsereturndata|Error(`Connection_lost_|`Connection_closed_by_peer|`Connection_locally_closed)->tzfailP2p_errors.Connection_closed|Error(`Unexpected_error_when_closing_|`Unexpected_error_)->tzfailP2p_errors.Connection_error)typeout_param=Circular_buffer.datatzresultLwt_pipe.Maybe_bounded.t(* [push] data to the pipe, feeding the application's data consumer. *)letpushpmsg=Error_monad.catch_s(fun()->Lwt_pipe.Maybe_bounded.pushp(Okmsg))(* on [close] we push the given [err] toward the data consumer. *)letcloseperr=Unit.catch_s(fun()->Lwt_pipe.Maybe_bounded.pushp(Errorerr))endmoduleReadScheduler=Scheduler(ReadIO)moduleWriteIO=struct(* The WriteScheduler schedules the connections writes.
Data are popped from a pipe fed by the application and pushed
into the fd.
Nothing here takes care of dividing the data by chunk. The
component user should take care of sending small enough data
chunks to avoid irregular bandwidth usage. *)letname="io_scheduler(write)"typein_param=Bytes.tLwt_pipe.Maybe_bounded.ttypedata=Bytes.tletlength=Bytes.length(* [pop] bytes to be sent from the queue. *)letpopp=Lwt.catch(fun()->Lwt_result.ok@@Lwt_pipe.Maybe_bounded.popp)(function|Lwt_pipe.Closed->fail_with_exnLwt_pipe.Closed|_->assertfalse)typeout_param=P2p_fd.t(* [push] bytes in the network. *)letpushfdbuf=letopenLwt_result_syntaxinlet*!r=P2p_fd.writefdbufinmatchrwith|Ok()->return_unit|Error(`Connection_closed_by_peer|`Connection_lost_|`Connection_locally_closed|`Unexpected_error_when_closing_|`Unexpected_errorLwt.Canceled)->tzfailP2p_errors.Connection_closed|Error(`Unexpected_errorex)->fail_with_exnex(* [close] does nothing, it will still be possible to push values to
the network. *)letclose_p_err=Lwt.return_unitendmoduleWriteScheduler=Scheduler(WriteIO)(* Type of a bidirectional scheduled connection *)typeconnection={fd:P2p_fd.t;canceler:Lwt_canceler.t;readable:P2p_buffer_reader.readable;read_conn:ReadScheduler.connection;write_conn:WriteScheduler.connection;write_queue:Bytes.tLwt_pipe.Maybe_bounded.t;remove_from_connection_table:unit->unit;}letto_readableconnection=connection.readabletypet={mutableclosed:bool;ma_state:Moving_average.state;connected:connectionP2p_fd.Table.t;read_scheduler:ReadScheduler.t;write_scheduler:WriteScheduler.t;max_upload_speed:intoption;(* bytes per second. *)max_download_speed:intoption;read_buffer_size:int;read_queue_size:intoption;write_queue_size:intoption;}(* updating quota for schedulers and connections on each
Moving_average update (approx one time per sec.).
Each connection's quota is the average bandwidth consumption
divided by the number of connections minus the over consumption of
the previous round. *)letreset_quotast=Events.(emit__dont_wait__use_with_carereset_quota());let{Moving_average.average=current_inflow;_}=Moving_average.statst.read_scheduler.counterand{Moving_average.average=current_outflow;_}=Moving_average.statst.write_scheduler.counterinletnb_conn=P2p_fd.Table.lengthst.connectedin(ifnb_conn>0thenletfair_read_quota=current_inflow/nb_connandfair_write_quota=current_outflow/nb_conninP2p_fd.Table.iter(fun_idconn->conn.read_conn.quota<-minconn.read_conn.quota0+fair_read_quota;conn.write_conn.quota<-minconn.write_conn.quota0+fair_write_quota)st.connected);ReadScheduler.update_quotast.read_scheduler;WriteScheduler.update_quotast.write_scheduler(* [create] a scheduler for reading and writing on registered
connections and starting the associated moving average worker.
The worker will call [reset_quota] at each update.
*)letcreate?max_upload_speed?max_download_speed?read_queue_size?write_queue_size~read_buffer_size()=Events.(emit__dont_wait__use_with_carecreate());letma_state=Moving_average.fresh_state~id:"p2p-io-sched"~refresh_interval:1.0inletst={closed=false;ma_state;connected=P2p_fd.Table.create53;read_scheduler=ReadScheduler.createma_statemax_download_speed;write_scheduler=WriteScheduler.createma_statemax_upload_speed;max_upload_speed;max_download_speed;read_buffer_size;read_queue_size;write_queue_size;}inMoving_average.on_updatema_state(fun()->reset_quotast);stletma_state{ma_state;_}=ma_stateexceptionClosedletread_size=function|Okdata->(Sys.word_size/8*8)+Circular_buffer.lengthdata+Lwt_pipe.Maybe_bounded.push_overhead|Error_->0(* we push Error only when we close the socket, we don't fear memory
leaks in that case... *)letwrite_sizebytes=(Sys.word_size/8*6)+Bytes.lengthbytes+Lwt_pipe.Maybe_bounded.push_overhead(* [register] a socket for scheduling by [st].
Creating read/write pipes and attaching them to their respective
scheduler.
Attaching to the freshly created canceler of the connection :
- removal from the set of scheduled fd
- destruction of moving average counters
- closing read/write pipes
- closing underlying socket (p2p_fd) *)letregisterstfd=ifst.closedthen(Error_monad.dont_wait(fun()->P2p_fd.closefd)(function|`Unexpected_errorex->Format.eprintf"Uncaught error: %s\n%!"(Printexc.to_stringex))(funexc->Format.eprintf"Uncaught exception: %s\n%!"(Printexc.to_stringexc));raiseClosed)elseletid=P2p_fd.idfdinletcanceler=Lwt_canceler.create()inletread_size=Option.map(funv->(v,read_size))st.read_queue_sizeinletwrite_size=Option.map(funv->(v,write_size))st.write_queue_sizeinletread_queue=Lwt_pipe.Maybe_bounded.create?bound:read_size()inletwrite_queue=Lwt_pipe.Maybe_bounded.create?bound:write_size()in(* This buffer is allocated once and is reused every time we read
a message from the corresponding file descriptor. *)letread_buffer=Circular_buffer.create~maxlength:(st.read_buffer_size*2)()inletread_conn=ReadScheduler.create_connectionst.read_scheduler{fd;maxlen=st.read_buffer_size;read_buffer}read_queuecanceleridandwrite_conn=WriteScheduler.create_connectionst.write_schedulerwrite_queuefdcanceleridinLwt_canceler.on_cancelcanceler(fun()->letopenLwt_syntaxinP2p_fd.Table.removest.connectedfd;Moving_average.destroyst.ma_stateread_conn.counter;Moving_average.destroyst.ma_statewrite_conn.counter;Lwt_pipe.Maybe_bounded.closewrite_queue;Lwt_pipe.Maybe_bounded.closeread_queue;let*r=P2p_fd.closefdinlet*()=matchrwith|Ok()->Lwt.return_unit|Error(`Unexpected_errorex)->(* Do not prevent the closing if an exception is raised *)let*()=Events.(emitclose_error)(id,error_of_exnex)inLwt.return_unitinreturn_unit);letreadable=P2p_buffer_reader.mk_readable~read_buffer~read_queueinletconn={fd;canceler;readable;read_conn;write_queue;write_conn;remove_from_connection_table=(fun()->P2p_fd.Table.removest.connectedfd);}inP2p_fd.Table.addst.connectedconn.fdconn;(* Events.(emit register) id) *)conn(* pushing bytes in the pipe *)letwrite?canceler{write_queue;_}msg=traceP2p_errors.Connection_closed@@protect?canceler(fun()->Lwt_result.ok@@Lwt_pipe.Maybe_bounded.pushwrite_queuemsg)(* pushing bytes in the pipe or return false if it is bounded and full *)letwrite_now{write_queue;_}msg=Lwt_pipe.Maybe_bounded.push_nowwrite_queuemsgletconvert~ws~rs={P2p_stat.total_sent=ws.Moving_average.total;total_recv=rs.Moving_average.total;current_outflow=ws.average;current_inflow=rs.average;}letglobal_stat{read_scheduler;write_scheduler;_}=letrs=Moving_average.statread_scheduler.counterandws=Moving_average.statwrite_scheduler.counterinconvert~rs~wsletstat{read_conn;write_conn;_}=letrs=Moving_average.statread_conn.counterandws=Moving_average.statwrite_conn.counterinconvert~rs~ws(* [close conn] prevents further data to be pushed to the remote peer
and start a cascade of effects that should close the connection. *)letclose?timeoutconn=letopenLwt_result_syntaxinletid=P2p_fd.idconn.fdinconn.remove_from_connection_table();Lwt_pipe.Maybe_bounded.closeconn.write_queue;(* Here, the WriteScheduler will drain the write_queue, then get a
[Exn Lwt_pipe.Closed;...] trace and thus cancel the
[write_conn.canceler] which is the connections canceler (by
connection construction).
And remember, the canceler has the following callback attached:
- removal from the set of scheduled fd
- destruction of moving average counters
- closing read/write pipes
- closing underlying socket (p2p_fd)
We wait the cancellation to be finished.*)let*()=matchtimeoutwith|None->let*!r=Lwt_canceler.when_canceledconn.cancelerinlet*!()=matchrwith|Ok()|Error[]->Lwt.return_unit|Errorexcs->(* Do not prevent the closing if an exception is raised *)List.iter_p(funexc->Events.(emitclose_error)(id,error_of_exnexc))excsinreturn_unit|Sometimeout->with_timeout~canceler:conn.canceler(Lwt_unix.sleeptimeout)(funcanceler->let*!r=Lwt_canceler.when_canceledcancelerinmatchrwith|Ok()|Error[]->return_unit|Error(exn::_)->(* Do not prevent the closing if an exception is raised *)let*!()=Events.(emitclose_error)(id,error_of_exnexn)inreturn_unit)in(* and here we wait for one push in the socket, not for all the
values in the pipe to be pushed. *)let*!res=conn.write_conn.current_pushinlet*!()=Events.(emitclose)idinLwt.returnresletiter_connection{connected;_}f=P2p_fd.Table.iter(fun_conn->fconn)connectedletshutdown?timeoutst=letopenLwt_syntaxinst.closed<-true;(* stop the reader loop if it's not stuck due to (max_speed+dead
moving average worker). *)let*()=ReadScheduler.shutdownst.read_schedulerin(* trigger the connections closing and wait for the start of the
cancellation of every connection. *)let*()=P2p_fd.Table.iter_p(fun_peer_idconn->let*_=close?timeoutconninLwt.return_unit)st.connectedin(* stop the writer loop if it's not stuck due to (max_speed+dead
moving average worker).*)let*()=WriteScheduler.shutdownst.write_schedulerinEvents.(emitshutdown_scheduler)()letidconn=P2p_fd.idconn.fd