Source file resource_cache0.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
module Stable = struct
  open! Core_kernel.Core_kernel_stable

  module Resource = struct
    module V1 = struct
      type t =
        { state : [`Busy | `Closing | `Idle]
        ; since : Time_ns.Span.V2.t
        }
      [@@deriving sexp, bin_io]
    end
  end

  module type Key = sig
    type t [@@deriving sexp, bin_io]
  end

  module Resource_list = struct
    module V1 = struct
      type 'key t =
        { key : 'key
        ; resources : Resource.V1.t list
        ; queue_length : int
        ; max_time_on_queue : Time_ns.Span.V2.t option
        }
      [@@deriving sexp, bin_io]
    end
  end

  module Status = struct
    module V1 = struct
      type 'key t =
        { resource_lists : 'key Resource_list.V1.t list
        ; num_jobs_in_cache : int
        }
      [@@deriving sexp, bin_io]
    end
  end
end

open! Core_kernel
open! Async_kernel
open! Import


include Resource_cache_intf
module Uid = Unique_id.Int ()

module Make_wrapped (R : Resource.S_wrapped) = struct
  module Status = struct
    module Key = R.Key

    module Resource = struct
      type state =
        [ `Busy
        | `Idle
        | `Closing ]
      [@@deriving sexp_of, compare]

      type t = Stable.Resource.V1.t =
        { state : state
        ; since : Time_ns.Span.t
        }
      [@@deriving fields, sexp_of, compare]
    end

    module Resource_list = struct
      type 'key t_ = 'key Stable.Resource_list.V1.t =
        { key : 'key
        ; resources : Resource.t list
        ; queue_length : int
        ; max_time_on_queue : Time_ns.Span.t option
        }
      [@@deriving fields, sexp_of, compare]

      type t = Key.t t_ [@@deriving sexp_of, compare]
    end

    type 'key t_ = 'key Stable.Status.V1.t =
      { resource_lists : 'key Resource_list.t_ list
      ; num_jobs_in_cache : int
      }
    [@@deriving fields, sexp_of, compare]

    type t = Key.t t_ [@@deriving sexp_of, compare]

    module Make_stable = struct
      module V1 (Key : Stable.Key with type t = Key.t) = struct
        type t = Key.t Stable.Status.V1.t [@@deriving sexp, bin_io]
      end
    end
  end

  module Delayed_failures = struct
    type t =
      [ `Error_opening_resource of R.Key.t * Error.t
      | `Cache_is_closed ]
  end

  module Job : sig
    type 'a t

    val create
      :  ?open_timeout:Time_ns.Span.t
      -> give_up:unit Deferred.t
      -> f:(R.t -> 'a Deferred.t)
      -> 'a t

    (* Use [has_result t] instead of [Deferred.is_determined (result t)] to prevent a race
       condition. It is possible that the result ivar was filled but [result] is not yet
       determined. *)

    val has_result : _ t -> bool

    val result
      :  'a t
      -> [`Ok of R.Key.t * 'a | `Gave_up_waiting_for_resource | Delayed_failures.t]
           Deferred.t

    val f : 'a t -> R.t -> 'a Deferred.t
    val open_timeout : 'a t -> Time_ns.Span.t option
    val created_at : 'a t -> Time_ns.t

    val mark_result_from_available_resource
      :  'a t
      -> R.Key.t
      -> result:'a Deferred.t
      -> unit

    val mark_result_from_resource_creation
      :  'a t
      -> result:[ `Ok of R.Key.t * 'a
                | Delayed_failures.t
                | (* This case is not possible, but the compiler gets mad otherwise *)
                  `Gave_up_waiting_for_resource ]
                  Deferred.t
      -> unit

    val mark_cache_closed : 'a t -> unit
  end = struct
    type 'a t =
      { f : R.t -> 'a Deferred.t
      ; result_ivar :
          [`Ok of R.Key.t * 'a | `Gave_up_waiting_for_resource | Delayed_failures.t]
            Deferred.t
            Ivar.t
      ; open_timeout : Time_ns.Span.t option
      ; created_at : Time_ns.t
      }
    [@@deriving fields]

    let create ?open_timeout ~give_up ~f =
      let result_ivar = Ivar.create () in
      upon give_up (fun () ->
        Ivar.fill_if_empty result_ivar (return `Gave_up_waiting_for_resource));
      { f; result_ivar; open_timeout; created_at = Time_ns.now () }
    ;;

    let mark_result_from_available_resource t args ~result =
      Ivar.fill
        t.result_ivar
        (let%map res = result in
         `Ok (args, res))
    ;;

    let mark_result_from_resource_creation t ~result = Ivar.fill t.result_ivar result
    let mark_cache_closed t = Ivar.fill_if_empty t.result_ivar (return `Cache_is_closed)
    let has_result t = Ivar.is_full t.result_ivar

    let result t =
      let%bind result = Ivar.read t.result_ivar in
      result
    ;;
  end

  (* [Resource] wraps [R] taking care that uses of [with_] don't cross paths, and that
     [close] and [close_finished] are well behaved.

     It will trigger [close] once the [max_resource_reuse] or [idle_cleanup_after] are
     exceeded. *)
  module Resource : sig
    type t

    (* [create] will immediately produce a [Resource.t] that is initially
       busy with:
       - calling [R.open_]
       - calling [immediate ~f:with_] with the resource created (if successful)

       If [R.open_] fails, this resource is immediately closed
       otherwise the resource will become idle after the initial use.

       @see [immediate]. *)

    val create
      :  ?open_timeout:Time_ns.Span.t
      -> Config.t
      -> R.Key.t
      -> R.Common_args.t
      -> with_:(R.t -> 'a Deferred.t)
      -> log_error:(string -> unit)
      -> t * [> `Ok of R.Key.t * 'a | Delayed_failures.t] Deferred.t

    val status : t -> Status.Resource.t

    (* [close_when_idle] forces the resource to shutdown either now or when the currently
       running [f] completes *)

    val close_when_idle : t -> unit Deferred.t

    (* [close_finished] becomes determined when this [Resource] has been closed.
       We guarantee that this will become determined, even if the underlying
       resource implementation is not well behaved. *)

    val close_finished : t -> unit Deferred.t

    (* Aquire an exclusive lock on this resource and call [f]. If [f] fails, or if the
       number of calls exceeds [max_resource_reuse] this resource will be closed.
       Otherwise this resource will be marked as idle and will close if not used again
       within a predefined timeout. *)

    val immediate
      :  t
      -> f:(R.t -> 'a Deferred.t)
      -> [ `Ok of 'a Deferred.t
         | `Resource_unavailable_until of unit Deferred.t
         | `Resource_closed ]

    val equal : t -> t -> bool
  end = struct
    type t =
      { uid : Uid.t
      ; key : R.Key.t
      ; args : R.Common_args.t
      ; resource : R.t Set_once.t
      ; mutable state : [`Idle | `In_use_until of unit Ivar.t | `Closing]
      ; mutable in_state_since : Time_ns.t
      ; config : Config.t
      ; mutable remaining_uses : int
      ; close_finished : unit Ivar.t
      ; log_error : string -> unit
      }

    let equal a b = Uid.equal a.uid b.uid

    let status t =
      let state =
        match t.state with
        | `Idle -> `Idle
        | `In_use_until _ -> `Busy
        | `Closing -> `Closing
      in
      { Status.Resource.state; since = Time_ns.diff (Time_ns.now ()) t.in_state_since }
    ;;

    let set_state t state =
      t.state <- state;
      t.in_state_since <- Time_ns.now ()
    ;;

    let close_finished t = Ivar.read t.close_finished

    let close t =
      let really_close () =
        set_state t `Closing;
        let closed =
          match Set_once.get t.resource with
          | None -> Deferred.unit
          | Some r ->
            (match%map
               Monitor.try_with (fun () ->
                 if R.has_close_started r then Deferred.unit else R.close r)
             with
             | Ok () -> ()
             | Error exn ->
               t.log_error (sprintf !"Exception closing resource: %{Exn}" exn))
        in
        match%map Clock_ns.with_timeout (Time_ns.Span.of_sec 10.) closed with
        | `Result ()
        | `Timeout -> Ivar.fill t.close_finished ()
      in
      match t.state with
      | `Closing -> close_finished t
      | `Idle -> really_close ()
      | `In_use_until done_ ->
        assert (not (Ivar.is_full done_));
        close_finished t >>> Ivar.fill done_;
        really_close ()
    ;;

    let close_when_idle t =
      match t.state with
      | `Closing -> close_finished t
      | `Idle -> close t
      | `In_use_until _ ->
        (* This will trigger a [close] when the current task completes. *)
        t.remaining_uses <- 0;
        close_finished t
    ;;

    let set_idle t =
      match t.state with
      | `Closing -> failwith "Impossible, can't set a closed resource to idle"
      | `Idle -> failwith "Impossible, already marked as idle"
      | `In_use_until done_ ->
        assert (Ivar.is_empty done_);
        if t.remaining_uses <= 0
        then don't_wait_for (close t)
        else (
          set_state t `Idle;
          Ivar.fill done_ ();
          Clock_ns.after t.config.idle_cleanup_after
          >>> fun () ->
          match t.state with
          | `Closing | `In_use_until _ -> ()
          | `Idle ->
            let idle_time = Time_ns.diff (Time_ns.now ()) t.in_state_since in
            if Time_ns.Span.( >= ) idle_time t.config.idle_cleanup_after
            then don't_wait_for (close t))
    ;;

    let unsafe_immediate t ~f =
      match t.state with
      | `Closing -> failwith "Can't [unsafe_immediate] a closed resource"
      | `Idle -> failwith "Can't [unsafe_immediate] an idle resource"
      | `In_use_until done_ ->
        assert (Ivar.is_empty done_);
        assert (t.remaining_uses > 0);
        t.remaining_uses <- t.remaining_uses - 1;
        (* deliberately not filling [done_] here.
           It is filled in [set_idle] or [close]. *)
        (match%map
           Monitor.try_with (fun () -> f (Set_once.get_exn t.resource [%here]))
         with
         | Ok res ->
           set_idle t;
           res
         | Error exn ->
           don't_wait_for (Deferred.ignore (close t));
           raise exn)
    ;;

    let immediate t ~f =
      match t.state with
      | `Closing -> `Resource_closed
      | `In_use_until done_ -> `Resource_unavailable_until (Ivar.read done_)
      | `Idle ->
        (* It is possible that [R.close] was called but [R.close_finished] is not
           determined yet. Use [R.is_closed] to prevent this race. *)
        if R.has_close_started (Set_once.get_exn t.resource [%here])
        then `Resource_closed
        else (
          set_state t (`In_use_until (Ivar.create ()));
          `Ok (unsafe_immediate t ~f))
    ;;

    let create ?open_timeout config key args ~with_ ~log_error =
      let t =
        { uid = Uid.create ()
        ; key
        ; args
        ; resource = Set_once.create ()
        ; state = `In_use_until (Ivar.create ())
        ; in_state_since = Time_ns.now ()
        ; config
        ; remaining_uses = config.Config.max_resource_reuse
        ; close_finished = Ivar.create ()
        ; log_error
        }
      in
      let res =
        match%bind
          Deferred.Or_error.try_with_join (fun () ->
            match open_timeout with
            | None -> R.open_ key args
            | Some timeout ->
              let resource_ivar = Ivar.create () in
              (match%map
                 Clock_ns.with_timeout
                   timeout
                   (let%map r = R.open_ key args in
                    Ivar.fill resource_ivar r;
                    r)
               with
               | `Result r -> r
               | `Timeout ->
                 (* In case we timeout, make sure we cleanup after ourselves *)
                 (Ivar.read resource_ivar
                  >>> function
                  | Error _ -> ()
                  | Ok r -> don't_wait_for (R.close r));
                 Or_error.error_string "Exceeded open timeout while creating resource"))
        with
        | Ok res ->
          (* A call to [close_and_flush] might have occurred *)
          if t.remaining_uses > 0
          then (
            don't_wait_for
              (let%bind () = R.close_finished res in
               close_when_idle t);
            Set_once.set_exn t.resource [%here] res;
            let%map r = unsafe_immediate t ~f:with_ in
            `Ok (key, r))
          else return `Cache_is_closed
        | Error err ->
          (* Ensure [close_finished] gets filled *)
          don't_wait_for (close t);
          return (`Error_opening_resource (key, err))
      in
      t, res
    ;;
  end

  (* Limit the number of concurrent [Resource.t]s globally *)
  module Global_resource_limiter : sig
    type t

    val create : Config.t -> t

    (* create a single resource, and block a slot until the resource has been cleaned
       up *)

    val create_resource
      :  ?open_timeout:Time_ns.Span.t
      -> t
      -> R.Key.t
      -> R.Common_args.t
      -> with_:(R.t -> 'a Deferred.t)
      -> log_error:(string -> unit)
      -> [ `Ok of Resource.t * [> `Ok of R.Key.t * 'a | Delayed_failures.t] Deferred.t
         | `Cache_is_closed
         | `No_resource_available_until of unit Deferred.t ]

    val close_and_flush : t -> unit Deferred.t
  end = struct
    type t =
      { config : Config.t
      ; throttle : unit Throttle.t
      }

    let create config =
      { config
      ; throttle =
          Throttle.create
            ~continue_on_error:true
            ~max_concurrent_jobs:config.max_resources
      }
    ;;

    let create_resource ?open_timeout { config; throttle } key args ~with_ ~log_error =
      if Throttle.is_dead throttle
      then `Cache_is_closed
      else if Throttle.num_jobs_running throttle < Throttle.max_concurrent_jobs throttle
      then (
        assert (Throttle.num_jobs_waiting_to_start throttle = 0);
        let r, v = Resource.create ?open_timeout config key args ~with_ ~log_error in
        don't_wait_for (Throttle.enqueue throttle (fun () -> Resource.close_finished r));
        `Ok (r, v))
      else
        `No_resource_available_until
          (Deferred.any
             [ Throttle.capacity_available throttle; Throttle.cleaned throttle ])
    ;;

    let close_and_flush t =
      Throttle.kill t.throttle;
      Throttle.cleaned t.throttle
    ;;
  end

  (* Limit the number of concurrent [Resource.t]s locally *)
  module Resource_list : sig
    type t

    val create
      :  Config.t
      -> Global_resource_limiter.t
      -> R.Key.t
      -> R.Common_args.t
      -> log_error:(string -> unit)
      -> t

    val status : t -> Status.Resource_list.t

    (* [is_empty] is true iff there are no currently connected/connecting resources. *)

    val is_empty : t -> bool

    (* [close_and_flush'] will mark this resource list for removal and start tearing down
       all its resources. *)

    val close_and_flush' : t -> unit

    (* [close_finished] becomes determined after [close_and_flush'] was called and all
       resources have been closed. *)

    val close_finished : t -> unit Deferred.t

    (* [find_available_resource] and [create_resource] can be used to bypass [enqueue] in
       the case where there is an idle resource or an available slot. *)

    val find_available_resource
      :  t
      -> f:(R.t -> 'a Deferred.t)
      -> [`Immediate of 'a Deferred.t | `None_until of unit Deferred.t]

    val create_resource
      :  ?open_timeout:Time_ns.Span.t
      -> t
      -> f:(R.t -> 'a Deferred.t)
      -> [> `Ok of R.Key.t * 'a | Delayed_failures.t] Deferred.t option

    val enqueue : t -> 'a Job.t -> unit
  end = struct
    type job = T : 'a Job.t -> job

    type t =
      { config : Config.t
      ; key : R.Key.t
      ; args : R.Common_args.t
      ; global_resource_limiter : Global_resource_limiter.t
      ; mutable resources : Resource.t list
      ; waiting_jobs : job Queue.t
      ; trigger_queue_manager : unit Mvar.Read_write.t
      ; mutable close_started : bool
      ; close_finished : unit Ivar.t
      ; log_error : string -> unit
      }

    let status t =
      let max_time_on_queue =
        Queue.peek t.waiting_jobs
        |> Option.map ~f:(fun (T job) ->
          Time_ns.diff (Time_ns.now ()) (Job.created_at job))
      in
      { Status.Resource_list.key = t.key
      ; resources = List.map t.resources ~f:Resource.status
      ; queue_length = Queue.length t.waiting_jobs
      ; max_time_on_queue
      }
    ;;

    let find_available_resource t ~f =
      let rec loop ~until = function
        | [] -> `None_until (Deferred.any until)
        | r :: rs ->
          (match Resource.immediate r ~f with
           | `Ok r -> `Immediate r
           | `Resource_unavailable_until u -> loop ~until:(u :: until) rs
           | `Resource_closed -> loop ~until rs)
      in
      loop t.resources ~until:[]
    ;;

    let create_resource ?open_timeout t ~f =
      if List.length t.resources >= t.config.max_resources_per_id
      then None
      else (
        match
          Global_resource_limiter.create_resource
            ?open_timeout
            t.global_resource_limiter
            t.key
            t.args
            ~with_:f
            ~log_error:t.log_error
        with
        | `Cache_is_closed -> None
        | `No_resource_available_until u ->
          (* Trigger when there is global capacity available *)
          upon u (Mvar.set t.trigger_queue_manager);
          None
        | `Ok (resource, response) ->
          t.resources <- resource :: t.resources;
          (Resource.close_finished resource
           >>> fun () ->
           t.resources
           <- List.filter t.resources ~f:(fun r -> not (Resource.equal resource r));
           (* Trigger that capacity is now available *)
           Mvar.set t.trigger_queue_manager ();
           if t.close_started && List.is_empty t.resources
           then Ivar.fill t.close_finished ());
          (* Trigger when this resource is now available. This is needed because
             [create_resource] is called from outside this module *)
          upon response (fun _ -> Mvar.set t.trigger_queue_manager ());
          Some response)
    ;;

    let allocate_resources t =
      let rec loop () =
        match Queue.peek t.waiting_jobs with
        | None -> ()
        | Some (T job) ->
          (* Skip if this job has a result already *)
          if Job.has_result job
          then (
            let (_ : _) = Queue.dequeue_exn t.waiting_jobs in
            loop ())
          else (
            match find_available_resource t ~f:(Job.f job) with
            | `Immediate result ->
              Job.mark_result_from_available_resource job t.key ~result;
              let (_ : _) = Queue.dequeue_exn t.waiting_jobs in
              loop ()
            | `None_until until ->
              (* Trigger when a resource is available *)
              upon until (Mvar.set t.trigger_queue_manager);
              (match
                 create_resource ?open_timeout:(Job.open_timeout job) t ~f:(Job.f job)
               with
               | Some result ->
                 Job.mark_result_from_resource_creation job ~result;
                 let (_ : _) = Queue.dequeue_exn t.waiting_jobs in
                 loop ()
               | None -> ()))
      in
      loop ()
    ;;

    let start_background_resource_allocator t =
      let rec loop () =
        let%bind () = Mvar.take t.trigger_queue_manager in
        if t.close_started
        then (
          Queue.iter t.waiting_jobs ~f:(fun (T job) -> Job.mark_cache_closed job);
          Queue.clear t.waiting_jobs;
          Deferred.unit)
        else (
          allocate_resources t;
          loop ())
      in
      loop ()
    ;;

    let create config global_resource_limiter key args ~log_error =
      let t =
        { config
        ; key
        ; args
        ; global_resource_limiter
        ; resources = []
        ; waiting_jobs = Queue.create ()
        ; trigger_queue_manager = Mvar.create ()
        ; close_started = false
        ; close_finished = Ivar.create ()
        ; log_error
        }
      in
      don't_wait_for (start_background_resource_allocator t);
      t
    ;;

    let enqueue t job =
      Queue.enqueue t.waiting_jobs (T job);
      (* Trigger that a new job is on the queue *)
      Mvar.set t.trigger_queue_manager ();
      upon (Job.result job) (fun _ ->
        Queue.filter_inplace t.waiting_jobs ~f:(fun (T job') ->
          not (phys_same job job'));
        (* Trigger that a resource is now available *)
        Mvar.set t.trigger_queue_manager ())
    ;;

    let is_empty t = List.is_empty t.resources && Queue.is_empty t.waiting_jobs
    let close_finished t = Ivar.read t.close_finished

    let close_and_flush' t =
      if not t.close_started
      then (
        t.close_started <- true;
        if List.is_empty t.resources
        then Ivar.fill t.close_finished ()
        else (
          Mvar.set t.trigger_queue_manager ();
          List.iter t.resources ~f:(fun r -> don't_wait_for (Resource.close_when_idle r))))
    ;;
  end

  type t =
    { config : Config.t
    ; global_resource_limiter : Global_resource_limiter.t
    ; cache : Resource_list.t R.Key.Table.t
    ; args : R.Common_args.t
    ; mutable num_jobs_in_cache : int
    ; mutable close_started : bool
    ; close_finished : unit Ivar.t
    ; log_error : string -> unit
    }

  let status t =
    let resource_lists = List.map (Hashtbl.data t.cache) ~f:Resource_list.status in
    { Status.resource_lists; num_jobs_in_cache = t.num_jobs_in_cache }
  ;;

  let get_resource_list t key =
    Hashtbl.find_or_add t.cache key ~default:(fun () ->
      Resource_list.create
        t.config
        t.global_resource_limiter
        key
        t.args
        ~log_error:t.log_error)
  ;;

  let find_any_available_resource t keys ~f =
    List.find_map keys ~f:(fun key ->
      let res_list = get_resource_list t key in
      match Resource_list.find_available_resource res_list ~f with
      | `Immediate res -> Some (key, res)
      | `None_until _ -> None)
  ;;

  let create_any_resource ?open_timeout t keys ~f =
    List.find_map keys ~f:(fun key ->
      let res_list = get_resource_list t key in
      Resource_list.create_resource ?open_timeout res_list ~f)
  ;;

  let enqueue_all ?open_timeout t ~give_up keys ~f =
    let job = Job.create ?open_timeout ~give_up ~f in
    List.iter keys ~f:(fun key ->
      let res_list = get_resource_list t key in
      Resource_list.enqueue res_list job);
    Job.result job
  ;;

  let with_any' ?open_timeout ?(give_up = Deferred.never ()) t keys ~f =
    let f resource = f (R.underlying resource) in
    t.num_jobs_in_cache <- t.num_jobs_in_cache + 1;
    let result =
      if t.close_started
      then return `Cache_is_closed
      else (
        match find_any_available_resource t keys ~f with
        | Some (args, res) ->
          let%map res = res in
          `Ok (args, res)
        | None ->
          (match create_any_resource ?open_timeout t keys ~f with
           | Some res -> res
           | None ->
             if Deferred.is_determined give_up
             then return `Gave_up_waiting_for_resource
             else enqueue_all ?open_timeout ~give_up t keys ~f))
    in
    upon result (fun _ -> t.num_jobs_in_cache <- t.num_jobs_in_cache - 1);
    result
  ;;

  let with_any ?open_timeout ?give_up t keys ~f =
    match%map with_any' ?open_timeout t ?give_up keys ~f with
    | `Ok args_and_res -> Ok args_and_res
    | `Error_opening_resource (key, err) ->
      let tag = sprintf !"Error creating required resource: %{sexp:R.Key.t}" key in
      Error (Error.tag ~tag err)
    | `Cache_is_closed -> Or_error.error_string "Cache is closed"
    | `Gave_up_waiting_for_resource ->
      Or_error.error_string "Gave up waiting for resource"
  ;;

  let with_ ?open_timeout ?give_up t key ~f =
    match%map with_any ?open_timeout ?give_up t [ key ] ~f with
    | Ok (_args, res) -> Ok res
    | Error e -> Error e
  ;;

  let with_' ?open_timeout ?give_up t key ~f =
    match%map with_any' ?open_timeout ?give_up t [ key ] ~f with
    | `Ok (_args, res) -> `Ok res
    | `Error_opening_resource (_args, err) -> `Error_opening_resource err
    | `Cache_is_closed -> `Cache_is_closed
    | `Gave_up_waiting_for_resource -> `Gave_up_waiting_for_resource
  ;;

  let with_any_loop ?open_timeout ?give_up t keys ~f =
    let rec loop ~failed = function
      | [] -> return (`Error_opening_all_resources (List.rev failed))
      | keys ->
        (match%bind with_any' ?open_timeout ?give_up t keys ~f with
         | (`Ok _ | `Gave_up_waiting_for_resource | `Cache_is_closed) as res -> return res
         | `Error_opening_resource (failed_key, e) ->
           let remaining =
             List.filter keys ~f:(fun key -> not (R.Key.equal key failed_key))
           in
           loop ~failed:((failed_key, e) :: failed) remaining)
    in
    loop ~failed:[] keys
  ;;

  let init ~config ~log_error args =
    let t =
      { config
      ; global_resource_limiter = Global_resource_limiter.create config
      ; cache = R.Key.Table.create ()
      ; args
      ; num_jobs_in_cache = 0
      ; close_started = false
      ; close_finished = Ivar.create ()
      ; log_error
      }
    in
    Clock_ns.every
      ~stop:(Ivar.read t.close_finished)
      config.idle_cleanup_after
      (fun () ->
         Hashtbl.filter_inplace t.cache ~f:(fun d ->
           if Resource_list.is_empty d
           then (
             Resource_list.close_and_flush' d;
             false)
           else true));
    t
  ;;

  let close_and_flush t =
    if not t.close_started
    then (
      t.close_started <- true;
      let%map () =
        Deferred.all_unit
          (Global_resource_limiter.close_and_flush t.global_resource_limiter
           :: List.map (Hashtbl.data t.cache) ~f:(fun r ->
             Resource_list.close_and_flush' r;
             Resource_list.close_finished r))
      in
      Ivar.fill t.close_finished ())
    else Ivar.read t.close_finished
  ;;

  let config t = t.config
  let close_started t = t.close_started
  let close_finished t = Ivar.read t.close_finished
end

module Make (R : Resource.S) = struct
  include Make_wrapped (struct
      include R

      type resource = t

      let underlying t = t
    end)
end

module Make_simple (R : Resource.Simple) = struct
  include Make_wrapped (struct
      include Resource.Make_simple (R)
    end)
end