1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
open Lwt
open Xs_protocol
let ( |> ) a b = b a
let ( ++ ) f g x = f (g x)
let debug fmt = Logging.debug "xs_server" fmt
let error fmt = Logging.error "xs_server" fmt
let store =
let store = Store.create () in
List.iter
(fun path ->
let p = Store.Path.create path (Store.Path.getdomainpath 0) in
if not (Store.exists store p) then
Store.mkdir store 0 (Perms.of_domain 0) p)
[
"/local"
; "/local/domain"
; "/tool"
; "/tool/xenstored"
; "/tool/xenstored/quota"
; "/tool/xenstored/connection"
; "/tool/xenstored/log"
; "/tool/xenstored/memory"
];
store
module type TRANSPORT = sig
type 'a t = 'a Lwt.t
val return : 'a -> 'a Lwt.t
val ( >>= ) : 'a t -> ('a -> 'b Lwt.t) -> 'b Lwt.t
type server
val listen : unit -> server Lwt.t
type channel
val read : channel -> bytes -> int -> int -> int Lwt.t
val write : channel -> bytes -> int -> int -> unit Lwt.t
val destroy : channel -> unit Lwt.t
val address_of : channel -> Xs_protocol.address Lwt.t
val namespace_of : channel -> (module Namespace.IO) option
val accept_forever : server -> (channel -> unit Lwt.t) -> 'a Lwt.t
end
module Server =
functor
(T : TRANSPORT)
->
struct
module PS = PacketStream (T)
let handle_connection t =
T.address_of t >>= fun address ->
let interface = T.namespace_of t in
let c = Connection.create address interface in
let channel = PS.make t in
let m = Lwt_mutex.create () in
let take_watch_events () =
let q =
List.rev
(Queue.fold (fun acc x -> x :: acc) [] c.Connection.watch_events)
in
Queue.clear c.Connection.watch_events;
q
in
let flush_watch_events q =
Lwt_list.iter_s
(fun (path, token) ->
PS.send channel
Xs_protocol.(Response.(print (Watchevent (path, token)) 0l 0l)))
q
in
let (background_watch_event_flusher : unit Lwt.t) =
let rec forever () =
Lwt_mutex.with_lock m (fun () ->
let rec loop () =
if Queue.length c.Connection.watch_events = 0 then
Lwt_condition.wait ~mutex:m c.Connection.cvar >>= fun () ->
loop ()
else return ()
in
loop () >>= fun () -> flush_watch_events (take_watch_events ()))
>>= fun () -> forever ()
in
forever ()
in
Lwt.catch
(fun () ->
let rec forever () =
(PS.recv channel >>= function
| Ok x -> return x
| Exception e -> Lwt.fail e)
>>= fun request ->
let events = take_watch_events () in
let reply = Call.reply store c request in
Lwt_mutex.with_lock m (fun () ->
flush_watch_events events >>= fun () -> PS.send channel reply)
>>= fun () -> forever ()
in
forever () >>= fun () -> T.destroy t)
(fun _ ->
Lwt.cancel background_watch_event_flusher;
Connection.destroy address;
T.destroy t)
let serve_forever () =
Parser.allow_oversize_packets := false;
T.listen () >>= fun server -> T.accept_forever server handle_connection
end