Source file rrd_updates.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
(**
* @group Performance Monitoring
*)
open Rrd
type row = {time: int64; row_data: float array}
type t = {
start_time: int64
; step: int64
; end_time: int64
; legend: string array
; data: row array
}
(** Debugging only *)
let string_of t =
let leg_string =
Printf.sprintf "[%s]"
(String.concat ";"
(List.map (fun l -> Printf.sprintf "\"%s\"" l) (Array.to_list t.legend))
)
in
let data_string =
Printf.sprintf "[|%s|]"
(String.concat ";\n"
(List.map
(fun row ->
Printf.sprintf "{time=%Ld; row_data=[|%s|]}" row.time
(String.concat "; "
(List.map
(fun f -> Printf.sprintf "%0.4f" f)
(Array.to_list row.row_data)
)
)
)
(Array.to_list t.data)
)
)
in
Printf.sprintf
"start_time:\t%Ld\nstep:\t\t%Ld\nend_time:\t%Ld\nlegend:\t\t%s\ndata:\n%s\n"
t.start_time t.step t.end_time leg_string data_string
let create rra_timestep rras first_rra last_cdp_time first_cdp_time start
legends =
let rec do_data i accum =
let time = Int64.(sub last_cdp_time (mul (of_int i) rra_timestep)) in
if time < start || i >= first_rra.rra_row_cnt then
List.rev accum
else
let rra =
List.map (fun ring -> Fring.peek ring i) (Array.to_list rra.rra_data)
in
let values = List.concat (List.map extract_row rras) in
do_data (i + 1) ({time; row_data= Array.of_list values} :: accum)
in
let data = Array.of_list (do_data 0 []) in
{
start_time= first_cdp_time
; step= rra_timestep
; end_time= last_cdp_time
; legend= legends
; data
}
let xml_of t output =
let tag tag next () =
Xmlm.output output (`El_start (("", tag), [])) ;
List.iter (fun x -> x ()) next ;
Xmlm.output output `El_end
in
let data dat () = Xmlm.output output (`Data dat) in
let xml_of_row row =
let values =
List.map
(fun v -> tag "v" [data (Utils.f_to_s v)])
(Array.to_list row.row_data)
in
tag "row" (tag "t" [data (Printf.sprintf "%Ld" row.time)] :: values)
in
let rows = List.map xml_of_row (Array.to_list t.data) in
let mydata = tag "data" rows in
let meta =
tag "meta"
[
tag "start" [data (Printf.sprintf "%Ld" t.start_time)]
; tag "step" [data (Printf.sprintf "%Ld" t.step)]
; tag "end" [data (Printf.sprintf "%Ld" t.end_time)]
; tag "rows" [data (Printf.sprintf "%d" (List.length rows))]
; tag "columns" [data (Printf.sprintf "%d" (Array.length t.legend))]
; tag "legend"
(List.map (fun x -> tag "entry" [data x]) (Array.to_list t.legend))
]
in
Xmlm.output output (`Dtd None) ;
tag "xport" [meta; mydata] ()
let of_xml input =
let open Utils.Xmlm_utils in
let read_row i =
read_block "row"
(fun i ->
let time = get_el "t" i in
let values = read_all "v" (get_el "v") i [] in
{
time= Int64.of_string time
; row_data= Array.of_list (List.map (fun v -> float_of_string v) values)
}
)
i
in
let read_data i = Array.of_list (read_all "row" read_row i []) in
let read_meta i =
read_block "meta"
(fun i ->
let start_time = get_el "start" i |> Int64.of_string in
let step = get_el "step" i |> Int64.of_string in
let end_time = get_el "end" i |> Int64.of_string in
let rows = get_el "rows" i |> int_of_string in
let columns = get_el "columns" i |> int_of_string in
let legend =
read_block "legend"
(fun i -> read_all "entry" (get_el "entry") i [])
i
|> Array.of_list
in
let data = [||] in
let meta = {start_time; step; end_time; legend; data} in
(meta, rows, columns)
)
i
in
accept (`Dtd None) input ;
read_block "xport"
(fun i ->
let meta, _, _ = read_meta i in
let data = read_block "data" read_data i in
{meta with data}
)
input
let json_of_t t =
let buffer = Buffer.create 4096 in
let do_data row =
Printf.bprintf buffer "{t:%Ld,values:[%s]}" row.time
(String.concat "," (List.map Utils.f_to_s (Array.to_list row.row_data)))
in
Printf.bprintf buffer "{meta: {start:%Ld,step:%Ld,end:%Ld,rows:%d,columns:%d,"
t.start_time t.step t.end_time (Array.length t.data) (Array.length t.legend) ;
Printf.bprintf buffer "legend:[%s]},"
(String.concat ","
(List.map (fun x -> "\"" ^ x ^ "\"") (Array.to_list t.legend))
) ;
Printf.bprintf buffer "data:[" ;
for i = 0 to Array.length t.data - 2 do
do_data t.data.(i) ;
Printf.bprintf buffer "%s" ","
done ;
do_data t.data.(Array.length t.data - 1) ;
Printf.bprintf buffer "]}" ;
Buffer.contents buffer
(** Export data from a bunch of rrds. Specify a prefix per rrd to be
put onto legend. Note that each rrd *must* have the same timestep
and have been updated at the same time, and *must* have
homogeneous rras too. If not, those that dont look like the 1st
one will be silently dropped. The export format is the rrdtool
'xport' format. *)
let create_multi prefixandrrds start interval cfopt =
let first_rrd = snd (List.hd prefixandrrds) in
let pdp_interval = Int64.to_int (Int64.div interval first_rrd.timestep) in
let prefixandrrds =
List.filter
(fun (_prefix, rrd) -> rrd.timestep = first_rrd.timestep)
prefixandrrds
in
let start =
prefixandrrds
|> List.map (fun (_, rrd) ->
if start < 0L then
Int64.(add start (of_float rrd.last_updated))
else
start
)
|> List.fold_left min Int64.max_int
in
let rras =
List.map
(fun (_prefix, rrd) ->
Rrd.find_best_rras rrd pdp_interval cfopt start
)
prefixandrrds
in
let first_rra =
rras |> List.find_opt (fun x -> x <> []) |> function
| Some (x :: _) ->
x
| Some [] | None ->
raise No_RRA_Available
in
let rras =
let only_valid_pdp_and_num_rows rra =
rra.rra_pdp_cnt = first_rra.rra_pdp_cnt
&& rra.rra_row_cnt = first_rra.rra_row_cnt
in
List.map (List.filter only_valid_pdp_and_num_rows) rras
in
let legends =
Array.concat
(List.map2
(fun (prefix, rrd) rras ->
let ds_legends =
Array.map (fun ds -> prefix ^ ds.ds_name) rrd.rrd_dss
in
let ds_legends_with_cf_prefix =
Array.concat
(List.map
(fun rra ->
Array.map
(fun name -> cf_type_to_string rra.rra_cf ^ ":" ^ name)
ds_legends
)
rras
)
in
ds_legends_with_cf_prefix
)
prefixandrrds rras
)
in
let rras = List.flatten rras in
let rra_timestep =
Int64.mul first_rrd.timestep (Int64.of_int first_rra.rra_pdp_cnt)
in
let last_cdp_time, _age = get_times first_rrd.last_updated rra_timestep in
let first_cdp_time_minus_one, _age =
get_times (Int64.to_float start) rra_timestep
in
let first_cdp_time = Int64.add first_cdp_time_minus_one rra_timestep in
create rra_timestep rras first_rra last_cdp_time first_cdp_time start legends
let export ?(json = false) prefixandrrds start interval cfopt =
let t = create_multi prefixandrrds start interval cfopt in
if json then
json_of_t t
else
let buffer = Buffer.create 10 in
let output = Xmlm.make_output (`Buffer buffer) in
xml_of t output ; Buffer.contents buffer
let of_string s =
let input = Xmlm.make_input (`String (0, s)) in
of_xml input