Source file mutable_outputs.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
open! Core
open Async_kernel
open! Import
module State = struct
type t =
{ outputs : Output.t list
; previous_outputs_flushed : unit Deferred.t
; stop_watching_for_background_errors : unit Ivar.t
}
[@@deriving fields ~getters ~iterators:create]
let watch_for_background_errors outputs ~stop ~on_background_output_error:on_error =
List.iter outputs ~f:(fun output ->
match Output.Private.buffered_background_error output with
| `Error e -> don't_wait_for (choose [ choice stop Fn.id; choice e on_error ])
| `Output_is_unbuffered -> ())
;;
let flushed t =
let open Eager_deferred.Use in
let%bind () = t.previous_outputs_flushed in
Deferred.List.iter t.outputs ~how:`Sequential ~f:Output.flush
;;
let create outputs ~previous ~on_background_output_error =
let stop_watching_for_background_errors = Ivar.create () in
let previous_outputs_flushed =
match previous with
| None -> Deferred.unit
| Some previous ->
let%map.Eager_deferred () = flushed previous in
Ivar.fill_if_empty previous.stop_watching_for_background_errors ()
in
watch_for_background_errors
outputs
~stop:(Ivar.read stop_watching_for_background_errors)
~on_background_output_error;
{ outputs; previous_outputs_flushed; stop_watching_for_background_errors }
;;
let is_empty t = List.is_empty t.outputs
let write t msg = List.iter t.outputs ~f:(fun output -> Output.write output msg)
end
type t =
{ mutable state : State.t
; mutable last_update : [ `Flush of unit Deferred.t | `Not_a_flush ]
; on_background_output_error : exn -> unit
}
let create outputs ~on_background_output_error =
{ state = State.create outputs ~previous:None ~on_background_output_error
; last_update = `Not_a_flush
; on_background_output_error
}
;;
let is_empty t = State.is_empty t.state
let write t =
t.last_update <- `Not_a_flush;
State.write t.state
;;
let flushed t =
match t.last_update with
| `Flush flush -> flush
| `Not_a_flush ->
let flush = State.flushed t.state in
t.last_update <- `Flush flush;
flush
;;
let update_outputs t outputs =
t.last_update <- `Not_a_flush;
t.state
<- State.create
outputs
~previous:(Some t.state)
~on_background_output_error:t.on_background_output_error
;;
let current_outputs t = t.state.outputs