Source file pipeline.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208

(* This file is free software, part of dolmen. See file "LICENSE" for more information *)

exception Sigint = Sys.Break
exception Out_of_time = Alarm.Out_of_time
exception Out_of_space = Alarm.Out_of_space

module Make(State : State.S) = struct

  (* Setup *)
  (* ************************************************************************ *)

  (* We want to catch user interruptions *)
  let () =
    match Sys.os_type with
    | "Win32" | "Cygwin" ->
      Sys.catch_break true
    | "Unix" ->
      Sys.set_signal Sys.sigint (
        Sys.Signal_handle (fun _ ->
            raise Sigint)
      )
    | _ -> ()

  (* Pipeline and execution *)
  (* ************************************************************************ *)

  type 'st merge = 'st -> 'st -> 'st
  type ('a, 'b) cont = [ `Done of 'a | `Continue of 'b ]
  type ('st, 'a) fix = [ `Ok | `Gen of 'st merge * ('st -> 'st * 'a option) ]
  type 'st k_exn = { k : 'a. 'st -> Printexc.raw_backtrace -> exn -> 'a; }

  type ('st, 'a, 'b) op = {
    name : string;
    f : 'st -> 'a -> 'st * 'b;
  }

  (* Type for pipelines, i.e a series of transformations to
      apply to the input. An ('st, 'a, 'b) t is a pipeline that
      takes an input of type ['a] and returns a value of type
      ['b]. *)
  type (_, _, _) t =
    (* The end of the pipeline, the identity/reflexive constructor *)
    | End :
        ('st, 'a, 'a) t
    (* Apply a single function and then proceed with the rest of the pipeline *)
    | Map :
        ('st, 'a, 'c) op * ('st, 'c, 'b) t -> ('st, 'a, 'b) t
    (* Allow early exiting from the loop *)
    | Cont :
        ('st, 'a, ('b, 'c) cont) op * ('st, 'c, 'b) t -> ('st, 'a, 'b) t
    (* Concat two pipeline. Not tail recursive. *)
    | Concat :
        ('st, 'a, 'b) t * ('st, 'b, 'c) t -> ('st, 'a, 'c) t
    (* Fixpoint expansion *)
    | Fix :
        ('st, 'a, ('st, 'a) fix) op * ('st, 'a, unit) t -> ('st, 'a, unit) t

  (* Creating operators. *)

  let op ?(name="") f = { name; f; }

  let apply ?name f = op ?name (fun st x -> st, f x)

  let iter_ ?name f = op ?name (fun st x -> f x; st, x)

  let f_map ?name ?(test=(fun _ _ -> true)) f =
    op ?name (fun st x ->
        if test st x then begin
          let st', y = f st x in
          st', `Continue y
        end else
          st, `Done x
      )

  (* Creating pipelines. *)

  let _end = End
  let (@>>>) op t = Map(op, t)
  let (@>|>) op t = Cont(op, t)
  let (@|||) t t' = Concat (t, t')

  let fix op t = Fix(op, t)

  (* Eval an operator *)
  let eval_op ~exn op st x =
    try op.f st x
    with e ->
      let bt = Printexc.get_raw_backtrace () in
      exn.k st bt e

  (* Eval a pipeline into the corresponding function *)
  let rec eval : type st a b.
    exn:st k_exn -> (st, a, b) t -> st -> a -> st * b =
    fun ~exn pipe st x ->
    match pipe with
    | End -> st, x
    | Map (op, t) ->
      let st', y = eval_op ~exn op st x in
      eval ~exn t st' y
    | Cont (op, t) ->
      let st', y = eval_op ~exn op st x in
      begin match y with
        | `Continue res -> eval ~exn t st' res
        | `Done res -> st', res
      end
    | Concat (t, t') ->
      let st', y = eval ~exn t st x in
      eval ~exn t' st' y
    | Fix (op, t) ->
      let st', y = eval_op ~exn op st x in
      begin match y with
        | `Ok -> eval ~exn t st' x
        | `Gen (merge, g) ->
          let st'' = eval_gen_fold ~exn pipe st' g in
          let st''' = merge st st'' in
          st''', ()
      end

  and eval_gen_fold : type st a.
    exn: st k_exn -> (st, a, unit) t -> st -> (st -> st * a option) -> st =
    fun ~exn pipe st g ->
    match g st with
    | st, None -> st
    | st, Some x ->
      let st', () = eval ~exn pipe st x in
      eval_gen_fold ~exn pipe st' g
    | exception e ->
      let bt = Printexc.get_raw_backtrace () in
      exn.k st bt e

  (* Aux function to eval a pipeline on the current value of a generator. *)
  let run_aux ~exn pipe g st =
    match g st with
    | st, None -> `Done st
    | st, Some x -> `Continue (eval ~exn pipe st x)
    | exception e ->
      let bt = Printexc.get_raw_backtrace () in
      exn.k st bt e

  (* Effectively run a pipeline on all values that come from a generator.
     Time/size limits apply for the complete evaluation of each input
     (so all expanded values count toward the same limit). *)
  let rec run :
    type a.
    finally:(State.t -> (Printexc.raw_backtrace * exn) option -> State.t) ->
    ?alarm:Alarm.t -> (State.t -> State.t * a option) -> State.t ->
    (State.t, a, unit) t -> State.t
    = fun ~finally ?(alarm=Alarm.default) g st pipe ->
      let exception Exn of State.t * Printexc.raw_backtrace * exn in
      let module A = (val alarm) in
      let time = State.get State.time_limit st in
      let size = State.get State.size_limit st in
      let al = A.setup ~time ~size in
      let exn = { k = fun st bt e ->
          (* delete alarm as soon as possible *)
          let () = A.delete al in
          (* go the the correct handler *)
          raise (Exn (st, bt, e));
        }
      in
      begin
        match run_aux ~exn pipe g st with

        (* End of the run, yay ! *)
        | `Done st ->
          let () = A.delete al in
          st

        (* Regular case, we finished running the pipeline on one input
           value, let's get to the next one. *)
        | `Continue (st', ()) ->
          let () = A.delete al in
          let st'' = try finally st' None with _ -> st' in
          run ~finally ~alarm g st'' pipe

        (* "Normal" exception case: the exn was raised by an operator, and caught
           then re-raised by the {exn} cotinuation passed to run_aux *)
        | exception Exn (st, bt, e) ->
          (* delete alarm *)
          let () = A.delete al in
          (* Flush stdout and print a newline in case the exn was
             raised in the middle of printing *)
          Format.pp_print_flush Format.std_formatter ();
          Format.pp_print_flush Format.err_formatter ();
          (* Go on running the rest of the pipeline. *)
          let st' = finally st (Some (bt,e)) in
          run ~finally ~alarm g st' pipe

        (* Exception case for exceptions, that can realisically occur for all
           asynchronous exceptions, or if some operator was not properly wrapped.
           In this error case, we might use a rather old and outdate state, but
           this should not happen often, and should not matter for asynchronous
           exceptions. *)
        | exception e ->
          let bt = Printexc.get_raw_backtrace () in
          (* delete alarm *)
          let () = A.delete al in
          (* Flush stdout and print a newline in case the exn was
             raised in the middle of printing *)
          Format.pp_print_flush Format.std_formatter ();
          Format.pp_print_flush Format.err_formatter ();
          (* Go on running the rest of the pipeline. *)
          let st' = finally st (Some (bt,e)) in
          run ~finally ~alarm g st' pipe
      end

end