Source file state.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
(* (c) 2017-2019 Hannes Mehnert, all rights reserved *)

let src = Logs.Src.create "tcp.tracing" ~doc:"TCP tracing"
module Tracing = (val Logs.src_log src : Logs.LOG)

(* in contrast to literature, there is no need for LISTEN nor CLOSED --
   there's no tcp socket for them anyways *)
type tcp_state =
  | Syn_sent
  | Syn_received
  | Established
  | Close_wait
  | Fin_wait_1
  | Closing
  | Last_ack
  | Fin_wait_2
  | Time_wait

let behind_established = function Syn_sent | Syn_received -> false | _ -> true

let is_connected = function
  | Established | Close_wait | Fin_wait_1 | Closing | Last_ack | Fin_wait_2 -> true
  | _ -> false

let fsm_to_string = function
  | Syn_received -> "syn received"
  | Syn_sent -> "syn sent"
  | Established -> "established"
  | Fin_wait_1 -> "fin wait 1"
  | Fin_wait_2 -> "fin wait 2"
  | Closing -> "closing"
  | Time_wait -> "time wait"
  | Close_wait -> "close wait"
  | Last_ack -> "last ack"

let pp_fsm ppf s =
  Fmt.string ppf (fsm_to_string s)

(* hostTypes:182 *)
type rttinf = {
  t_rttupdated : int ; (*: number of times rtt sampled :*)
  tf_srtt_valid : bool ; (*: estimate is currently believed to be valid :*)
  t_srtt : Duration.t ; (*: smoothed round-trip time :*)
  t_rttvar : Duration.t ; (*: variance in round-trip time :*)
  t_rttmin : Duration.t ; (*: minimum rtt allowed :*)
  t_lastrtt : Duration.t option ; (*: most recent instantaneous RTT obtained :*)
  (*: Note this should really be an option type which is set to [[NONE]] if no
    value has been obtained. The same applies to [[t_lastshift]] below. :*)
  (* in BSD, this is the local variable rtt in tcp_xmit_timer(); we put it here
     because we don't want to store rxtcur in the tcpcb *)
  t_lastshift : int option ; (*: the last retransmission shift used :*)
  t_wassyn : bool (*: whether that shift was [[RexmtSyn]] or not :*)
  (* these two also are to avoid storing rxtcur in the tcpcb; they are somewhat
     annoying because they are *only* required for the tcp_output test that
     returns to slow start if the connection has been idle for >=1RTO *)
}

let pp_rttinf ppf t =
  Fmt.pf ppf "rttinf: #updated %u@ valid %B@ smoothed %a@ variance %a@ min %a@ \
              last %a@ shift %a@ wassyn %B"
    t.t_rttupdated t.tf_srtt_valid Duration.pp t.t_srtt Duration.pp t.t_rttvar
    Duration.pp t.t_rttmin Fmt.(option ~none:(any "none") Duration.pp) t.t_lastrtt
    Fmt.(option ~none:(any "none") int) t.t_lastshift t.t_wassyn

type rexmtmode = RexmtSyn | Rexmt | Persist

let mode_of = function
  | None -> None
  | Some ((x, _), _) -> Some x

module Reassembly_queue = struct
  type reassembly_segment = {
    seq : Sequence.t ;
    fin : bool ;
    data : Rope.t ;
  }

  (* we take care that the list is sorted by the sequence number *)
  type t = reassembly_segment list

  let empty = []

  let is_empty = function [] -> true | _ -> false

  let length t = List.length t

  let pp_rseg ppf { seq ; data ; _ } =
    Fmt.pf ppf "%a (len %u)" Sequence.pp seq (Rope.length data)

  let pp = Fmt.(list ~sep:(any ", ") pp_rseg)

  (* insert segment, potentially coalescing existing ones *)
  let insert_seg t (seq, fin, (data : Rope.t)) =
    (* they may overlap, the newest seg wins *)
    (* (1) figure out the place whereafter to insert the seg *)
    (* (2) peek whether the next seg can be already coalesced *)
    let inserted, segq =
      List.fold_left (fun (inserted, acc) e ->
          match inserted with
          | Some (elt, seq_end) ->
            (* 2 - the current "e" may be merged into the head of acc *)
            let acc' = match acc with [] -> [] | _hd :: tl -> tl in
            if Sequence.less_equal e.seq seq_end then
              let overlap = Sequence.sub seq_end e.seq in
              if overlap = 0 then
                (* overlap = 0, we can just merge them *)
                let elt = { elt with fin = e.fin || elt.fin ; data = Rope.concat elt.data e.data } in
                Some (elt, Sequence.addi elt.seq (Rope.length elt.data)), elt :: acc'
              else
                (* we need to cut some bytes from e *)
                let data = Rope.shift e.data overlap in
                let data = Rope.concat elt.data data in
                let elt = { elt with fin = e.fin || elt.fin ; data } in
                Some (elt, Sequence.addi elt.seq (Rope.length data)), elt :: acc'
            else
              (* there's still a hole, nothing to merge *)
              (inserted, e :: acc)
          | None ->
            (* 1 *)
            (* there are three cases:
               - (a) the new seq is before the existing e.seq -> prepend
                     (and figure out whether to merge with e)
                     seq <= e.seq
               - (b) the new seq is within e.seq + len e -> append (partially)
                     seq <= e.seq + len
               - (c) the new seq is way behind e.seq + len e -> move along
                     seq > e.seq + len
            *)
            if Sequence.less_equal seq e.seq then
              (* case (a) *)
              let seq_e = Sequence.addi seq (Rope.length data) in
              (* case (1): a new segment that is way before the existing one:
                 seq <= e.seq && seq_e <= e.seq -> e must be retained
                 case (2): a new segment that is partially before the existing:
                 seq <= e.seq && seq_e > e.seq -> e may be partially retained:
                  (i) seq_e >= e.seq_e -> drop e
                  (ii) seq_e < e.seq_e -> retain the last bytes of e
              *)
              if Sequence.less_equal seq_e e.seq then
                if Sequence.equal seq_e e.seq then
                  let e = { seq ; fin = fin || e.fin ; data = Rope.concat data e.data } in
                  Some (e, Sequence.addi seq (Rope.length e.data)), e :: acc
                else
                  let e' = { seq ; fin ; data } in
                  Some (e', Sequence.addi seq (Rope.length data)), e :: e' :: acc
              else
                let e_seq_e = Sequence.addi e.seq (Rope.length e.data) in
                if Sequence.greater_equal seq_e e_seq_e then
                  let e' = { seq ; fin ; data } in
                  Some (e', seq_e), e' :: acc
                else
                  (* we've to retain some parts of seq *)
                  let post =
                    let retain_data = Sequence.sub e_seq_e seq_e in
                    let skip_data = Rope.length e.data - retain_data in
                    Rope.shift e.data skip_data
                  in
                  let e = { seq ; fin = fin || e.fin ; data = Rope.concat data post } in
                  Some (e, Sequence.addi seq (Rope.length e.data)), e :: acc
            else
              let e_seq_e = Sequence.addi e.seq (Rope.length e.data) in
              if Sequence.less_equal seq e_seq_e then
                (* case (b) we append to the thing *)
                if Sequence.equal seq e_seq_e then
                  let e = { e with fin = fin || e.fin ; data = Rope.concat e.data data } in
                  Some (e, Sequence.addi e_seq_e (Rope.length data)), e :: acc
                else
                  let overlap = Sequence.sub e_seq_e seq in
                  let pre = Rope.chop e.data (Rope.length e.data - overlap) in
                  let seq_e = Sequence.addi seq (Rope.length data) in
                  let end_ = Sequence.max e_seq_e seq_e in
                  let post =
                    if Sequence.greater e_seq_e seq_e then
                      let retain_data = Sequence.sub e_seq_e seq_e in
                      let skip_data = Rope.length e.data - retain_data in
                      Rope.shift e.data skip_data
                    else
                      Rope.empty
                  in
                  let e = { e with fin = fin || e.fin ; data = Rope.concat (Rope.concat pre data) post } in
                  Some (e, end_), e :: acc
              else
                (None, e :: acc))
        (None, []) t
    in
    let segq =
      if inserted = None then
        { seq ; fin ; data } :: segq
      else
        segq
    in
    List.rev segq

  let maybe_take t seq =
    let r, t' =
      List.fold_left (fun (r, acc) e ->
          match r with
          | None ->
            if Sequence.equal seq e.seq then
              Some (e.data, e.fin), acc
            else if Sequence.greater seq e.seq then
              let e_end = Sequence.addi e.seq (Rope.length e.data) in
              if Sequence.less seq e_end then
                let to_cut = Sequence.sub seq e.seq in
                let data = Rope.shift e.data to_cut in
                Some (data, e.fin), acc
              else
                None, acc
            else
              None, e :: acc
          | Some _ -> (r, e :: acc))
        (None, []) t
    in
    List.rev t', r
end

(* hostTypes:230 but dropped urg and ts stuff *)
type control_block = {
  (*: timers :*)
  (* TODO pretty sure we can consolidate them to one or two fields *)
  (* additionally, not all are allowed in all tcp states *)
  tt_rexmt : (rexmtmode * int) Timers.timed option; (*: retransmit timer, with mode and shift; [[NONE]] is idle :*)
    (*: see |tcp_output.c:356ff| for more info. :*)
    (*: as in BSD, the shift starts at zero, and is incremented each
        time the timer fires.  So it is zero during the first interval,
        1 after the first retransmit, etc. :*)
  (* tt_keep : unit Timers.timed option ; (\*: keepalive timer :*\) *)
  tt_2msl : unit Timers.timed option ; (*: $2*\mathit{MSL}$ [[TIME_WAIT]] timer :*)
  tt_delack : unit Timers.timed option ; (*: delayed [[ACK]] timer :*)
  tt_conn_est : unit Timers.timed option ; (*: connection-establishment timer, overlays keep in BSD :*)
  tt_fin_wait_2 : unit Timers.timed option ; (*: [[FIN_WAIT_2]] timer, overlays 2msl in BSD :*)
  t_idletime : Mtime.t ; (*: time since last segment received :*)

  (*: flags, some corresponding to BSD |TF_| flags :*)
  tf_needfin : bool ;
  tf_shouldacknow : bool ;

  (*: send variables :*)
  snd_una : Sequence.t ; (*: lowest unacknowledged sequence number :*)
  snd_max : Sequence.t ; (*: highest sequence number sent; used to recognise retransmits :*)
  snd_nxt : Sequence.t ; (*: next sequence number to send :*)
  snd_wl1 : Sequence.t ; (*: seq number of most recent window update segment :*)
  snd_wl2 : Sequence.t ; (*: ack number of most recent window update segment :*)
  iss : Sequence.t ; (* initial send sequence number *)
  snd_wnd : int ; (*: send window size: always between 0 and 65535*2**14 :*)
  snd_cwnd : int ; (*: congestion window :*)
  snd_ssthresh : int ; (*: threshold between exponential and linear [[snd_cwnd]] expansion (for slow start):*)

  (*: receive variables :*)
  rcv_wnd : int ; (*: receive window size :*)
  tf_rxwin0sent : bool ; (*: have advertised a zero window to receiver :*)
  rcv_nxt : Sequence.t ; (*: lowest sequence number not yet received :*)
  irs : Sequence.t ; (*: initial receive sequence number :*)
  rcv_adv : Sequence.t ; (*: most recently advertised window :*)
  last_ack_sent : Sequence.t ; (*: last acknowledged sequence number :*)

  (*: connection parameters :*)
  (* TODO move into tcp_state, at least t_advmss; tf_doing_ws/request_r_scale *)
  (* we also don't need that many options: we will do window scaling and MSS! *)
  t_maxseg : int ; (*: maximum segment size on this connection :*)
  t_advmss : int ; (*: the mss advertisment sent in our initial SYN :*)

  (* currently: false, None, 0, 0 in initial_cb;
     deliver_in_1 sets tf_doing_ws, request_r_scale, snd_scale, rcv_scale
     connect_1 sets request_r_scale
     Segment.make_syn/make_syn_ack use request_r_scale!
     deliver_in_2 sets tf_doing_ws, snd_scale, rcv_scale
     timer_tt_rexmtsyn may set request_r_scale to None
     --> only once we're in established, the values should be used! (retransmissions handle this?)
 *)
  tf_doing_ws : bool ; (*: doing window scaling on this connection?  (result of negotiation) :*)
  request_r_scale : int option ; (*: pending window scaling, if any (used during negotiation) :*)
  snd_scale : int ; (*: window scaling for send window (0..14), applied to received advertisements (RFC1323) :*)
  rcv_scale : int ; (*: window scaling for receive window (0..14), applied when we send advertisements (RFC1323) :*)

  (*: round-trip time estimation :*)
  t_rttseg : (Mtime.t * Sequence.t) option ; (*: start time and sequence number of segment being timed :*)
  t_rttinf : rttinf ; (*: round-trip time estimator values :*)

  (*: retransmission :*)
  t_dupacks : int ; (*: number of consecutive duplicate acks received (typically 0..3ish; should this wrap at 64K/4G ack burst?) :*)
  t_badrxtwin : Mtime.t ; (*: deadline for bad-retransmit recovery :*)
  snd_cwnd_prev : int ; (*: [[snd_cwnd]] prior to retransmit (used in bad-retransmit recovery) :*)
  snd_ssthresh_prev : int ; (*: [[snd_ssthresh]] prior to retransmit (used in bad-retransmit recovery) :*)
  snd_recover : Sequence.t ; (*: highest sequence number sent at time of receipt of partial ack (used in RFC2581/RFC2582 fast recovery) :*)

  (*: other :*)
  t_segq :  Reassembly_queue.t;  (*: segment reassembly queue :*)
  t_softerror : string option      (*: current transient error; reported only if failure becomes permanent :*)
  (*: could cut this down to the actually-possible errors? :*)

}

(* auxFns:1066*)
let initial_cb =
  let initial_rttinf = {
    t_rttupdated = 0;
    tf_srtt_valid = false;
    t_srtt = Params.tcptv_rtobase;
    t_rttvar = Params.tcptv_rttvarbase;
    t_rttmin = Params.tcptv_min;
    t_lastrtt = None;
    t_lastshift = None;
    t_wassyn = false  (* if t_lastshift=0, this doesn't make a difference *)
  } in
  {
    (* <| t_segq            := []; *)
    tt_rexmt = None;
    (* tt_keep = None; *)
    tt_2msl = None;
    tt_delack = None;
    tt_conn_est = None;
    tt_fin_wait_2 = None;
    tf_needfin = false;
    tf_shouldacknow = false;
    snd_una = Sequence.zero;
    snd_max = Sequence.zero;
    snd_nxt = Sequence.zero;
    snd_wl1 = Sequence.zero;
    snd_wl2 = Sequence.zero;
    iss = Sequence.zero;
    snd_wnd = 0;
    snd_cwnd = Params.tcp_maxwin lsl Params.tcp_maxwinscale;
    snd_ssthresh = Params.tcp_maxwin lsl Params.tcp_maxwinscale;
    rcv_wnd = 0;
    tf_rxwin0sent = false;
    rcv_nxt = Sequence.zero;
    irs = Sequence.zero;
    rcv_adv = Sequence.zero;
    snd_recover = Sequence.zero;
    t_maxseg = Params.mssdflt;
    t_advmss = Params.mssdflt;
    t_rttseg = None;
    t_rttinf = initial_rttinf ;
    t_dupacks = 0;
    t_idletime = Mtime.of_uint64_ns 0L;
    t_segq = Reassembly_queue.empty ;
    t_softerror = None;
    snd_scale = 0;
    rcv_scale = 0;
    request_r_scale = None;
    tf_doing_ws = false;
    last_ack_sent = Sequence.zero;
    snd_cwnd_prev = 0;
    snd_ssthresh_prev = 0;
    t_badrxtwin = Mtime.of_uint64_ns 0L;
  }

let pp_timer now ppf (_, deadline) =
  let now_span = Mtime.Span.of_uint64_ns (Mtime.to_uint64_ns now) in
  Duration.pp ppf
    (Mtime.to_uint64_ns
       (Option.value ~default:Mtime.min_stamp (Mtime.sub_span deadline now_span)))

let pp_rexmt now ppf ((mode, shift), deadline) =
  Fmt.pf ppf "%s, shift %u, deadline %a"
    (match mode with RexmtSyn -> "syn" | Rexmt -> "rexmt" | Persist -> "persist")
    shift (pp_timer now) ((), deadline)

let pp_control now ppf c =
  Fmt.pf ppf "needfin %B@ shouldacknow %B@ snd_una %a@ snd_max %a@ snd_nxt %a@ \
              snd_wl1 %a@ snd_wl2 %a@ iss %a@ snd_wnd %d@ snd_cwnd %d@ \
              snd_sshtresh %d@ rcv_wnd %d@ tf_rxwin0sent %B@ rcv_nxt %a@ \
              irs %a@ rcv_adv %a@ snd_recover %a@ t_maxseg %d@ t_advmss %d@ \
              snd_scale %d@ rcv_scale %d@ request_r_scale %a@ tf_doing_ws %B@ \
              tt_rexmt %a@ tt_2msl %a@ tt_delack %a@ tt_conn_est %a@ \
              tt_fin_wait_2 %a@ dupacks %u@ rttinf %a@ rttseg %a"
    c.tf_needfin c.tf_shouldacknow
    Sequence.pp c.snd_una Sequence.pp c.snd_max Sequence.pp c.snd_nxt
    Sequence.pp c.snd_wl1 Sequence.pp c.snd_wl2 Sequence.pp c.iss
    c.snd_wnd c.snd_cwnd c.snd_ssthresh c.rcv_wnd c.tf_rxwin0sent
    Sequence.pp c.rcv_nxt Sequence.pp c.irs Sequence.pp c.rcv_adv
    Sequence.pp c.snd_recover c.t_maxseg c.t_advmss
    c.snd_scale c.rcv_scale Fmt.(option ~none:(any "no") int) c.request_r_scale c.tf_doing_ws
    Fmt.(option ~none:(any "none") (pp_rexmt now)) c.tt_rexmt
    Fmt.(option ~none:(any "none") (pp_timer now)) c.tt_2msl
    Fmt.(option ~none:(any "none") (pp_timer now)) c.tt_delack
    Fmt.(option ~none:(any "none") (pp_timer now)) c.tt_conn_est
    Fmt.(option ~none:(any "none") (pp_timer now)) c.tt_fin_wait_2
    c.t_dupacks pp_rttinf c.t_rttinf
    Fmt.(option ~none:(any "none") (pair ~sep:(any ", ")
                                      (any "-" ++ Duration.pp) Sequence.pp))
    (Option.map (fun (ts, seg) ->
         let sent = Mtime.Span.of_uint64_ns (Mtime.to_uint64_ns ts) in
         let ts' =
           Mtime.to_uint64_ns
             (Option.value ~default:Mtime.min_stamp (Mtime.sub_span now sent))
         in
         ts', seg) c.t_rttseg)
(*
    (* tt_keep = None; *)
    t_idletime = Mtime.of_uint64_ns 0L;
    t_softerror = None;
    snd_cwnd_prev = 0;
    snd_ssthresh_prev = 0;
    t_badrxtwin = Mtime.of_uint64_ns 0L;
    last_ack_sent = Sequence.zero;
  *)

let compare_int (a : int) (b : int) = compare a b

module Connection = struct
  type t = Ipaddr.t * int * Ipaddr.t * int

  let pp ppf (src, srcp, dst, dstp) =
    Fmt.pf ppf "%a:%d -> %a:%d" Ipaddr.pp src srcp Ipaddr.pp dst dstp

  let andThen a b = if a = 0 then b else a
  let compare ((src, srcp, dst, dstp) : t) ((src', srcp', dst', dstp') : t) =
    andThen (compare_int srcp srcp')
      (andThen (compare_int dstp dstp')
         (andThen (Ipaddr.compare src src')
            (Ipaddr.compare dst dst')))
end

(* in this we store Connection.t -> state *)
module CM = Map.Make(Connection)

(* maybe timer information should go in here?
   -- put into tcp_state (allowing SYN_SENT (and closing states) to be slimmer)?
   -- segments to be retransmitted need to be preserved as well somewhere!
   --> and they may change whenever an ACK is received *)
(* sndq/rcvq: ownership discipline - as defined by the docs:
  - listen (mirage-net): the ownership of packet is transferred to the callback
  - send (mirage-flow) says that buffer ownership is now at the flow
*)
type 'a conn_state = {
  tcp_state : tcp_state ;
  control_block : control_block ; (* control_block should go into state, allowing smaller control blocks for initial states *)
  cantrcvmore : bool ;
  cantsndmore : bool ;
  rcvbufsize : int ;
  sndbufsize : int ;
  rcvq : Rope.t ;
  sndq : Rope.t ;
  rcv_notify : 'a;
  snd_notify : 'a;
  created : Mtime.t;
}

let conn_state created mk_notify ~rcvbufsize ~sndbufsize tcp_state control_block = {
  tcp_state ; control_block ;
  cantrcvmore = false ; cantsndmore = false ;
  rcvq = Rope.empty ; sndq = Rope.empty ;
  rcvbufsize ; sndbufsize ;
  rcv_notify = mk_notify () ; snd_notify = mk_notify () ;
  created ;
}

let pp_conn_state now ppf c =
  let created_span = Mtime.Span.of_uint64_ns (Mtime.to_uint64_ns c.created) in
  Fmt.pf ppf "TCP (since %a) %a cb %a"
    Duration.pp
    (Mtime.to_uint64_ns
       (Option.value ~default:Mtime.min_stamp (Mtime.sub_span now created_span)))
    pp_fsm c.tcp_state (pp_control now) c.control_block

module IS = Set.Make(struct type t = int let compare = compare_int end)

module Stats = struct
  type t = {
    mutable total_established : int ;
    mutable total_passive_connections : int ;
    mutable total_active_connections : int ;
  }

  let empty () = {
    total_established = 0 ;
    total_passive_connections = 0 ;
    total_active_connections = 0 ;
  }

  let incr_passive t =
    t.total_passive_connections <- succ t.total_passive_connections

  let incr_established t =
    t.total_established <- succ t.total_established

  let incr_active t =
    t.total_active_connections <- succ t.total_active_connections
end

(* path mtu (its global to a stack) *)
type 'a t = {
  listeners : IS.t ;
  connections : 'a conn_state CM.t ;
  stats : Stats.t ;
  id : string ;
  mutable ctr : int ;
  metrics : (string -> Metrics.field list, Mtime.t * 'a conn_state CM.t * Stats.t -> Metrics.data) Metrics.src;
  transitions : (string -> Metrics.field list, string -> Metrics.data) Metrics.src;
  mk_notify : unit -> 'a;
}

module States = Map.Make (struct
    type t = tcp_state
    let compare a b = compare a b
  end)

let src = Logs.Src.create "tcp.state" ~doc:"TCP state"
module Log = (val Logs.src_log src : Logs.LOG)

let metrics () =
  let tcp_states =
    [ Syn_sent ; Syn_received ; Established ; Close_wait ; Fin_wait_1 ;
      Closing ; Last_ack ; Fin_wait_2 ; Time_wait
    ]
  in
  let open Metrics in
  let doc = "uTCP metrics" in
  let data (now, connections, stats) =
    let rcvq, sndq, states =
      CM.fold (fun k conn (rcvq, sndq, acc) ->
          if Mtime.(Span.to_uint64_ns (span now conn.created)) > Duration.of_min 1 then
            Log.info (fun m -> m "%a in %a" Connection.pp k (pp_conn_state now) conn);
          rcvq + Rope.length conn.rcvq,
          sndq + Rope.length conn.sndq,
          States.update conn.tcp_state (fun v -> Some (succ (Option.value ~default:0 v))) acc)
        connections
        (0, 0, States.empty)
    in
    let total = States.fold (fun _ v acc -> v + acc) states 0 in
    Data.v
      (List.map (fun tcp_state ->
           let v = Option.value ~default:0 (States.find_opt tcp_state states) in
           int (fsm_to_string tcp_state) v)
          tcp_states @ [
         int "active connections" total
       ; int "total established" stats.Stats.total_established
       ; int "total server" stats.total_passive_connections
       ; int "total client" stats.total_active_connections
       ; int "receive queue size" rcvq
       ; int "send queue size" sndq
       ])
  in
  let tag = Tags.string "stack-id" in
  Src.v ~doc ~tags:Tags.[ tag ] ~data "utcp"

let add_metrics t now =
  Metrics.add t.metrics (fun x -> x t.id) (fun d -> d (now, t.connections, t.stats))

let transitions () =
  let create () =
    let data : (string, int) Hashtbl.t = Hashtbl.create 7 in
    (fun key ->
       let cur = match Hashtbl.find_opt data key with
         | None -> 0
         | Some x -> x
       in
       Hashtbl.replace data key (succ cur)),
    (fun () ->
       let data, total =
         Hashtbl.fold (fun key value (acc, total) ->
             (Metrics.uint key value :: acc), value + total)
           data ([], 0)
       in
       Metrics.uint "total" total :: data)
  in
  let open Metrics in
  let doc = "uTCP transition metrics" in
  let incr, get = create () in
  let data thing = incr thing; Data.v (get ()) in
  let tag = Tags.string "stack-id" in
  Src.v ~doc ~tags:Metrics.Tags.[ tag ] ~data "utcp_transition"

let rule t name =
  Metrics.add t.transitions (fun x -> x t.id) (fun d -> d name)

let pp now ppf t =
  Fmt.pf ppf "listener %a, connections: %a"
    Fmt.(list ~sep:(any ", ") int) (IS.elements t.listeners)
    Fmt.(list ~sep:(any "@.") (pair ~sep:(any ": ") Connection.pp (pp_conn_state now)))
    (CM.bindings t.connections)

let start_listen t port = { t with listeners = IS.add port t.listeners }
let stop_listen t port = { t with listeners = IS.remove port t.listeners }

let empty mk_notify id =
  {
    id ;
    listeners = IS.empty ;
    connections = CM.empty ;
    stats = Stats.empty () ;
    ctr = 0 ;
    metrics = metrics () ;
    transitions = transitions () ;
    mk_notify ;
  }