Source file append_entries_handler.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
open Core
open Cohttp_lwt_unix
open Yojson.Basic
open State
let append_entries ~(conf : Conf.t) ~logger ~state
~(param : Params.append_entries_request) ~(apply_log : Base.apply_log)
~cb_valid_request ~cb_new_leader =
cb_valid_request ();
VolatileState.update_leader_id state.volatile_state ~logger param.leader_id;
let persistent_state = state.persistent_state in
let persistent_log = state.persistent_log in
let volatile_state = state.volatile_state in
if PersistentState.detect_new_leader state.persistent_state ~logger
~other_term:param.term
then (
PersistentState.update_current_term state.persistent_state ~term:param.term;
cb_new_leader ()
);
if VolatileState.detect_higher_commit_index volatile_state ~logger
~other:param.leader_commit
then
VolatileState.update_commit_index volatile_state
(min param.leader_commit (PersistentLog.last_index persistent_log));
if List.length param.entries > 0
then (
Logger.debug logger
(Printf.sprintf "This param isn't empty, so appending entries(lentgh: %d)"
(List.length param.entries));
PersistentLog.append persistent_log
~term:(PersistentState.current_term persistent_state)
~start:(param.prev_log_index + 1) ~entries:param.entries
);
VolatileState.apply_logs volatile_state ~logger ~f:(fun i ->
let log = PersistentLog.get_exn persistent_log i in
apply_log ~node_id:conf.node_id ~log_index:log.index ~log_data:log.data)
let handle ~conf ~state ~logger ~apply_log ~cb_valid_request ~cb_new_leader
~(param : Params.append_entries_request) =
let persistent_state = state.persistent_state in
let persistent_log = state.persistent_log in
let stored_prev_log = PersistentLog.get persistent_log param.prev_log_index in
let result =
if PersistentState.detect_old_leader persistent_state ~logger
~other_term:param.term
then false
else if (not (param.prev_log_term = -1 && param.prev_log_index = 0))
&&
match stored_prev_log with
| Some l -> l.term <> param.prev_log_term
| None -> true
then (
Logger.warn logger
(Printf.sprintf
"Received a request that doesn't meet requirement.\nparam:%s,\nstate:%s"
(Params.show_append_entries_request param)
(PersistentLog.show persistent_log));
cb_valid_request ();
false
)
else (
append_entries ~conf ~logger ~state ~param ~apply_log ~cb_valid_request
~cb_new_leader;
State.log state ~logger;
true
)
in
let response_body =
`Assoc
[
("term", `Int (PersistentState.current_term persistent_state));
("success", `Bool result);
]
|> to_string
in
Server.respond_string ~status:`OK ~body:response_body ()