Source file buffered_output.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
open! Core
open! Async_kernel
open! Import
module Update = struct
type t =
| Msg of Message_event.Unstable.t
| Flush of unit Ivar.t
| Rotate of unit Ivar.t
[@@deriving sexp_of]
end
module State = struct
type t =
{ write : Message_event.t Queue.t -> unit Deferred.t
; rotate : unit -> unit Deferred.t
; flush : unit -> unit Deferred.t
; msgs : Message_event.t Queue.t
}
[@@deriving fields ~getters]
let batch_size = 100
let create ~flush ~write ~rotate = { flush; write; rotate; msgs = Queue.create () }
let write_msgs_to_output t ~and_then =
if Queue.length t.msgs = 0
then and_then ()
else (
let%bind () = t.write t.msgs in
Queue.clear t.msgs;
and_then ())
;;
let process_exn t (updates : Update.t Queue.t) =
let rec loop yield_every =
let yield_every = yield_every - 1 in
if yield_every = 0
then (
let%bind () = Async_kernel_scheduler.yield () in
loop batch_size)
else (
match Queue.dequeue updates with
| None -> write_msgs_to_output t ~and_then:return
| Some update ->
(match update with
| Rotate i ->
write_msgs_to_output t ~and_then:(fun () ->
let%bind () = t.rotate () in
Ivar.fill_exn i ();
loop yield_every)
| Flush i ->
write_msgs_to_output t ~and_then:(fun () ->
let%bind () = t.flush () in
Ivar.fill_exn i ();
loop yield_every)
| Msg msg ->
Queue.enqueue t.msgs msg;
loop yield_every))
in
loop batch_size
;;
end
type t =
{ updates : Update.t Pipe.Writer.t
; background_error : exn Deferred.t
; mutable last_update : [ `Flush of unit Deferred.t | `Not_a_flush ]
}
[@@deriving fields ~getters]
let create ~flush ~rotate ~write =
let r, w = Pipe.create () in
let process_log = State.create ~write ~rotate ~flush in
let background_error =
match%map
Monitor.try_with ~rest:`Log (fun () ->
Pipe.iter' r ~f:(State.process_exn process_log))
with
| Ok () -> raise_s [%message "Bug: Log processor pipe closed unexpectedly"]
| Error exn -> exn
in
{ updates = w; background_error; last_update = `Not_a_flush }
;;
let push_update t (update : Update.t) =
t.last_update
<- (match update with
| Flush i -> `Flush (Ivar.read i)
| Msg _ | Rotate _ -> `Not_a_flush);
Pipe.write_without_pushback t.updates update
;;
let flushed t =
match t.last_update with
| `Flush f -> f
| `Not_a_flush -> Deferred.create (fun i -> push_update t (Flush i))
;;
let rotate t = Deferred.create (fun i -> push_update t (Rotate i))
let write t msg = push_update t (Msg msg)