Source file parany.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280

module Pr = Printf

let debug = ref false

type 'a t = { id: Netmcore.res_id;
              name: string;
              q: ('a, unit) Netmcore_queue.squeue }

(* default size of one queue *)
let shm_size = ref (1024 * 1024 * 1024)

let set_shm_size new_size =
  shm_size := new_size

let get_shm_size () =
  !shm_size

let core_pinning = ref false (* OFF by default, because of multi-users *)

let enable_core_pinning () =
  core_pinning := true

let disable_core_pinning () =
  core_pinning := false

(* queue for parallel processing *)
module Pqueue = struct

  let create name =
    let mem_pool_id = Netmcore_mempool.create_mempool !shm_size in
    let queue = Netmcore_queue.create mem_pool_id () in
    { id = mem_pool_id;
      name = name;
      q = queue }

  let destroy q =
    Netmcore_mempool.unlink_mempool q.id

  (* Unix.sleepf, but for all OCaml versions *)
  let unix_sleepf (timeout: float): unit =
    let elapsed = ref 0.0 in
    while !elapsed < timeout do
      let start = Unix.gettimeofday () in
      begin
        try ignore(Unix.select [] [] [] (timeout -. !elapsed))
        with Unix.Unix_error(EINTR, _, _) -> ()
      end;
      let stop = Unix.gettimeofday () in
      let dt = stop -. start in
      elapsed := !elapsed +. dt
    done;
    ()

  let rec push queue (x: 'a list): unit =
    let could_push =
      try (Netmcore_queue.push x queue.q; true) (* push elt *)
      with Netmcore_mempool.Out_of_pool_memory -> false in
    if not could_push then
      begin (* queue is full *)
        let current_size = Netmcore_queue.length queue.q in
        if !debug then
          Pr.eprintf "warn: Pqueue.push: %s: full: %d messages\n%!"
            queue.name current_size;
        (* apparently, trying to push to a full queue monopolizes the semaphore
           and prevents clients from popping.
           So, we wait for the size to significantly decrease
           before trying again *)
        unix_sleepf 0.001;
        let low_water_mark = (current_size * 10) / 100 in
        while Netmcore_queue.length queue.q >= low_water_mark do
          unix_sleepf 0.001
        done;
        push queue x (* try to push again *)
      end

  let rec process_one_in_place queue (f: 'a list -> unit): unit =
    let could_pop =
      (* Netmcore_queue.pop_p avoids data copy out of the shared heap *)
      try (Netmcore_queue.pop_p queue.q f; true)
      with Netmcore_queue.Empty -> false in
    if not could_pop then
      begin
        if !debug then
          Pr.eprintf "warn: Pqueue.process_one_in_place: empty: %s\n%!"
            queue.name;
        unix_sleepf 0.001;
        while Netmcore_queue.is_empty queue.q do
          unix_sleepf 0.001
        done;
        process_one_in_place queue f
      end

  let rec process_one_copy queue (f: 'a list -> unit): unit =
    let could_pop =
      try (f (Netmcore_queue.pop_c queue.q); true)
      with Netmcore_queue.Empty -> false in
    if not could_pop then
      begin
        if !debug then
          Pr.eprintf "warn: Pqueue.process_one_copy: empty: %s\n%!"
            queue.name;
        unix_sleepf 0.001;
        while Netmcore_queue.is_empty queue.q do
          unix_sleepf 0.001
        done;
        process_one_copy queue f
      end

end

(* should we copy out ot the shm prior to each call to the work function? *)
let copy_on_work = ref false

(* should we copy out ot the shm prior to each call to the mux function? *)
let copy_on_mux = ref false

let set_copy_on_work () =
  copy_on_work := true

let set_copy_on_mux () =
  copy_on_mux := true

exception End_of_input

(* feeder process main loop *)
let feed_them_all csize nprocs demux queue =
  (* let pid = Unix.getpid () in *)
  (* printf "feeder %d: started\n%!" pid; *)
  let to_send = ref [] in
  try
    while true do
      for _ = 1 to csize do
        let x = demux () in
        to_send := x :: !to_send
      done;
      Pqueue.push queue !to_send;
      to_send := []
    done
  with End_of_input ->
    begin
      if !to_send <> [] then Pqueue.push queue !to_send;
      (* tell workers to stop *)
      (* printf "feeder %d: telling workers to stop\n%!" pid; *)
      for _ = 1 to nprocs do
        Pqueue.push queue []
      done
    end

(* worker process loop *)
let go_to_work jobs_queue work results_queue =
  (* let pid = Unix.getpid () in *)
  (* printf "worker %d: started\n%!" pid; *)
  let finished = ref false in
  let process_one =
    if !copy_on_work then Pqueue.process_one_copy
    else Pqueue.process_one_in_place
  in
  while not !finished do
    process_one jobs_queue (function
        | [] -> finished := true
        | xs ->
          let ys = List.rev_map work xs in
          (* printf "worker %d: did one\n%!" pid; *)
          Pqueue.push results_queue ys
      )
  done;
  (* tell collector to stop *)
  (* printf "worker %d: I'm done\n%!" pid; *)
  Pqueue.push results_queue []

let fork_out f =
  match Unix.fork () with
  | -1 -> failwith "Parany.fork_out: fork failed"
  | 0 -> let () = f () in exit 0
  | _pid -> ()

let run ~verbose ~csize ~nprocs ~demux ~work ~mux =
  debug := verbose;
  if nprocs <= 1 then
    (* sequential version *)
    try
      while true do
        mux (work (demux ()))
      done
    with End_of_input -> ()
  else
    begin
      assert(csize >= 1);
      let max_cores = Cpu.numcores () in
      assert(nprocs <= max_cores);
      (* parallel version *)
      (* let pid = Unix.getpid () in *)
      (* printf "father %d: started\n%!" pid; *)
      (* create queues *)
      let jobs_queue = Pqueue.create "jobs_in" in
      let results_queue = Pqueue.create "results_out" in
      (* start feeder *)
      (* printf "father %d: starting feeder\n%!" pid; *)
      Gc.compact (); (* like parmap: reclaim memory prior to forking *)
      fork_out (fun () -> feed_them_all csize nprocs demux jobs_queue);
      (* start workers *)
      for worker_rank = 0 to nprocs - 1 do
        (* printf "father %d: starting a worker\n%!" pid; *)
        fork_out (fun () ->
            if !core_pinning then Cpu.setcore worker_rank;
            go_to_work jobs_queue work results_queue
          )
      done;
      (* collect results *)
      let nb_finished = ref 0 in
      let process_one =
        if !copy_on_mux then Pqueue.process_one_copy
        else Pqueue.process_one_in_place in
      while !nb_finished < nprocs do
        process_one results_queue (fun msg ->
            match msg with
            | [] -> incr nb_finished
            | xs ->
              (* printf "father %d: collecting one\n%!" pid; *)
              List.iter mux xs
          )
      done;
      (* free resources *)
      Pqueue.destroy jobs_queue;
      Pqueue.destroy results_queue
    end

(* Wrapper for near-compatibility with Parmap *)
module Parmap = struct

  let tail_rec_map f l =
    List.rev (List.rev_map f l)

  let parmap ~ncores ?(csize = 1) f l =
    if ncores <= 1 then tail_rec_map f l
    else
      let input = ref l in
      let demux () = match !input with
        | [] -> raise End_of_input
        | x :: xs -> (input := xs; x) in
      let output = ref [] in
      let mux x =
        output := x :: !output in
      (* for safety *)
      set_copy_on_work ();
      set_copy_on_mux ();
      (* parallel work *)
      run ~verbose:false ~csize ~nprocs:ncores ~demux ~work:f ~mux;
      !output

  let pariter ~ncores ?(csize = 1) f l =
    if ncores <= 1 then List.iter f l
    else
      let input = ref l in
      let demux () = match !input with
        | [] -> raise End_of_input
        | x :: xs -> (input := xs; x) in
      (* for safety *)
      set_copy_on_work ();
      (* parallel work *)
      run ~verbose:false ~csize ~nprocs:ncores ~demux ~work:f ~mux:ignore

  let parfold ~ncores ?(csize = 1) f g init l =
    if ncores <= 1 then List.fold_left g init (tail_rec_map f l)
    else
      let input = ref l in
      let demux () = match !input with
        | [] -> raise End_of_input
        | x :: xs -> (input := xs; x) in
      let output = ref init in
      let mux x =
        output := g !output x in
      (* for safety *)
      set_copy_on_work ();
      set_copy_on_mux ();
      (* parallel work *)
      run ~verbose:false ~csize ~nprocs:ncores ~demux ~work:f ~mux;
      !output
end