Source file lwt_domain.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
open Lwt.Infix
module C = Domainslib.Chan
module T = Domainslib.Task
type pool = Domainslib.Task.pool
let setup_pool ?name num_additional_domains =
T.setup_pool ?name ~num_additional_domains ()
let teardown_pool = T.teardown_pool
let lookup_pool = T.lookup_pool
let get_num_domains = T.get_num_domains
let init_result = Error (Failure "Lwt_domain.detach")
let detach pool f args =
if (get_num_domains pool = 1) then
Lwt.wrap1 f args
else begin
let result = ref init_result in
let task () =
result := try Ok (f args) with exn -> Error exn
in
let waiter, wakener = Lwt.wait () in
let id =
Lwt_unix.make_notification ~once:true
(fun () -> Lwt.wakeup_result wakener !result)
in
let _ = T.async pool (fun _ -> task ();
Lwt_unix.send_notification id) in
waiter
end
let jobs = C.make_unbounded ()
let job_done = C.make_bounded 0
let job_notification =
Lwt_unix.make_notification
(fun () ->
let thunk = C.recv jobs in
ignore (thunk ()))
let run_in_main f =
let res = ref init_result in
let job () =
Lwt.try_bind f
(fun ret -> Lwt.return (Result.Ok ret))
(fun exn -> Lwt.return (Result.Error exn)) >>= fun result ->
res := result;
C.send job_done ();
Lwt.return_unit
in
C.send jobs job;
Lwt_unix.send_notification job_notification;
C.recv job_done;
match !res with
| Result.Ok ret -> ret
| Result.Error exn -> raise exn