Source file request_sender.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
open Core
open Cohttp_lwt_unix
open Lwt
open Base
let handle_response ~logger ~converter ~node ~resp ~body =
let status_code = resp |> Response.status |> Cohttp.Code.code_of_status in
if status_code / 100 = 2
then (
Logger.debug logger
(Printf.sprintf "Received response from node: %d, body: %s" node.id body);
let json = Yojson.Safe.from_string body in
match converter json with
| Ok param -> Some param
| Error err ->
Logger.error logger
(Printf.sprintf
"Received an error response from node %d. err: %s, body: %s"
node.id err body);
None
)
else (
Logger.error logger
@@ Printf.sprintf "Received an error status code from node %d : %d" node.id
status_code;
None
)
let post ~node_id ~logger ~url_path ~request_json ~timeout_millis
~(converter : Yojson.Safe.t -> (Params.response, string) Result.t) node =
let request_param =
request_json |> Yojson.Safe.to_string |> Cohttp_lwt.Body.of_string
in
let =
Cohttp.Header.init_with "X-Raft-Node-Id" (string_of_int node_id)
in
let timeout : Params.response option Lwt.t =
Lwt_unix.sleep (float_of_int timeout_millis /. 1000.0) >>= fun () ->
Logger.warn logger
(Printf.sprintf "Request timeout. node_id: %d, url_path: %s" node.id url_path);
Lwt.return None
in
let send_req node =
Client.post ~body:request_param ~headers
(Uri.of_string
(Printf.sprintf "http://%s:%d/%s" node.host node.port url_path))
>>= fun (resp, body) ->
body |> Cohttp_lwt.Body.to_string >|= fun body ->
try handle_response ~logger ~converter ~node ~resp ~body
with e ->
let msg = Stdlib.Printexc.to_string e in
Logger.error logger
(Printf.sprintf "Failed to handle response body. node_id: %d, error: %s"
node.id msg);
None
in
Lwt.catch
(fun () -> Lwt.pick [ send_req node; timeout ])
(fun e ->
let msg = Stdlib.Printexc.to_string e in
Logger.error logger
(Printf.sprintf "Failed to send a request. node_id: %d, error: %s"
node.id msg);
Lwt.return None)