Source file quorumtool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
open Ctypes
open Corosync_lib
open Corotypes
open CsError
let ( >>= ) = Result.bind
type name_format = AddressFormatName | AddressFormatIP
module ViewList = struct
let mu = Mutex.create ()
let g_view_list_entries = ref 0
type node_name = Name of string | Ips of Ipaddr.t list
type view_list_entry = {
vq_info: Votequorum.vinfo option
; name: node_name option
; node_id: int
}
let g_view_list : view_list_entry list ref = ref []
let g_called = ref false
let copy_view_list view_list view_list_entries =
List.init view_list_entries Fun.id
|> List.map (fun i ->
let node_id = !@(view_list +@ i) |> Unsigned.UInt32.to_int in
{vq_info= None; name= None; node_id}
)
|> ( := ) g_view_list
let get_name_by_format node_id = function
| AddressFormatIP -> (
let open Cmap in
with_handle @@ fun chandle ->
get_prefix chandle "nodelist.node" >>= fun node_list ->
List.find_opt (fun (_k, v) -> v = string_of_int node_id) node_list
|> Option.fold ~none:(Error CsErrNotExist) ~some:(fun (name_key, _v) ->
Scanf.sscanf name_key "nodelist.node.%d.nodeid" Fun.id
|> Result.ok
)
>>= fun index ->
List.filter_map
(fun (k, v) ->
if
Astring.String.is_prefix
~affix:(Printf.sprintf "nodelist.node.%d.ring" index)
k
then
Some v
else
None
)
node_list
|> fun l ->
(try List.map Ipaddr.of_string_exn l with Ipaddr.Parse_error _ -> [])
|> function
| [] ->
Error CsErrExist
| ipaddrs ->
Ok (Ips ipaddrs)
)
| AddressFormatName ->
let open Cmap in
with_handle @@ fun chandle ->
get_prefix chandle "nodelist.node" >>= fun node_list ->
List.find_opt (fun (_k, v) -> v = string_of_int node_id) node_list
|> Option.fold ~none:(Error CsErrNotExist) ~some:(fun (name_key, _v) ->
Scanf.sscanf name_key "nodelist.node.%d.nodeid" Fun.id
|> Result.ok
)
>>= fun id ->
Printf.sprintf "nodelist.node.%d.name" id |> fun k ->
List.assoc_opt k node_list
|> Option.fold ~none:(Error CsErrExist) ~some:(fun name ->
Ok (Name name)
)
let resolve_view_list_names format =
let open Cfg in
Mutex.lock mu ;
let new_view_list =
List.map
(fun {node_id; _} ->
get_name_by_format node_id format >>= fun addr ->
Ok {node_id; name= Some addr; vq_info= None}
)
!g_view_list
in
match List.find_opt Result.is_error new_view_list with
| Some e ->
Result.get_error e |> Result.error
| None ->
Ok (List.map Result.get_ok new_view_list) >>= fun new_list ->
g_view_list := new_list ;
Ok () >>= fun () -> Mutex.unlock mu ; Ok ()
let ocaml_quorum_notify_fn _handle _quorate _ring_seq view_list_entries
view_list =
Mutex.lock mu ;
g_called := true ;
g_view_list_entries := Unsigned.UInt32.to_int view_list_entries ;
copy_view_list view_list !g_view_list_entries ;
Mutex.unlock mu
(** A view_list contains the current members in the quorum. Call this function
to retrieve it. Call [update_membership_info] to update the viewlist before
retrieving view_list *)
let get_view_list () =
let r : view_list_entry list ref = ref [] in
Mutex.lock mu ;
r := !g_view_list ;
Mutex.unlock mu ;
!r
let get_g_called () =
let r = ref false in
Mutex.lock mu ;
r := !g_called ;
Mutex.unlock mu ;
!r
let reset_g_called () =
Mutex.lock mu ;
g_called := false ;
Mutex.unlock mu
end
let dispatch qhandle flag =
ViewList.reset_g_called () ;
let rec dispatch_aux qhandle flag =
Quorum.quorum_dispatch qhandle flag |> CsError.to_result >>= fun () ->
if ViewList.get_g_called () then
Ok ()
else
dispatch_aux qhandle flag
in
dispatch_aux qhandle flag
let with_quorum_track f =
let open Quorum in
let qhandle = allocate quorum_handle_t Unsigned.UInt64.zero in
let qtype = allocate uint32_t Unsigned.UInt32.zero in
let qcb = make quorum_callbacks_t in
setf qcb quorum_notify_fn ViewList.ocaml_quorum_notify_fn ;
quorum_initialize qhandle (addr qcb) qtype |> CsError.to_result >>= fun () ->
quorum_trackstart !@qhandle (Unsigned.UInt.of_int cs_track_current)
|> CsError.to_result
>>= fun () ->
dispatch !@qhandle CsDispatchFlag.CsDispatchOne >>= fun () ->
let r = f () in
quorum_trackstop !@qhandle |> CsError.to_result >>= fun () ->
quorum_finalize !@qhandle |> to_result >>= fun () -> r
let is_quorate () = Quorum.(with_handle @@ getquorate)
let using_votequorum () =
Cmapctl.get Cmap.CmapValue.string "quorum.provider"
|> Result.fold
~ok:(String.equal "corosync_votequorum")
~error:(Fun.const false)
let votequorum_info id =
if using_votequorum () then
Votequorum.(with_handle @@ fun handle -> get_info handle id)
else
Error CsErrNoVoteQuorum
let my_votequorum_info () = Cfg.(with_handle cfg_local_get) >>= votequorum_info
let quorum_members name_format =
with_quorum_track @@ fun () ->
let open ViewList in
ViewList.resolve_view_list_names name_format >>= fun () ->
ViewList.get_view_list () |> Result.ok >>= fun vl ->
let vqinfo_l = List.map (fun {node_id; _} -> votequorum_info node_id) vl in
( match List.find_opt Result.is_error vqinfo_l with
| Some e ->
Result.get_error e |> Result.error
| None ->
Ok (List.map Result.get_ok vqinfo_l)
)
>>= fun vqinfo_l ->
List.map2 (fun vq_info vle -> {vle with vq_info= Some vq_info}) vqinfo_l vl
|> Result.ok