1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
open! Core
open! Async_kernel
open! Import
module Definitely_a_heap_block : sig
type t
val the_one_and_only : t
end = struct
type t = string
let the_one_and_only = String.make 1 ' '
end
type t =
{ write : Message_event.t -> unit
; rotate : unit -> unit Deferred.t
; flush : unit -> unit Deferred.t
; buffered_background_error : [ `Output_is_unbuffered | `Error of exn Deferred.t ]
; heap_block : Definitely_a_heap_block.t
}
let aux_create ~finalize ~rotate ~flush ~buffered_background_error ~write =
let heap_block = Definitely_a_heap_block.the_one_and_only in
let t = { write; rotate; flush; buffered_background_error; heap_block } in
Option.iter finalize ~f:(fun finalize ->
Gc.add_finalizer (Heap_block.create_exn t) (fun t ->
let t = Heap_block.value t in
don't_wait_for
(let%bind () = t.flush () in
finalize ())));
t
;;
let create_expert ?(rotate = return) ?finalize ~flush write =
let output = Buffered_output.create ~rotate ~flush ~write in
aux_create
~finalize
~rotate:(fun () -> Buffered_output.rotate output)
~flush:(fun () -> Buffered_output.flushed output)
~buffered_background_error:(`Error (Buffered_output.background_error output))
~write:(Buffered_output.write output)
;;
let create_unbuffered ?finalize ~flush write =
aux_create
~finalize
~rotate:return
~flush
~buffered_background_error:`Output_is_unbuffered
~write
;;
let create ?rotate ?finalize ~flush write =
create_expert ?rotate ?finalize ~flush (fun messages ->
Queue.map messages ~f:Message_event.to_serialized_message_lossy |> write)
;;
let create_expert = create_expert ?rotate:None
let empty =
create_expert (fun (_ : Message_event.t Queue.t) -> Deferred.unit) ~flush:return
;;
let write t = t.write
let rotate t = t.rotate ()
let flush t = t.flush ()
let sexp_of_t _ = Sexp.Atom "<opaque>"
let filter_to_level t ~level =
let write message =
if Level.as_or_more_verbose_than
~log_level:level
~msg_level:(Message_event.level message)
then t.write message
in
{ t with write }
;;
let stderr_sync =
lazy
(let zone =
if am_running_test then Timezone.find_exn "nyc" else force Timezone.local
in
create_unbuffered ~flush:return (fun msg ->
Message.to_write_only_text (Message_event.to_serialized_message_lossy msg) zone
|> Core.prerr_endline))
;;
let stderr_async = Set_once.create ()
let stderr =
lazy
(match Set_once.get stderr_async with
| None -> force stderr_sync
| Some stderr -> force stderr)
;;
module Private = struct
let buffered_background_error t = t.buffered_background_error
let set_async_stderr_output t ~here = Set_once.set_exn stderr_async here t
end
module For_testing = struct
let create ~map_output =
create_unbuffered ~flush:return (fun msg ->
map_output (Message_event.message msg) |> print_endline)
;;
let is_async_stderr_output_set () = Set_once.is_some stderr_async
end