Source file subscriber.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
open Common_
open Trace_core
module Span_tbl = Trace_subscriber.Span_tbl

let on_tracing_error = on_tracing_error

type span_info = {
  tid: int;
  name: string;
  start_ns: int64;
  mutable data: (string * Sub.user_data) list;
      (* NOTE: thread safety: this is supposed to only be modified by the thread
      that's running this (synchronous, stack-abiding) span. *)
}
(** Information we store about a span begin event, to emit a complete event when
    we meet the corresponding span end event *)

type t = {
  active: bool A.t;
  pid: int;
  spans: span_info Span_tbl.t;
  buf_chain: Buf_chain.t;
  exporter: Exporter.t;
}
(** Subscriber state *)

open struct
  (** Write the buffers that are ready *)
  let[@inline] write_ready_ (self : t) =
    if Buf_chain.has_ready self.buf_chain then
      Buf_chain.pop_ready self.buf_chain ~f:self.exporter.write_bufs

  let print_non_closed_spans_warning spans =
    let module Str_set = Set.Make (String) in
    let spans = Span_tbl.to_list spans in
    if spans <> [] then (
      !on_tracing_error
      @@ Printf.sprintf "warning: %d spans were not closed" (List.length spans);
      let names =
        List.fold_left
          (fun set (_, span) -> Str_set.add span.name set)
          Str_set.empty spans
      in
      Str_set.iter
        (fun name ->
          !on_tracing_error @@ Printf.sprintf "  span %S was not closed" name)
        names;
      flush stderr
    )
end

let close (self : t) : unit =
  if A.exchange self.active false then (
    Buf_chain.ready_all_non_empty self.buf_chain;
    write_ready_ self;
    self.exporter.close ();

    print_non_closed_spans_warning self.spans
  )

let[@inline] active self = A.get self.active

let flush (self : t) : unit =
  Buf_chain.ready_all_non_empty self.buf_chain;
  write_ready_ self;
  self.exporter.flush ()

let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
  let buf_chain = Buf_chain.create ~sharded:true ~buf_pool () in
  { active = A.make true; buf_chain; exporter; pid; spans = Span_tbl.create () }

module Callbacks = struct
  type st = t

  let on_init (self : st) ~time_ns:_ =
    Writer.Metadata.Magic_record.encode self.buf_chain;
    Writer.Metadata.Initialization_record.(
      encode self.buf_chain ~ticks_per_secs:default_ticks_per_sec ());
    Writer.Metadata.Provider_info.encode self.buf_chain ~id:0
      ~name:"ocaml-trace" ();
    (* make sure we write these immediately so they're not out of order *)
    Buf_chain.ready_all_non_empty self.buf_chain;

    write_ready_ self

  let on_shutdown (self : st) ~time_ns:_ = close self

  let on_name_process (self : st) ~time_ns:_ ~tid:_ ~name : unit =
    Writer.Kernel_object.(
      encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ());
    write_ready_ self

  let on_name_thread (self : st) ~time_ns:_ ~tid ~name : unit =
    Writer.Kernel_object.(
      encode self.buf_chain ~name ~ty:ty_thread ~kid:tid
        ~args:[ "process", A_kid (Int64.of_int self.pid) ]
        ());
    write_ready_ self

  (* add function name, if provided, to the metadata *)
  let add_fun_name_ fun_name data : _ list =
    match fun_name with
    | None -> data
    | Some f -> ("function", Sub.U_string f) :: data

  let[@inline] on_enter_span (self : st) ~__FUNCTION__:fun_name ~__FILE__:_
      ~__LINE__:_ ~time_ns ~tid ~data ~name span : unit =
    let data = add_fun_name_ fun_name data in
    let info = { tid; name; start_ns = time_ns; data } in
    (* save the span so we find it at exit *)
    Span_tbl.add self.spans span info

  let on_exit_span (self : st) ~time_ns ~tid:_ span : unit =
    match Span_tbl.find_exn self.spans span with
    | exception Not_found ->
      !on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
    | { tid; name; start_ns; data } ->
      Span_tbl.remove self.spans span;
      Writer.(
        Event.Duration_complete.encode self.buf_chain ~name
          ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
          ~time_ns:start_ns ~end_time_ns:time_ns ~args:(args_of_user_data data)
          ());
      write_ready_ self

  let on_add_data (self : st) ~data span =
    if data <> [] then (
      try
        let info = Span_tbl.find_exn self.spans span in
        info.data <- List.rev_append data info.data
      with Not_found ->
        !on_tracing_error (Printf.sprintf "cannot find span %Ld" span)
    )

  let on_message (self : st) ~time_ns ~tid ~span:_ ~data msg : unit =
    Writer.(
      Event.Instant.encode self.buf_chain
        ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
        ~name:msg ~time_ns ~args:(args_of_user_data data) ());
    write_ready_ self

  let on_counter (self : st) ~time_ns ~tid ~data ~name n : unit =
    Writer.(
      Event.Counter.encode self.buf_chain
        ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
        ~name ~time_ns
        ~args:((name, A_float n) :: args_of_user_data data)
        ());
    write_ready_ self

  let on_enter_manual_span (self : st) ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_
      ~time_ns ~tid ~parent:_ ~data ~name ~flavor:_ ~trace_id _span : unit =
    Writer.(
      Event.Async_begin.encode self.buf_chain ~name
        ~args:(args_of_user_data data)
        ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
        ~time_ns ~async_id:trace_id ());
    write_ready_ self

  let on_exit_manual_span (self : st) ~time_ns ~tid ~name ~data ~flavor:_
      ~trace_id (_ : span) : unit =
    Writer.(
      Event.Async_end.encode self.buf_chain ~name ~args:(args_of_user_data data)
        ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
        ~time_ns ~async_id:trace_id ());
    write_ready_ self

  let on_extension_event _ ~time_ns:_ ~tid:_ _ev = ()
end

let subscriber (self : t) : Sub.t =
  Sub.Subscriber.Sub { st = self; callbacks = (module Callbacks) }