Source file collector_tef.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
open Common_
open Types
open Trace_core

module Buf_pool = struct
  type t = Buffer.t Trace_util.Rpool.t

  let create ?(max_size = 32) ?(buf_size = 256) () : t =
    Trace_util.Rpool.create ~max_size ~clear:Buffer.reset
      ~create:(fun () -> Buffer.create buf_size)
      ()
end

open struct
  let[@inline] time_us_of_time_ns (t : int64) : float =
    Int64.div t 1_000L |> Int64.to_float
end

type t = {
  active: bool A.t;
  pid: int;
  buf_pool: Buf_pool.t;
  exporter: Exporter.t;
  trace_id_gen: Trace_util.Trace_id64.Gen.t;
}
(** Subscriber state *)

let close (self : t) : unit =
  if A.exchange self.active false then
    (* FIXME: print_non_closed_spans_warning self.spans; *)
    self.exporter.close ()

let[@inline] active self = A.get self.active
let[@inline] flush (self : t) : unit = self.exporter.flush ()

let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
  {
    active = A.make true;
    exporter;
    buf_pool;
    pid;
    trace_id_gen = Trace_util.Trace_id64.Gen.create ();
  }

open struct
  type st = t

  let rec flavor_of_params = function
    | [] -> `Sync
    | Core_ext.Extension_span_flavor f :: _ -> f
    | _ :: tl -> flavor_of_params tl

  let new_trace_id (self : st) = Trace_util.Trace_id64.Gen.gen self.trace_id_gen
  let init _ = ()
  let shutdown (self : st) = close self

  (* add function name, if provided, to the metadata *)
  let add_fun_name_ fun_name data : _ list =
    match fun_name with
    | None -> data
    | Some f -> ("function", `String f) :: data

  let enter_span (self : st) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~level:_ ~params
      ~data ~parent name : span =
    let start_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
    let flavor = flavor_of_params params in

    let pid = self.pid in
    let tid = Trace_util.Mock_.get_tid () in
    match flavor with
    | `Sync -> Span_tef_sync { name; pid; tid; args = data; start_us }
    | `Async ->
      let trace_id =
        match parent with
        | P_some (Span_tef_async sp) -> sp.trace_id
        | _ -> new_trace_id self
      in
      let data = add_fun_name_ __FUNCTION__ data in

      (let@ buf = Trace_util.Rpool.with_ self.buf_pool in
       Writer.emit_begin_async buf ~name ~pid ~tid ~trace_id ~ts:start_us
         ~args:data;
       self.exporter.on_json buf);

      Span_tef_async { pid; tid; trace_id; name; args = data }

  let exit_span (self : st) sp =
    let end_time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in

    let@ buf = Trace_util.Rpool.with_ self.buf_pool in
    let did_write =
      match sp with
      | Span_tef_sync { name; pid; tid; args; start_us } ->
        (* emit full event *)
        Writer.emit_duration_event buf ~pid ~tid ~name ~start:start_us
          ~end_:end_time_us ~args;
        true
      | Span_tef_async { name; trace_id; pid; tid; args } ->
        Writer.emit_end_async buf ~pid ~tid ~name ~trace_id ~ts:end_time_us
          ~args;
        true
      | _ -> false
    in

    if did_write then self.exporter.on_json buf

  let message (self : st) ~level:_ ~params:_ ~data ~span:_ msg : unit =
    let tid = Trace_util.Mock_.get_tid () in
    let time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
    let@ buf = Trace_util.Rpool.with_ self.buf_pool in
    Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us
      ~args:data;
    self.exporter.on_json buf

  let counter_float (self : st) ~params:_ ~data:_ name n : unit =
    let tid = Trace_util.Mock_.get_tid () in
    let time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
    let@ buf = Trace_util.Rpool.with_ self.buf_pool in
    Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n;
    self.exporter.on_json buf

  let metric (self : st) ~level:_ ~params ~data name m : unit =
    match m with
    | Core_ext.Metric_float n -> counter_float self ~params ~data name n
    | Core_ext.Metric_int i ->
      counter_float self ~params ~data name (float_of_int i)
    | _ -> ()

  let add_data_to_span _st sp data =
    match sp with
    | Span_tef_sync sp -> sp.args <- List.rev_append data sp.args
    | Span_tef_async sp -> sp.args <- List.rev_append data sp.args
    | _ -> ()

  let on_name_thread_ (self : st) ~tid name : unit =
    let@ buf = Trace_util.Rpool.with_ self.buf_pool in
    Writer.emit_name_thread buf ~pid:self.pid ~tid ~name;
    self.exporter.on_json buf

  let on_name_process_ (self : st) name : unit =
    let@ buf = Trace_util.Rpool.with_ self.buf_pool in
    Writer.emit_name_process ~pid:self.pid ~name buf;
    self.exporter.on_json buf

  let extension (self : st) ~level:_ ev =
    match ev with
    | Core_ext.Extension_set_thread_name name ->
      let tid = Trace_util.Mock_.get_tid () in
      on_name_thread_ self ~tid name
    | Core_ext.Extension_set_process_name name -> on_name_process_ self name
    | _ -> ()
end

let callbacks_collector : _ Collector.Callbacks.t =
  Collector.Callbacks.make ~init ~shutdown ~enter_span ~exit_span ~message
    ~add_data_to_span ~metric ~extension ()

let collector (self : t) : Collector.t =
  Collector.C_some (self, callbacks_collector)