Source file request_sender.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
open Core
open Cohttp_lwt_unix
open Lwt
open Base

let handle_response ~logger ~converter ~node ~resp ~body =
  let status_code = resp |> Response.status |> Cohttp.Code.code_of_status in
  if status_code / 100 = 2
  then (
    Logger.debug logger
      (Printf.sprintf "Received response from node: %d, body: %s" node.id body);
    let json = Yojson.Safe.from_string body in
    match converter json with
    | Ok param -> Some param
    | Error err ->
        Logger.error logger
          (Printf.sprintf
             "Received an error response from node %d. err: %s, body: %s"
             node.id err body);
        None
  )
  else (
    Logger.error logger
    @@ Printf.sprintf "Received an error status code from node %d : %d" node.id
         status_code;
    None
  )


let post ~node_id ~logger ~url_path ~request_json ~timeout_millis
    ~(converter : Yojson.Safe.t -> (Params.response, string) Result.t) node =
  let request_param =
    request_json |> Yojson.Safe.to_string |> Cohttp_lwt.Body.of_string
  in
  let headers =
    Cohttp.Header.init_with "X-Raft-Node-Id" (string_of_int node_id)
  in
  let timeout : Params.response option Lwt.t =
    Lwt_unix.sleep (float_of_int timeout_millis /. 1000.0) >>= fun () ->
    Logger.warn logger
      (Printf.sprintf "Request timeout. node_id: %d, url_path: %s" node.id url_path);
    Lwt.return None
  in
  let send_req node =
    Client.post ~body:request_param ~headers
      (Uri.of_string
         (Printf.sprintf "http://%s:%d/%s" node.host node.port url_path))
    >>= fun (resp, body) ->
    body |> Cohttp_lwt.Body.to_string >|= fun body ->
    try handle_response ~logger ~converter ~node ~resp ~body
    with e ->
      let msg = Stdlib.Printexc.to_string e in
      Logger.error logger
        (Printf.sprintf "Failed to handle response body. node_id: %d, error: %s"
           node.id msg);
      None
  in
  Lwt.catch
    (fun () -> Lwt.pick [ send_req node; timeout ])
    (fun e ->
      let msg = Stdlib.Printexc.to_string e in
      Logger.error logger
        (Printf.sprintf "Failed to send a request. node_id: %d, error: %s"
           node.id msg);
      Lwt.return None)