1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
open Core
open Core_profiler_disabled
module Unix = Core_unix
module Time_ns = Time_ns_unix
let debug = false
let default_output_filename = "profiler.dat"
let current_output_filename =
let env = "OUTPUT_FILE" in
let v =
match Check_environment.get_var env with
| Some v -> v
| None -> default_output_filename
in
if debug then printf "file = %s\n" v;
ref v
let set_current_output_filename v =
current_output_filename := v
] unpacked (id, time)
let%test_unit "0 0" = test 0 1405085600000000000L
let%test_unit "max max" = test 511 1423099998509481983L
let%test_unit "1 1" = test 1 1405085600000000001L
let%test_unit "256 100_000" = test 256 1405085600000100000L
end)
let%bench_module "Short message header packing" = (module struct
let epoch =
Profiler_epoch.of_time
(Time_ns.of_int_ns_since_epoch (Int64.to_int_exn 1405085600000000000L))
let id = Probe_id.of_int_exn 123
let time = Time_ns.of_int_ns_since_epoch (Int64.to_int_exn 1405085600123123000L)
let%bench "pack_exn" = ignore (pack_exn epoch id time : int)
let%bench "pack_unsafe" = ignore (pack_unsafe epoch id time : int)
end)
end
module Buffer : sig
val current_chunk : (read_write, _) Iobuf.t
(** Is the main (short message) buffer empty? *)
val is_empty : unit -> bool
val get_chunks : unit -> (read_write, Iobuf.no_seek) Iobuf.t list
val ensure_free : int -> unit
module Unsafe_internals : sig
val reset : unit -> unit
end
end = struct
let = lazy (Iobuf.create ~len:(561152))
let () =
let copy = Iobuf.create ~len:0 in
if Lazy.is_val header_chunk
then begin
Iobuf.set_bounds_and_buffer ~src:(Lazy.force header_chunk) ~dst:copy;
Iobuf.flip_lo copy
end;
(copy :> (read, _) Iobuf.t)
let current_chunk = Iobuf.create ~len:0
let chunk_size = 10_000_000
let previous_chunks = ref []
let allocate_new_chunk len =
Iobuf.flip_lo current_chunk;
if not (Iobuf.is_empty current_chunk) then begin
let copy = Iobuf.sub_shared current_chunk in
previous_chunks := copy :: !previous_chunks
end;
let new_memory = Iobuf.create ~len in
Iobuf.set_bounds_and_buffer ~src:new_memory ~dst:current_chunk;
if len > 0
then
for i = 0 to (len - 1) / 512 do
Iobuf.Unsafe.Poke.uint8_trunc current_chunk ~pos:(i * 512) 0
done
let ensure_free len =
assert (len <= chunk_size);
if Iobuf.length current_chunk < len then allocate_new_chunk chunk_size
let get_chunks () =
allocate_new_chunk 0;
List.rev !previous_chunks
let is_empty () = List.is_empty (get_chunks ())
module Unsafe_internals = struct
let reset () =
if Lazy.is_val header_chunk then Iobuf.reset (Lazy.force header_chunk);
allocate_new_chunk 0;
previous_chunks := []
end
let%test_unit "allocate_new_chunk" =
protect
~f:(fun () ->
allocate_new_chunk 1000;
[%test_eq: int] (Iobuf.length current_chunk) 1000;
Iobuf.Fill.stringo current_chunk "the first chunk\n";
Iobuf.Fill.stringo current_chunk "still the first chunk\n";
allocate_new_chunk 500;
allocate_new_chunk 500;
Iobuf.Fill.stringo current_chunk "the second chunk\n";
allocate_new_chunk 0;
[%test_eq: int] (Iobuf.length current_chunk) 0;
[%test_eq: int] (List.length !previous_chunks) 2;
begin
match !previous_chunks with
| [second; first] ->
[%test_eq: string]
(Iobuf.to_string first)
"the first chunk\nstill the first chunk\n";
[%test_eq: string]
(Iobuf.to_string second)
"the second chunk\n"
| _ -> assert false
end
)
~finally:Unsafe_internals.reset
let%test_unit "ensure_free" =
protect
~f:(fun () ->
ensure_free 100;
[%test_eq: int] (Iobuf.length current_chunk) chunk_size;
[%test_eq: int] (List.length !previous_chunks) 0;
Iobuf.advance current_chunk (chunk_size - 50);
ensure_free 500;
[%test_eq: int] (Iobuf.length current_chunk) chunk_size;
[%test_eq: int] (List.length !previous_chunks) 1;
)
~finally:Unsafe_internals.reset
let%test_unit "get_header_chunk" =
protect
~f:(fun () ->
let = Lazy.force header_chunk in
Iobuf.Fill.stringo header_chunk "some data";
let contents =
get_header_chunk ()
|> Iobuf.to_string
in
[%test_eq: string] contents "some data"
)
~finally:Unsafe_internals.reset
let%test_unit "get_chunks" =
protect
~f:(fun () ->
allocate_new_chunk 1000;
Iobuf.Fill.stringo current_chunk "the first chunk";
allocate_new_chunk 1000;
Iobuf.Fill.stringo current_chunk "the second chunk";
let contents =
get_chunks ()
|> List.map ~f:Iobuf.to_string
in
[%test_eq: string list] contents ["the first chunk"; "the second chunk"]
)
~finally:Unsafe_internals.reset
end
module Writer = struct
let epoch =
Time_ns.now ()
|> Fn.flip Time_ns.sub (Time_ns.Span.of_min 1.)
|> Profiler_epoch.of_time
let max_time = Profiler_epoch.add epoch Short_header.max_time_diff
let write_epoch () =
let = Lazy.force Buffer.header_chunk in
let written = Header_protocol.Epoch.write ~epoch header_chunk in
Iobuf.advance header_chunk written
let () =
let = Lazy.force Buffer.header_chunk in
let written = Header_protocol.End_of_header.write header_chunk in
Iobuf.advance header_chunk written
let write_new_single id name spec =
let = Lazy.force Buffer.header_chunk in
let written =
Header_protocol.New_single.write
~id ~spec ~name header_chunk
in
Iobuf.advance header_chunk written
let write_new_group id name spec =
let = Lazy.force Buffer.header_chunk in
let written =
Header_protocol.New_group.write
~id ~spec ~name header_chunk
in
Iobuf.advance header_chunk written
let write_new_group_point ~group_id ~id name sources =
let = Lazy.force Buffer.header_chunk in
let module NPP = Header_protocol.New_group_point in
let sources_count = Array.length sources in
let len = NPP.write ~group_id ~id ~name ~sources_count header_chunk in
Array.iteri sources ~f:(fun index id ->
NPP.write_sources header_chunk ~count:sources_count ~index ~source_id:id
);
Iobuf.advance header_chunk len
let write_timer_at id time =
Buffer.ensure_free 8;
Iobuf.Unsafe.Fill.int64_le
Buffer.current_chunk
(Short_header.pack_unsafe epoch id time)
let write_probe_at id time value =
let current_chunk = Buffer.current_chunk in
Buffer.ensure_free 16;
Iobuf.Unsafe.Poke.int64_le
current_chunk ~pos:0
(Short_header.pack_unsafe epoch id time);
Iobuf.Unsafe.Poke.int64_le
current_chunk ~pos:8
value;
Iobuf.unsafe_advance current_chunk 16
let write_group_reset = write_timer_at
let%test_module "write header messages" = (module struct
let unpack_one () =
let chunk = Lazy.force Buffer.header_chunk in
Iobuf.flip_lo chunk;
match Header_protocol.to_unpacked chunk with
| Ok (unpacked, length) ->
[%test_eq: int] (Iobuf.length chunk) length;
unpacked
| _ ->
failwith "to_unpacked failed"
let%test_unit "write_new_single" =
protect
~finally:Buffer.Unsafe_internals.reset
~f:(fun () ->
write_new_single
(Probe_id.of_int_exn 100)
"unittest"
(Probe_type.Timer);
match unpack_one () with
| New_single { id; spec; name; message_length=_; message_type=_ } ->
[%test_eq: Probe_id.t] id (Probe_id.of_int_exn 100);
[%test_eq: Probe_type.t] spec Probe_type.Timer;
[%test_eq: string] name "unittest"
| _ -> failwith "Incorrect message type"
)
let%test_unit "write_new_group" =
protect
~finally:Buffer.Unsafe_internals.reset
~f:(fun () ->
write_new_group
(Probe_id.of_int_exn 100)
"unittest"
(Probe_type.Probe Profiler_units.Seconds);
match (unpack_one ()) with
| New_group { id; spec; name; message_length=_; message_type=_ } ->
[%test_eq: Probe_id.t] id (Probe_id.of_int_exn 100);
[%test_eq: Probe_type.t] spec (Probe_type.Probe Profiler_units.Seconds);
[%test_eq: string] name "unittest"
| _ -> failwith "Incorrect message type"
)
let%test_unit "write_new_group_point" =
protect
~finally:Buffer.Unsafe_internals.reset
~f:(fun () ->
write_new_group_point
~group_id:(Probe_id.of_int_exn 100)
~id:(Probe_id.of_int_exn 300)
"unittest"
(Array.map ~f:Probe_id.of_int_exn [|500; 700|]);
match unpack_one () with
| New_group_point
{ group_id; id; name; sources_grp; message_length=_; message_type=_ } ->
[%test_eq: int] (Probe_id.to_int_exn group_id) 100;
[%test_eq: int] (Probe_id.to_int_exn id) 300;
[%test_eq: string] name "unittest";
[%test_eq: int array]
(Array.map sources_grp ~f:(fun r ->
let r : Header_protocol.New_group_point.Unpacked.t_sources = r in
Probe_id.to_int_exn r.source_id))
[|500; 700|];
| _ ->
assert false
)
end)
let write_to_fd fd chunks =
List.iter
(header_chunk :: chunks)
~f:(fun chunk ->
Iobuf.protect_window_and_bounds chunk ~f:(fun chunk ->
Bigstring_unix.really_write fd (Iobuf.Peek.bigstringo ~pos:0 chunk)
)
)
let%test_unit "write_to_fd" =
let (filename, fd) = Unix.mkstemp "/tmp/core-profiler-tests" in
protect
~f:(fun () ->
let = Iobuf.of_string "the header chunk\n" in
let chunks =
[ Iobuf.of_string "the first chunk\n"
; Iobuf.of_string "the second chunk\n"
]
in
write_to_fd fd header_chunk chunks;
Unix.close fd;
[%test_eq: string]
(In_channel.read_all filename)
"the header chunk\nthe first chunk\nthe second chunk\n";
)
~finally:(fun () ->
begin
try Unix.close fd
with _ -> ()
end;
Unix.unlink filename;
)
let write_to_file name_ref chunks =
let name = !name_ref in
begin
match Sys_unix.file_exists name with
| `Yes -> Unix.rename ~src:name ~dst:(name ^ ".old")
| `No | `Unknown -> ()
end;
Unix.with_file
name
~mode:[ Unix.O_CREAT; Unix.O_WRONLY; Unix.O_TRUNC ]
~f:(fun fd ->
write_to_fd fd header_chunk chunks
)
let at_exit_handler = ref (Some (write_to_file current_output_filename))
let set_at_exit_handler = function
| `Write_file name -> at_exit_handler := Some (write_to_file (ref name))
| `Function f -> at_exit_handler := Some f
| `Disable -> at_exit_handler := None
let dump_stats_internal handler =
write_epoch ();
write_end_of_header ();
let chunks = Buffer.get_chunks () in
if not (List.is_empty chunks)
then handler (Buffer.get_header_chunk ())
(chunks :> (read, Iobuf.no_seek) Iobuf.t list)
let dump_stats () =
Option.iter !at_exit_handler ~f:(fun handler ->
let = Lazy.force Buffer.header_chunk in
let lo_bound = Iobuf.Lo_bound.window header_chunk in
let hi_bound = Iobuf.Hi_bound.window header_chunk in
dump_stats_internal handler;
Iobuf.Lo_bound.restore lo_bound header_chunk;
Iobuf.Hi_bound.restore hi_bound header_chunk)
let () = at_exit (fun () ->
Option.iter !at_exit_handler ~f:dump_stats_internal
)
module Unsafe_internals = struct
let write_epoch = write_epoch
let = write_end_of_header
end
end