Source file collector_tef.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
open Common_
open Types
open Trace_core
module Buf_pool = struct
type t = Buffer.t Trace_util.Rpool.t
let create ?(max_size = 32) ?(buf_size = 256) () : t =
Trace_util.Rpool.create ~max_size ~clear:Buffer.reset
~create:(fun () -> Buffer.create buf_size)
()
end
open struct
let[@inline] time_us_of_time_ns (t : int64) : float =
Int64.div t 1_000L |> Int64.to_float
end
type t = {
active: bool A.t;
pid: int;
buf_pool: Buf_pool.t;
exporter: Exporter.t;
trace_id_gen: Trace_util.Trace_id64.Gen.t;
}
(** Subscriber state *)
let close (self : t) : unit =
if A.exchange self.active false then
self.exporter.close ()
let[@inline] active self = A.get self.active
let[@inline] flush (self : t) : unit = self.exporter.flush ()
let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
{
active = A.make true;
exporter;
buf_pool;
pid;
trace_id_gen = Trace_util.Trace_id64.Gen.create ();
}
open struct
type st = t
let rec flavor_of_params = function
| [] -> `Sync
| Core_ext.Extension_span_flavor f :: _ -> f
| _ :: tl -> flavor_of_params tl
let new_trace_id (self : st) = Trace_util.Trace_id64.Gen.gen self.trace_id_gen
let init _ = ()
let shutdown (self : st) = close self
let add_fun_name_ fun_name data : _ list =
match fun_name with
| None -> data
| Some f -> ("function", `String f) :: data
let enter_span (self : st) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~level:_ ~params
~data ~parent name : span =
let start_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
let flavor = flavor_of_params params in
let pid = self.pid in
let tid = Trace_util.Mock_.get_tid () in
match flavor with
| `Sync -> Span_tef_sync { name; pid; tid; args = data; start_us }
| `Async ->
let trace_id =
match parent with
| P_some (Span_tef_async sp) -> sp.trace_id
| _ -> new_trace_id self
in
let data = add_fun_name_ __FUNCTION__ data in
(let@ buf = Trace_util.Rpool.with_ self.buf_pool in
Writer.emit_begin_async buf ~name ~pid ~tid ~trace_id ~ts:start_us
~args:data;
self.exporter.on_json buf);
Span_tef_async { pid; tid; trace_id; name; args = data }
let exit_span (self : st) sp =
let end_time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
let did_write =
match sp with
| Span_tef_sync { name; pid; tid; args; start_us } ->
Writer.emit_duration_event buf ~pid ~tid ~name ~start:start_us
~end_:end_time_us ~args;
true
| Span_tef_async { name; trace_id; pid; tid; args } ->
Writer.emit_end_async buf ~pid ~tid ~name ~trace_id ~ts:end_time_us
~args;
true
| _ -> false
in
if did_write then self.exporter.on_json buf
let message (self : st) ~level:_ ~params:_ ~data ~span:_ msg : unit =
let tid = Trace_util.Mock_.get_tid () in
let time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
Writer.emit_instant_event buf ~pid:self.pid ~tid ~name:msg ~ts:time_us
~args:data;
self.exporter.on_json buf
let counter_float (self : st) ~params:_ ~data:_ name n : unit =
let tid = Trace_util.Mock_.get_tid () in
let time_us = time_us_of_time_ns @@ Trace_util.Mock_.now_ns () in
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
Writer.emit_counter buf ~pid:self.pid ~name ~tid ~ts:time_us n;
self.exporter.on_json buf
let metric (self : st) ~level:_ ~params ~data name m : unit =
match m with
| Core_ext.Metric_float n -> counter_float self ~params ~data name n
| Core_ext.Metric_int i ->
counter_float self ~params ~data name (float_of_int i)
| _ -> ()
let add_data_to_span _st sp data =
match sp with
| Span_tef_sync sp -> sp.args <- List.rev_append data sp.args
| Span_tef_async sp -> sp.args <- List.rev_append data sp.args
| _ -> ()
let on_name_thread_ (self : st) ~tid name : unit =
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
Writer.emit_name_thread buf ~pid:self.pid ~tid ~name;
self.exporter.on_json buf
let on_name_process_ (self : st) name : unit =
let@ buf = Trace_util.Rpool.with_ self.buf_pool in
Writer.emit_name_process ~pid:self.pid ~name buf;
self.exporter.on_json buf
let extension (self : st) ~level:_ ev =
match ev with
| Core_ext.Extension_set_thread_name name ->
let tid = Trace_util.Mock_.get_tid () in
on_name_thread_ self ~tid name
| Core_ext.Extension_set_process_name name -> on_name_process_ self name
| _ -> ()
end
let callbacks_collector : _ Collector.Callbacks.t =
Collector.Callbacks.make ~init ~shutdown ~enter_span ~exit_span ~message
~add_data_to_span ~metric ~extension ()
let collector (self : t) : Collector.t =
Collector.C_some (self, callbacks_collector)