Source file impl.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
open Lwt.Infix

let max_log_chunk_size = 102400L  (* 100K at a time *)

let read ~start path =
  let ch = open_in_bin (Fpath.to_string path) in
  Fun.protect ~finally:(fun () -> close_in ch) @@ fun () ->
  let len = LargeFile.in_channel_length ch in
  let (+) = Int64.add in
  let (-) = Int64.sub in
  let start = if start < 0L then len + start else start in
  let start = if start < 0L then 0L else if start > len then len else start in
  LargeFile.seek_in ch start;
  let len = min max_log_chunk_size (len - start) in
  really_input_string ch (Int64.to_int len), start + len

module Make (Current : S.CURRENT) = struct
  open Capnp_rpc_lwt

  module Job = struct
    let job_cache = ref Current.Job.Map.empty

    let stream_log_data ~job_id ~start =
      match Current.Job.log_path job_id with
      | Error `Msg m -> Lwt_result.fail (`Capnp (`Exception (Capnp_rpc.Exception.v m)))
      | Ok path ->
        let rec aux () =
          match read ~start path with
          | ("", _) as x ->
            begin match Current.Job.lookup_running job_id with
              | None -> Lwt_result.return x
              | Some job ->
                Current.Job.wait_for_log_data job >>= aux
            end
          | x -> Lwt_result.return x
        in
        aux ()

    let rec local engine job_id =
      let module Job = Api.Service.Job in
      match Current.Job.Map.find_opt job_id !job_cache with
      | Some job ->
        Capability.inc_ref job;
        job
      | None ->
        let cap =
          let lookup () =
            let state = Current.Engine.state engine in
            Current.Job.Map.find_opt job_id (Current.Engine.jobs state)
          in
          Job.local @@ object
            inherit Job.service

            method log_impl params release_param_caps =
              let open Job.Log in
              release_param_caps ();
              let start = Params.start_get params in
              Log.info (fun f -> f "log(%S, %Ld)" job_id start);
              Service.return_lwt @@ fun () ->
              stream_log_data ~job_id ~start >|= function
              | Error _ as e -> e
              | Ok (log, next) ->
                let response, results = Service.Response.create Results.init_pointer in
                Results.log_set results log;
                Results.next_set results next;
                Ok response

            method rebuild_impl _params release_param_caps =
              release_param_caps ();
              Log.info (fun f -> f "rebuild(%S)" job_id);
              match lookup () with
              | None -> Service.fail "Job is no longer active (cannot rebuild)"
              | Some job ->
                match job#rebuild with
                | None -> Service.fail "Job cannot be rebuilt at the moment"
                | Some rebuild ->
                  let open Job.Rebuild in
                  let response, results = Service.Response.create Results.init_pointer in
                  let new_job = local engine (rebuild ()) in
                  Results.job_set results (Some new_job);
                  Capability.dec_ref new_job;
                  Service.return_lwt @@ fun () ->
                  (* Allow the engine to re-evaluate, so the job will appear
                     active to the caller immediately. *)
                  Lwt.pause () >|= fun () ->
                  Ok response

            method cancel_impl _params release_param_caps =
              release_param_caps ();
              Log.info (fun f -> f "cancel(%S)" job_id);
              match Current.Job.lookup_running job_id with
              | None -> Service.fail "Job is no longer active (cannot cancel)"
              | Some job ->
                Current.Job.cancel job "Cancelled by user";
                Service.return_empty ()

            method status_impl _params release_param_caps =
              let open Job.Status in
              release_param_caps ();
              Log.info (fun f -> f "status(%S)" job_id);
              let response, results = Service.Response.create Results.init_pointer in
              Results.id_set results job_id;
              let can_cancel =
                match Current.Job.lookup_running job_id with
                | Some job -> Current.Job.cancelled_state job = Ok ()
                | None -> false
              in
              begin match lookup () with
                | None -> Results.description_set results "Inactive job"
                | Some job ->
                  Results.description_set results (Fmt.str "%t" job#pp);
                  Results.can_cancel_set results can_cancel;
                  Results.can_rebuild_set results (job#rebuild <> None);
              end;
              Service.return response

            method approve_early_start_impl _params release_param_caps =
              release_param_caps ();
              Log.info (fun f -> f "approveEarlyStart(%S)" job_id);
              match Current.Job.lookup_running job_id with
              | None -> Service.fail "Job is not running (cannot approve early start)"
              | Some job ->
                let response = Service.Response.create_empty () in
                Current.Job.approve_early_start job;
                Service.return response

            method! release =
              job_cache := Current.Job.Map.remove job_id !job_cache
          end
        in
        job_cache := Current.Job.Map.add job_id cap !job_cache;
        cap

    let local_opt engine job_id =
      match Current.Job.log_path job_id with
      | Error _ as e -> e
      | Ok _ -> Ok (local engine job_id)
  end

  let job ~engine id = Job.local engine id

  let engine engine =
    let module Engine = Api.Service.Engine in
    Engine.local @@ object
      inherit Engine.service

      method active_jobs_impl _params release_param_caps =
        let open Engine.ActiveJobs in
        release_param_caps ();
        Log.info (fun f -> f "activeJobs");
        let response, results = Service.Response.create Results.init_pointer in
        let state = Current.Engine.state engine in
        Current.Job.Map.bindings (Current.Engine.jobs state)
        |> List.map fst |> Results.ids_set_list results |> ignore;
        Service.return response

      method job_impl params release_param_caps =
        let open Engine.Job in
        let id = Params.id_get params in
        Log.info (fun f -> f "job(%S)" id);
        release_param_caps ();
        let response, results = Service.Response.create Results.init_pointer in
        match Job.local_opt engine id with
        | Error `Msg m -> Service.fail "%s" m
        | Ok job ->
          Results.job_set results (Some job);
          Capability.dec_ref job;
          Service.return response
    end
end