Source file Http_Controller.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
open Core
open Async
open Cohttp_async
open Frenetic_netkat.Syntax
open Common
module Server = Cohttp_async.Server
module Comp = Frenetic_netkat.Local_compiler
type client = {
policy_node: (DynGraph.cannot_receive, policy) DynGraph.t;
event_reader: string Pipe.Reader.t;
event_writer: string Pipe.Writer.t;
}
let current_compiler_options = ref Comp.default_compiler_options
let port_to_json port = `Int (Int32.to_int_exn port)
let switch_and_ports_to_json (sw, ports) =
`Assoc [("switch_id", `Int (Int64.to_int_exn sw));
("ports", `List (List.map ~f:port_to_json ports))]
let current_switches_to_json lst =
`List (List.map ~f:switch_and_ports_to_json lst)
let current_switches_to_json_string lst =
Yojson.Basic.to_string ~std:true (current_switches_to_json lst)
let unions (pols : policy list) : policy =
List.fold_left pols ~init:drop ~f:(fun p q -> Union (p, q))
let pol : (policy, policy) DynGraph.t = DynGraph.create drop unions
let clients : (string, client) Hashtbl.t = Hashtbl.Poly.create ()
let iter_clients (f : string -> client -> unit) : unit =
Hashtbl.iteri clients ~f:(fun ~key ~data -> f key data)
let rec propogate_events event =
event () >>=
fun evt ->
let response = Frenetic_netkat.Json.event_to_json_string evt in
Hashtbl.iteri clients (fun ~key ~data:client ->
Pipe.write_without_pushback client.event_writer response);
propogate_events event
let get_client (clientId: string): client =
Hashtbl.find_or_add clients clientId
~default:(fun () ->
printf ~level:`Info "New client %s" clientId;
let node = DynGraph.create_source drop in
DynGraph.attach node pol;
let (r, w) = Pipe.create () in
{ policy_node = node; event_reader = r; event_writer = w }
)
let handle_request
(module Controller : NetKAT_Controller.CONTROLLER)
~(body : Cohttp_async.Body.t)
(client_addr : Socket.Address.Inet.t)
(request : Request.t) : Server.response Deferred.t =
let open Controller in
Logging.info "%s %s" (Cohttp.Code.string_of_method request.meth)
(Uri.path (Request.uri request));
match request.meth, extract_path request with
| `GET, ["version"] -> Server.respond_string "4"
| `GET, ["port_stats"; switch_id; port_id] ->
port_stats (Int64.of_string switch_id) (Int32.of_string port_id)
>>= fun portStats ->
Server.respond_string (Frenetic_netkat.Json.port_stat_to_json_string portStats)
| `GET, ["current_switches"] ->
switches () >>= fun switches ->
Server.respond_string (current_switches_to_json_string switches)
| `GET, ["query"; name] ->
query name
>>= fun stats ->
Server.respond_string (Frenetic_netkat.Json.stats_to_json_string stats)
| `GET, [clientId; "event"] ->
let curr_client = get_client clientId in
Pipe.read curr_client.event_reader
>>= (function
| `Eof -> assert false
| `Ok response -> Server.respond_string response)
| `POST, ["pkt_out"] ->
handle_parse_errors' body
(fun str ->
let json = Yojson.Basic.from_string str in
Frenetic_netkat.Json.pkt_out_from_json json)
(fun (sw_id, port_id, payload, policies) ->
packet_out sw_id port_id payload policies >>= fun () ->
Cohttp_async.Server.respond `OK)
| `POST, [clientId; "update_json"] ->
handle_parse_errors body parse_update_json
(fun pol ->
DynGraph.push pol (get_client clientId).policy_node;
Cohttp_async.Server.respond `OK)
| `POST, [clientId; "update" ] ->
handle_parse_errors body parse_update
(fun pol ->
DynGraph.push pol (get_client clientId).policy_node;
Cohttp_async.Server.respond `OK)
| `POST, ["config"] ->
printf "POST /config";
handle_parse_errors body parse_config_json
(fun conf ->
current_compiler_options := conf;
set_current_compiler_options conf;
Cohttp_async.Server.respond `OK)
| `GET, ["config"] ->
printf "GET /config";
Comp.options_to_json_string !current_compiler_options |>
Cohttp_async.Server.respond_string
| _, _ ->
Logging.error "Unknown method/path (404 error)";
Cohttp_async.Server.respond `Not_found
let print_error addr exn =
let monitor_exn = Exn.to_string (Monitor.extract_exn exn) in
match String.substr_index monitor_exn ~pattern:"writer fd unexpectedly closed" with
| Some _ -> Logging.info "Ignoring writer exception"
| None -> Logging.error "%s" monitor_exn
let listen ~http_port ~openflow_port =
let module Controller = NetKAT_Controller.Make(OpenFlow0x01_Plugin) in
let on_handler_error = `Call print_error in
let _ = Cohttp_async.Server.create
~on_handler_error
(Tcp.Where_to_listen.of_port http_port)
(handle_request (module Controller)) in
let (_, pol_reader) = DynGraph.to_pipe pol in
let _ = Pipe.iter pol_reader ~f:(fun pol -> Controller.update pol) in
Controller.start openflow_port;
don't_wait_for(propogate_events Controller.event);
Deferred.return ()
let main (http_port : int) (openflow_port : int) () : unit =
don't_wait_for(listen ~http_port ~openflow_port)