123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496open!Core_kernelopen!ImportmoduleInfinite_or_finite=structmoduleT=structtype'at=|Infinite|Finiteof'a[@@derivingsexp,bin_io]endincludeTletcomparecomparet1t2=matcht1,t2with|Infinite,Infinite->0|Infinite,Finite_->1|Finite_,Infinite->-1|Finitea,Finiteb->compareab;;end(** Mutable version of Infinite_or_finite, for internal use, to avoid allocation *)moduleIofm:sigtype'at[@@derivingsexp_of]valinfinite:unit->'atvalfinite:'a->'atvalis_infinite:'at->boolvalis_finite:'at->boolvalset_infinite:'at->unitvalset_finite:'at->'a->unitvalget_finite_exn:'at->'avalto_ordinary:'at->'aInfinite_or_finite.tvalof_ordinary:'aInfinite_or_finite.t->'atend=structtype'at='aMoption.t[@@derivingsexp_of]letinfinite()=Moption.create()letfinitev=lett=Moption.create()inMoption.set_sometv;t;;letis_infinite=Moption.is_noneletis_finite=Moption.is_someletset_infinite=Moption.set_noneletset_finite=Moption.set_someletget_finite_exn=Moption.get_some_exnlet[@inlinealways]to_ordinaryt:_Infinite_or_finite.t=ifMoption.is_nonetthenInfiniteelseFinite(Moption.get_some_exnt);;let[@inlinealways]of_ordinary(ext:_Infinite_or_finite.t)=matchextwith|Infinite->infinite()|Finitev->finitev;;endopenInfinite_or_finite.TmoduleTry_take_result=structtypet=|Taken|Unable|Asked_for_more_than_bucket_limitendmoduleTry_return_to_bucket_result=structtypet=|Returned_to_bucket|UnableendmoduleTokens_may_be_available_result=structtypet=|AtofTime_ns.t|Never_because_greater_than_bucket_limit|When_return_to_hopper_is_calledendmoduleTime_ns=structincludeTime_nsletsexp_of_t=Time_ns.Alternate_sexp.sexp_of_tendtypet={start_time:Time_ns.t(** The current time of the rate limiter. Note that when this is moved forward,
[in_hopper] must be updated accordingly. *);mutabletime:Time_ns.t(** the amount of time that has passed expressed in token terms, since start_time. *);time_in_token_space:intIofm.t(** number of tokens in the bucket *);mutablein_bucket:int(** number of tokens in the hopper. May be [inf] *);in_hopper:intIofm.t(** Everything that has been taken from bucket but not returned to hopper *);mutablein_flight:int(** maximum size allowable in the bucket *);mutablebucket_limit:int(** maximum size allowable in flight *);in_flight_limit:intIofm.t(** rate at which tokens "fall" from the hopper into the bucket *);hopper_to_bucket_rate_per_ns:Float.tIofm.t}[@@derivingsexp_of,fields]letfill_rate_must_be_positivefill_rate=ifIofm.is_finitefill_ratethenbeginletrate=Iofm.get_finite_exnfill_rateinifFloat.(<)rateFloat.zerothenraise_s[%message"hopper_to_bucket_rate_per_ns must be >= 0"(rate:Float.t)];end;;letin_systemt=ifIofm.is_infinitet.in_hopperthenInfiniteelseFinite(t.in_flight+Iofm.get_finite_exnt.in_hopper+t.in_bucket);;letinvariantt=fill_rate_must_be_positivet.hopper_to_bucket_rate_per_ns;(* bucket is limited to size *)ift.in_bucket>t.bucket_limitthenfailwithf!"amount in_bucket (%{Int}) cannot be greater than bucket_limit \
(%{Int})"t.in_buckett.bucket_limit();(* sizes must be positive *)ift.bucket_limit<=0thenfailwithf!"bucket_limit (burst_size) (%{Int}) must be > 0"t.bucket_limit();ift.in_bucket<0thenfailwithf!"in_bucket (%{Int}) must be >= 0."t.in_bucket();beginmatchIofm.to_ordinaryt.in_hopperwith|Infinite->()|Finitein_hopper->ifin_hopper<0thenfailwithf!"in_hopper (%{Int}) must be >= 0."in_hopper();end;ift.in_flight<0thenfailwithf!"in_flight (%{Int}) must be >= 0."t.in_flight();beginmatchIofm.to_ordinaryt.hopper_to_bucket_rate_per_ns,Iofm.to_ordinaryt.time_in_token_spacewith|Infinite,Finite_|Finite_,Infinite->failwith"hopper_to_bucket_rate_per_sec can only be infinite if time_in_token_space is \
infinite";|Infinite,Infinite|Finite_,Finite_->()end;;;typelimiter=t[@@derivingsexp_of]letcreate_exn~now~hopper_to_bucket_rate_per_sec~bucket_limit~in_flight_limit~initial_bucket_level~initial_hopper_level=letin_hopper=Iofm.of_ordinaryinitial_hopper_levelinlettime_in_token_space=matchhopper_to_bucket_rate_per_secwith|Infinite->Iofm.infinite()|Finite_->Iofm.finite0inlethopper_to_bucket_rate_per_ns=matchhopper_to_bucket_rate_per_secwith|Infinite->Iofm.infinite()|Finiterate_per_sec->Iofm.finite(rate_per_sec/.1E9)inlett={start_time=now;time=now;time_in_token_space;in_bucket=initial_bucket_level;in_hopper;in_flight=0;bucket_limit=bucket_limit;in_flight_limit=Iofm.of_ordinaryin_flight_limit;hopper_to_bucket_rate_per_ns}ininvariantt;t;;letmove_from_hopper_to_buckettmax_move=letspace_in_bucket=t.bucket_limit-t.in_bucketinletactual_move=Int.minmax_movespace_in_bucketinifactual_move>0thenbegint.in_bucket<-t.in_bucket+actual_move;ifIofm.is_finitet.in_hopperthenIofm.set_finitet.in_hopper(Iofm.get_finite_exnt.in_hopper-actual_move)end;;(* Computes the number of tokens that would have dropped since start_time given the
current rate *)letupdate_time_in_token_space(t:t)=(* if it's infinite then time_in_token_space was set to infinite in [create_exn] *)ifIofm.is_finitet.hopper_to_bucket_rate_per_nsthenbeginlettokens_per_ns=Iofm.get_finite_exnt.hopper_to_bucket_rate_per_nsinlettime_elapsed_since_start_in_ns=Time_ns.Span.to_ns(Time_ns.difft.timet.start_time)in(* this will raise when there is an int overflow, but in a way that will be annoying
to understand/track down if it fails. This comment is here to help while keeping
the common case fast. *)lettime_in_token_space=Float.iround_down_exn(time_elapsed_since_start_in_ns*.tokens_per_ns)inIofm.set_finitet.time_in_token_spacetime_in_token_spaceend;;(* advances [t]s notion of time, moving tokens from the hopper down into the bucket as
dictated by the passage of time and the hopper_to_bucket_rate_per_ns. *)letadvance_time=(* Just updates [t] to match the current value of [t.time]. We write it this way to make
it clear that now is not directly used in update_tokens. *)letupdate_tokenst=ifIofm.is_infinitet.time_in_token_spacethenbeginletmax_move=ifIofm.is_infinitet.in_hopperthent.bucket_limitelseIofm.get_finite_exnt.in_hopperinmove_from_hopper_to_buckettmax_moveendelsebeginletprevious_time_in_token_space=Iofm.get_finite_exnt.time_in_token_spaceinupdate_time_in_token_spacet;letnew_time_in_token_space=Iofm.get_finite_exnt.time_in_token_spaceinletamount_that_could_fall=(* this will always be >= 0 because time always moves forward *)new_time_in_token_space-previous_time_in_token_spaceinletmax_move=ifIofm.is_infinitet.in_hopperthenamount_that_could_fallelseInt.min(Iofm.get_finite_exnt.in_hopper)amount_that_could_fallinmove_from_hopper_to_buckettmax_moveendinfunt~now->ifTime_ns.(>)nowt.timethent.time<-now;(* this has to be run even if time doesn't move foward to handle the case of an
Infinite hopper to bucket drop rate. In that case tokens in the hopper may
instantaneously move into the bucket. *)update_tokenst;;letcan_put_n_tokens_in_flightt~n=ifIofm.is_infinitet.in_flight_limitthentrueelset.in_flight+n<=Iofm.get_finite_exnt.in_flight_limit;;lettry_taket~nowamount:Try_take_result.t=advance_timet~now;ifnot(can_put_n_tokens_in_flightt~n:amount)thenUnableelseifamount>t.bucket_limitthenAsked_for_more_than_bucket_limitelseifamount>t.in_bucketthenUnableelsebegint.in_bucket<-t.in_bucket-amount;t.in_flight<-t.in_flight+amount;Takenend;;letreturn_to_hoppert~nowamount=ifamount<0thenfailwithf!"return_to_hopper passed a negative amount (%{Int})"amount();ifamount>t.in_flightthenfailwithf!"return_to_hopper passed an amount (%{Int}) > in_flight (%{Int})"amountt.in_flight();advance_timet~now;t.in_flight<-t.in_flight-amount;ifIofm.is_finitet.in_hopperthenIofm.set_finitet.in_hopper(Iofm.get_finite_exnt.in_hopper+amount);;lettry_return_to_buckett~nowamount:Try_return_to_bucket_result.t=advance_timet~now;letspace_in_bucket=t.bucket_limit-t.in_bucketinifamount<0||amount>t.in_flight||amount>space_in_bucketthenUnableelsebegint.in_flight<-t.in_flight-amount;t.in_bucket<-t.in_bucket+amount;Returned_to_bucketend;;lettokens_may_be_available_whent~nowamount:Tokens_may_be_available_result.t=ifnot(can_put_n_tokens_in_flightt~n:amount)thenWhen_return_to_hopper_is_calledelseifamount>t.bucket_limitthenNever_because_greater_than_bucket_limitelsebeginadvance_timet~now;letamount_missing=amount-t.in_bucketinifamount_missing<=0thenAtt.timeelsebeginifIofm.is_infinitet.hopper_to_bucket_rate_per_nsthenWhen_return_to_hopper_is_calledelsebeginlettokens_per_ns=Iofm.get_finite_exnt.hopper_to_bucket_rate_per_nsinletmin_seconds_left=Float.of_intamount_missing/.(tokens_per_ns*.1E9)inlet(min_time:Tokens_may_be_available_result.t)=At(Time_ns.addt.time(Time_ns.Span.of_secmin_seconds_left))inifIofm.is_infinitet.in_hopperthenmin_timeelsebeginifamount_missing>Iofm.get_finite_exnt.in_hopperthenWhen_return_to_hopper_is_calledelsemin_timeendendendend;;letin_buckett~now=advance_timet~now;t.in_bucket;;letin_hoppert~now=advance_timet~now;Iofm.to_ordinaryt.in_hopper;;letin_flightt~now=advance_timet~now;t.in_flight;;letin_limitert~now=matchin_hoppert~nowwith|Infinite->Infinite|Finitein_hopper->Finite((in_buckett~now)+in_hopper);;letin_systemt~now=advance_timet~now;in_systemt;;letbucket_limitt=t.bucket_limitlethopper_to_bucket_rate_per_sect=ifIofm.is_infinitet.hopper_to_bucket_rate_per_nsthenInfiniteelseFinite(Iofm.get_finite_exnt.hopper_to_bucket_rate_per_ns*.1E9);;moduleToken_bucket=structtypet=limiterletcreate_exn~now~burst_size:bucket_limit~sustained_rate_per_sec:fill_rate?(initial_bucket_level=0)()=create_exn~now~bucket_limit~in_flight_limit:Infinite~hopper_to_bucket_rate_per_sec:(Finitefill_rate)~initial_bucket_level~initial_hopper_level:Infinite;;lettry_take=try_takeendmoduleThrottled_rate_limiter=structtypet=limiterletcreate_exn~now~burst_size~sustained_rate_per_sec:fill_rate~max_concurrent_jobs=letbucket_limit=burst_sizeinletmax_concurrent_jobs=max_concurrent_jobsinletinitial_bucket_level=Int.minbucket_limitmax_concurrent_jobsinletinitial_hopper_level=Finite(Int.max0(max_concurrent_jobs-initial_bucket_level))increate_exn~now~bucket_limit~in_flight_limit:Infinite~hopper_to_bucket_rate_per_sec:(Finitefill_rate)~initial_bucket_level~initial_hopper_level;;lettry_start_jobt~now=matchtry_taket~now1with|Asked_for_more_than_bucket_limit->assertfalse(* see create *)|Taken->`Start|Unable->beginmatchtokens_may_be_available_whent~now1with|Never_because_greater_than_bucket_limit->assertfalse(* see create *)|When_return_to_hopper_is_called->`Max_concurrent_jobs_running|Attime->`Unable_until_at_leasttimeend;;letfinish_jobt~now=return_to_hoppert~now1endmoduleThrottle=structincludeThrottled_rate_limiterletcreate_exn~now~max_concurrent_jobs=(* the sustained rate is immediately overridden with
set_hopper_to_bucket_rate_per_sec *)letsustained_rate_unused=1.inlett=create_exn~now~burst_size:max_concurrent_jobs~sustained_rate_per_sec:sustained_rate_unused~max_concurrent_jobsinIofm.set_infinitet.hopper_to_bucket_rate_per_ns;Iofm.set_infinitet.time_in_token_space;(* Since we set the hopper rate to infinite then the bucket can immediately be
filled. *)t.in_bucket<-t.bucket_limit;t;;lettry_start_jobt~now=matchtry_start_jobt~nowwith|`Start->`Start|`Max_concurrent_jobs_running->`Max_concurrent_jobs_running|`Unable_until_at_least_->assertfalse;;endmoduleExpert=structletcreate_exn=create_exnlettry_take=try_takeletreturn_to_hopper=return_to_hopperlettry_return_to_bucket=try_return_to_bucketlettokens_may_be_available_when=tokens_may_be_available_whenend