1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
open! Core_kernel
open! Import
module Infinite_or_finite = struct
module T = struct
type 'a t =
| Infinite
| Finite of 'a
[@@deriving sexp, bin_io]
end
include T
let compare compare t1 t2 =
match t1, t2 with
| Infinite, Infinite -> 0
| Infinite, Finite _ -> 1
| Finite _, Infinite -> -1
| Finite a, Finite b -> compare a b
;;
end
(** Mutable version of Infinite_or_finite, for internal use, to avoid allocation *)
module Iofm : sig
type 'a t [@@deriving sexp_of]
val infinite : unit -> 'a t
val finite : 'a -> 'a t
val is_infinite : 'a t -> bool
val is_finite : 'a t -> bool
val set_infinite : 'a t -> unit
val set_finite : 'a t -> 'a -> unit
val get_finite_exn : 'a t -> 'a
val to_ordinary : 'a t -> 'a Infinite_or_finite.t
val of_ordinary : 'a Infinite_or_finite.t -> 'a t
end = struct
type 'a t = 'a Moption.t [@@deriving sexp_of]
let infinite () = Moption.create ()
let finite v =
let t = Moption.create () in
Moption.set_some t v;
t
;;
let is_infinite = Moption.is_none
let is_finite = Moption.is_some
let set_infinite = Moption.set_none
let set_finite = Moption.set_some
let get_finite_exn = Moption.get_some_exn
let[@inline always] to_ordinary t : _ Infinite_or_finite.t =
if Moption.is_none t then Infinite else Finite (Moption.get_some_exn t)
;;
let[@inline always] of_ordinary (ext : _ Infinite_or_finite.t) =
match ext with
| Infinite -> infinite ()
| Finite v -> finite v
;;
end
open Infinite_or_finite.T
module Try_take_result = struct
type t =
| Taken
| Unable
| Asked_for_more_than_bucket_limit
end
module Try_return_to_bucket_result = struct
type t =
| Returned_to_bucket
| Unable
end
module Tokens_may_be_available_result = struct
type t =
| At of Time_ns.t
| Never_because_greater_than_bucket_limit
| When_return_to_hopper_is_called
end
module Try_increase_bucket_limit_result = struct
type t =
| Increased
| Unable
[@@deriving sexp_of]
end
module Time_ns = struct
include Time_ns
let sexp_of_t = Time_ns.Alternate_sexp.sexp_of_t
end
type t =
{ start_time : Time_ns.t
(** The current time of the rate limiter. Note that when this is moved forward,
[in_hopper] must be updated accordingly. *)
; mutable time : Time_ns.t
(** the amount of time that has passed expressed in token terms, since start_time. *)
; time_in_token_space : int Iofm.t (** number of tokens in the bucket *)
; mutable in_bucket : int (** number of tokens in the hopper. May be [inf] *)
; in_hopper : int Iofm.t
(** Everything that has been taken from bucket but not returned to hopper *)
; mutable in_flight : int (** maximum size allowable in the bucket *)
; mutable bucket_limit : int (** maximum size allowable in flight *)
; in_flight_limit : int Iofm.t
(** rate at which tokens "fall" from the hopper into the bucket *)
; hopper_to_bucket_rate_per_ns : Float.t Iofm.t
}
[@@deriving sexp_of, fields]
let fill_rate_must_be_positive fill_rate =
if Iofm.is_finite fill_rate
then (
let rate = Iofm.get_finite_exn fill_rate in
if Float.( < ) rate Float.zero
then raise_s [%message "hopper_to_bucket_rate_per_ns must be >= 0" (rate : Float.t)])
;;
let in_system t =
if Iofm.is_infinite t.in_hopper
then Infinite
else Finite (t.in_flight + Iofm.get_finite_exn t.in_hopper + t.in_bucket)
;;
let invariant t =
fill_rate_must_be_positive t.hopper_to_bucket_rate_per_ns;
if t.in_bucket > t.bucket_limit
then
failwithf
!"amount in_bucket (%{Int}) cannot be greater than bucket_limit (%{Int})"
t.in_bucket
t.bucket_limit
();
if t.bucket_limit <= 0
then failwithf !"bucket_limit (burst_size) (%{Int}) must be > 0" t.bucket_limit ();
if t.in_bucket < 0 then failwithf !"in_bucket (%{Int}) must be >= 0." t.in_bucket ();
(match Iofm.to_ordinary t.in_hopper with
| Infinite -> ()
| Finite in_hopper ->
if in_hopper < 0 then failwithf !"in_hopper (%{Int}) must be >= 0." in_hopper ());
if t.in_flight < 0 then failwithf !"in_flight (%{Int}) must be >= 0." t.in_flight ();
match
( Iofm.to_ordinary t.hopper_to_bucket_rate_per_ns
, Iofm.to_ordinary t.time_in_token_space )
with
| Infinite, Finite _ | Finite _, Infinite ->
failwith
"hopper_to_bucket_rate_per_sec can only be infinite if time_in_token_space is \
infinite"
| Infinite, Infinite | Finite _, Finite _ -> ()
;;
type limiter = t [@@deriving sexp_of]
let create_exn
~now
~hopper_to_bucket_rate_per_sec
~bucket_limit
~in_flight_limit
~initial_bucket_level
~initial_hopper_level
=
let in_hopper = Iofm.of_ordinary initial_hopper_level in
let time_in_token_space =
match hopper_to_bucket_rate_per_sec with
| Infinite -> Iofm.infinite ()
| Finite _ -> Iofm.finite 0
in
let hopper_to_bucket_rate_per_ns =
match hopper_to_bucket_rate_per_sec with
| Infinite -> Iofm.infinite ()
| Finite rate_per_sec -> Iofm.finite (rate_per_sec /. 1E9)
in
let t =
{ start_time = now
; time = now
; time_in_token_space
; in_bucket = initial_bucket_level
; in_hopper
; in_flight = 0
; bucket_limit
; in_flight_limit = Iofm.of_ordinary in_flight_limit
; hopper_to_bucket_rate_per_ns
}
in
invariant t;
t
;;
let move_from_hopper_to_bucket t max_move =
let space_in_bucket = t.bucket_limit - t.in_bucket in
let actual_move = Int.min max_move space_in_bucket in
if actual_move > 0
then (
t.in_bucket <- t.in_bucket + actual_move;
if Iofm.is_finite t.in_hopper
then Iofm.set_finite t.in_hopper (Iofm.get_finite_exn t.in_hopper - actual_move))
;;
let update_time_in_token_space (t : t) =
if Iofm.is_finite t.hopper_to_bucket_rate_per_ns
then (
let tokens_per_ns = Iofm.get_finite_exn t.hopper_to_bucket_rate_per_ns in
let time_elapsed_since_start_in_ns =
Time_ns.Span.to_ns (Time_ns.diff t.time t.start_time)
in
let time_in_token_space =
Float.iround_down_exn (time_elapsed_since_start_in_ns *. tokens_per_ns)
in
Iofm.set_finite t.time_in_token_space time_in_token_space)
;;
let advance_time =
let update_tokens t =
if Iofm.is_infinite t.time_in_token_space
then (
let max_move =
if Iofm.is_infinite t.in_hopper
then t.bucket_limit
else Iofm.get_finite_exn t.in_hopper
in
move_from_hopper_to_bucket t max_move)
else (
let previous_time_in_token_space = Iofm.get_finite_exn t.time_in_token_space in
update_time_in_token_space t;
let new_time_in_token_space = Iofm.get_finite_exn t.time_in_token_space in
let amount_that_could_fall =
new_time_in_token_space - previous_time_in_token_space
in
let max_move =
if Iofm.is_infinite t.in_hopper
then amount_that_could_fall
else Int.min (Iofm.get_finite_exn t.in_hopper) amount_that_could_fall
in
move_from_hopper_to_bucket t max_move)
in
fun t ~now ->
if Time_ns.( > ) now t.time then t.time <- now;
update_tokens t
;;
let can_put_n_tokens_in_flight t ~n =
if Iofm.is_infinite t.in_flight_limit
then true
else t.in_flight + n <= Iofm.get_finite_exn t.in_flight_limit
;;
let try_take t ~now amount : Try_take_result.t =
advance_time t ~now;
if not (can_put_n_tokens_in_flight t ~n:amount)
then Unable
else if amount > t.bucket_limit
then Asked_for_more_than_bucket_limit
else if amount > t.in_bucket
then Unable
else (
t.in_bucket <- t.in_bucket - amount;
t.in_flight <- t.in_flight + amount;
Taken)
;;
let return_to_hopper t ~now amount =
if amount < 0
then failwithf !"return_to_hopper passed a negative amount (%{Int})" amount ();
if amount > t.in_flight
then
failwithf
!"return_to_hopper passed an amount (%{Int}) > in_flight (%{Int})"
amount
t.in_flight
();
advance_time t ~now;
t.in_flight <- t.in_flight - amount;
if Iofm.is_finite t.in_hopper
then Iofm.set_finite t.in_hopper (Iofm.get_finite_exn t.in_hopper + amount)
;;
let try_return_to_bucket t ~now amount : Try_return_to_bucket_result.t =
advance_time t ~now;
let space_in_bucket = t.bucket_limit - t.in_bucket in
if amount < 0 || amount > t.in_flight || amount > space_in_bucket
then Unable
else (
t.in_flight <- t.in_flight - amount;
t.in_bucket <- t.in_bucket + amount;
Returned_to_bucket)
;;
let tokens_may_be_available_when t ~now amount : Tokens_may_be_available_result.t =
if not (can_put_n_tokens_in_flight t ~n:amount)
then When_return_to_hopper_is_called
else if amount > t.bucket_limit
then Never_because_greater_than_bucket_limit
else (
advance_time t ~now;
let amount_missing = amount - t.in_bucket in
if amount_missing <= 0
then At t.time
else if Iofm.is_infinite t.hopper_to_bucket_rate_per_ns
then When_return_to_hopper_is_called
else (
let tokens_per_ns = Iofm.get_finite_exn t.hopper_to_bucket_rate_per_ns in
let min_seconds_left = Float.of_int amount_missing /. (tokens_per_ns *. 1E9) in
let (min_time : Tokens_may_be_available_result.t) =
At (Time_ns.add t.time (Time_ns.Span.of_sec min_seconds_left))
in
if Iofm.is_infinite t.in_hopper
then min_time
else if amount_missing > Iofm.get_finite_exn t.in_hopper
then When_return_to_hopper_is_called
else min_time))
;;
let in_bucket t ~now =
advance_time t ~now;
t.in_bucket
;;
let in_hopper t ~now =
advance_time t ~now;
Iofm.to_ordinary t.in_hopper
;;
let in_flight t ~now =
advance_time t ~now;
t.in_flight
;;
let in_limiter t ~now =
match in_hopper t ~now with
| Infinite -> Infinite
| Finite in_hopper -> Finite (in_bucket t ~now + in_hopper)
;;
let in_system t ~now =
advance_time t ~now;
in_system t
;;
let bucket_limit t = t.bucket_limit
let hopper_to_bucket_rate_per_sec t =
if Iofm.is_infinite t.hopper_to_bucket_rate_per_ns
then Infinite
else Finite (Iofm.get_finite_exn t.hopper_to_bucket_rate_per_ns *. 1E9)
;;
module Token_bucket = struct
type t = limiter [@@deriving sexp_of]
let create_exn
~now
~burst_size:bucket_limit
~sustained_rate_per_sec:fill_rate
?(initial_bucket_level = 0)
()
=
create_exn
~now
~bucket_limit
~in_flight_limit:Infinite
~hopper_to_bucket_rate_per_sec:(Finite fill_rate)
~initial_bucket_level
~initial_hopper_level:Infinite
;;
let try_take = try_take
module Starts_full = struct
type nonrec t = t [@@deriving sexp_of]
let create_exn ~now ~burst_size =
create_exn ~now ~burst_size ~initial_bucket_level:burst_size ()
;;
let try_increase_bucket_limit t ~new_limit : Try_increase_bucket_limit_result.t =
if new_limit < t.bucket_limit
then Unable
else (
let increase_amount = new_limit - t.bucket_limit in
t.in_bucket <- t.in_bucket + increase_amount;
t.bucket_limit <- new_limit;
Increased)
;;
end
end
module Throttled_rate_limiter = struct
type t = limiter [@@deriving sexp_of]
let create_exn ~now ~burst_size ~sustained_rate_per_sec:fill_rate ~max_concurrent_jobs =
let bucket_limit = burst_size in
let initial_bucket_level = Int.min bucket_limit max_concurrent_jobs in
let initial_hopper_level =
Finite (Int.max 0 (max_concurrent_jobs - initial_bucket_level))
in
create_exn
~now
~bucket_limit
~in_flight_limit:Infinite
~hopper_to_bucket_rate_per_sec:(Finite fill_rate)
~initial_bucket_level
~initial_hopper_level
;;
let try_start_job t ~now =
match try_take t ~now 1 with
| Asked_for_more_than_bucket_limit -> assert false
| Taken -> `Start
| Unable ->
(match tokens_may_be_available_when t ~now 1 with
| Never_because_greater_than_bucket_limit -> assert false
| When_return_to_hopper_is_called -> `Max_concurrent_jobs_running
| At time -> `Unable_until_at_least time)
;;
let finish_job t ~now = return_to_hopper t ~now 1
end
module Throttle = struct
include Throttled_rate_limiter
let create_exn ~now ~max_concurrent_jobs =
let sustained_rate_unused = 1. in
let t =
create_exn
~now
~burst_size:max_concurrent_jobs
~sustained_rate_per_sec:sustained_rate_unused
~max_concurrent_jobs
in
Iofm.set_infinite t.hopper_to_bucket_rate_per_ns;
Iofm.set_infinite t.time_in_token_space;
t.in_bucket <- t.bucket_limit;
t
;;
let try_start_job t ~now =
match try_start_job t ~now with
| `Start -> `Start
| `Max_concurrent_jobs_running -> `Max_concurrent_jobs_running
| `Unable_until_at_least _ -> assert false
;;
end
module Expert = struct
let create_exn = create_exn
let try_take = try_take
let return_to_hopper = return_to_hopper
let try_return_to_bucket = try_return_to_bucket
let tokens_may_be_available_when = tokens_may_be_available_when
end