1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
open Base
open State
open Printf
type leader_node = { host : string; port : int }
type current_state = { mode : mode; term : int; leader : leader_node option }
type t = {
conf : Conf.t;
post_command : string -> bool Lwt.t;
current_state : unit -> current_state;
}
let state ~(conf : Conf.t) ~logger =
match PersistentLog.load ~state_dir:conf.state_dir ~logger with
| Ok persistent_log ->
Ok
{
persistent_state = PersistentState.load ~state_dir:conf.state_dir;
persistent_log;
volatile_state = VolatileState.create ();
}
| Error msg -> Error msg
let process ~conf ~logger ~apply_log ~state : unit Lwt.t =
let exec next_mode =
VolatileState.update_mode state.volatile_state ~logger next_mode;
match next_mode with
| FOLLOWER -> Follower.run ~conf ~apply_log ~state
| CANDIDATE -> Candidate.run ~conf ~apply_log ~state
| LEADER -> Leader.run ~conf ~apply_log ~state
in
let rec loop next_mode =
let%lwt next_mode =
match exec next_mode with
| Ok next ->
let%lwt next_mode = next in
Lwt.return next_mode
| Error msg ->
let msg =
sprintf "Failed to process. Starting from %s again. error:[%s]"
(Base.show_mode Base.FOLLOWER)
msg
in
Logger.error logger ~loc:__LOC__ msg;
Lwt.return FOLLOWER
in
loop next_mode
in
loop FOLLOWER
let post_command ~(conf : Conf.t) ~logger ~state s =
let request_json =
let r : Params.client_command_request = { data = s } in
Params.client_command_request_to_yojson r
in
let request =
Request_sender.post ~logger ~url_path:"client_command"
~my_node_id:conf.node_id
~request_json
~timeout_millis:(conf.request_timeout_millis * 2)
~converter:(fun response_json ->
match Params.client_command_response_of_yojson response_json with
| Ok param -> Ok (Params.CLIENT_COMMAND_RESPONSE param)
| Error _ as err -> err
)
in
match VolatileState.leader_id state.volatile_state with
| Some node_id ->
let current_leader_node = Conf.peer_node conf ~node_id in
let%lwt result = request current_leader_node in
Logger.debug logger ~loc:__LOC__
(Printf.sprintf "Sending command to node(%d) : %s" node_id s);
Lwt.return
( match result with
| Some (Params.CLIENT_COMMAND_RESPONSE x) -> x.success
| Some _ ->
Logger.error logger ~loc:__LOC__ "Shouldn't reach here";
false
| None -> false
)
| None -> Lwt.return false
let start ~conf_file ~apply_log =
let conf = Conf.from_file conf_file in
let logger =
Logger.create ~node_id:conf.node_id ~output_path:conf.log_file
~level:conf.log_level ()
in
match state ~conf ~logger with
| Ok state ->
Logger.info logger ~loc:__LOC__ "Starting Oraft";
ignore
(let%lwt _ = process ~conf ~logger ~apply_log ~state in
Lwt.return ()
);
let post_command = post_command ~conf ~logger ~state in
Ok
{
conf;
post_command;
current_state =
(fun () ->
let mode = VolatileState.mode state.volatile_state in
let term = PersistentState.current_term state.persistent_state in
let leader =
match VolatileState.leader_id state.volatile_state with
| Some x ->
let leader = Conf.peer_node conf ~node_id:x in
Some { host = leader.host; port = leader.app_port }
| None -> None
in
{ mode; term; leader }
);
}
| Error msg ->
let msg = sprintf "Failed to prepare state. error:[%s]" msg in
Logger.error ~loc:__LOC__ logger msg;
Error msg