123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134openEio.StdmoduleMetrics=structopenPrometheusletnamespace="capnp"letsubsystem="net"letconnections=lethelp="Number of live capnp-rpc connections"inGauge.v~help~namespace~subsystem"connections"letmessages_inbound_received_total=lethelp="Total number of messages received"inCounter.v~help~namespace~subsystem"messages_inbound_received_total"letmessages_outbound_enqueued_total=lethelp="Total number of messages enqueued to be transmitted"inCounter.v~help~namespace~subsystem"messages_outbound_enqueued_total"endmoduleWrite=Eio.Buf_writeletsrc=Logs.Src.create"endpoint"~doc:"Send and receive Cap'n'Proto messages"moduleLog=(valLogs.src_logsrc:Logs.LOG)letcompression=`Noneletrecord_sent_messages=falsetypeflow=Eio.Flow.two_way_tyrtypet={flow:flow;writer:Write.t;decoder:Capnp.Codecs.FramedStream.t;peer_id:Auth.Digest.t;recv_buf:Cstruct.t;}letpeer_idt=t.peer_idletof_flow~peer_idflow=letdecoder=Capnp.Codecs.FramedStream.emptycompressioninletflow=(flow:>flow)inletwriter=Write.create4096inletrecv_buf=Cstruct.create4096in{flow;writer;decoder;peer_id;recv_buf}letdump_msg=letnext=ref0infundata->letname=Fmt.str"/tmp/msg-%d.capnp"!nextinLog.info(funf->f"Saved message as %S"name);incrnext;letch=open_out_binnameinoutput_stringchdata;close_outchletsendtmsg=Log.debug(funf->letmoduleM=Capnp_rpc.Private.Schema.MessageWrapper.Messageinf"queue_send: %d/%d allocated bytes in %d segs"(M.total_sizemsg)(M.total_alloc_sizemsg)(M.num_segmentsmsg));Capnp.Codecs.serialize_iter_copyless~compressionmsg~f:(funxlen->Write.stringt.writerx~len);Prometheus.Counter.inc_oneMetrics.messages_outbound_enqueued_total;ifrecord_sent_messagesthendump_msg(Capnp.Codecs.serialize~compressionmsg)letrecrecv~tagst=matchCapnp.Codecs.FramedStream.get_next_framet.decoderwith|Okmsg->Prometheus.Counter.inc_oneMetrics.messages_inbound_received_total;(* We often want to send multiple response messages while processing a batch of requests,
so pause the writer to collect them. We'll unpause on the next [single_read]. *)Write.pauset.writer;Ok(Capnp.BytesMessage.Message.readonlymsg)|ErrorCapnp.Codecs.FramingError.Unsupported->failwith"Unsupported Cap'n'Proto frame received"|ErrorCapnp.Codecs.FramingError.Incomplete->Log.debug(funf->f~tags"Incomplete; waiting for more data...");(* We probably scheduled one or more application fibers to run while handling the last
batch of messages. Give them a chance to run now while the writer is paused, because
they might want to send more messages immediately. *)Fiber.yield();Write.unpauset.writer;matchEio.Flow.single_readt.flowt.recv_bufwith|got->Log.debug(funf->f~tags"Read %d bytes"got);Capnp.Codecs.FramedStream.add_fragmentt.decoder(Cstruct.to_stringt.recv_buf~len:got);recv~tagst|exceptionEnd_of_file->Log.info(funf->f~tags"Received end-of-stream");Error`Closed|exception(Eio.Io(Eio.Net.EConnection_reset_,_)asex)->Log.info(funf->f~tags"Receive failed: %a"Eio.Exn.ppex);Error`Closedletdisconnectt=tryEio.Flow.shutdownt.flow`AllwithEio.Io(Eio.Net.EConnection_reset_,_)->(* TCP connection already shut down, so TLS shutdown failed. Ignore. *)()letshutdown_sendt=Write.unpauset.writer;Write.closet.writerletrecrun_writer~tagst=matchWrite.await_batcht.writerwith|exceptionEnd_of_file->()(* Due to [shutdown_send] closing it. *)|bufs->matchEio.Flow.single_writet.flowbufswith|n->Write.shiftt.writern;run_writer~tagst|exception(Eio.Io(Eio.Net.EConnection_reset_,_)asex)->Log.info(funf->f~tags"Send failed: %a"Eio.Exn.ppex)|exceptionex->Eio.Fiber.check();Log.warn(funf->f~tags"Error sending messages: %a (will shutdown connection)"Fmt.exnex)letrun_writer~tagst=letcleanup()=Prometheus.Gauge.dec_oneMetrics.connections;disconnectt(* The listen fiber will read end-of-stream soon *)inPrometheus.Gauge.inc_oneMetrics.connections;matchrun_writer~tagstwith|()->cleanup()|exceptionex->letbt=Printexc.get_raw_backtrace()incleanup();Printexc.raise_with_backtraceexbt