1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
open Catapult_utils
module P = Catapult
module Tracing = P.Tracing
module Atomic = P.Atomic_shim_
type event = P.Ser.Event.t
module type ARG = sig
val writer : Writer.t
end
module Make(A : ARG) : P.BACKEND = struct
let writer = A.writer
type local_buf = {
t_id: int;
buf: Buffer.t;
mutable evs: string list;
mutable n_evs: int;
}
let batch_size = (try int_of_string @@ Sys.getenv "TRACE_BATCH_SIZE" with _ -> 100)
let max_batch_interval_us = 3. *. 1e6
let last_batch_flush = Atomic.make (P.Clock.now_us())
let flush_batch (self:local_buf) : unit =
if self.n_evs > 0 then (
let b = List.rev self.evs in
self.evs <- [];
self.n_evs <- 0;
Writer.write_string_l writer b;
)
let check_batch (self:local_buf) ~now : unit =
if self.n_evs > batch_size ||
(self.n_evs > 0 && now -. Atomic.get last_batch_flush > max_batch_interval_us)
then (
Atomic.set last_batch_flush now;
flush_batch self;
)
let buf : local_buf Thread_local.t =
Thread_local.create
~init:(fun ~t_id ->
{t_id;buf=Buffer.create 1024; n_evs=0; evs=[]})
~close:flush_batch
()
let teardown () =
Thread_local.clear buf;
Writer.close writer
let tick () =
let now = P.Clock.now_us() in
Thread_local.iter buf ~f:(check_batch ~now)
module Out = Catapult_utils.Json_out
let[@inline] field_col oc = Out.char oc ':'
let[@inline] field_sep oc = Out.char oc ','
let any_val oc (j:string) = Out.raw_string oc j
let field oc k f v : unit =
Out.raw_string oc k;
field_col oc;
f oc v
let[@inline] opt_iter o f = match o with
| None -> ()
| Some x -> f x
let emit
~id ~name ~ph ~tid ~pid ~cat ~ts_us ~args ~stack ~dur ? () : unit =
let lbuf = Thread_local.get_or_create buf in
let j =
let buf = lbuf.buf in
Buffer.clear buf;
Out.char buf '{';
field buf {|"name"|} Out.str_val name;
field_sep buf;
field buf {|"ph"|} Out.char_val (P.Event_type.to_char ph);
field_sep buf;
field buf {|"tid"|} any_val (string_of_int tid);
field_sep buf;
field buf {|"ts"|} Out.float ts_us;
field_sep buf;
opt_iter dur (fun dur ->
field buf {|"dur"|} Out.float dur;
field_sep buf;
);
opt_iter id (fun i ->
field buf {|"id"|} Out.str_val i;
field_sep buf;
);
opt_iter stack (fun s ->
Out.raw_string buf {|"stack"|};
field_col buf;
Out.char buf '[';
List.iteri (fun i x -> if i>0 then field_sep buf; any_val buf x) s;
Out.char buf ']';
field_sep buf;
);
opt_iter cat (fun cs ->
Out.raw_string buf {|"cat"|};
field_col buf;
Out.char buf '"';
List.iteri (fun i x -> if i>0 then field_sep buf; Out.raw_string buf x) cs;
Out.char buf '"';
field_sep buf;
);
opt_iter args (fun args ->
Out.raw_string buf {|"args"|};
field_col buf;
Out.char buf '{';
List.iteri (fun i (k,v) ->
if i>0 then field_sep buf;
Out.str_val buf k; field_col buf; Out.arg buf (v:P.Arg.t))
args;
Out.char buf '}';
field_sep buf;
);
opt_iter extra (fun l ->
List.iter (fun (x,y) ->
Out.str_val buf x; field_col buf; Out.str_val buf y;
field_sep buf)
l);
field buf {|"pid"|} Out.int pid;
Out.char buf '}';
Buffer.contents buf
in
lbuf.evs <- j :: lbuf.evs;
lbuf.n_evs <- 1 + lbuf.n_evs;
check_batch lbuf ~now:ts_us;
Gc_stats.maybe_emit ~now:ts_us ~pid ();
()
end