12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970openLwt.Infixletsrc=Logs.Src.create"endpoint"~doc:"Send and receive Cap'n'Proto messages"moduleLog=(valLogs.src_logsrc:Logs.LOG)letcompression=`Noneletrecord_sent_messages=falsetypeflow=Flow:(moduleMirage_flow.Swithtypeflow='a)*'a->flowtypet={flow:flow;decoder:Capnp.Codecs.FramedStream.t;switch:Lwt_switch.t;peer_id:Auth.Digest.t;}letpeer_idt=t.peer_idletof_flow(typeflow)~switch~peer_id(moduleF:Mirage_flow.Swithtypeflow=flow)(flow:flow)=letgeneric_flow=Flow((moduleF),flow)inletdecoder=Capnp.Codecs.FramedStream.emptycompressionin{flow=generic_flow;decoder;switch;peer_id}letdump_msg=letnext=ref0infundata->letname=Fmt.str"/tmp/msg-%d.capnp"!nextinLog.info(funf->f"Saved message as %S"name);incrnext;letch=open_out_binnameinoutput_stringchdata;close_outchletsendtmsg=let(Flow((moduleF),flow))=t.flowinletdata=Capnp.Codecs.serialize~compressionmsginifrecord_sent_messagesthendump_msgdata;F.writeflow(Cstruct.of_stringdata)>|=function|Ok()|Error`Closedase->e|Errore->Error(`Msg(Fmt.to_to_stringF.pp_write_errore))letrecrecvt=let(Flow((moduleF),flow))=t.flowinmatchCapnp.Codecs.FramedStream.get_next_framet.decoderwith|_whennot(Lwt_switch.is_ont.switch)->Lwt.return@@Error`Closed|Okmsg->Lwt.return(Ok(Capnp.BytesMessage.Message.readonlymsg))|ErrorCapnp.Codecs.FramingError.Unsupported->failwith"Unsupported Cap'n'Proto frame received"|ErrorCapnp.Codecs.FramingError.Incomplete->Log.debug(funf->f"Incomplete; waiting for more data...");F.readflow>>=function|Ok(`Datadata)->Log.debug(funf->f"Read %d bytes"(Cstruct.lengthdata));Capnp.Codecs.FramedStream.add_fragmentt.decoder(Cstruct.to_stringdata);recvt|Ok`Eof->Log.info(funf->f"Connection closed");Lwt_switch.turn_offt.switch>|=fun()->Error`Closed|ErrorexwhenLwt_switch.is_ont.switch->Capnp_rpc.Debug.failf"recv: %a"F.pp_errorex|Error_->Lwt.return(Error`Closed)letdisconnectt=Lwt_switch.turn_offt.switchletpp_errorf=function|`Closed->Fmt.stringf"Connection closed"|`Msgm->Fmt.stringfm