Source file mutex.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
module Make(IO : S.IO)(Client : S.Client with module IO = IO) = struct
  module IO = IO
  module Client = Client

  exception Error of string

  let (>>=) = IO.(>>=)

  let acquire conn ?(atime=10.) ?(ltime=10) mutex id =
    let etime = Unix.time() +. atime in

    let update_ttl () =
      Client.ttl conn mutex >>= function
      | None -> Client.expire conn mutex ltime >>= fun _ -> IO.return ()
      | _ -> IO.return () in

    let rec loop () =
      Client.setnx conn mutex id >>= function
      | true -> Client.expire conn mutex ltime >>= fun _ -> IO.return ()
      | _ -> update_ttl () >>= fun _ ->
        if Unix.time() < etime then IO.sleep(0.1) >>= loop
        else IO.fail (Error ("could not acquire lock " ^ mutex))
    in
    loop ()

  let release conn mutex id =
    Client.watch conn [mutex] >>= fun _ ->
    Client.get conn mutex >>= function
    | Some x when x = id ->
      Client.multi conn >>= fun _ ->
      Client.queue (fun () -> Client.del conn [mutex]) >>= fun _ ->
      Client.exec conn >>= fun _ ->
      IO.return ()
    | _ ->
      Client.unwatch conn >>= fun _ ->
      IO.fail (Error ("lock was lost: " ^ mutex))

  let with_mutex conn ?atime ?ltime mutex fn =
    let id = Uuidm.(to_string (create `V4) [@ocaml.alert "-deprecated"]) in
    acquire conn ?atime ?ltime mutex id >>= fun _ ->
    IO.catch
      (* try *) (fun () ->
        fn () >>= fun res ->
        release conn mutex id >>= fun _ ->
        IO.return res)
      (* catch *) (function e ->
        release conn mutex id >>= fun _ ->
        IO.fail e)
end