Source file connection.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
open Lwt.Infix
open Capnp_rpc_lwt
let ( >>!= ) = Lwt_result.bind
module Metrics = struct
open Prometheus
let namespace = "ocluster"
let subsystem = "ocurrent"
let queue =
let help = "Items in cluster queue by state" in
Gauge.v_label ~label_name:"state" ~help ~namespace ~subsystem "queue_state"
let queue_connect = queue "connect"
let queue_rate_limit = queue "rate-limit"
let queue_get_ticket = queue "get-ticket"
let queue_get_worker = queue "get-worker"
end
type t = {
sr : [`Submission_f4e8a768b32a7c42] Sturdy_ref.t;
mutable sched : Cluster_api.Submission.t Lwt.t;
rate_limits : ((string * bool), unit Lwt_pool.t) Hashtbl.t;
max_pipeline : int;
}
let sched ~job t =
let conn = t in
match Lwt.state conn.sched with
| Lwt.Return cap when Capability.problem cap = None -> Lwt.return cap
| Lwt.Sleep ->
Current.Job.log job "Connecting to build cluster...";
conn.sched
| _ ->
Current.Job.log job "Connecting to build cluster...";
let rec aux () =
Lwt.catch
(fun () ->
Sturdy_ref.connect_exn conn.sr >>= fun cap ->
Capability.await_settled_exn cap >|= fun () ->
cap
)
(fun ex ->
Log.warn (fun f -> f "Error connecting to build cluster (will retry): %a" Fmt.exn ex);
Lwt_unix.sleep 10.0 >>= fun () ->
aux ()
)
in
conn.sched <- aux ();
conn.sched
let rate_limit t pool urgent =
let key = (pool, urgent) in
match Hashtbl.find_opt t.rate_limits key with
| Some limiter -> limiter
| None ->
let limiter = Lwt_pool.create t.max_pipeline Lwt.return in
Hashtbl.add t.rate_limits key limiter;
limiter
let urgent_if_high = function
| `High -> true
| `Low -> false
let with_state metric fn =
Prometheus.Gauge.inc_one metric;
Lwt.finalize fn
(fun () ->
Prometheus.Gauge.dec_one metric;
Lwt.return_unit
)
let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~switch:_ =
let urgent = urgent priority in
let limiter_thread = ref None in
let stage = ref `Init in
let cancelled, set_cancelled = Lwt.wait () in
let cancel () =
if Lwt.is_sleeping cancelled then Lwt.wakeup set_cancelled (Error `Cancelled);
match !stage with
| `Init | `Got_worker -> Lwt.return_unit
| `Rate_limit ->
Option.iter Lwt.cancel !limiter_thread;
Lwt.return_unit
| `Get_ticket ticket ->
Cluster_api.Ticket.cancel ticket >|= function
| Ok () -> ()
| Error (`Capnp e) -> Current.Job.log job "Cancel ticket failed: %a" Capnp_rpc.Error.pp e
in
let rec aux () =
Prometheus.Gauge.inc_one Metrics.queue_connect;
let sched = sched ~job t >|= Result.ok in
Lwt.choose [sched; cancelled] >>= fun sched ->
Prometheus.Gauge.dec_one Metrics.queue_connect;
match sched with
| Error `Cancelled -> Lwt.(fail Canceled)
| Ok sched ->
stage := `Rate_limit;
Prometheus.Gauge.inc_one Metrics.queue_rate_limit;
let use_thread =
Lwt.catch
(fun () ->
Lwt_pool.use (rate_limit t pool urgent)
(fun () ->
Prometheus.Gauge.dec_one Metrics.queue_rate_limit;
let ticket = Cluster_api.Submission.submit ~urgent ?src ?secrets sched ~pool ~action ~cache_hint in
let build_job = Cluster_api.Ticket.job ticket in
stage := `Get_ticket ticket;
with_state Metrics.queue_get_ticket (fun () -> Capability.await_settled ticket) >>!= fun () ->
Current.Job.log job "Waiting for worker...";
with_state Metrics.queue_get_worker (fun () -> Capability.await_settled build_job) >>!= fun () ->
Capability.dec_ref ticket;
stage := `Got_worker;
Lwt_result.return build_job
)
)
(function
| Lwt.Canceled as ex ->
if !stage = `Rate_limit then Prometheus.Gauge.dec_one Metrics.queue_rate_limit
else Log.warn (fun f -> f "Cancelled at unexpected point!");
Lwt.fail ex
| ex ->
Lwt.fail ex
)
in
limiter_thread := Some use_thread;
use_thread >>= fun build_job ->
match build_job with
| Ok build_job -> Lwt.return build_job
| Error err ->
Lwt.pause () >>= fun () ->
if Capability.problem sched = None then (
Lwt.fail_with (Fmt.str "%a" Capnp_rpc.Exception.pp err)
) else (
limiter_thread := None;
begin match !stage with
| `Init | `Got_worker | `Rate_limit -> ()
| `Get_ticket ticket -> Capability.dec_ref ticket
end;
stage := `Init;
aux ()
)
in
aux (), cancel
let tail ~job build_job =
let rec aux start =
Cluster_api.Job.log build_job start >>= function
| Error (`Capnp e) -> Lwt.return @@ Fmt.error_msg "%a" Capnp_rpc.Error.pp e
| Ok ("", _) -> Lwt_result.return ()
| Ok (data, next) ->
Current.Job.write job data;
aux next
in aux 0L
let run_job ~job build_job =
let on_cancel _ =
Cluster_api.Job.cancel build_job >|= function
| Ok () -> ()
| Error (`Capnp e) -> Current.Job.log job "Cancel failed: %a" Capnp_rpc.Error.pp e
in
Current.Job.with_handler job ~on_cancel @@ fun () ->
let result = Cluster_api.Job.result build_job in
tail ~job build_job >>!= fun () ->
result >>= function
| Error (`Capnp e) -> Lwt_result.fail (`Msg (Fmt.to_to_string Capnp_rpc.Error.pp e))
| Ok _ as x -> Lwt.return x
let create ?(max_pipeline=200) sr =
let rate_limits = Hashtbl.create 10 in
{ sr; sched = Lwt.fail_with "init"; rate_limits; max_pipeline }
let pool ~job ~pool ~action ~cache_hint ?src ?secrets ?(urgent=urgent_if_high) t =
Current.Pool.of_fn ~label:"OCluster" @@ submit ~job ~pool ~action ~cache_hint ~urgent ?src ?secrets t