Source file role.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
open Core

module Replica = struct
  module Connection_state = struct
    type t =
      | Connect
      | Connecting
      | Sync
      | Connected
      | Handshake
      (** [Handshake] Undocumented. Can happen if the replica is unable to connect to the leader. *)
    [@@deriving sexp_of, compare]

    let of_string = function
      | "connect"    -> Ok Connect
      | "connecting" -> Ok Connecting
      | "sync"       -> Ok Sync
      | "connected"  -> Ok Connected
      | "handshake"  -> Ok Handshake
      | connection_state ->
        Or_error.error_s
          [%message "Unrecognized connection state" (connection_state : string)]
    ;;
  end

  type t =
    { leader             : Host_and_port.t
    ; connection_state   : Connection_state.t
    ; replication_offset : int
    }
  [@@deriving fields, sexp_of, compare]

  let keyword = "slave"

  let of_resp3 = function
    | [ Resp3.String k
      ; Resp3.String host
      ; Resp3.Int port
      ; Resp3.String connection_state
      ; Resp3.Int replication_offset
      ]
      when String.equal k keyword ->
      let%bind.Or_error connection_state = Connection_state.of_string connection_state in
      let leader = Host_and_port.create ~host ~port in
      Ok { leader; connection_state; replication_offset }
    | resp3 -> Or_error.error_s [%message "Invalid Replica resp3:" (resp3 : Resp3.t list)]
  ;;
end

module Leader = struct
  module Replica = struct
    type t =
      { where_to_connect   : Host_and_port.t
      ; replication_offset : int
      }
    [@@deriving fields, sexp_of, compare]

    let of_resp3 resp3 =
      match resp3 with
      | Resp3.Array
          [| Resp3.String host; Resp3.String port; Resp3.String replication_offset |] ->
        let%bind.Or_error port = Or_error.try_with (fun () -> Int.of_string port) in
        let%bind.Or_error replication_offset =
          Or_error.try_with (fun () -> Int.of_string replication_offset)
        in
        let where_to_connect = Host_and_port.create ~host ~port in
        Ok { where_to_connect; replication_offset }
      | resp3 ->
        Or_error.error_s [%message "Invalid Leader Replica resp3:" (resp3 : Resp3.t)]
    ;;
  end

  type t =
    { replication_offset : int
    ; replicas           : Replica.t list
    }
  [@@deriving fields, sexp_of, compare]

  let keyword = "master"

  let of_resp3 = function
    | [ Resp3.String k; Resp3.Int replication_offset; Resp3.Array replicas ]
      when String.equal k keyword ->
      let%bind.Or_error replicas =
        Array.to_list replicas |> List.map ~f:Replica.of_resp3 |> Or_error.all
      in
      Ok { replication_offset; replicas }
    | resp3 -> Or_error.error_s [%message "Invalid Leader resp3:" (resp3 : Resp3.t list)]
  ;;
end

module Sentinel = struct
  type t = string list [@@deriving sexp_of, compare]

  let keyword = "sentinel"

  let of_resp3 = function
    | [ Resp3.String k; Resp3.Array leader_names ] when String.equal k keyword ->
      let l = Array.to_list leader_names in
      let leader_names, other =
        List.partition_map l ~f:(function
          | Resp3.String s -> Either.First  s
          | other          -> Either.Second other)
      in
      if not (List.is_empty other)
      then
        Or_error.error_s
          [%message
            "Not all leader names are strings:"
              (leader_names : string list)
              (other : Resp3.t list)]
      else Ok leader_names
    | resp3 ->
      Or_error.error_s [%message "Invalid Sentinel resp3:" (resp3 : Resp3.t list)]
  ;;
end

type t =
  | Leader   of Leader.t
  | Replica  of Replica.t
  | Sentinel of Sentinel.t
[@@deriving sexp_of, compare]

let of_resp3 = function
  | Resp3.Array a ->
    let l = Array.to_list a in
    (match List.hd l with
     | Some (Resp3.String x) when String.equal x Replica.keyword ->
       Replica.of_resp3 l |> Or_error.map ~f:(fun r -> Replica r)
     | Some (Resp3.String x) when String.equal x Leader.keyword ->
       Leader.of_resp3 l |> Or_error.map ~f:(fun l -> Leader l)
     | Some (Resp3.String x) when String.equal x Sentinel.keyword ->
       Sentinel.of_resp3 l |> Or_error.map ~f:(fun s -> Sentinel s)
     | Some role ->
       Or_error.error_s [%message "Invalid Redis role type provided" (role : Resp3.t)]
     | None -> Or_error.error_s [%message "No Redis role type provided"])
  | other -> Or_error.error_s [%message "Unexpected role response:" (other : Resp3.t)]
;;

module For_testing = struct
  let zero_port (host_and_port : Host_and_port.t) =
    Host_and_port.create ~host:host_and_port.host ~port:0
  ;;

  let zero_ports (r : t) =
    match r with
    | Sentinel s -> Sentinel s
    | Leader l   ->
      let replicas =
        List.map l.replicas ~f:(fun r ->
          { r with where_to_connect = zero_port r.where_to_connect })
      in
      Leader { l with replicas }
    | Replica replica -> Replica { replica with leader = zero_port replica.leader }
  ;;
end