1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162open!Coreopen!Asyncopen!Composition_infixletparse_sexpof_sexpsexp=matchof_sexpsexpwith|x->x|exceptionexn->raise_s[%message"Malformed row sexp."(exn:exn)(sexp:Sexp.t)];;moduleRow=structtypet=stringString.Map.t[@@derivingsexp]letparse_sexp=parse_sexp[%of_sexp:t]letto_csv_rowt~header~row_number:row=ifMap.lengtht<>List.lengthheaderthenraise_s[%message"Row has wrong number of fields."(header:stringlist)(row:int)~_:(t:t)]elseList.mapheader~f:(funfield->matchMap.findtfieldwith|None->raise_s[%message"Missing field in a row."(field:string)(row:int)~_:(t:t)]|Somevalue->value);;endletget_headersexps=match%bindPipe.values_availablesexpswith|`Eof->return`Eof|`Ok->(matchPipe.peeksexpswith(* In general this is valid. Here we know nobody else is reading from the pipe. *)|None->raise_s[%message"Pipe has no values despite [values_available]."]|Somerow->return(`Ok(List.map(parse_sexp[%of_sexp:(string*string)list]row)~f:fst)));;letcommand=letsummary="of sexp"inAsync.Command.async~summary(let%map_open.Csv_paramseparator=sepandinclude_header=map~f:notno_headerinfun()->letsexps=Reader.read_sexps(Lazy.forceReader.stdin)inmatch%bindget_headersexpswith|`Eof->return()|`Okheader->ifinclude_headerthenCsvlib.Csv.print~separator[header];Pipe.folding_mapsexps~init:1~f:(funrow_numberrow->letrow=Row.parse_sexprowinrow_number+1,Row.to_csv_rowrow~header~row_number)|>Pipe.iter'~f:(funrows->Csvlib.Csv.print~separator(Queue.to_listrows);return()))~behave_nicely_in_pipeline:false;;