12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788(*
* Copyright (C) 2015 David Scott <dave.scott@unikernel.com>
* Copyright (C) 2016 Docker Inc
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
*)letsrc=letsrc=Logs.Src.create"flow_lwt_hvsock"~doc:"AF_HYPERV flow"inLogs.Src.set_levelsrc(SomeLogs.Debug);srcmoduleLog=(valLogs.src_logsrc:Logs.LOG)moduleMake(Time:Mirage_time.S)(Fn:S.FN)(Socket_family:Hvsock.Af_common.S)=structmoduleBlocking_socket=Socket_familymoduleSocket=Socket.Make(Time)(Fn)(Socket_family)moduleRWBuffering=Buffering.Make(Fn)(Socket_family)typeerror=[`UnixofUnix.error]letpp_errorppf(`Unixe)=Fmt.stringppf(Unix.error_messagee)typewrite_error=[Mirage_flow.write_error|error]letpp_write_errorppf=function|#Mirage_flow.write_errorase->Mirage_flow.pp_write_errorppfe|#errorase->pp_errorppfetypeflow={socket:Socket.t;flow:RWBuffering.flow;mutableclosed:bool;mutableshutdown_write:bool;}letconnect?(message_size=8192)?(buffer_size=262144)socket=letfd=matchSocket.to_fdsocketwithSomex->x|None->assertfalseinletflow=RWBuffering.connect~message_size~buffer_sizefdinletclosed=falseinletshutdown_write=falsein{socket;flow;closed;shutdown_write}letcloset=Log.debug(funf->f"FLOW.close called");matcht.closedwith|false->t.closed<-true;RWBuffering.closet.flow|true->Lwt.return()letshutdown_read_t=(* We don't care about shutdown_read. We care about shutdown_write because
we want to send an EOF to the remote and still receive a response. *)Log.debug(funf->f"FLOW.shutdown_read called and ignored");Lwt.return_unitletshutdown_writet=(* When we shutdown_write we still expect buffered data to be flushed. *)Log.debug(funf->f"FLOW.shutdown_write called");matcht.shutdown_write||t.closedwith|true->Lwt.return()|false->Log.debug(funf->f"shutting down writer thread");t.shutdown_write<-true;RWBuffering.shutdown_writet.flowletreadt=RWBuffering.readt.flowletread_into_flow_buffer=(* Can we drop this function altogether? *)Log.err(funf->f"read_into not implemented");failwith"not implemented read_into"letwritevtbufs=RWBuffering.writevt.flowbufsletwritetbuf=RWBuffering.writet.flowbufend