Source file pool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
open Lwt.Infix

module Metrics = struct
  open Prometheus

  let namespace = "ocurrent"
  let subsystem = "pool"

  let qlen =
    let help = "Number of users waiting for a resource" in
    Gauge.v_label ~help ~label_name:"name" ~namespace ~subsystem "qlen"

  let wait_time =
    let help = "Time spent waiting for a resource" in
    Summary.v_label ~help ~label_name:"name" ~namespace ~subsystem
      "wait_time_seconds"

  let use_time =
    let help = "Time spent using a resource" in
    Summary.v_label ~help ~label_name:"name" ~namespace ~subsystem
      "use_time_seconds"

  let resources_in_use =
    let help = "Number of resources currently being used" in
    Gauge.v_label ~help ~label_name:"name" ~namespace ~subsystem
      "resources_in_use"

  let capacity =
    let help = "Total pool capacity" in
    Gauge.v_label ~help ~label_name:"name" ~namespace ~subsystem "capacity"
end

type priority = [ `High | `Low ]

type 'a t = {
  name : string;
  get : priority:priority -> switch:Switch.t -> 'a Lwt.t * (unit -> unit Lwt.t);
}

module Local = struct
  type t = {
    label : string;
    mutable used : int;
    capacity : int;
    queue_low : [`Use | `Cancel] Lwt.u Lwt_dllist.t;
    queue_high : [`Use | `Cancel] Lwt.u Lwt_dllist.t;
  }

  let check t =
    if t.used < t.capacity then (
      let next =
        match Lwt_dllist.take_opt_l t.queue_high with
        | None -> Lwt_dllist.take_opt_l t.queue_low
        | Some _ as x -> x
      in
      next |> Option.iter @@ fun waiter ->
      t.used <- t.used + 1;
      Lwt.wakeup_later waiter `Use
    )

  let get t ~priority ~switch =
    let ready, set_ready = Lwt.wait () in
    let queue =
      match priority with
      | `High -> t.queue_high
      | `Low -> t.queue_low
    in
    let node = Lwt_dllist.add_r set_ready queue in
    let cancel () =
        Lwt_dllist.remove node;
        if Lwt.is_sleeping ready then Lwt.wakeup_later set_ready `Cancel;
        Lwt.return_unit
    in
    check t;
    let start_wait = Unix.gettimeofday () in
    Prometheus.Gauge.inc_one (Metrics.qlen t.label);
    let th =
      Switch.add_hook_or_exec switch cancel >>= fun () ->
      ready >|= fun ready ->
      Prometheus.Gauge.dec_one (Metrics.qlen t.label);
      match ready with
      | `Cancel -> Fmt.failwith "Cancelled waiting for resource from pool %S" t.label
      | `Use ->
        let stop_wait = Unix.gettimeofday () in
        Prometheus.Summary.observe (Metrics.wait_time t.label) (stop_wait -. start_wait);
        Prometheus.Gauge.inc_one (Metrics.resources_in_use t.label);
        Switch.add_hook_or_fail switch (fun _reason ->
            assert (t.used > 0);
            Prometheus.Gauge.dec_one (Metrics.resources_in_use t.label);
            t.used <- t.used - 1;
            let release_time = Unix.gettimeofday () in
            Prometheus.Summary.observe (Metrics.use_time t.label) (release_time -. stop_wait);
            check t;
            Lwt.return_unit
          )
    in
    th, cancel

  let create ~label capacity =
    Prometheus.Gauge.set (Metrics.capacity label) (float_of_int capacity);
    let t = { label; used = 0; capacity;
      queue_low = Lwt_dllist.create ();
      queue_high = Lwt_dllist.create ()
    } in
    { name = label; get = get t }
end

let create = Local.create

let of_fn ~label get = { name = label; get }

let get t = t.get

let pp f t =
  Fmt.string f t.name