1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
let src = Logs.Src.create "git.sync" ~doc:"logs git's sync event"
module Log = (val Logs.src_log src : Logs.LOG)
module Default = struct
let capabilities =
[ `Multi_ack_detailed; `Thin_pack; `Side_band_64k; `Ofs_delta
; `Agent "git/2.0.0"; `Report_status; `No_done ]
end
type 'a shallow_update = {shallow: 'a list; unshallow: 'a list}
type 'a acks =
{ shallow: 'a list
; unshallow: 'a list
; acks: ('a * [`Common | `Ready | `Continue | `ACK]) list }
module type ENDPOINT = sig
type t
val uri : t -> Uri.t
end
module type S = sig
module Store : Minimal.S
module Endpoint : ENDPOINT
type error
val pp_error : error Fmt.t
type command =
[ `Create of Store.Hash.t * Store.Reference.t
| `Delete of Store.Hash.t * Store.Reference.t
| `Update of Store.Hash.t * Store.Hash.t * Store.Reference.t ]
val pp_command : command Fmt.t
val push :
Store.t
-> push:( (Store.Hash.t * Store.Reference.t * bool) list
-> (Store.Hash.t list * command list) Lwt.t)
-> ?capabilities:Capability.t list
-> Endpoint.t
-> ( (Store.Reference.t, Store.Reference.t * string) result list
, error )
result
Lwt.t
val ls :
Store.t
-> ?capabilities:Capability.t list
-> Endpoint.t
-> ((Store.Hash.t * Store.Reference.t * bool) list, error) result Lwt.t
val fetch :
Store.t
-> ?shallow:Store.Hash.t list
-> ?capabilities:Capability.t list
-> notify:(Store.Hash.t shallow_update -> unit Lwt.t)
-> negociate:( Store.Hash.t acks
-> 'state
-> ([`Ready | `Done | `Again of Store.Hash.Set.t] * 'state)
Lwt.t)
* 'state
-> have:Store.Hash.Set.t
-> want:( (Store.Hash.t * Store.Reference.t * bool) list
-> (Store.Reference.t * Store.Hash.t) list Lwt.t)
-> ?deepen:[`Depth of int | `Timestamp of int64 | `Ref of Reference.t]
-> Endpoint.t
-> ((Store.Reference.t * Store.Hash.t) list * int, error) result Lwt.t
val fetch_some :
Store.t
-> ?capabilities:Capability.t list
-> references:Store.Reference.t list Store.Reference.Map.t
-> Endpoint.t
-> ( Store.Hash.t Store.Reference.Map.t
* Store.Reference.t list Store.Reference.Map.t
, error )
result
Lwt.t
val fetch_all :
Store.t
-> ?capabilities:Capability.t list
-> references:Store.Reference.t list Store.Reference.Map.t
-> Endpoint.t
-> ( Store.Hash.t Store.Reference.Map.t
* Store.Reference.t list Store.Reference.Map.t
* Store.Hash.t Store.Reference.Map.t
, error )
result
Lwt.t
val fetch_one :
Store.t
-> ?capabilities:Capability.t list
-> reference:Store.Reference.t * Store.Reference.t list
-> Endpoint.t
-> ( [`AlreadySync | `Sync of Store.Hash.t Store.Reference.Map.t]
, error )
result
Lwt.t
val pp_fetch_one : [ `AlreadySync | `Sync of Store.Hash.t Store.Reference.Map.t ] Fmt.t
val clone :
Store.t
-> ?capabilities:Capability.t list
-> reference:Store.Reference.t * Store.Reference.t
-> Endpoint.t
-> (unit, error) result Lwt.t
val update_and_create :
Store.t
-> ?capabilities:Capability.t list
-> references:Store.Reference.t list Store.Reference.Map.t
-> Endpoint.t
-> ( (Store.Reference.t, Store.Reference.t * string) result list
, error )
result
Lwt.t
val pp_update_and_create : (Store.Reference.t, Store.Reference.t * string) result list Fmt.t
end
module Common (G : Minimal.S) = struct
module Store = G
let src =
Logs.Src.create "git.common.sync" ~doc:"logs git's common sync event"
module Log = (val Logs.src_log src : Logs.LOG)
type command =
[ `Create of Store.Hash.t * Store.Reference.t
| `Delete of Store.Hash.t * Store.Reference.t
| `Update of Store.Hash.t * Store.Hash.t * Store.Reference.t ]
let pp_command ppf = function
| `Create (hash, r) ->
Fmt.pf ppf "(`Create (%a, %a))" Store.Hash.pp hash Store.Reference.pp r
| `Delete (hash, r) ->
Fmt.pf ppf "(`Delete (%a, %a))" Store.Hash.pp hash Store.Reference.pp r
| `Update (_of, _to, r) ->
Fmt.pf ppf "(`Update (of:%a, to:%a, %a))" Store.Hash.pp _of
Store.Hash.pp _to Store.Reference.pp r
let pp_fetch_one ppf = function
| `AlreadySync -> Fmt.pf ppf "Reference is synchronized"
| `Sync map ->
Fmt.pf ppf "Updated @[<hov>%a@]"
(Fmt.iter_bindings Store.Reference.Map.iter Fmt.(pair ~sep:(always " -> ") Store.Reference.pp Store.Hash.pp)) map
let pp_update_and_create ppf lst =
let pp_elt ppf = function
| Ok reference -> Fmt.pf ppf "(Reference <%a> updated)" Store.Reference.pp reference
| Error (reference, err) -> Fmt.pf ppf "(Conflict on <%a>: %s)" Store.Reference.pp reference err in
Fmt.(Dump.list pp_elt) ppf lst
open Lwt.Infix
module Node = struct
type t = {value: Store.Value.t; mutable color: [`Black | `White]}
let compare a b =
match a.value, b.value with
| Store.Value.Commit a, Store.Value.Commit b ->
Store.Value.Commit.compare_by_date b a
| a, b -> Store.Value.compare a b
end
module Pq = Psq.Make (Store.Hash) (Node)
module Q : Ke.Sigs.F = Ke.Fke
exception Store of Store.error
let packer git exclude source =
let store = Hashtbl.create 128 in
let memoize get hash =
try
let ret = Hashtbl.find store hash in
Lwt.return (Some ret)
with Not_found -> (
get hash
>>= function
| Ok value ->
let node = {Node.value; color= `White} in
Hashtbl.add store hash node ;
Lwt.return (Some node)
| Error `Not_found -> Lwt.return None
| Error err ->
Log.err (fun l ->
l "Got an error when we get the object: %a." Store.Hash.pp hash
) ;
Lwt.fail (Store err) )
in
let preds = function
| Store.Value.Commit commit ->
Store.Value.Commit.tree commit :: Store.Value.Commit.parents commit
| Store.Value.Tree tree ->
List.map
(fun {Store.Value.Tree.node; _} -> node)
(Store.Value.Tree.to_list tree)
| Store.Value.Tag tag -> [Store.Value.Tag.obj tag]
| Store.Value.Blob _ -> []
in
let get = memoize (Store.read git) in
let all_blacks pq =
Pq.fold
(fun _ -> function {Node.color= `Black; _} -> ( && ) true
| _ -> ( && ) false )
true pq
in
let propagate {Node.value; color} =
let rec go q =
match Q.pop_exn q with
| hash, q -> (
try
let node = Hashtbl.find store hash in
node.Node.color <- color ;
go (List.fold_left Q.push q (preds node.Node.value))
with Not_found -> go q )
| exception Q.Empty -> ()
in
go (List.fold_left Q.push Q.empty (preds value))
in
let propagate_snapshot {Node.value; color} =
let rec go q =
match Q.pop_exn q with
| hash, q -> (
let k node =
node.Node.color <- color ;
go (List.fold_left Q.push q (preds node.Node.value))
in
try
let node = Hashtbl.find store hash in
k node
with Not_found -> (
get hash
>>= function None -> Lwt.return () | Some node -> k node ) )
| exception Q.Empty -> Lwt.return ()
in
go (List.fold_left Q.push Q.empty (preds value))
in
let rec garbage pq =
if all_blacks pq then Lwt.return ()
else
match Pq.pop pq with
| Some ((_, {Node.value; color= `Black}), pq) ->
Lwt_list.fold_left_s
(fun pq hash ->
get hash
>>= function
| Some ({Node.value= Store.Value.Tree _; _} as node) ->
node.Node.color <- `Black ;
propagate_snapshot node >>= fun () -> Lwt.return pq
| Some ({Node.color= `White; _} as node) ->
node.Node.color <- `Black ;
propagate node ;
Lwt.return (Pq.add hash node pq)
| Some node -> Lwt.return (Pq.add hash node pq)
| None -> Lwt.return pq )
pq (preds value)
>>= garbage
| Some ((_, {Node.value; _}), pq) ->
Lwt_list.fold_left_s
(fun pq hash ->
get hash
>>= function
| None -> Lwt.return pq
| Some node -> Lwt.return (Pq.add hash node pq) )
pq (preds value)
>>= garbage
| None -> Lwt.return ()
in
let collect () =
Hashtbl.fold
(fun hash -> function
| {Node.color= `White; value} -> Store.Hash.Map.add hash value
| _ -> fun acc -> acc )
store Store.Hash.Map.empty
in
Lwt_list.map_s
(fun hash ->
get hash
>>= function
| Some ({Node.value= Store.Value.Commit commit; _} as node) ->
get (Store.Value.Commit.tree commit)
>>= (function
| None -> Lwt.return ()
| Some node_root_tree -> propagate_snapshot node_root_tree)
>>= fun () -> Lwt.return (Some (hash, node))
| Some node -> Lwt.return (Some (hash, node))
| None -> Lwt.return None )
source
>>= fun source ->
Lwt_list.map_s
(fun hash ->
get hash
>>= function
| Some ({Node.value= Store.Value.Commit commit; _} as node) ->
node.Node.color <- `Black ;
get (Store.Value.Commit.tree commit)
>>= (function
| None -> Lwt.return ()
| Some node_root_tree ->
node_root_tree.Node.color <- `Black ;
propagate_snapshot node_root_tree)
>>= fun () -> Lwt.return (Some (hash, node))
| Some node -> Lwt.return (Some (hash, node))
| None -> Lwt.return None )
exclude
>|= List.append source
>|= List.fold_left
(fun acc -> function None -> acc | Some x -> x :: acc)
[]
>|= Pq.of_list
>>= fun pq -> garbage pq >|= collect
let packer ?(window = `Object 10) ?(depth = 50) git ~ofs_delta:_ remote
commands =
let exclude =
List.fold_left
(fun exclude (hash, _, _) -> Store.Hash.Set.add hash exclude)
Store.Hash.Set.empty remote
|> fun exclude ->
List.fold_left
(fun exclude -> function
| `Delete (hash, _) -> Store.Hash.Set.add hash exclude
| `Update (hash, _, _) -> Store.Hash.Set.add hash exclude
| `Create _ -> exclude )
exclude commands
|> Store.Hash.Set.elements
in
let source =
List.fold_left
(fun source -> function
| `Update (_, hash, _) -> Store.Hash.Set.add hash source
| `Create (hash, _) -> Store.Hash.Set.add hash source
| `Delete _ -> source )
Store.Hash.Set.empty commands
|> Store.Hash.Set.elements
in
packer git exclude source
>|= Store.Hash.Map.bindings
>|= List.map snd
>>= Store.Pack.make git ~window ~depth
let want_handler git choose remote_refs =
Lwt_list.filter_map_s
(function
| remote_hash, remote_ref, false -> (
choose remote_ref
>>= (function
| false ->
Log.debug (fun l ->
l "We missed the reference %a." Store.Reference.pp
remote_ref ) ;
Lwt.return None
| true -> Lwt.return (Some (remote_ref, remote_hash)))
>>= function
| None -> Lwt.return None
| Some (remote_ref, remote_hash) -> (
Store.mem git remote_hash
>>= function
| true -> Lwt.return None
| false -> Lwt.return (Some (remote_ref, remote_hash)) ) )
| _ -> Lwt.return None)
remote_refs
exception Jump of Store.error
let update_and_create git ~references results =
let results =
List.fold_left
(fun results (remote_ref, hash) ->
Store.Reference.Map.add remote_ref hash results )
Store.Reference.Map.empty results
in
let updated, missed =
Store.Reference.Map.partition
(fun remote_ref _ -> Store.Reference.Map.mem remote_ref results)
references
in
let updated, downloaded =
Store.Reference.Map.fold
(fun remote_ref new_hash (updated', downloaded) ->
try
let local_refs = Store.Reference.Map.find remote_ref updated in
( List.fold_left
(fun updated' local_ref ->
Store.Reference.Map.add local_ref new_hash updated' )
updated' local_refs
, downloaded )
with Not_found ->
updated', Store.Reference.Map.add remote_ref new_hash downloaded )
results
Store.Reference.Map.(empty, empty)
in
Lwt.try_bind
(fun () ->
Lwt_list.iter_s
(fun (local_ref, new_hash) ->
Store.Ref.write git local_ref (Store.Reference.Hash new_hash)
>>= function
| Ok _ -> Lwt.return () | Error err -> Lwt.fail (Jump err) )
(Store.Reference.Map.bindings updated) )
(fun () -> Lwt.return (Ok (updated, missed, downloaded)))
(function Jump err -> Lwt.return (Error err) | exn -> Lwt.fail exn)
let push_handler git references remote_refs =
Store.Ref.list git
>>= fun local_refs ->
let local_refs =
List.fold_left
(fun local_refs (local_ref, local_hash) ->
Store.Reference.Map.add local_ref local_hash local_refs )
Store.Reference.Map.empty local_refs
in
Lwt_list.filter_map_p
(function
| remote_hash, remote_ref, false ->
Lwt.return (Some (remote_ref, remote_hash))
| _ -> Lwt.return None)
remote_refs
>>= fun remote_refs ->
let actions =
Store.Reference.Map.fold
(fun local_ref local_hash actions ->
try
let remote_refs' = Store.Reference.Map.find local_ref references in
List.fold_left
(fun actions remote_ref ->
try
let remote_hash = List.assoc remote_ref remote_refs in
`Update (remote_hash, local_hash, remote_ref) :: actions
with Not_found -> `Create (local_hash, remote_ref) :: actions
)
actions remote_refs'
with Not_found -> actions )
local_refs []
in
Lwt_list.filter_map_s
(fun action ->
match action with
| `Update (remote_hash, local_hash, reference) ->
Store.mem git remote_hash
>>= fun has_remote_hash ->
Store.mem git local_hash
>>= fun has_local_hash ->
Log.debug (fun l ->
l "Check update command on %a for %a to %a (equal = %b)."
Store.Reference.pp reference Store.Hash.pp remote_hash
Store.Hash.pp local_hash
Store.Hash.(equal remote_hash local_hash) ) ;
if
has_remote_hash
&& has_local_hash
&& not (Store.Hash.equal remote_hash local_hash)
then Lwt.return (Some (action :> command))
else Lwt.return None
| `Create (local_hash, _) -> (
Store.mem git local_hash
>>= function
| true -> Lwt.return (Some (action :> command))
| false -> Lwt.return None ) )
actions
end