Source file pool_lwt.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
open Lwt.Syntax

type pool_config = {
  max_open : int;
  max_idle : int;
  idle_timeout : float; (* seconds *)
  max_lifetime : float; (* seconds *)
}

type pooled_connection = { conn : Connection.t; mutable last_used : float; created : float }

type connection_pool = {
  config : pool_config;
  mutable idle : pooled_connection list;
  mutable total_count : int;
  mutex : Lwt_mutex.t;
  cond : unit Lwt_condition.t;
  create_connection : unit -> Connection.t;
  mutable cleanup_stopper : bool;
}

let default_pool_config =
  { max_open = 20; max_idle = 5; idle_timeout = 300.0; max_lifetime = 3600.0 }

let now () = Unix.gettimeofday ()

let create_pool ?(config = default_pool_config) create_fn =
  {
    config;
    idle = [];
    total_count = 0;
    mutex = Lwt_mutex.create ();
    cond = Lwt_condition.create ();
    create_connection = create_fn;
    cleanup_stopper = false;
  }

let rec get_connection pool : Connection.t Lwt.t =
  let* r =
    Lwt_mutex.with_lock pool.mutex (fun () ->
        match pool.idle with
        | c :: rest ->
            pool.idle <- rest;
            Lwt.return (`Conn c.conn)
        | [] ->
            if pool.total_count < pool.config.max_open then (
              let conn = pool.create_connection () in
              pool.total_count <- pool.total_count + 1;
              Lwt.return (`Conn conn))
            else
              let+ () = Lwt_condition.wait ~mutex:pool.mutex pool.cond in
              `Wait)
  in
  match r with `Conn c -> Lwt.return c | `Wait -> get_connection pool

let return_connection pool (conn : Connection.t) : unit Lwt.t =
  Lwt_mutex.with_lock pool.mutex (fun () ->
      if List.length pool.idle < pool.config.max_idle then (
        pool.idle <- { conn; last_used = now (); created = now () } :: pool.idle;
        Lwt_condition.signal pool.cond ();
        Lwt.return_unit)
      else (
        pool.total_count <- pool.total_count - 1;
        Connection.disconnect conn))

let with_connection pool (f : Connection.t -> 'a Lwt.t) : 'a Lwt.t =
  let* conn = get_connection pool in
  Lwt.finalize (fun () -> f conn) (fun () -> return_connection pool conn)

let rec cleanup_loop pool period : unit Lwt.t =
  if pool.cleanup_stopper then Lwt.return_unit
  else
    let* () = Lwt_unix.sleep period in
    let cutoff_idle = now () -. pool.config.idle_timeout in
    let cutoff_create = now () -. pool.config.max_lifetime in
    let* drop_list =
      Lwt_mutex.with_lock pool.mutex (fun () ->
          let keep, drop =
            List.partition
              (fun pc -> pc.last_used >= cutoff_idle && pc.created >= cutoff_create)
              pool.idle
          in
          pool.idle <- keep;
          let to_close = List.length drop in
          pool.total_count <- pool.total_count - to_close;
          Lwt.return drop)
    in
    let* () = Lwt_list.iter_s (fun pc -> Connection.disconnect pc.conn) drop_list in
    cleanup_loop pool period

let start_cleanup pool ~period =
  pool.cleanup_stopper <- false;
  Lwt.async (fun () -> cleanup_loop pool period)

let stop_cleanup pool = pool.cleanup_stopper <- true

let pool_stats pool : (int * int) Lwt.t =
  Lwt_mutex.with_lock pool.mutex (fun () -> Lwt.return (pool.total_count, List.length pool.idle))