Source file subscriber.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
open Common_
open Trace_core
module Span_tbl = Trace_subscriber.Span_tbl
let on_tracing_error = on_tracing_error
type span_info = {
tid: int;
name: string;
start_ns: int64;
mutable data: (string * Sub.user_data) list;
}
(** Information we store about a span begin event, to emit a complete event when
we meet the corresponding span end event *)
type t = {
active: bool A.t;
pid: int;
spans: span_info Span_tbl.t;
buf_chain: Buf_chain.t;
exporter: Exporter.t;
}
(** Subscriber state *)
open struct
(** Write the buffers that are ready *)
let[@inline] write_ready_ (self : t) =
if Buf_chain.has_ready self.buf_chain then
Buf_chain.pop_ready self.buf_chain ~f:self.exporter.write_bufs
let print_non_closed_spans_warning spans =
let module Str_set = Set.Make (String) in
let spans = Span_tbl.to_list spans in
if spans <> [] then (
!on_tracing_error
@@ Printf.sprintf "warning: %d spans were not closed" (List.length spans);
let names =
List.fold_left
(fun set (_, span) -> Str_set.add span.name set)
Str_set.empty spans
in
Str_set.iter
(fun name ->
!on_tracing_error @@ Printf.sprintf " span %S was not closed" name)
names;
flush stderr
)
end
let close (self : t) : unit =
if A.exchange self.active false then (
Buf_chain.ready_all_non_empty self.buf_chain;
write_ready_ self;
self.exporter.close ();
print_non_closed_spans_warning self.spans
)
let[@inline] active self = A.get self.active
let flush (self : t) : unit =
Buf_chain.ready_all_non_empty self.buf_chain;
write_ready_ self;
self.exporter.flush ()
let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
let buf_chain = Buf_chain.create ~sharded:true ~buf_pool () in
{ active = A.make true; buf_chain; exporter; pid; spans = Span_tbl.create () }
module Callbacks = struct
type st = t
let on_init (self : st) ~time_ns:_ =
Writer.Metadata.Magic_record.encode self.buf_chain;
Writer.Metadata.Initialization_record.(
encode self.buf_chain ~ticks_per_secs:default_ticks_per_sec ());
Writer.Metadata.Provider_info.encode self.buf_chain ~id:0
~name:"ocaml-trace" ();
Buf_chain.ready_all_non_empty self.buf_chain;
write_ready_ self
let on_shutdown (self : st) ~time_ns:_ = close self
let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
Writer.Kernel_object.(
encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ());
write_ready_ self
let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
Writer.Kernel_object.(
encode self.buf_chain ~name ~ty:ty_thread ~kid:tid
~args:[ "process", A_kid (Int64.of_int self.pid) ]
());
write_ready_ self
let add_fun_name_ fun_name data : _ list =
match fun_name with
| None -> data
| Some f -> ("function", Sub.U_string f) :: data
let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
let data = add_fun_name_ fun_name data in
let info = { tid; name; start_ns = time_ns; data } in
Span_tbl.add self.spans span info
let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
match Span_tbl.find_exn self.spans span with
| exception Not_found ->
!on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
| { tid; name; start_ns; data } ->
Span_tbl.remove self.spans span;
Writer.(
Event.Duration_complete.encode self.buf_chain ~name
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~time_ns:start_ns ~end_time_ns:time_ns ~args:(args_of_user_data data)
());
write_ready_ self
let on_add_data (self : st) ~data span =
if data <> [] then (
try
let info = Span_tbl.find_exn self.spans span in
info.data <- List.rev_append data info.data
with Not_found ->
!on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
)
let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
Writer.(
Event.Instant.encode self.buf_chain
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~name:msg ~time_ns ~args:(args_of_user_data data) ());
write_ready_ self
let on_counter (self : st) ~time_ns ~tid ~data ~name n : unit =
Writer.(
Event.Counter.encode self.buf_chain
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~name ~time_ns
~args:((name, A_float n) :: args_of_user_data data)
());
write_ready_ self
let on_enter_manual_span (self : st) ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_
~time_ns ~tid ~parent:_ ~data ~name ~flavor:_ ~trace_id _span : unit =
Writer.(
Event.Async_begin.encode self.buf_chain ~name
~args:(args_of_user_data data)
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~time_ns ~async_id:trace_id ());
write_ready_ self
let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor:_
~trace_id (_ : span) : unit =
Writer.(
Event.Async_end.encode self.buf_chain ~name ~args:(args_of_user_data data)
~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
~time_ns ~async_id:trace_id ());
write_ready_ self
let on_extension_event _ ~time_ns:_ ~tid:_ _ev = ()
end
let subscriber (self : t) : Sub.t =
Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) }