1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
let src = Logs.Src.create "tcp.window" ~doc:"Mirage TCP Window module"
module Log = (val Logs.src_log src : Logs.LOG)
type time = int64
type t = {
tx_mss: int;
tx_isn: Sequence.t;
rx_isn: Sequence.t;
max_rx_wnd: int32;
tx_wnd_scale: int;
rx_wnd_scale: int;
mutable ack_serviced: bool;
mutable ack_seq: Sequence.t;
mutable ack_win: int;
mutable snd_una: Sequence.t;
mutable tx_nxt: Sequence.t;
mutable rx_nxt: Sequence.t;
mutable rx_nxt_inseq: Sequence.t;
mutable fast_rec_th: Sequence.t;
mutable max_tx_wnd : int32;
mutable tx_wnd: int32;
mutable rx_wnd: int32;
mutable ssthresh: int32;
mutable cwnd: int32;
mutable fast_recovery: bool;
mutable rtt_timer_on: bool;
mutable rtt_timer_reset: bool;
mutable rtt_timer_seq: Sequence.t;
mutable rtt_timer_starttime: time;
mutable srtt: time;
mutable rttvar: time;
mutable rto: int64;
mutable backoff_count: int;
}
let pp fmt t =
Format.fprintf fmt
"Window: rx_nxt=%a rx_nxt_inseq=%a tx_nxt=%a rx_wnd=%lu tx_wnd=%lu snd_una=%a backoffs=%d rto=%Lu"
Sequence.pp t.rx_nxt
Sequence.pp t.rx_nxt_inseq
Sequence.pp t.tx_nxt
t.rx_wnd t.tx_wnd
Sequence.pp t.snd_una
t.backoff_count t.rto
let t ~rx_wnd_scale ~tx_wnd_scale ~rx_wnd ~tx_wnd ~rx_isn ~tx_mss ~tx_isn =
let tx_nxt = tx_isn in
let rx_nxt = Sequence.succ rx_isn in
let rx_nxt_inseq = Sequence.succ rx_isn in
let snd_una = tx_nxt in
let fast_rec_th = tx_nxt in
let ack_serviced = true in
let ack_seq = tx_nxt in
let ack_win = rx_wnd in
let rx_wnd = Int32.(shift_left (of_int rx_wnd) rx_wnd_scale) in
let max_rx_wnd = rx_wnd in
let tx_wnd = Int32.(shift_left (of_int tx_wnd) tx_wnd_scale) in
let max_tx_wnd = tx_wnd in
let ssthresh = tx_wnd in
let cwnd = Int32.of_int (tx_mss * 2) in
let fast_recovery = false in
let rtt_timer_on = false in
let rtt_timer_reset = true in
let rtt_timer_seq = tx_nxt in
let rtt_timer_starttime = 0L in
let srtt = (Duration.of_ms 667) in
let rttvar = 0L in
let rto = (Duration.of_ms 667) in
let backoff_count = 0 in
{ tx_isn; rx_isn; max_rx_wnd; max_tx_wnd;
ack_serviced; ack_seq; ack_win;
snd_una; tx_nxt; tx_wnd; rx_nxt; rx_nxt_inseq;
fast_rec_th; rx_wnd; tx_wnd_scale; rx_wnd_scale;
ssthresh; cwnd; tx_mss; fast_recovery;
rtt_timer_on; rtt_timer_reset;
rtt_timer_seq; rtt_timer_starttime; srtt; rttvar; rto; backoff_count }
let valid t seq =
let redge = Sequence.(add t.rx_nxt (of_int32 t.rx_wnd)) in
let ledge = Sequence.(sub t.rx_nxt (of_int32 t.max_rx_wnd)) in
let r = Sequence.between seq ledge redge in
Log.debug (fun f -> f "sequence validation: seq=%a range=%a[%lu] res=%b"
Sequence.pp seq Sequence.pp t.rx_nxt t.rx_wnd r);
r
let rx_advance t b =
t.rx_nxt <- Sequence.add t.rx_nxt b
let rx_advance_inseq t b =
t.rx_nxt_inseq <- Sequence.add t.rx_nxt_inseq b
let rx_nxt t = t.rx_nxt
let rx_nxt_inseq t = t.rx_nxt_inseq
let rx_wnd t = t.rx_wnd
let rx_wnd_unscaled t = Int32.shift_right t.rx_wnd t.rx_wnd_scale
let ack_serviced t = t.ack_serviced
let ack_seq t = t.ack_seq
let ack_win t = t.ack_win
let set_ack_serviced t v = t.ack_serviced <- v
let set_ack_seq_win t s w =
t.ack_seq <- s;
t.ack_win <- w
let set_rx_wnd t sz =
t.rx_wnd <- max sz (Int32.of_int (3 * t.tx_mss + 1 lsl t.rx_wnd_scale))
let set_tx_wnd t sz =
let wnd = Int32.(shift_left (of_int sz) t.tx_wnd_scale) in
t.tx_wnd <- wnd;
if wnd > t.max_tx_wnd then
t.max_tx_wnd <- wnd
let tx_mss t =
t.tx_mss
let tx_advance t b =
if not t.rtt_timer_on && not t.fast_recovery then begin
t.rtt_timer_on <- true;
t.rtt_timer_seq <- t.tx_nxt;
t.rtt_timer_starttime <- Mirage_mtime.elapsed_ns ();
end;
t.tx_nxt <- Sequence.add t.tx_nxt b
let tx_ack t r win =
set_tx_wnd t win;
if t.fast_recovery then begin
if Sequence.gt r t.snd_una then
t.snd_una <- r;
if Sequence.geq r t.fast_rec_th then begin
Log.debug (fun f -> f "EXITING fast recovery");
t.cwnd <- t.ssthresh;
t.fast_recovery <- false;
end else begin
t.cwnd <- (Int32.add t.cwnd (Int32.of_int t.tx_mss));
end
end else begin
if Sequence.gt r t.snd_una then begin
t.backoff_count <- 0;
t.snd_una <- r;
if t.rtt_timer_on && Sequence.gt r t.rtt_timer_seq then begin
t.rtt_timer_on <- false;
let rtt_m = Int64.sub (Mirage_mtime.elapsed_ns ()) t.rtt_timer_starttime in
if t.rtt_timer_reset then begin
t.rtt_timer_reset <- false;
t.rttvar <- Int64.div rtt_m 2L;
t.srtt <- rtt_m;
end else begin
let (/) = Int64.div
and ( * ) = Int64.mul
and (-) = Int64.sub
and (+) = Int64.add
in
t.rttvar <- (3L * t.rttvar / 4L) + (Int64.abs (t.srtt - rtt_m) / 4L);
t.srtt <- (7L * t.srtt / 8L) + (rtt_m / 8L)
end;
t.rto <- max (Duration.of_ms 667) Int64.(add t.srtt (mul t.rttvar 4L));
end;
end;
let cwnd_incr = match t.cwnd < t.ssthresh with
| true -> Int32.of_int t.tx_mss
| false -> max (Int32.div (Int32.of_int (t.tx_mss * t.tx_mss)) t.cwnd) 1l
in
t.cwnd <- Int32.add t.cwnd cwnd_incr
end
let tx_nxt t = t.tx_nxt
let tx_wnd t = t.tx_wnd
let tx_wnd_unscaled t = Int32.shift_right t.tx_wnd t.tx_wnd_scale
let max_tx_wnd t = t.max_tx_wnd
let tx_una t = t.snd_una
let fast_rec t = t.fast_recovery
let tx_available t =
let inflight = Sequence.to_int32 (Sequence.sub t.tx_nxt t.snd_una) in
let win = min t.cwnd t.tx_wnd in
let avail_win = Int32.sub win inflight in
match avail_win < Int32.of_int t.tx_mss with
| true -> 0l
| false -> avail_win
let tx_inflight t =
t.tx_nxt <> t.snd_una
let alert_fast_rexmit t _ =
if not t.fast_recovery then begin
let inflight = Sequence.to_int32 (Sequence.sub t.tx_nxt t.snd_una) in
let newssthresh = max (Int32.div inflight 2l) (Int32.of_int (t.tx_mss * 2)) in
let newcwnd = Int32.add inflight (Int32.of_int (t.tx_mss * 2)) in
Log.debug (fun fmt ->
fmt "ENTERING fast recovery inflight=%ld, ssthresh=%ld -> %ld, \
cwnd=%ld -> %ld"
inflight t.ssthresh newssthresh t.cwnd newcwnd);
t.fast_recovery <- true;
t.fast_rec_th <- t.tx_nxt;
t.ssthresh <- newssthresh;
t.rtt_timer_on <- false;
t.cwnd <- newcwnd
end
let rto t =
match t.backoff_count with
| 0 -> t.rto
| _ -> Int64.(mul t.rto (shift_left 2L t.backoff_count))
let backoff_rto t =
t.backoff_count <- t.backoff_count + 1;
t.rtt_timer_on <- false;
t.rtt_timer_reset <- true
let max_rexmits_done t =
(t.backoff_count > 5)
let tx_totalbytes t =
Sequence.(to_int (sub t.tx_nxt t.tx_isn))
let rx_totalbytes t =
(-) Sequence.(to_int (sub t.rx_nxt t.rx_isn)) 1