123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145openCoreopenAsync_kernelopenAsync_unixletdefault_delivery_unit=Byte_units.of_megabytes2.moduleFile=structtypet={path:string;fd:Fd.t;raw_fd:Core.Unix.File_descr.t;bytes_sent:int;bytes_pending:int}[@@derivingfields]letwith_filefile~f=let%bindstat=Unix.statfileinUnix.with_file~mode:[`Rdonly]file~f:(funfd->lett={path=file;fd;raw_fd=Fd.file_descr_exnfd;bytes_sent=0;bytes_pending=Int64.to_int_exn(Unix.Stats.sizestat)}inft);;letupdatet~bytes_sent_now={twithbytes_sent=t.bytes_sent+bytes_sent_now;bytes_pending=t.bytes_pending-bytes_sent_now};;letsendfile=Or_error.ok_exnLinux_ext.sendfileletsendfilet~socket_fd~delivery_unit=letdelivery_unit=Byte_units.bytes_int_exndelivery_unitinifInt.(=)0t.bytes_pendingthenOk`Fully_sentelse(tryletbytes_sent_now=sendfile~pos:t.bytes_sent~len:(Int.mint.bytes_pendingdelivery_unit)~fd:t.raw_fdsocket_fdinifbytes_sent_now<0thenError(Error.create_s[%message"Negative return value from [sendfile]"~return_value:(bytes_sent_now:int)])elseOk(`Sent(bytes_sent_now,updatet~bytes_sent_now))with|exn->Error(Error.of_exnexn));;endmoduleLimiter=structmoduleLimiter=Limiter_async.Token_buckettypet=bytes_sent:int->unitDeferred.tletcreate~rate_per_sec=letrate=Byte_units.bytes_floatrate_per_secinletlimiter=Limiter.create_exn~burst_size:(Float.to_intrate)~sustained_rate_per_sec:rate~continue_on_error:true()infun~bytes_sent->letsent=Ivar.create()inLimiter.enqueue_exnlimiter~allow_immediate_run:truebytes_sent(Ivar.fillsent)();Ivar.readsent;;letno_pushback~bytes_sent:_=Deferred.unitendletoptimization_to_achieve_the_limiter_limitsdeferredf=matchDeferred.peekdeferredwith|None->deferred>>=f|Somev->fv;;let(>>==)=optimization_to_achieve_the_limiter_limitsleterror_socket_fd_closed=Error.of_string"Socket fd is closed."letfailed_to_send~file~error=Error(Error.create_s[%message"Failed to fully send file"~file:(File.pathfile:string)~bytes_sent:(File.bytes_sentfile:int)~bytes_pending:(File.bytes_pendingfile:int)(error:Error.t)]);;letfeed_file~file~socket_fd~delivery_unit~limiter=ifFd.is_closedsocket_fdthenreturn(failed_to_send~file~error:error_socket_fd_closed)else(letraw_client_fd=Fd.file_descr_exnsocket_fdinletrecloopfile=matchFile.sendfile~socket_fd:raw_client_fdfile~delivery_unitwith|Errorerror->return(failed_to_send~file~error)|Ok`Fully_sent->Deferred.Or_error.ok_unit|Ok(`Sent(bytes_sent,file))->letready_to_send=limiter~bytes_sentinletready_to_write=Fd.ready_tosocket_fd`Writeinready_to_send>>==fun()->ready_to_write>>==(function|`Ready->loopfile|`Closed|`Bad_fd->return(failed_to_send~file~error:error_socket_fd_closed))inloopfile);;letsendfile?(limiter=Limiter.no_pushback)?(delivery_unit=default_delivery_unit)~socket_fd~file()=File.with_filefile~f:(funfile->feed_file~file~socket_fd~delivery_unit~limiter);;