1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
type 'a t = {
mutable size: int;
mutable q: 'a list;
(** The queue is a FIFO represented as a list in reverse order *)
batch: int; (** Minimum size to batch before popping *)
high_watermark: int; (** Size above which we start dropping signals *)
timeout: Mtime.span option;
mutable start: Mtime.t;
mutex: Mutex.t;
}
let[@inline never] protect_mutex m f =
Mutex.lock m;
Fun.protect f ~finally:(fun () -> Mutex.unlock m)
let default_high_watermark batch_size =
if batch_size = 1 then
100
else
batch_size * 10
let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
let high_watermark =
match high_watermark with
| Some x -> x
| None -> default_high_watermark batch
in
let start =
match now with
| Some x -> x
| None -> Mtime_clock.now ()
in
let mutex = Mutex.create () in
assert (batch > 0);
{ size = 0; q = []; start; batch; timeout; high_watermark; mutex }
let timeout_expired_ ~now self : bool =
match self.timeout with
| Some t ->
let elapsed = Mtime.span now self.start in
Mtime.Span.compare elapsed t >= 0
| None -> false
let is_full_ self : bool = self.size >= self.batch
let ready_to_pop ~force ~now self =
self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)
let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
let rev_batch_opt =
protect_mutex self.mutex @@ fun () ->
if ready_to_pop ~force ~now self then (
assert (self.q <> []);
let batch = self.q in
self.q <- [];
self.size <- 0;
Some batch
) else
None
in
match rev_batch_opt with
| None -> None
| Some batch ->
Some (List.rev batch)
let rec push_unprotected (self : _ t) ~(elems : _ list) : unit =
match elems with
| [] -> ()
| x :: xs ->
self.q <- x :: self.q;
self.size <- 1 + self.size;
push_unprotected self ~elems:xs
let push (self : _ t) elems : [ `Dropped | `Ok ] =
protect_mutex self.mutex @@ fun () ->
if self.size >= self.high_watermark then
`Dropped
else (
if self.size = 0 && Option.is_some self.timeout then
self.start <- Mtime_clock.now ();
push_unprotected self ~elems;
`Ok
)