1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
open Lwt.Infix
let max_log_chunk_size = 102400L
let read ~start path =
let ch = open_in_bin (Fpath.to_string path) in
Fun.protect ~finally:(fun () -> close_in ch) @@ fun () ->
let len = LargeFile.in_channel_length ch in
let (+) = Int64.add in
let (-) = Int64.sub in
let start = if start < 0L then len + start else start in
let start = if start < 0L then 0L else if start > len then len else start in
LargeFile.seek_in ch start;
let len = min max_log_chunk_size (len - start) in
really_input_string ch (Int64.to_int len), start + len
module Make (Current : S.CURRENT) = struct
open Capnp_rpc_lwt
module Job = struct
let job_cache = ref Current.Job.Map.empty
let stream_log_data ~job_id ~start =
match Current.Job.log_path job_id with
| Error `Msg m -> Lwt_result.fail (`Capnp (`Exception (Capnp_rpc.Exception.v m)))
| Ok path ->
let rec aux () =
match read ~start path with
| ("", _) as x ->
begin match Current.Job.lookup_running job_id with
| None -> Lwt_result.return x
| Some job ->
Current.Job.wait_for_log_data job >>= aux
end
| x -> Lwt_result.return x
in
aux ()
let rec local engine job_id =
let module Job = Api.Service.Job in
match Current.Job.Map.find_opt job_id !job_cache with
| Some job ->
Capability.inc_ref job;
job
| None ->
let cap =
let lookup () =
let state = Current.Engine.state engine in
Current.Job.Map.find_opt job_id (Current.Engine.jobs state)
in
Job.local @@ object
inherit Job.service
method log_impl params release_param_caps =
let open Job.Log in
release_param_caps ();
let start = Params.start_get params in
Log.info (fun f -> f "log(%S, %Ld)" job_id start);
Service.return_lwt @@ fun () ->
stream_log_data ~job_id ~start >|= function
| Error _ as e -> e
| Ok (log, next) ->
let response, results = Service.Response.create Results.init_pointer in
Results.log_set results log;
Results.next_set results next;
Ok response
method rebuild_impl _params release_param_caps =
release_param_caps ();
Log.info (fun f -> f "rebuild(%S)" job_id);
match lookup () with
| None -> Service.fail "Job is no longer active (cannot rebuild)"
| Some job ->
match job#rebuild with
| None -> Service.fail "Job cannot be rebuilt at the moment"
| Some rebuild ->
let open Job.Rebuild in
let response, results = Service.Response.create Results.init_pointer in
let new_job = local engine (rebuild ()) in
Results.job_set results (Some new_job);
Capability.dec_ref new_job;
Service.return_lwt @@ fun () ->
Lwt.pause () >|= fun () ->
Ok response
method cancel_impl _params release_param_caps =
release_param_caps ();
Log.info (fun f -> f "cancel(%S)" job_id);
match Current.Job.lookup_running job_id with
| None -> Service.fail "Job is no longer active (cannot cancel)"
| Some job ->
Current.Job.cancel job "Cancelled by user";
Service.return_empty ()
method status_impl _params release_param_caps =
let open Job.Status in
release_param_caps ();
Log.info (fun f -> f "status(%S)" job_id);
let response, results = Service.Response.create Results.init_pointer in
Results.id_set results job_id;
let can_cancel =
match Current.Job.lookup_running job_id with
| Some job -> Current.Job.cancelled_state job = Ok ()
| None -> false
in
begin match lookup () with
| None -> Results.description_set results "Inactive job"
| Some job ->
Results.description_set results (Fmt.str "%t" job#pp);
Results.can_cancel_set results can_cancel;
Results.can_rebuild_set results (job#rebuild <> None);
end;
Service.return response
method approve_early_start_impl _params release_param_caps =
release_param_caps ();
Log.info (fun f -> f "approveEarlyStart(%S)" job_id);
match Current.Job.lookup_running job_id with
| None -> Service.fail "Job is not running (cannot approve early start)"
| Some job ->
let response = Service.Response.create_empty () in
Current.Job.approve_early_start job;
Service.return response
method! release =
job_cache := Current.Job.Map.remove job_id !job_cache
end
in
job_cache := Current.Job.Map.add job_id cap !job_cache;
cap
let local_opt engine job_id =
match Current.Job.log_path job_id with
| Error _ as e -> e
| Ok _ -> Ok (local engine job_id)
end
let job ~engine id = Job.local engine id
let engine engine =
let module Engine = Api.Service.Engine in
Engine.local @@ object
inherit Engine.service
method active_jobs_impl _params release_param_caps =
let open Engine.ActiveJobs in
release_param_caps ();
Log.info (fun f -> f "activeJobs");
let response, results = Service.Response.create Results.init_pointer in
let state = Current.Engine.state engine in
Current.Job.Map.bindings (Current.Engine.jobs state)
|> List.map fst |> Results.ids_set_list results |> ignore;
Service.return response
method job_impl params release_param_caps =
let open Engine.Job in
let id = Params.id_get params in
Log.info (fun f -> f "job(%S)" id);
release_param_caps ();
let response, results = Service.Response.create Results.init_pointer in
match Job.local_opt engine id with
| Error `Msg m -> Service.fail "%s" m
| Ok job ->
Results.job_set results (Some job);
Capability.dec_ref job;
Service.return response
end
end