1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
open Core
open Tracing_zero
module Process = struct
type t = { pid : int } [@@deriving sexp_of, compare, hash, equal]
end
module Thread = struct
type t =
{ pid : int
; tid : int
}
[@@deriving sexp_of, compare, hash, equal]
end
module Ticks = struct
type t =
{ ticks_per_second : int
; base_time : Time_ns_unix.Option.t
}
end
module State = struct
module References = struct
(** Keeps track of what dynamic state is referenced in a span of tracing events. *)
type t =
{ string_idxs : Hash_set.M(Parser.String_index).t
; thread_idxs : Hash_set.M(Parser.Thread_index).t
; threads : Hash_set.M(Thread).t
; processes : Hash_set.M(Process).t
}
let create () =
{ string_idxs = Hash_set.create (module Parser.String_index)
; thread_idxs = Hash_set.create (module Parser.Thread_index)
; threads = Hash_set.create (module Thread)
; processes = Hash_set.create (module Process)
}
;;
end
(** Dynamic tracing state updated by tracing events. *)
type t =
{ strings : string Hashtbl.M(Parser.String_index).t (** Interned strings *)
; threads : Parser.Thread.t Hashtbl.M(Parser.Thread_index).t
(** Interned thread IDs *)
; thread_names : Parser.String_index.t Hashtbl.M(Thread).t
; process_names : Parser.String_index.t Hashtbl.M(Process).t
; mutable ticks : Ticks.t option
}
let create () =
{ strings = Hashtbl.create (module Parser.String_index)
; threads = Hashtbl.create (module Parser.Thread_index)
; thread_names = Hashtbl.create (module Thread)
; process_names = Hashtbl.create (module Process)
; ticks = None
}
;;
let copy t =
{ strings = Hashtbl.copy t.strings
; threads = Hashtbl.copy t.threads
; thread_names = Hashtbl.copy t.thread_names
; process_names = Hashtbl.copy t.process_names
; ticks = t.ticks
}
;;
let filter t ~(references : References.t) =
let strings =
Hashtbl.filter_keys t.strings ~f:(fun k0 ->
Hash_set.exists references.string_idxs ~f:(fun k1 ->
Parser.String_index.equal k0 k1))
in
let threads =
Hashtbl.filter_keys t.threads ~f:(fun k0 ->
Hash_set.exists references.thread_idxs ~f:(fun k1 ->
Parser.Thread_index.equal k0 k1))
in
let thread_names =
Hashtbl.filter_keys t.thread_names ~f:(fun k0 ->
Hash_set.exists references.threads ~f:(fun k1 -> Thread.equal k0 k1))
in
let process_names =
Hashtbl.filter_keys t.process_names ~f:(fun k0 ->
Hash_set.exists references.processes ~f:(fun k1 -> Process.equal k0 k1))
in
{ strings; threads; thread_names; process_names; ticks = t.ticks }
;;
let process t record =
let open Parser.Record in
match record with
| Interned_string { index; value } -> Hashtbl.set t.strings ~key:index ~data:value
| Interned_thread { index; value } -> Hashtbl.set t.threads ~key:index ~data:value
| Process_name_change { name; pid } ->
Hashtbl.set t.process_names ~key:{ pid } ~data:name
| Thread_name_change { name; pid; tid } ->
Hashtbl.set t.thread_names ~key:{ pid; tid } ~data:name
| Tick_initialization { ticks_per_second; base_time } ->
t.ticks <- Some Ticks.{ ticks_per_second; base_time }
| Event _ -> ()
;;
let construct_trace t writer =
let open Parser.Record in
let rw = Record_writer.create writer in
Option.iter t.ticks ~f:(fun { ticks_per_second; base_time } ->
let record = Tick_initialization { ticks_per_second; base_time } in
Record_writer.write_record rw ~record);
Hashtbl.iteri t.strings ~f:(fun ~key ~data ->
let record = Interned_string { index = key; value = data } in
Record_writer.write_record rw ~record);
Hashtbl.iteri t.threads ~f:(fun ~key ~data ->
let record = Interned_thread { index = key; value = data } in
Record_writer.write_record rw ~record);
Hashtbl.iteri t.thread_names ~f:(fun ~key ~data ->
let record = Thread_name_change { name = data; pid = key.pid; tid = key.tid } in
Record_writer.write_record rw ~record);
Hashtbl.iteri t.process_names ~f:(fun ~key ~data ->
let record = Process_name_change { name = data; pid = key.pid } in
Record_writer.write_record rw ~record)
;;
end
module Checkpoint = struct
(** Data necessary to describe a span of tracing events. *)
type t =
{ begin_state : State.t
; data : (read_write, Iobuf.seek) Iobuf.t
; destination : (module Writer_intf.Destination)
; refs : State.References.t
}
let create ~size_bits ~begin_state =
let data = Iobuf.create ~len:(Int.pow 2 size_bits) in
let (module Dest : Writer_intf.Destination) =
Destinations.raw_iobuf_destination data
in
{ begin_state; destination = (module Dest); data; refs = State.References.create () }
;;
let process t record =
let open Parser.Record in
match record with
| Interned_string { index; value = _ } -> Hash_set.add t.refs.string_idxs index
| Interned_thread { index; value = _ } -> Hash_set.add t.refs.thread_idxs index
| Process_name_change { name; pid } ->
Hash_set.add t.refs.string_idxs name;
Hash_set.add t.refs.processes { pid }
| Thread_name_change { name; pid; tid } ->
Hash_set.add t.refs.string_idxs name;
Hash_set.add t.refs.threads { pid; tid }
| Event { timestamp = _; thread; category; name; arguments; event_type = _ } ->
Hash_set.add t.refs.string_idxs category;
Hash_set.add t.refs.string_idxs name;
Hash_set.add t.refs.thread_idxs thread;
List.iter arguments ~f:(fun (name, arg) ->
Hash_set.add t.refs.string_idxs name;
match arg with
| Parser.Event_arg.String idx -> Hash_set.add t.refs.string_idxs idx
| _ -> ())
| Tick_initialization _ -> ()
;;
let union_refs t0 t1 =
{ State.References.string_idxs =
Hash_set.union t0.refs.string_idxs t1.refs.string_idxs
; thread_idxs = Hash_set.union t0.refs.thread_idxs t1.refs.thread_idxs
; threads = Hash_set.union t0.refs.threads t1.refs.threads
; processes = Hash_set.union t0.refs.processes t1.refs.processes
}
;;
let write t writer =
Iobuf.flip_lo t.data;
Writer.Expert.write_iobuf writer ~buf:(Iobuf.read_only t.data);
Iobuf.flip_hi t.data
;;
end
module Parser_errors = struct
type t =
{ mutable timestamp_too_large : int
; mutable invalid_size_on_record : int
; mutable invalid_string_ref : int
; mutable invalid_thread_ref : int
; mutable invalid_tick_initialization : int
}
[@@deriving sexp]
let empty =
{ timestamp_too_large = 0
; invalid_size_on_record = 0
; invalid_string_ref = 0
; invalid_thread_ref = 0
; invalid_tick_initialization = 0
}
;;
let clear t =
t.timestamp_too_large <- 0;
t.invalid_size_on_record <- 0;
t.invalid_string_ref <- 0;
t.invalid_thread_ref <- 0;
t.invalid_tick_initialization <- 0
;;
let has_errors t =
t.invalid_size_on_record > 0
|| t.invalid_string_ref > 0
|| t.invalid_thread_ref > 0
|| t.timestamp_too_large > 0
|| t.invalid_tick_initialization > 0
;;
end
type t =
{ writer : Writer.t
; record_writer : Record_writer.t
; parser : Parser.t
; errors : Parser_errors.t
; mutable empty : bool
; mutable prev : Checkpoint.t
; mutable current : Checkpoint.t
; mutable size_bits : int
; state : State.t
}
let process_record t record =
(match record with
| Parser.Record.Event _ -> t.empty <- false
| _ -> ());
State.process t.state record;
Checkpoint.process t.current record;
try
Record_writer.write_record t.record_writer ~record;
true
with
| Failure _ -> false
;;
let create ?num_temp_strs ~size_bits () =
let prev = Checkpoint.create ~size_bits ~begin_state:(State.create ()) in
let current = Checkpoint.create ~size_bits ~begin_state:(State.create ()) in
let writer =
Writer.Expert.create_no_header ?num_temp_strs ~destination:current.destination ()
in
let record_writer = Record_writer.create writer in
{ writer
; record_writer
; empty = true
; parser = Parser.create ()
; errors = Parser_errors.empty
; prev
; current
; size_bits
; state = State.create ()
}
;;
let flip t =
Writer.Expert.flush t.writer;
t.prev <- t.current;
t.current <- Checkpoint.create ~size_bits:t.size_bits ~begin_state:(State.copy t.state);
Writer.Expert.set_destination t.writer ~destination:t.current.destination
;;
let resize t ~size_bits =
Writer.Expert.flush t.writer;
t.size_bits <- size_bits;
t.empty <- true;
t.prev <- Checkpoint.create ~size_bits ~begin_state:(State.copy t.state);
t.current <- Checkpoint.create ~size_bits ~begin_state:(State.copy t.state);
Writer.Expert.set_destination t.writer ~destination:t.current.destination
;;
module Process_result = struct
type t =
| Complete
| Incomplete
| Out_of_space of int
end
let process_until t data =
let rec parse () =
let at = Iobuf.Expert.lo data in
match Parser.parse_next t.parser with
| Ok record ->
if process_record t record then parse () else Process_result.Out_of_space at
| Error Incomplete_record -> Incomplete
| Error No_more_words -> Complete
| Error Timestamp_too_large ->
t.errors.timestamp_too_large <- t.errors.timestamp_too_large + 1;
parse ()
| Error Invalid_size_on_record ->
t.errors.invalid_size_on_record <- t.errors.invalid_size_on_record + 1;
parse ()
| Error Invalid_string_ref ->
t.errors.invalid_string_ref <- t.errors.invalid_string_ref + 1;
parse ()
| Error Invalid_thread_ref ->
t.errors.invalid_thread_ref <- t.errors.invalid_thread_ref + 1;
parse ()
| Error Invalid_tick_initialization ->
t.errors.invalid_tick_initialization <- t.errors.invalid_tick_initialization + 1;
parse ()
in
parse ()
;;
let rec consume' t data =
match process_until t data with
| Complete | Incomplete -> ()
| Out_of_space at ->
Iobuf.Expert.set_lo data at;
flip t;
consume' t data
;;
let consume t data =
Parser.set_buffer t.parser data;
consume' t data;
Parser_errors.clear t.errors
;;
let try_consume t data =
Parser.set_buffer t.parser data;
consume' t data;
let ret =
if Parser_errors.has_errors t.errors
then Or_error.error_s [%message (t.errors : Parser_errors.t)]
else Ok ()
in
Parser_errors.clear t.errors;
ret
;;
let write_empty_trace_event writer =
let name = Writer.set_temp_string_slot writer ~slot:0 "Empty trace!" in
let thread = Writer.set_thread_slot writer ~slot:0 ~pid:1 ~tid:2 in
let =
Writer.Expert.precompute_header
~event_type:Writer.Expert.Event_type.instant
~extra_words:0
~arg_types:Writer.Arg_types.none
~thread
~category:name
~name
in
Writer.Expert.write_from_header_with_tsc writer ~header
;;
let output t final_dest =
Writer.Expert.flush t.writer;
let references = Checkpoint.union_refs t.prev t.current in
let state = State.filter t.prev.begin_state ~references in
State.construct_trace state final_dest;
Checkpoint.write t.prev final_dest;
Checkpoint.write t.current final_dest;
if t.empty then write_empty_trace_event final_dest
;;