Source file parany.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229

open Printf
module Fn = Filename

let debug = ref false

let core_pinning = ref false (* OFF by default, because of multi-users *)

let enable_core_pinning () =
  core_pinning := true

let disable_core_pinning () =
  core_pinning := false

exception End_of_input

module Shm = struct

  let init () =
    Unix.(socketpair PF_UNIX SOCK_DGRAM 0)

  let unmarshal_from_file fn =
    let input = open_in_bin fn in
    let res = Marshal.from_channel input in
    close_in input;
    res

  let marshal_to_file fn v =
    let out = open_out_bin fn in
    Marshal.to_channel out v [Marshal.No_sharing];
    close_out out

  let rec send_loop sock buff n =
    try
      let sent = Unix.send sock buff 0 n [] in
      assert(sent = n)
    with Unix.Unix_error(ENOBUFS, _, _) ->
      (* send on a UDP socket never blocks on Mac OS X
         and probably several of the BSDs *)
      (* eprintf "sleep\n%!"; *)
      let _ = Unix.select [] [] [] 0.001 in (* wait *)
      (* We should use nanosleep for precision, if only it
         was provided by the Unix module... *)
      send_loop sock buff n

  let raw_send sock str =
    let n = String.length str in
    let buff = Bytes.unsafe_of_string str in
    send_loop sock buff n

  let send fn queue to_send =
    marshal_to_file fn to_send;
    raw_send queue fn

  let raw_receive sock buff =
    let n = Bytes.length buff in
    let received = Unix.recv sock buff 0 n [] in
    assert(received > 0);
    Bytes.sub_string buff 0 received

  let receive queue buff =
    let fn = raw_receive queue buff in
    if fn = "EOF" then
      raise End_of_input
    else
      let res = unmarshal_from_file fn in
      Sys.remove fn;
      res

end

(* feeder process loop *)
let feed_them_all csize ncores demux queue =
  (* let pid = Unix.getpid () in
   * eprintf "feeder(%d) started\n%!" pid; *)
  let in_count = ref 0 in
  let prfx = Filename.temp_file "iparany_" "" in
  let to_send = ref [] in
  try
    while true do
      for _ = 1 to csize do
        to_send := (demux ()) :: !to_send
      done;
      let fn = sprintf "%s_%d" prfx !in_count in
      Shm.send fn queue !to_send;
      (* eprintf "feeder(%d) sent one\n%!" pid; *)
      to_send := [];
      incr in_count
    done
  with End_of_input ->
    begin
      (* if needed, send remaining jobs (< csize) *)
      (if !to_send <> [] then
         let fn = sprintf "%s_%d" prfx !in_count in
         Shm.send fn queue !to_send);
      (* send an EOF to each worker *)
      for _ = 1 to ncores do
        Shm.raw_send queue "EOF"
      done;
      (* eprintf "feeder(%d) finished\n%!" pid; *)
      Sys.remove prfx;
      Unix.close queue
    end

(* worker process loop *)
let go_to_work jobs_queue work results_queue =
  (* let pid = Unix.getpid () in
   * eprintf "worker(%d) started\n%!" pid; *)
  let out_count = ref 0 in
  let prfx = Filename.temp_file "oparany_" "" in
  try
    let buff = Bytes.create 80 in
    while true do
      let xs = Shm.receive jobs_queue buff in
      let ys = List.rev_map work xs in
      (* eprintf "worker(%d) did one\n%!" pid; *)
      let fn = sprintf "%s_%d" prfx !out_count in
      Shm.send fn results_queue ys;
      incr out_count
    done
  with End_of_input ->
    begin
      (* tell collector to stop *)
      (* eprintf "worker(%d) finished\n%!" pid; *)
      Sys.remove prfx;
      Shm.raw_send results_queue "EOF";
      Unix.close results_queue
    end

let fork_out f =
  match Unix.fork () with
  | -1 -> failwith "Parany.fork_out: fork failed"
  | 0 -> let () = f () in exit 0
  | _pid -> ()

let run ~verbose ~csize ~nprocs ~demux ~work ~mux =
  debug := verbose;
  if nprocs <= 1 then
    (* sequential version *)
    try
      while true do
        mux (work (demux ()))
      done
    with End_of_input -> ()
  else
    begin
      assert(csize >= 1);
      let max_cores = Cpu.numcores () in
      assert(nprocs <= max_cores);
      (* parallel version *)
      (* let pid = Unix.getpid () in
       * eprintf "father(%d) started\n%!" pid; *)
      (* create queues *)
      let jobs_in, jobs_out = Shm.init () in
      let res_in, res_out = Shm.init () in
      (* start feeder *)
      (* eprintf "father(%d) starting feeder\n%!" pid; *)
      Gc.compact (); (* like parmap: reclaim memory prior to forking *)
      fork_out (fun () -> feed_them_all csize nprocs demux jobs_in);
      (* start workers *)
      for worker_rank = 0 to nprocs - 1 do
        (* eprintf "father(%d) starting a worker\n%!" pid; *)
        fork_out (fun () ->
            if !core_pinning then Cpu.setcore worker_rank;
            go_to_work jobs_out work res_in
          )
      done;
      (* collect results *)
      let finished = ref 0 in
      let buff = Bytes.create 80 in
      while !finished < nprocs do
        try
          while true do
            let xs = Shm.receive res_out buff in
            (* eprintf "father(%d) collecting one\n%!" pid; *)
            List.iter mux xs
          done
        with End_of_input ->
          incr finished
      done;
      (* eprintf "father(%d) finished\n%!" pid; *)
      (* free resources *)
      List.iter Unix.close [jobs_in; jobs_out; res_in; res_out]
    end

(* Wrapper for near-compatibility with Parmap *)
module Parmap = struct

  let tail_rec_map f l =
    List.rev (List.rev_map f l)

  let parmap ~ncores ?(csize = 1) f l =
    if ncores <= 1 then tail_rec_map f l
    else
      let input = ref l in
      let demux () = match !input with
        | [] -> raise End_of_input
        | x :: xs -> (input := xs; x) in
      let output = ref [] in
      let mux x =
        output := x :: !output in
      (* parallel work *)
      run ~verbose:false ~csize ~nprocs:ncores ~demux ~work:f ~mux;
      !output

  let pariter ~ncores ?(csize = 1) f l =
    if ncores <= 1 then List.iter f l
    else
      let input = ref l in
      let demux () = match !input with
        | [] -> raise End_of_input
        | x :: xs -> (input := xs; x) in
      (* parallel work *)
      run ~verbose:false ~csize ~nprocs:ncores ~demux ~work:f ~mux:ignore

  let parfold ~ncores ?(csize = 1) f g init l =
    if ncores <= 1 then List.fold_left g init (tail_rec_map f l)
    else
      let input = ref l in
      let demux () = match !input with
        | [] -> raise End_of_input
        | x :: xs -> (input := xs; x) in
      let output = ref init in
      let mux x =
        output := g !output x in
      (* parallel work *)
      run ~verbose:false ~csize ~nprocs:ncores ~demux ~work:f ~mux;
      !output
end