Source file oraft.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
open Base
open State
open Printf

type leader_node = { host : string; port : int }
type current_state = { mode : mode; term : int; leader : leader_node option }

type t = {
  conf : Conf.t;
  post_command : string -> bool Lwt.t;
  current_state : unit -> current_state;
}

let state ~(conf : Conf.t) ~logger =
  match PersistentLog.load ~state_dir:conf.state_dir ~logger with
  | Ok persistent_log ->
      Ok
        {
          persistent_state = PersistentState.load ~state_dir:conf.state_dir;
          persistent_log;
          volatile_state = VolatileState.create ();
        }
  | Error msg -> Error msg


let process ~conf ~logger ~apply_log ~state : unit Lwt.t =
  let exec next_mode =
    VolatileState.update_mode state.volatile_state ~logger next_mode;
    match next_mode with
    | FOLLOWER -> Follower.run ~conf ~apply_log ~state
    | CANDIDATE -> Candidate.run ~conf ~apply_log ~state
    | LEADER -> Leader.run ~conf ~apply_log ~state
  in
  let rec loop next_mode =
    let%lwt next_mode =
      match exec next_mode with
      | Ok next ->
          let%lwt next_mode = next in
          Lwt.return next_mode
      | Error msg ->
          let msg =
            sprintf "Failed to process. Starting from %s again. error:[%s]"
              (Base.show_mode Base.FOLLOWER)
              msg
          in
          Logger.error logger ~loc:__LOC__ msg;
          Lwt.return FOLLOWER
    in
    loop next_mode
  in
  loop FOLLOWER


let post_command ~(conf : Conf.t) ~logger ~state s =
  let request_json =
    let r : Params.client_command_request = { data = s } in
    Params.client_command_request_to_yojson r
  in
  let request =
    Request_sender.post ~logger ~url_path:"client_command"
      ~my_node_id:conf.node_id
      ~request_json
        (* Afford to allow a connection timeout to unavailable server *)
      ~timeout_millis:(conf.request_timeout_millis * 2)
      ~converter:(fun response_json ->
        match Params.client_command_response_of_yojson response_json with
        | Ok param -> Ok (Params.CLIENT_COMMAND_RESPONSE param)
        | Error _ as err -> err
    )
  in
  match VolatileState.leader_id state.volatile_state with
  | Some node_id ->
      let current_leader_node = Conf.peer_node conf ~node_id in
      let%lwt result = request current_leader_node in
      Logger.debug logger ~loc:__LOC__
        (Printf.sprintf "Sending command to node(%d) : %s" node_id s);
      Lwt.return
        ( match result with
        | Some (Params.CLIENT_COMMAND_RESPONSE x) -> x.success
        | Some _ ->
            Logger.error logger ~loc:__LOC__ "Shouldn't reach here";
            false
        | None -> false
        )
  | None -> Lwt.return false


let start ~conf_file ~apply_log =
  let conf = Conf.from_file conf_file in
  let logger =
    Logger.create ~node_id:conf.node_id ~output_path:conf.log_file
      ~level:conf.log_level ()
  in
  match state ~conf ~logger with
  | Ok state ->
      Logger.info logger ~loc:__LOC__ "Starting Oraft";
      ignore
        (let%lwt _ = process ~conf ~logger ~apply_log ~state in
         Lwt.return ()
        );
      let post_command = post_command ~conf ~logger ~state in
      Ok
        {
          conf;
          post_command;
          current_state =
            (fun () ->
              let mode = VolatileState.mode state.volatile_state in
              let term = PersistentState.current_term state.persistent_state in
              let leader =
                match VolatileState.leader_id state.volatile_state with
                | Some x ->
                    let leader = Conf.peer_node conf ~node_id:x in
                    Some { host = leader.host; port = leader.app_port }
                | None -> None
              in
              { mode; term; leader }
            );
        }
  | Error msg ->
      let msg = sprintf "Failed to prepare state. error:[%s]" msg in
      Logger.error ~loc:__LOC__ logger msg;
      Error msg