123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215openCoreopenAsync(** XXX(seliopou): Replace Angstrom.Buffered with a module like this, while
also supporting growing the buffer. Clients can use this to buffer and the
use the unbuffered interface for actually running the parser. *)moduleBuffer:sigtypetvalcreate:int->tvalget:t->f:(Bigstring.t->off:int->len:int->int)->intvalput:t->f:(Bigstring.t->off:int->len:int->int)->intend=structtypet={buffer:Bigstring.t;mutableoff:int;mutablelen:int}letcreatesize=letbuffer=Bigstring.createsizein{buffer;off=0;len=0};;letcompresst=ift.len=0thenbegint.off<-0;t.len<-0;endelseift.off>0thenbeginBigstring.blit~src:t.buffer~src_pos:t.off~dst:t.buffer~dst_pos:0~len:t.len;t.off<-0;end;;letgett~f=letn=ft.buffer~off:t.off~len:t.lenint.off<-t.off+n;t.len<-t.len-n;ift.len=0thent.off<-0;n;;letputt~f=compresst;letn=ft.buffer~off:(t.off+t.len)~len:(Bigstring.lengtht.buffer-t.len)int.len<-t.len+n;n;;endletreadfdbuffer=letbadfdfd=failwithf"read got back fd: %s"(Fd.to_stringfd)()inletrecfinishfdbufferresult=letopenUnix.Errorinmatchresultwith|`Already_closed|`Ok0->return`Eof|`Okn->return(`Okn)|`Error(Unix.Unix_error((EWOULDBLOCK|EAGAIN),_,_))->beginFd.ready_tofd`Read>>=function|`Bad_fd->badfdfd|`Closed->return`Eof|`Ready->gofdbufferend|`Error(Unix.Unix_error(EBADF,_,_))->badfdfd|`Errorexn->Deferred.don't_wait_for(Fd.closefd);raiseexnandgofdbuffer=ifFd.supports_nonblockfdthenfinishfdbuffer(Fd.syscallfd~nonblocking:true(funfile_descr->Buffer.putbuffer~f:(funbigstring~off~len->Unix.Syscall_result.Int.ok_or_unix_error_exn~syscall_name:"read"(Bigstring.read_assume_fd_is_nonblockingfile_descrbigstring~pos:off~len))))elseFd.syscall_in_threadfd~name:"read"(funfile_descr->Buffer.putbuffer~f:(funbigstring~off~len->Bigstring.readfile_descrbigstring~pos:off~len))>>=funresult->finishfdbufferresultingofdbufferopenHttpafmoduleServer=structletcreate_connection_handler?(config=Config.default)~request_handler~error_handler=funclient_addrsocket->letfd=Socket.fdsocketinletwritev=Faraday_async.writev_of_fdfdinletrequest_handler=request_handlerclient_addrinleterror_handler=error_handlerclient_addrinletconn=Server_connection.create~config~error_handlerrequest_handlerinletread_complete=Ivar.create()inletbuffer=Buffer.createconfig.read_buffer_sizeinletrecreader_thread()=matchServer_connection.next_read_operationconnwith|`Read->(* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *)readfdbuffer>>>beginfunction|`Eof->Buffer.getbuffer~f:(funbigstring~off~len->Server_connection.read_eofconnbigstring~off~len)|>ignore;reader_thread()|`Ok_->Buffer.getbuffer~f:(funbigstring~off~len->Server_connection.readconnbigstring~off~len)|>ignore;reader_thread()end|`Yield->(* Log.Global.printf "read_yield(%d)%!" (Fd.to_int_exn fd); *)Server_connection.yield_readerconnreader_thread|`Close->(* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *)Ivar.fillread_complete();ifnot(Fd.is_closedfd)thenSocket.shutdownsocket`Receiveinletwrite_complete=Ivar.create()inletrecwriter_thread()=matchServer_connection.next_write_operationconnwith|`Writeiovecs->(* Log.Global.printf "write(%d)%!" (Fd.to_int_exn fd); *)writeviovecs>>>funresult->Server_connection.report_write_resultconnresult;writer_thread()|`Yield->(* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *)Server_connection.yield_writerconnwriter_thread;|`Close_->(* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *)Ivar.fillwrite_complete();ifnot(Fd.is_closedfd)thenSocket.shutdownsocket`Sendinletconn_monitor=Monitor.create()inScheduler.within~monitor:conn_monitorreader_thread;Scheduler.within~monitor:conn_monitorwriter_thread;Monitor.detach_and_iter_errorsconn_monitor~f:(funexn->Server_connection.report_exnconnexn);(* The Tcp module will close the file descriptor once this becomes determined. *)Deferred.all_unit[Ivar.readread_complete;Ivar.readwrite_complete]endmoduleClient=structletrequest?(config=Config.default)socketrequest~error_handler~response_handler=letfd=Socket.fdsocketinletwritev=Faraday_async.writev_of_fdfdinletrequest_body,conn=Client_connection.requestrequest~error_handler~response_handlerinletread_complete=Ivar.create()inletbuffer=Buffer.createconfig.read_buffer_sizeinletrecreader_thread()=matchClient_connection.next_read_operationconnwith|`Read->(* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *)readfdbuffer>>>beginfunction|`Eof->Buffer.getbuffer~f:(funbigstring~off~len->Client_connection.read_eofconnbigstring~off~len)|>ignore;reader_thread()|`Ok_->Buffer.getbuffer~f:(funbigstring~off~len->Client_connection.readconnbigstring~off~len)|>ignore;reader_thread()end|`Close->(* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *)Ivar.fillread_complete();ifnot(Fd.is_closedfd)thenSocket.shutdownsocket`Receiveinletwrite_complete=Ivar.create()inletrecwriter_thread()=matchClient_connection.next_write_operationconnwith|`Writeiovecs->(* Log.Global.printf "write(%d)%!" (Fd.to_int_exn fd); *)writeviovecs>>>funresult->Client_connection.report_write_resultconnresult;writer_thread()|`Yield->(* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *)Client_connection.yield_writerconnwriter_thread;|`Close_->(* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *)Ivar.fillwrite_complete();inletconn_monitor=Monitor.create()inScheduler.within~monitor:conn_monitorreader_thread;Scheduler.within~monitor:conn_monitorwriter_thread;Monitor.detach_and_iter_errorsconn_monitor~f:(funexn->Client_connection.report_exnconnexn);don't_wait_for(Deferred.all_unit[Ivar.readread_complete;Ivar.readwrite_complete]>>|fun()->ifnot(Fd.is_closedfd)thendon't_wait_for(Fd.closefd));request_bodyend