123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177(*
* Copyright (c) 2018 Hannes Mehnert <hannes@mehnert.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)(*************)(* influxdb line protocol reporter *)(* from https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/ *)(* example line: weather,location=us-midwest temperature=82 1465839830100400200 *)(*************)moduleS=Set.Make(String)letavoid_keyword=letkeywords=S.of_list["ALL";"ALTER";"ANY";"AS";"ASC";"BEGIN";"BY";"CREATE";"CONTINUOUS";"DATABASE";"DATABASES";"DEFAULT";"DELETE";"DESC";"DESTINATIONS";"DIAGNOSTICS";"DISTINCT";"DROP";"DURATION";"END";"EVERY";"EXPLAIN";"FIELD";"FOR";"FROM";"GRANT";"GRANTS";"GROUP";"GROUPS";"IN";"INF";"INSERT";"INTO";"KEY";"KEYS";"KILL";"LIMIT";"SHOW";"MEASUREMENT";"MEASUREMENTS";"NAME";"OFFSET";"ON";"ORDER";"PASSWORD";"POLICY";"POLICIES";"PRIVILEGES";"QUERIES";"QUERY";"READ";"REPLICATION";"RESAMPLE";"RETENTION";"REVOKE";"SELECT";"SERIES";"SET";"SHARD";"SHARDS";"SLIMIT";"SOFFSET";"STATS";"SUBSCRIPTION";"SUBSCRIPTIONS";"TAG";"TO";"USER";"USERS";"VALUES";"WHERE";"WITH";"WRITE";]infunm->ifS.mem(String.uppercase_asciim)keywordsthen"o"^melsemletescape=List.fold_right(funem'->String.concat("\\"^Char.escapede)(String.split_on_charem'))letescape_measurementm=escape[',';' '](avoid_keywordm)letescape_namem=escape[',';' ';'='](avoid_keywordm)letpp_value(str:stringFmt.t)ppff=letopenMetricsinmatchvaluefwith|V(String,s)->strppfs|V(Int,i)->Fmt.pfppf"%di"i|V(Int32,i32)->Fmt.pfppf"%ldi"i32|V(Int64,i64)->Fmt.pfppf"%Ldi"i64|V(Uint,u)->Fmt.pfppf"%ui"u|V(Uint32,u32)->Fmt.pfppf"%lui"u32|V(Uint64,u64)->Fmt.pfppf"%Lui"u64|_->pp_valueppff(* we need to:
- avoid keywords
- escape comma and space in measurement name
- escape comma, space and equal in tag key, tag value, field key of type string
- double-quote field value of type string
- data type number is a float, suffix i for integers *)letencode_line_protocoltagsdataname=letdata_fields=Metrics.Data.fieldsdatainletpp_field_strppfs=Fmt.pfppf"%S"sinletpp_fieldppff=Fmt.(pair~sep:(any"=")string(pp_valuepp_field_str))ppf(escape_name(Metrics.keyf),f)inletpp_fields=Fmt.(list~sep:(any",")pp_field)inletpp_tag_strppfs=Fmt.stringppf(escape_names)inletpp_tagppff=Fmt.(pair~sep:(any"=")string(pp_valuepp_tag_str))ppf(escape_name(Metrics.keyf),f)inletpp_tags=Fmt.(list~sep:(any",")pp_tag)inFmt.str"%s,%a %a\n"(escape_measurementname)pp_tagstagspp_fieldsdata_fieldsmoduleSM=Map.Make(Metrics.Src)letlwt_reporter?tags:(more_tags=[])?intervalsendnow=letm=refSM.emptyinleti=matchintervalwithNone->0L|Somes->Duration.of_mssinletreport~tags~data~oversrck=letsend()=m:=SM.addsrc(now())!m;letstr=encode_line_protocol(more_tags@tags)data(Metrics.Src.namesrc)inletunblock()=over();Lwt.return_unitinLwt.finalize(fun()->sendstr)unblock|>Lwt.ignore_result;k()inmatchSM.find_optsrc!mwith|None->send()|Somelast->ifnow()>Int64.addlastithensend()else(over();k())in{Metrics.report;now;at_exit=(fun()->())}