1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
open Core
open Lwt
open Base
(** Followers ($B!x(B5.2):
* - Respond to RPCs from candidates and leaders
*
* - If election timeout elapses without receiving AppendEntries
* RPC from current leader or granting vote to candidate:
* convert to candidate
*)
let mode = Some FOLLOWER
let lock = Lwt_mutex.create ()
type t = {
conf : Conf.t;
logger : Logger.t;
apply_log : apply_log;
state : State.common;
}
let init ~conf ~apply_log ~state =
{
conf;
logger =
Logger.create ~node_id:conf.node_id ~mode ~output_path:conf.log_file
~level:conf.log_level;
apply_log;
state;
}
let request_handlers t ~election_timer =
let handlers = Stdlib.Hashtbl.create 2 in
let open Params in
Stdlib.Hashtbl.add handlers
(`POST, "/append_entries")
( (fun json ->
match append_entries_request_of_yojson json with
| Ok x -> Ok (APPEND_ENTRIES_REQUEST x)
| Error _ as e -> e),
function
| APPEND_ENTRIES_REQUEST x ->
Append_entries_handler.handle ~conf:t.conf ~state:t.state
~logger:t.logger
~apply_log:
t.apply_log
~cb_valid_request:(fun () -> Timer.update election_timer)
~cb_new_leader:(fun () -> ())
~param:x
| _ -> failwith "Unexpected state" );
Stdlib.Hashtbl.add handlers
(`POST, "/request_vote")
( (fun json ->
match request_vote_request_of_yojson json with
| Ok x -> Ok (REQUEST_VOTE_REQUEST x)
| Error _ as e -> e),
function
| REQUEST_VOTE_REQUEST x ->
Request_vote_handler.handle ~state:t.state
~logger:
t.logger
~cb_valid_request:(fun () -> Timer.update election_timer)
~cb_new_leader:(fun () -> ())
~param:x
| _ -> failwith "Unexpected state" );
handlers
let run t () =
Logger.info t.logger "### Follower: Start ###";
State.log t.state ~logger:t.logger;
let election_timer =
Timer.create ~logger:t.logger ~timeout_millis:t.conf.election_timeout_millis
in
let handlers = request_handlers t ~election_timer in
let server, server_stopper =
Request_dispatcher.create ~port:(Conf.my_node t.conf).port ~logger:t.logger
~lock ~table:handlers
in
let election_timer_thread =
Timer.start election_timer ~on_stop:(fun () -> Lwt.wakeup server_stopper ())
in
Logger.debug t.logger "Starting";
Lwt.join [ election_timer_thread; server ] >>= fun () -> Lwt.return CANDIDATE