Source file current_term.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
module S = S
module Output = Output
module Make (Metadata : sig type t end) = struct
type description = string
type 'a primitive = ('a Output.t * Metadata.t option) Current_incr.t
module Node = Node.Make(Metadata)
open Node
type 'a t = 'a Node.t
module Quick_stats = struct
let v =
ref { S.
ok = 0;
waiting_for_confirmation = 0;
ready = 0;
running = 0;
failed = 0;
blocked = 0;
}
let total = ref 0
let dec_ok () = v := { !v with ok = !v.ok - 1 }
let dec_waiting_for_confirmation () = v := { !v with waiting_for_confirmation = !v.waiting_for_confirmation - 1 }
let dec_ready () = v := { !v with ready = !v.ready - 1 }
let dec_running () = v := { !v with running = !v.running - 1 }
let dec_failed () = v := { !v with failed = !v.failed - 1 }
let update ~id : _ Dyn.t -> unit = function
| Ok _ -> v := { !v with ok = !v.ok + 1 }; Current_incr.on_release dec_ok
| Error (src, _) when not (Id.equal src id) -> ()
| Error (_, `Active `Waiting_for_confirmation) -> v := { !v with waiting_for_confirmation = !v.waiting_for_confirmation + 1 }; Current_incr.on_release dec_waiting_for_confirmation
| Error (_, `Active `Ready) -> v := { !v with ready = !v.ready + 1 }; Current_incr.on_release dec_ready
| Error (_, `Active `Running) -> v := { !v with running = !v.running + 1 }; Current_incr.on_release dec_running
| Error (_, `Msg _) -> v := { !v with failed = !v.failed + 1 }; Current_incr.on_release dec_failed
let dec_total () = decr total
let update_total () =
incr total;
Current_incr.on_release dec_total
let get () =
let v = !v in
{ v with blocked = !total - v.ok - v.ready - v.running - v.failed }
end
let bind_context : bind_context ref = ref None
let node ?(id=Id.mint ()) ty v = { id; v; ty; bind = !bind_context }
let with_bind_context bc f =
let old = !bind_context in
bind_context := Some bc;
Fun.protect
(fun () -> f ())
~finally:(fun () -> bind_context := old)
let with_id id = function
| Ok _ as v -> v
| Error e -> Error (id, e)
let active s =
let id = Id.mint () in
node ~id (Constant None) @@ Current_incr.const (Dyn.active ~id s)
let return ?label x =
node (Constant label) @@ Current_incr.const (Dyn.return x)
let map_input ~label source x =
node (Map_input {source = Term source; info = label}) @@ Current_incr.const x
let option_input source x =
node (Opt_input {source = Term source }) @@ Current_incr.const x
let fail msg =
let id = Id.mint () in
node ~id (Constant None) @@ Current_incr.const (Dyn.fail ~id msg)
let incr_map ?eq fn v = Current_incr.map ~eq:(Dyn.equal ?eq) fn v
let state ?(hidden=false) t =
let eq = Output.equal (==) in
node (State { source = Term t; hidden }) @@ incr_map ~eq Dyn.state t.v
let catch ?(hidden=false) t =
let eq = Result.equal ~ok:(==) ~error:(==) in
node (Catch { source = Term t; hidden }) @@ incr_map ~eq Dyn.catch t.v
let component fmt = Fmt.str ("@[<v>" ^^ fmt ^^ "@]")
let join ?eq x =
Current_incr.of_cc begin
Current_incr.read x @@ fun y ->
Current_incr.read y.v @@ Current_incr.write ~eq:(Dyn.equal ?eq)
end
let bind ?(info="") ?eq (f:'a -> 'b t) (x:'a t) =
Quick_stats.update_total ();
let bind_in = node (Bind_in (Term x, info)) x.v in
let t =
x.v |> Current_incr.map @@ fun v ->
Quick_stats.update ~id:x.id v;
with_bind_context (Term bind_in) @@ fun () ->
match v with
| Error _ as e -> node (Constant None) @@ Current_incr.const e
| Ok y -> f y
in
let nested = Current_incr.map (fun t -> Term t) t in
node (Bind_out nested) (join ?eq t)
let map ?eq f x =
let id = Id.mint () in
node ~id (Map (Term x)) @@ incr_map ?eq (Dyn.map ~id f) x.v
let cutoff ~eq x = map ~eq (fun x -> x) x
let map_error f x =
let id = Id.mint () in
node ~id (Map (Term x)) @@ incr_map (Dyn.map_error ~id f) x.v
let ignore_value x = map ignore x
let pair a b =
node (Pair (Term a, Term b)) @@ Current_incr.of_cc begin
Current_incr.read a.v @@ fun a ->
Current_incr.read b.v @@ fun b ->
Current_incr.write @@ Dyn.pair a b
end
let primitive ~info (f:'a -> 'b primitive) (x:'a t) =
Quick_stats.update_total ();
let id = Id.mint () in
let v_meta =
Current_incr.of_cc begin
Current_incr.read x.v @@ function
| Error _ as e -> Current_incr.write (e, None)
| Ok y ->
let output = f y in
Current_incr.read output @@ fun (v, job) ->
let v = with_id id v in
Quick_stats.update ~id v;
Current_incr.write (v, job)
end
in
let v = incr_map fst v_meta in
let meta = Current_incr.map snd v_meta in
node ~id (Primitive { x = Term x; info; meta }) v
module Syntax = struct
let (let**) x f info = bind ~info f x
let (let>) x f info = primitive ~info f x
let (and>) = pair
let (let*) x f = bind f x
let (and*) = pair
let (let+) x f = map f x
let (and+) = pair
end
open Syntax
let collapse ~key ~value ~input t =
node (Collapse { key; value; input = Term input; output = Term t }) t.v
let with_context (ctx : _ t) f =
let ctx =
match !bind_context with
| None -> Term ctx
| Some (Term prev) -> Term (pair prev ctx)
in
with_bind_context ctx f
let rec all = function
| [] -> return ()
| [x] -> x
| x :: xs ->
let+ () = x
and+ () = all xs in
()
let all_labelled items =
let rec aux = function
| [] -> return (Ok ())
| (l, x) :: xs ->
let+ x = catch x ~hidden:true
and+ xs = aux xs in
match x with
| Ok () -> xs
| Error (`Msg e) ->
match xs with
| Ok () -> Error (`Same ([l], e))
| Error (`Same (ls, e2)) when e = e2 -> Error (`Same (l :: ls, e))
| Error (`Same (ls, _))
| Error (`Diff ls) -> Error (`Diff (l :: ls))
in
"all" |>
let** results = aux items in
match results with
| Ok () -> return ()
| Error (`Same (ls, e)) -> fail (Fmt.str "%a failed: %s" Fmt.(list ~sep:(any ", ") string) ls e)
| Error (`Diff ls) -> fail (Fmt.str "%a failed" Fmt.(list ~sep:(any ", ") string) ls)
let replace old v =
{
id = Id.mint ();
v = Current_incr.const v;
ty = Constant None;
bind = Some (Term old)
}
let option_map (type a b) ?label (f : a t -> b t) (input : a option t) : b option t =
let results =
input.v |> Current_incr.map @@ function
| Error _ as r ->
let output = f (option_input input r) in
replace output r
| Ok None ->
let no_item = Error (Id.mint (), `Active `Ready) in
let output = f (option_input input no_item) in
replace output (Ok None)
| Ok (Some item) ->
let output = f (option_input input (Ok item)) in
{ output with v = Current_incr.map (Result.map Option.some) output.v }
in
let output = Current_incr.map (fun x -> Term x) results in
node (Option_map { item = Term input; output; label }) (join results)
let option_iter (type a) ?label (f : a t -> unit t) (input : a option t) =
let+ (_ : unit option) = option_map ?label f input in
()
let rec list_seq : 'a t list -> 'a list t = function
| [] -> return []
| x :: xs ->
let+ y = x
and+ ys = list_seq xs in
y :: ys
let collapse_list ~key ~value ~input t =
let all_of_them = list_seq t in
let collapse_node = node (Collapse { key; value; input = Term input; output = Term all_of_them }) all_of_them.v in
List.map (fun t -> node (Map (Term collapse_node)) t.v) t, collapse_node |> map (fun _ -> ())
let list_map (type a) (module M : S.ORDERED with type t = a) ?collapse_key ?label (f : a t -> 'b t) (input : a list t) =
let module Map = Map.Make(M) in
let module Sep = Current_incr.Separate(Map) in
let as_map =
input.v |> Current_incr.map @@ function
| Ok items -> items |> List.fold_left (fun acc x -> Map.add x () acc) Map.empty
| _ -> Map.empty
in
let results =
Sep.map as_map @@ fun item ->
let label = Fmt.to_to_string M.pp item in
let input = map_input ~label:(Ok label) input (Ok item) in
let output = f input in
match collapse_key with
| None -> Current_incr.write output
| Some key -> Current_incr.write (collapse ~key ~value:label ~input output)
in
let results =
Current_incr.of_cc begin
Current_incr.read input.v @@ function
| Error _ as r ->
let output = f (map_input input ~label:(Error `Blocked) r) in
Current_incr.write @@ replace output r
| Ok [] ->
let no_items = Error (Id.mint (), `Active `Ready) in
let output = f (map_input input ~label:(Error `Empty_list) no_items) in
Current_incr.write @@ replace output (Ok [])
| Ok items ->
Current_incr.read results @@ fun results ->
let results = items |> List.map (fun item -> Map.find item results) |> list_seq in
Current_incr.write results
end
in
let output = Current_incr.map (fun x -> Term x) results in
node (List_map { items = Term input; output; label }) (join results)
let list_iter (type a) (module M : S.ORDERED with type t = a) ?collapse_key ?label f (xs : a list t) =
let+ (_ : unit list) = list_map (module M) ?collapse_key ?label f xs in
()
let option_seq : 'a t option -> 'a option t = function
| None -> return None
| Some x -> let+ y = x in Some y
let gate ~on t =
let eq = Dyn.equal ~eq:(==) in
node (Gate_on { ctrl = Term on; value = Term t }) @@ Current_incr.of_cc begin
Current_incr.read t.v @@ fun t ->
Current_incr.read on.v @@ fun on ->
Current_incr.write ~eq @@ Dyn.bind on (fun () -> t)
end
let of_output x =
let id = Id.mint () in
let x = with_id id x in
Quick_stats.update_total ();
Quick_stats.update ~id x;
node ~id (Constant None) @@ Current_incr.const x
let observe t =
let v = Current_incr.observe t.v in
match v with
| Error (id, `Msg e) when Id.equal id t.id ->
Error (`Msg e)
| Error (_, `Active e) -> Error (`Active e)
| Error (_, `Msg _) -> Error `Blocked
| Ok v -> Ok v
module Executor = struct
let run (t : 'a t) = Current_incr.map Dyn.run t.v
end
module Analysis = struct
include Analysis.Make(Metadata)
let metadata t =
let rec aux (Term t) =
match t.ty with
| Primitive p -> p.meta
| Map t -> aux t
| _ -> failwith "metadata: this is not a primitive term!"
in
node (Constant None) @@ Current_incr.map Result.ok @@ aux (Term t)
let quick_stat = Quick_stats.get
end
end