1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)(* [Lwt_sequence] is deprecated – we don't want users outside Lwt using it.
However, it is still used internally by Lwt. So, briefly disable warning 3
("deprecated"), and create a local, non-deprecated alias for
[Lwt_sequence] that can be referred to by the rest of the code in this
module without triggering any more warnings. *)[@@@ocaml.warning"-3"]moduleLwt_sequence=Lwt_sequence[@@@ocaml.warning"+3"]openLwt.InfixexceptionChannel_closedofstring(* Minimum size for buffers: *)letmin_buffer_size=16letcheck_buffer_sizefun_namebuffer_size=ifbuffer_size<min_buffer_sizethenPrintf.ksprintfinvalid_arg"Lwt_io.%s: too small buffer size"fun_nameelseifbuffer_size>Sys.max_string_lengththenPrintf.ksprintfinvalid_arg"Lwt_io.%s: too big buffer size"fun_nameelse()letcheck_bufferfun_namebuffer=check_buffer_sizefun_name(Lwt_bytes.lengthbuffer)letdefault_buffer_size=ref4096(* +-----------------------------------------------------------------+
| Types |
+-----------------------------------------------------------------+ *)typeinputtypeoutputtype'amode=|Input:inputmode|Output:outputmodeletinput:inputmode=Inputletoutput:outputmode=Output(* A channel state *)type'modestate=|Busy_primitive(* A primitive is running on the channel *)|Busy_atomicof'modechannel(* An atomic operations is being performed on the channel. The
argument is the temporary atomic wrapper. *)|Waiting_for_busy(* A queued operation has not yet started. *)|Idle(* The channel is unused *)|Closed(* The channel has been closed *)|Invalid(* The channel is a temporary channel created for an atomic
operation which has terminated. *)(* A wrapper, which ensures that io operations are atomic: *)and'modechannel={mutablestate:'modestate;channel:'mode_channel;(* The real channel *)mutablequeued:unitLwt.uLwt_sequence.t;(* Queued operations *)}and'mode_channel={mutablebuffer:Lwt_bytes.t;mutablelength:int;mutableptr:int;(* Current position *)mutablemax:int;(* Position of the end of data int the buffer. It is equal to
[length] for output channels. *)abort_waiter:intLwt.t;(* Thread which is wakeup with an exception when the channel is
closed. *)abort_wakener:intLwt.u;mutableauto_flushing:bool;(* Wether the auto-flusher is currently running or not *)main:'modechannel;(* The main wrapper *)close:unitLwt.tLazy.t;(* Close function *)mode:'modemode;(* The channel mode *)mutableoffset:int64;(* Number of bytes really read/written *)typ:typ;(* Type of the channel. *)}andtyp=|Type_normalof(Lwt_bytes.t->int->int->intLwt.t)*(int64->Unix.seek_command->int64Lwt.t)(* The channel has been created with [make]. The first argument
is the refill/flush function and the second is the seek
function. *)|Type_bytes(* The channel has been created with [of_bytes]. *)typeinput_channel=inputchanneltypeoutput_channel=outputchanneltypedirect_access={da_buffer:Lwt_bytes.t;mutableda_ptr:int;mutableda_max:int;da_perform:unit->intLwt.t;}letmodewrapper=wrapper.channel.mode(* +-----------------------------------------------------------------+
| Creations, closing, locking, ... |
+-----------------------------------------------------------------+ *)(* This strange hash function is fine because Lwt_io only ever:
- adds distinct channels to the hash set,
- folds over the hash set.
Lwt_io never looks up individual elements. The constant function is not
suitable, because then all channels will end up in the same hash bucket.
A weak hash set is used instead of a weak array to avoid having to include
resizing and compaction code in Lwt_io. *)lethash_output_channel=letindex=ref0infun()->index:=!index+1;!indexmoduleOutputs=Weak.Make(structtypet=output_channellethash_=hash_output_channel()letequal=(==)end)(* Table of all opened output channels. On exit they are all
flushed: *)letoutputs=Outputs.create32letposition:typemode.modechannel->int64=funwrapper->letch=wrapper.channelinmatchch.modewith|Input->Int64.subch.offset(Int64.of_int(ch.max-ch.ptr))|Output->Int64.addch.offset(Int64.of_intch.ptr)letname:typemode.mode_channel->string=funch->matchch.modewith|Input->"input"|Output->"output"letclosed_channelch=Channel_closed(namech)letinvalid_channelch=Failure(Printf.sprintf"temporary atomic channel %s no more valid"(namech))letis_busych=matchch.statewith|Invalid->raise(invalid_channelch.channel)|Idle|Closed->false|Busy_primitive|Busy_atomic_|Waiting_for_busy->true(* Flush/refill the buffer. No race condition could happen because
this function is always called atomically: *)letperform_io:typemode.mode_channel->intLwt.t=funch->matchch.main.statewith|Closed->Lwt.fail(closed_channelch)|Invalid->Lwt.fail(invalid_channelch)|Idle|Waiting_for_busy->assertfalse|Busy_primitive|Busy_atomic_->matchch.typwith|Type_normal(perform,_)->letptr,len=matchch.modewith|Input->(* Size of data in the buffer *)letsize=ch.max-ch.ptrin(* If there are still data in the buffer, keep them: *)ifsize>0thenLwt_bytes.unsafe_blitch.bufferch.ptrch.buffer0size;(* Update positions: *)ch.ptr<-0;ch.max<-size;(size,ch.length-size)|Output->(0,ch.ptr)inletperform=ifSys.win32thenLwt.catch(fun()->performch.bufferptrlen)(function|Unix.Unix_error(Unix.EPIPE,_,_)->Lwt.return0|exn->Lwt.failexn)[@ocaml.warning"-4"]elseperformch.bufferptrleninLwt.pick[ch.abort_waiter;perform]>>=funn->(* Never trust user functions... *)ifn<0||n>lenthenLwt.fail(Failure(Printf.sprintf"Lwt_io.perform_io: invalid result of the [%s] function"(matchch.modewithInput->"read"|Output->"write")))elsebegin(* Update the global offset: *)ch.offset<-Int64.addch.offset(Int64.of_intn);(* Update buffer positions: *)beginmatchch.modewith|Input->ch.max<-ch.max+n|Output->(* Shift remaining data: *)letlen=len-ninLwt_bytes.unsafe_blitch.buffernch.buffer0len;ch.ptr<-lenend;Lwt.returnnend|Type_bytes->beginmatchch.modewith|Input->Lwt.return0|Output->Lwt.fail(Failure"cannot flush a channel created with Lwt_io.of_string")endletrefill=perform_ioletflush_partial=perform_ioletrecflush_totaloc=ifoc.ptr>0thenflush_partialoc>>=fun_->flush_totalocelseLwt.return_unitletsafe_flush_totaloc=Lwt.catch(fun()->flush_totaloc)(fun_->Lwt.return_unit)letdeepest_wrapperch=letrecloopwrapper=matchwrapper.statewith|Busy_atomicwrapper->loopwrapper|Busy_primitive|Waiting_for_busy|Idle|Closed|Invalid->wrapperinloopch.mainletauto_flushoc=Lwt.pause()>>=fun()->letwrapper=deepest_wrapperocinmatchwrapper.statewith|Busy_primitive|Waiting_for_busy->(* The channel is used, cancel auto flushing. It will be
restarted when the channel Lwt.returns to the [Idle] state: *)oc.auto_flushing<-false;Lwt.return_unit|Busy_atomic_->(* Cannot happen since we took the deepest wrapper: *)assertfalse|Idle->oc.auto_flushing<-false;wrapper.state<-Busy_primitive;safe_flush_totaloc>>=fun()->ifwrapper.state=Busy_primitivethenwrapper.state<-Idle;ifnot(Lwt_sequence.is_emptywrapper.queued)thenLwt.wakeup_later(Lwt_sequence.take_lwrapper.queued)();Lwt.return_unit|Closed|Invalid->Lwt.return_unit(* A ``locked'' channel is a channel in the state [Busy_primitive] or
[Busy_atomic] *)letunlock:typem.mchannel->unit=funwrapper->matchwrapper.statewith|Busy_primitive|Busy_atomic_->ifLwt_sequence.is_emptywrapper.queuedthenwrapper.state<-Idleelsebeginwrapper.state<-Waiting_for_busy;Lwt.wakeup_later(Lwt_sequence.take_lwrapper.queued)()end;(* Launches the auto-flusher: *)letch=wrapper.channelinif(* Launch the auto-flusher only if the channel is not busy: *)(wrapper.state=Idle&&(* Launch the auto-flusher only for output channel: *)(matchch.modewithInput->false|Output->true)&&(* Do not launch two auto-flusher: *)notch.auto_flushing&&(* Do not launch the auto-flusher if operations are queued: *)Lwt_sequence.is_emptywrapper.queued)thenbeginch.auto_flushing<-true;ignore(auto_flushch)end|Closed|Invalid->(* Do not change channel state if the channel has been closed *)ifnot(Lwt_sequence.is_emptywrapper.queued)thenLwt.wakeup_later(Lwt_sequence.take_lwrapper.queued)()|Idle|Waiting_for_busy->(* We must never unlock an unlocked channel *)assertfalse(* Wrap primitives into atomic io operations: *)letprimitivefwrapper=matchwrapper.statewith|Idle->wrapper.state<-Busy_primitive;Lwt.finalize(fun()->fwrapper.channel)(fun()->unlockwrapper;Lwt.return_unit)|Busy_primitive|Busy_atomic_|Waiting_for_busy->(Lwt.add_task_r[@ocaml.warning"-3"])wrapper.queued>>=fun()->beginmatchwrapper.statewith|Closed->(* The channel has been closed while we were waiting *)unlockwrapper;Lwt.fail(closed_channelwrapper.channel)|Idle|Waiting_for_busy->wrapper.state<-Busy_primitive;Lwt.finalize(fun()->fwrapper.channel)(fun()->unlockwrapper;Lwt.return_unit)|Invalid->Lwt.fail(invalid_channelwrapper.channel)|Busy_primitive|Busy_atomic_->assertfalseend|Closed->Lwt.fail(closed_channelwrapper.channel)|Invalid->Lwt.fail(invalid_channelwrapper.channel)(* Wrap a sequence of io operations into an atomic operation: *)letatomicfwrapper=matchwrapper.statewith|Idle->lettmp_wrapper={state=Idle;channel=wrapper.channel;queued=Lwt_sequence.create()}inwrapper.state<-Busy_atomictmp_wrapper;Lwt.finalize(fun()->ftmp_wrapper)(fun()->(* The temporary wrapper is no more valid: *)tmp_wrapper.state<-Invalid;unlockwrapper;Lwt.return_unit)|Busy_primitive|Busy_atomic_|Waiting_for_busy->(Lwt.add_task_r[@ocaml.warning"-3"])wrapper.queued>>=fun()->beginmatchwrapper.statewith|Closed->(* The channel has been closed while we were waiting *)unlockwrapper;Lwt.fail(closed_channelwrapper.channel)|Idle|Waiting_for_busy->lettmp_wrapper={state=Idle;channel=wrapper.channel;queued=Lwt_sequence.create()}inwrapper.state<-Busy_atomictmp_wrapper;Lwt.finalize(fun()->ftmp_wrapper)(fun()->tmp_wrapper.state<-Invalid;unlockwrapper;Lwt.return_unit)|Invalid->Lwt.fail(invalid_channelwrapper.channel)|Busy_primitive|Busy_atomic_->assertfalseend|Closed->Lwt.fail(closed_channelwrapper.channel)|Invalid->Lwt.fail(invalid_channelwrapper.channel)letrecabortwrapper=matchwrapper.statewith|Busy_atomictmp_wrapper->(* Close the depest opened wrapper: *)aborttmp_wrapper|Closed->(* Double close, just returns the same thing as before *)Lazy.forcewrapper.channel.close|Invalid->Lwt.fail(invalid_channelwrapper.channel)|Idle|Busy_primitive|Waiting_for_busy->wrapper.state<-Closed;(* Abort any current real reading/writing operation on the
channel: *)Lwt.wakeup_exnwrapper.channel.abort_wakener(closed_channelwrapper.channel);Lazy.forcewrapper.channel.closeletclose:typemode.modechannel->unitLwt.t=funwrapper->letchannel=wrapper.channelinifchannel.main!=wrapperthenLwt.fail(Failure"Lwt_io.close: cannot close a channel obtained via Lwt_io.atomic")elsematchchannel.modewith|Input->(* Just close it now: *)abortwrapper|Output->Lwt.catch(fun()->(* Performs all pending actions, flush the buffer, then close it: *)primitive(funchannel->safe_flush_totalchannel>>=fun()->abortwrapper)wrapper)(fun_->abortwrapper)letis_closedwrapper=matchwrapper.statewith|Closed->true|Busy_primitive|Busy_atomic_|Waiting_for_busy|Idle|Invalid->falseletflush_all()=letwrappers=Outputs.fold(funxl->x::l)outputs[]inLwt_list.iter_p(funwrapper->Lwt.catch(fun()->primitivesafe_flush_totalwrapper)(fun_->Lwt.return_unit))wrapperslet()=(* Flush all opened ouput channels on exit: *)Lwt_main.at_exitflush_allletno_seek_pos_cmd=Lwt.fail(Failure"Lwt_io.seek: seek not supported on this channel")letmake:typem.?buffer:Lwt_bytes.t->?close:(unit->unitLwt.t)->?seek:(int64->Unix.seek_command->int64Lwt.t)->mode:mmode->(Lwt_bytes.t->int->int->intLwt.t)->mchannel=fun?buffer?(close=Lwt.return)?(seek=no_seek)~modeperform_io->let(buffer,size)=matchbufferwith|Somebuffer->check_buffer"Lwt_io.make"buffer;(buffer,Lwt_bytes.lengthbuffer)|None->letsize=!default_buffer_sizein(Lwt_bytes.createsize,size)inletabort_waiter,abort_wakener=Lwt.wait()inletrecch={buffer=buffer;length=size;ptr=0;max=(matchmodewith|Input->0|Output->size);close=lazy(Lwt.catchcloseLwt.fail);abort_waiter=abort_waiter;abort_wakener=abort_wakener;main=wrapper;auto_flushing=false;mode=mode;offset=0L;typ=Type_normal(perform_io,funposcmd->tryseekposcmdwithe->Lwt.faile);}andwrapper={state=Idle;channel=ch;queued=Lwt_sequence.create();}in(matchmodewith|Input->()|Output->Outputs.addoutputswrapper);wrapperletof_bytes(typem)~(mode:mmode)bytes=letlength=Lwt_bytes.lengthbytesinletabort_waiter,abort_wakener=Lwt.wait()inletrecch={buffer=bytes;length=length;ptr=0;max=length;close=lazy(Lwt.return_unit);abort_waiter=abort_waiter;abort_wakener=abort_wakener;main=wrapper;(* Auto flush is set to [true] to prevent writing functions from
trying to launch the auto-fllushed. *)auto_flushing=true;mode=mode;offset=(matchmodewith|Output->0L|Input->Int64.of_intlength);typ=Type_bytes;}andwrapper={state=Idle;channel=ch;queued=Lwt_sequence.create();}inwrapperletof_fd:typem.?buffer:Lwt_bytes.t->?close:(unit->unitLwt.t)->mode:mmode->Lwt_unix.file_descr->mchannel=fun?buffer?close~modefd->letperform_io=matchmodewith|Input->Lwt_bytes.readfd|Output->Lwt_bytes.writefdinmake?buffer~close:(matchclosewith|Somef->f|None->(fun()->Lwt_unix.closefd))~seek:(funposcmd->Lwt_unix.LargeFile.lseekfdposcmd)~modeperform_ioletof_unix_fd:typem.?buffer:Lwt_bytes.t->?close:(unit->unitLwt.t)->mode:mmode->Unix.file_descr->mchannel=fun?buffer?close~modefd->of_fd?buffer?close~mode(Lwt_unix.of_unix_file_descrfd)letbuffered:typem.mchannel->int=funch->matchch.channel.modewith|Input->ch.channel.max-ch.channel.ptr|Output->ch.channel.ptrletbuffer_sizech=ch.channel.lengthletresize_buffer:typem.mchannel->int->unitLwt.t=funwrapperlen->iflen<min_buffer_sizetheninvalid_arg"Lwt_io.resize_buffer: buffer size too small";matchwrapper.channel.typwith|Type_bytes->Lwt.fail(Failure("Lwt_io.resize_buffer: cannot resize the buffer of a channel "^"created with Lwt_io.of_string"))|Type_normal_->letf:typem.m_channel->unitLwt.t=funch->matchch.modewith|Input->letunread_count=ch.max-ch.ptrin(* Fail if we want to decrease the buffer size and there is
too much unread data in the buffer: *)iflen<unread_countthenLwt.fail(Failure("Lwt_io.resize_buffer: cannot decrease buffer size, too much "^"unread data"))elsebeginletbuffer=Lwt_bytes.createleninLwt_bytes.unsafe_blitch.bufferch.ptrbuffer0unread_count;ch.buffer<-buffer;ch.length<-len;ch.ptr<-0;ch.max<-unread_count;Lwt.return_unitend|Output->(* If we decrease the buffer size, flush the buffer until
the number of buffered bytes fits into the new buffer: *)letrecloop()=ifch.ptr>lenthenflush_partialch>>=fun_->loop()elseLwt.return_unitinloop()>>=fun()->letbuffer=Lwt_bytes.createleninLwt_bytes.unsafe_blitch.buffer0buffer0ch.ptr;ch.buffer<-buffer;ch.length<-len;ch.max<-len;Lwt.return_unitinprimitivefwrapper(* +-----------------------------------------------------------------+
| Byte-order |
+-----------------------------------------------------------------+ *)moduleByteOrder=structmoduletypeS=sigvalpos16_0:intvalpos16_1:intvalpos32_0:intvalpos32_1:intvalpos32_2:intvalpos32_3:intvalpos64_0:intvalpos64_1:intvalpos64_2:intvalpos64_3:intvalpos64_4:intvalpos64_5:intvalpos64_6:intvalpos64_7:intendmoduleLE=structletpos16_0=0letpos16_1=1letpos32_0=0letpos32_1=1letpos32_2=2letpos32_3=3letpos64_0=0letpos64_1=1letpos64_2=2letpos64_3=3letpos64_4=4letpos64_5=5letpos64_6=6letpos64_7=7endmoduleBE=structletpos16_0=1letpos16_1=0letpos32_0=3letpos32_1=2letpos32_2=1letpos32_3=0letpos64_0=7letpos64_1=6letpos64_2=5letpos64_3=4letpos64_4=3letpos64_5=2letpos64_6=1letpos64_7=0endendmodulePrimitives=struct(* This module contains all primitives operations. The operates
without protection regarding locking, they are wrapped after into
safe operations. *)(* +---------------------------------------------------------------+
| Reading |
+---------------------------------------------------------------+ *)letrecread_charic=letptr=ic.ptrinifptr=ic.maxthenrefillic>>=function|0->Lwt.failEnd_of_file|_->read_charicelsebeginic.ptr<-ptr+1;Lwt.return(Lwt_bytes.unsafe_getic.bufferptr)endletread_char_optic=Lwt.catch(fun()->read_charic>|=funch->Somech)(function|End_of_file->Lwt.return_none|exn->Lwt.failexn)letread_lineic=letbuf=Buffer.create128inletrecloopcr_read=Lwt.try_bind(fun_->read_charic)(function|'\n'->Lwt.return(Buffer.contentsbuf)|'\r'->ifcr_readthenBuffer.add_charbuf'\r';looptrue|ch->ifcr_readthenBuffer.add_charbuf'\r';Buffer.add_charbufch;loopfalse)(function|End_of_file->ifcr_readthenBuffer.add_charbuf'\r';Lwt.return(Buffer.contentsbuf)|exn->Lwt.failexn)inread_charic>>=function|'\r'->looptrue|'\n'->Lwt.return""|ch->Buffer.add_charbufch;loopfalseletread_line_optic=Lwt.catch(fun()->read_lineic>|=funch->Somech)(function|End_of_file->Lwt.return_none|exn->Lwt.failexn)letunsafe_read_intoicbufofslen=letavail=ic.max-ic.ptrinifavail>0thenbeginletlen=minlenavailinLwt_bytes.unsafe_blit_to_bytesic.bufferic.ptrbufofslen;ic.ptr<-ic.ptr+len;Lwt.returnlenendelsebeginrefillic>>=funn->letlen=minlenninLwt_bytes.unsafe_blit_to_bytesic.buffer0bufofslen;ic.ptr<-len;ic.max<-n;Lwt.returnlenendletread_intoicbufofslen=ifofs<0||len<0||ofs+len>Bytes.lengthbufthenLwt.fail(Invalid_argument"Lwt_io.read_into")elsebeginiflen=0thenLwt.return0elseunsafe_read_intoicbufofslenendletrecunsafe_read_into_exactlyicbufofslen=unsafe_read_intoicbufofslen>>=function|0->Lwt.failEnd_of_file|n->letlen=len-niniflen=0thenLwt.return_unitelseunsafe_read_into_exactlyicbuf(ofs+n)lenletread_into_exactlyicbufofslen=ifofs<0||len<0||ofs+len>Bytes.lengthbufthenLwt.fail(Invalid_argument"Lwt_io.read_into_exactly")elsebeginiflen=0thenLwt.return_unitelseunsafe_read_into_exactlyicbufofslenendletrev_concatlenl=letbuf=Bytes.createleninlet_=List.fold_left(funofsstr->letlen=String.lengthstrinletofs=ofs-leninString.unsafe_blitstr0bufofslen;ofs)lenlinbufletrecread_allictotal_lenacc=letlen=ic.max-ic.ptrinletbuf=Bytes.createleninLwt_bytes.unsafe_blit_to_bytesic.bufferic.ptrbuf0len;letstr=Bytes.unsafe_to_stringbufinic.ptr<-ic.max;refillic>>=function|0->Lwt.return(rev_concat(len+total_len)(str::acc))|_->read_allic(len+total_len)(str::acc)letreadcountic=matchcountwith|None->read_allic0[]>|=Bytes.unsafe_to_string|Somelen->letbuf=Bytes.createleninunsafe_read_intoicbuf0len>>=funreal_len->ifreal_len<lenthenLwt.returnBytes.(subbuf0real_len|>unsafe_to_string)elseLwt.return(Bytes.unsafe_to_stringbuf)letread_valueic=letheader=Bytes.create20inunsafe_read_into_exactlyicheader020>>=fun()->letbsize=Marshal.data_sizeheader0inletbuffer=Bytes.create(20+bsize)inBytes.unsafe_blitheader0buffer020;unsafe_read_into_exactlyicbuffer20bsize>>=fun()->(* Marshal.from_bytes should be used here, but we want 4.01
compat. *)Lwt.return(Marshal.from_string(Bytes.unsafe_to_stringbuffer)0)(* +---------------------------------------------------------------+
| Writing |
+---------------------------------------------------------------+ *)letflush=flush_totalletrecwrite_charocch=letptr=oc.ptrinifptr<oc.lengththenbeginoc.ptr<-ptr+1;Lwt_bytes.unsafe_setoc.bufferptrch;Lwt.return_unitendelseflush_partialoc>>=fun_->write_charocchletrecunsafe_write_fromocstrofslen=letavail=oc.length-oc.ptrinifavail>=lenthenbeginLwt_bytes.unsafe_blit_from_bytesstrofsoc.bufferoc.ptrlen;oc.ptr<-oc.ptr+len;Lwt.return0endelsebeginLwt_bytes.unsafe_blit_from_bytesstrofsoc.bufferoc.ptravail;oc.ptr<-oc.length;flush_partialoc>>=fun_->letlen=len-availinifoc.ptr=0thenbeginiflen=0thenLwt.return0else(* Everything has been written, try to write more: *)unsafe_write_fromocstr(ofs+avail)lenendelse(* Not everything has been written, just what is
remaining: *)Lwt.returnlenendletwrite_fromocbufofslen=ifofs<0||len<0||ofs+len>Bytes.lengthbufthenLwt.fail(Invalid_argument"Lwt_io.write_from")elsebeginiflen=0thenLwt.return0elseunsafe_write_fromocbufofslen>>=funremaining->Lwt.return(len-remaining)endletwrite_from_stringocbufofslen=letbuf=Bytes.unsafe_of_stringbufinwrite_fromocbufofslenletrecunsafe_write_from_exactlyocbufofslen=unsafe_write_fromocbufofslen>>=function|0->Lwt.return_unit|n->unsafe_write_from_exactlyocbuf(ofs+len-n)nletwrite_from_exactlyocbufofslen=ifofs<0||len<0||ofs+len>Bytes.lengthbufthenLwt.fail(Invalid_argument"Lwt_io.write_from_exactly")elsebeginiflen=0thenLwt.return_unitelseunsafe_write_from_exactlyocbufofslenendletwrite_from_string_exactlyocbufofslen=letbuf=Bytes.unsafe_of_stringbufinwrite_from_exactlyocbufofslenletwriteocstr=letbuf=Bytes.unsafe_of_stringstrinunsafe_write_from_exactlyocbuf0(Bytes.lengthbuf)letwrite_lineocstr=letbuf=Bytes.unsafe_of_stringstrinunsafe_write_from_exactlyocbuf0(Bytes.lengthbuf)>>=fun()->write_charoc'\n'letwrite_valueoc?(flags=[])x=writeoc(Marshal.to_stringxflags)(* +---------------------------------------------------------------+
| Low-level access |
+---------------------------------------------------------------+ *)letrecread_block_unsafeicsizef=ific.max-ic.ptr<sizethenrefillic>>=function|0->Lwt.failEnd_of_file|_->read_block_unsafeicsizefelsebeginletptr=ic.ptrinic.ptr<-ptr+size;fic.bufferptrendletrecwrite_block_unsafeocsizef=ifoc.max-oc.ptr<sizethenflush_partialoc>>=fun_->write_block_unsafeocsizefelsebeginletptr=oc.ptrinoc.ptr<-ptr+size;foc.bufferptrendletblock:typem.m_channel->int->(Lwt_bytes.t->int->'aLwt.t)->'aLwt.t=funchsizef->ifsize<0||size>min_buffer_sizethenLwt.fail(Invalid_argument"Lwt_io.block")elseifch.max-ch.ptr>=sizethenbeginletptr=ch.ptrinch.ptr<-ptr+size;fch.bufferptrendelsematchch.modewith|Input->read_block_unsafechsizef|Output->write_block_unsafechsizefletperformtokendach=if!tokenthenbeginifda.da_max<>ch.max||da.da_ptr<ch.ptr||da.da_ptr>ch.maxthenLwt.fail(Invalid_argument"Lwt_io.direct_access.da_perform")elsebeginch.ptr<-da.da_ptr;perform_ioch>>=funcount->da.da_ptr<-ch.ptr;da.da_max<-ch.max;Lwt.returncountendendelseLwt.fail(Failure("Lwt_io.perform: this function can not be called outside "^"Lwt_io.direct_access"))letdirect_accesschf=lettoken=reftrueinletrecda={da_ptr=ch.ptr;da_max=ch.max;da_buffer=ch.buffer;da_perform=(fun_->performtokendach);}infda>>=funx->token:=false;ifda.da_max<>ch.max||da.da_ptr<ch.ptr||da.da_ptr>ch.maxthenLwt.fail(Failure"Lwt_io.direct_access: invalid result of [f]")elsebeginch.ptr<-da.da_ptr;Lwt.returnxendmoduleMakeNumberIO(ByteOrder:ByteOrder.S)=structopenByteOrder(* +-------------------------------------------------------------+
| Reading numbers |
+-------------------------------------------------------------+ *)letgetbufferptr=Char.code(Lwt_bytes.unsafe_getbufferptr)letread_intic=read_block_unsafeic4(funbufferptr->letv0=getbuffer(ptr+pos32_0)andv1=getbuffer(ptr+pos32_1)andv2=getbuffer(ptr+pos32_2)andv3=getbuffer(ptr+pos32_3)inletn3=ifv3>=128thenv3-256elsev3inletv=v0+(v1lsl8)+(v2lsl16)+(n3lsl24)inLwt.returnv)letread_int16ic=read_block_unsafeic2(funbufferptr->letv0=getbuffer(ptr+pos16_0)andv1=getbuffer(ptr+pos16_1)inletv=v0lor(v1lsl8)inifv1land0x80=0thenLwt.returnvelseLwt.return(v-(1lsl16)))letread_int32ic=read_block_unsafeic4(funbufferptr->letv0=getbuffer(ptr+pos32_0)andv1=getbuffer(ptr+pos32_1)andv2=getbuffer(ptr+pos32_2)andv3=getbuffer(ptr+pos32_3)inLwt.return(Int32.logor(Int32.logor(Int32.of_intv0)(Int32.shift_left(Int32.of_intv1)8))(Int32.logor(Int32.shift_left(Int32.of_intv2)16)(Int32.shift_left(Int32.of_intv3)24))))letread_int64ic=read_block_unsafeic8(funbufferptr->letv0=getbuffer(ptr+pos64_0)andv1=getbuffer(ptr+pos64_1)andv2=getbuffer(ptr+pos64_2)andv3=getbuffer(ptr+pos64_3)andv4=getbuffer(ptr+pos64_4)andv5=getbuffer(ptr+pos64_5)andv6=getbuffer(ptr+pos64_6)andv7=getbuffer(ptr+pos64_7)inLwt.return(Int64.logor(Int64.logor(Int64.logor(Int64.of_intv0)(Int64.shift_left(Int64.of_intv1)8))(Int64.logor(Int64.shift_left(Int64.of_intv2)16)(Int64.shift_left(Int64.of_intv3)24)))(Int64.logor(Int64.logor(Int64.shift_left(Int64.of_intv4)32)(Int64.shift_left(Int64.of_intv5)40))(Int64.logor(Int64.shift_left(Int64.of_intv6)48)(Int64.shift_left(Int64.of_intv7)56)))))letread_float32ic=read_int32ic>>=funx->Lwt.return(Int32.float_of_bitsx)letread_float64ic=read_int64ic>>=funx->Lwt.return(Int64.float_of_bitsx)(* +-------------------------------------------------------------+
| Writing numbers |
+-------------------------------------------------------------+ *)letsetbufferptrx=Lwt_bytes.unsafe_setbufferptr(Char.unsafe_chrx)letwrite_intocv=write_block_unsafeoc4(funbufferptr->setbuffer(ptr+pos32_0)v;setbuffer(ptr+pos32_1)(vlsr8);setbuffer(ptr+pos32_2)(vlsr16);setbuffer(ptr+pos32_3)(vasr24);Lwt.return_unit)letwrite_int16ocv=write_block_unsafeoc2(funbufferptr->setbuffer(ptr+pos16_0)v;setbuffer(ptr+pos16_1)(vlsr8);Lwt.return_unit)letwrite_int32ocv=write_block_unsafeoc4(funbufferptr->setbuffer(ptr+pos32_0)(Int32.to_intv);setbuffer(ptr+pos32_1)(Int32.to_int(Int32.shift_rightv8));setbuffer(ptr+pos32_2)(Int32.to_int(Int32.shift_rightv16));setbuffer(ptr+pos32_3)(Int32.to_int(Int32.shift_rightv24));Lwt.return_unit)letwrite_int64ocv=write_block_unsafeoc8(funbufferptr->setbuffer(ptr+pos64_0)(Int64.to_intv);setbuffer(ptr+pos64_1)(Int64.to_int(Int64.shift_rightv8));setbuffer(ptr+pos64_2)(Int64.to_int(Int64.shift_rightv16));setbuffer(ptr+pos64_3)(Int64.to_int(Int64.shift_rightv24));setbuffer(ptr+pos64_4)(Int64.to_int(Int64.shift_rightv32));setbuffer(ptr+pos64_5)(Int64.to_int(Int64.shift_rightv40));setbuffer(ptr+pos64_6)(Int64.to_int(Int64.shift_rightv48));setbuffer(ptr+pos64_7)(Int64.to_int(Int64.shift_rightv56));Lwt.return_unit)letwrite_float32ocv=write_int32oc(Int32.bits_of_floatv)letwrite_float64ocv=write_int64oc(Int64.bits_of_floatv)end(* +---------------------------------------------------------------+
| Random access |
+---------------------------------------------------------------+ *)letdo_seekfun_nameseekpos=seekposUnix.SEEK_SET>>=funoffset->ifoffset<>posthenLwt.fail(Failure(Printf.sprintf"Lwt_io.%s: seek failed"fun_name))elseLwt.return_unitletset_position:typem.m_channel->int64->unitLwt.t=funchpos->matchch.typ,ch.modewith|Type_normal(_,seek),Output->flush_totalch>>=fun()->do_seek"set_position"seekpos>>=fun()->ch.offset<-pos;Lwt.return_unit|Type_normal(_,seek),Input->letcurrent=Int64.subch.offset(Int64.of_int(ch.max-ch.ptr))inifpos>=current&&pos<=ch.offsetthenbeginch.ptr<-ch.max-(Int64.to_int(Int64.subch.offsetpos));Lwt.return_unitendelsebegindo_seek"set_position"seekpos>>=fun()->ch.offset<-pos;ch.ptr<-0;ch.max<-0;Lwt.return_unitend|Type_bytes,_->ifpos<0L||pos>Int64.of_intch.lengththenLwt.fail(Failure"Lwt_io.set_position: out of bounds")elsebeginch.ptr<-Int64.to_intpos;Lwt.return_unitendletlengthch=matchch.typwith|Type_normal(_,seek)->seek0LUnix.SEEK_END>>=funlen->do_seek"length"seekch.offset>>=fun()->Lwt.returnlen|Type_bytes->Lwt.return(Int64.of_intch.length)end(* +-----------------------------------------------------------------+
| Primitive operations |
+-----------------------------------------------------------------+ *)letread_charwrapper=letchannel=wrapper.channelinletptr=channel.ptrin(* Speed-up in case a character is available in the buffer. It
increases performances by 10x. *)ifwrapper.state=Idle&&ptr<channel.maxthenbeginchannel.ptr<-ptr+1;Lwt.return(Lwt_bytes.unsafe_getchannel.bufferptr)endelseprimitivePrimitives.read_charwrapperletread_char_optwrapper=letchannel=wrapper.channelinletptr=channel.ptrinifwrapper.state=Idle&&ptr<channel.maxthenbeginchannel.ptr<-ptr+1;Lwt.return(Some(Lwt_bytes.unsafe_getchannel.bufferptr))endelseprimitivePrimitives.read_char_optwrapperletread_lineic=primitivePrimitives.read_lineicletread_line_optic=primitivePrimitives.read_line_opticletread?countic=primitive(funic->Primitives.readcountic)icletread_intoicstrofslen=primitive(funic->Primitives.read_intoicstrofslen)icletread_into_exactlyicstrofslen=primitive(funic->Primitives.read_into_exactlyicstrofslen)icletread_valueic=primitivePrimitives.read_valueicletflushoc=primitivePrimitives.flushocletwrite_charwrapperx=letchannel=wrapper.channelinletptr=channel.ptrinifwrapper.state=Idle&&ptr<channel.maxthenbeginchannel.ptr<-ptr+1;Lwt_bytes.unsafe_setchannel.bufferptrx;(* Fast launching of the auto flusher: *)ifnotchannel.auto_flushingthenbeginchannel.auto_flushing<-true;ignore(auto_flushchannel);Lwt.return_unitendelseLwt.return_unitendelseprimitive(funoc->Primitives.write_charocx)wrapperletwriteocstr=primitive(funoc->Primitives.writeocstr)ocletwrite_lineocx=primitive(funoc->Primitives.write_lineocx)ocletwrite_fromocstrofslen=primitive(funoc->Primitives.write_fromocstrofslen)ocletwrite_from_stringocstrofslen=primitive(funoc->Primitives.write_from_stringocstrofslen)ocletwrite_from_exactlyocstrofslen=primitive(funoc->Primitives.write_from_exactlyocstrofslen)ocletwrite_from_string_exactlyocstrofslen=primitive(funoc->Primitives.write_from_string_exactlyocstrofslen)ocletwrite_valueoc?flagsx=primitive(funoc->Primitives.write_valueoc?flagsx)ocletblockchsizef=primitive(funch->Primitives.blockchsizef)chletdirect_accesschf=primitive(funch->Primitives.direct_accesschf)chletset_positionchpos=primitive(funch->Primitives.set_positionchpos)chletlengthch=primitivePrimitives.lengthchmoduletypeNumberIO=sigvalread_int:input_channel->intLwt.tvalread_int16:input_channel->intLwt.tvalread_int32:input_channel->int32Lwt.tvalread_int64:input_channel->int64Lwt.tvalread_float32:input_channel->floatLwt.tvalread_float64:input_channel->floatLwt.tvalwrite_int:output_channel->int->unitLwt.tvalwrite_int16:output_channel->int->unitLwt.tvalwrite_int32:output_channel->int32->unitLwt.tvalwrite_int64:output_channel->int64->unitLwt.tvalwrite_float32:output_channel->float->unitLwt.tvalwrite_float64:output_channel->float->unitLwt.tendmoduleMakeNumberIO(ByteOrder:ByteOrder.S)=structmodulePrimitives=Primitives.MakeNumberIO(ByteOrder)letread_intic=primitivePrimitives.read_inticletread_int16ic=primitivePrimitives.read_int16icletread_int32ic=primitivePrimitives.read_int32icletread_int64ic=primitivePrimitives.read_int64icletread_float32ic=primitivePrimitives.read_float32icletread_float64ic=primitivePrimitives.read_float64icletwrite_intocx=primitive(funoc->Primitives.write_intocx)ocletwrite_int16ocx=primitive(funoc->Primitives.write_int16ocx)ocletwrite_int32ocx=primitive(funoc->Primitives.write_int32ocx)ocletwrite_int64ocx=primitive(funoc->Primitives.write_int64ocx)ocletwrite_float32ocx=primitive(funoc->Primitives.write_float32ocx)ocletwrite_float64ocx=primitive(funoc->Primitives.write_float64ocx)ocendmoduleLE=MakeNumberIO(ByteOrder.LE)moduleBE=MakeNumberIO(ByteOrder.BE)typebyte_order=Lwt_sys.byte_order=Little_endian|Big_endianletsystem_byte_order=Lwt_sys.byte_orderinclude(val(matchsystem_byte_orderwith|Little_endian->(moduleLE:NumberIO)|Big_endian->(moduleBE:NumberIO)):NumberIO)(* +-----------------------------------------------------------------+
| Other |
+-----------------------------------------------------------------+ *)letread_charsic=Lwt_stream.from(fun_->read_char_optic)letwrite_charsocchars=Lwt_stream.iter_s(funchar->write_charocchar)charsletread_linesic=Lwt_stream.from(fun_->read_line_optic)letwrite_linesoclines=Lwt_stream.iter_s(funline->write_lineocline)linesletzero=make~mode:input~buffer:(Lwt_bytes.createmin_buffer_size)(funstrofslen->Lwt_bytes.fillstrofslen'\x00';Lwt.returnlen)letnull=make~mode:output~buffer:(Lwt_bytes.createmin_buffer_size)(fun_str_ofslen->Lwt.returnlen)(* Do not close standard ios on close, otherwise uncaught exceptions
will not be printed *)letstdin=of_fd~mode:inputLwt_unix.stdinletstdout=of_fd~mode:outputLwt_unix.stdoutletstderr=of_fd~mode:outputLwt_unix.stderrletfprintoctxt=writeoctxtletfprintloctxt=write_lineoctxtletfprintfocfmt=Printf.ksprintf(funtxt->writeoctxt)fmtletfprintlfocfmt=Printf.ksprintf(funtxt->write_lineoctxt)fmtletprinttxt=writestdouttxtletprintltxt=write_linestdouttxtletprintffmt=Printf.ksprintfprintfmtletprintlffmt=Printf.ksprintfprintlfmtleteprinttxt=writestderrtxtleteprintltxt=write_linestderrtxtleteprintffmt=Printf.ksprintfeprintfmtleteprintlffmt=Printf.ksprintfeprintlfmtletpipe?in_buffer?out_buffer_=letfd_r,fd_w=Lwt_unix.pipe()in(of_fd?buffer:in_buffer~mode:inputfd_r,of_fd?buffer:out_buffer~mode:outputfd_w)typefile_name=stringletopen_file:typem.?buffer:Lwt_bytes.t->?flags:Unix.open_flaglist->?perm:Unix.file_perm->mode:mmode->file_name->mchannelLwt.t=fun?buffer?flags?perm~modefilename->letflags=matchflags,modewith|Somel,_->l|None,Input->[Unix.O_RDONLY;Unix.O_NONBLOCK]|None,Output->[Unix.O_WRONLY;Unix.O_CREAT;Unix.O_TRUNC;Unix.O_NONBLOCK]andperm=matchperm,modewith|Somep,_->p|None,Input->0|None,Output->0o666inLwt_unix.openfilefilenameflagsperm>>=funfd->Lwt.return(of_fd?buffer~modefd)letwith_file?buffer?flags?perm~modefilenamef=open_file?buffer?flags?perm~modefilename>>=funic->Lwt.finalize(fun()->fic)(fun()->closeic)letprng=lazy(Random.State.make_self_init())lettemp_file_nametemp_dirprefix=letrnd=Random.State.int(Lazy.forceprng)0x1000000inFilename.concattemp_dir(Printf.sprintf"%s%06x"prefixrnd)letopen_temp_file?buffer?flags?perm?temp_dir?prefix()=letflags=matchflagswith|None->[Unix.O_WRONLY;Unix.O_CREAT;Unix.O_EXCL;Unix.O_CLOEXEC]|Someflags->flagsinletdir=matchtemp_dirwith|None->Filename.get_temp_dir_name()|Somedirname->dirnameinletprefix=matchprefixwith|None->"lwt_io_temp_file_"|Someprefix->prefixinletrecattemptn=letfname=temp_file_namedirprefixinLwt.catch(fun()->open_file?buffer~flags?perm~mode:Outputfname>>=funchan->Lwt.return(fname,chan))(function|Unix.Unix_error_whenn<1000->attempt(n+1)|exn->Lwt.failexn)inattempt0letwith_temp_file?buffer?flags?perm?temp_dir?prefixf=open_temp_file?buffer?flags?perm?temp_dir?prefix()>>=fun(fname,chan)->Lwt.finalize(fun()->f(fname,chan))(fun()->closechan>>=fun()->Lwt_unix.unlinkfname)letfile_lengthfilename=Lwt_unix.statfilename>>=funstat->ifstat.Unix.st_kind=Unix.S_DIRthenLwt.fail(Unix.(Unix_error(EISDIR,"file_length",filename)))elsewith_file~mode:inputfilenamelengthletclose_socketfd=Lwt.finalize(fun()->Lwt.catch(fun()->Lwt_unix.shutdownfdUnix.SHUTDOWN_ALL;Lwt.return_unit)(function(* Occurs if the peer closes the connection first. *)|Unix.Unix_error(Unix.ENOTCONN,_,_)->Lwt.return_unit|exn->Lwt.failexn)[@ocaml.warning"-4"])(fun()->Lwt_unix.closefd)letopen_connection?fd?in_buffer?out_buffersockaddr=letfd=matchfdwith|None->Lwt_unix.socket(Unix.domain_of_sockaddrsockaddr)Unix.SOCK_STREAM0|Somefd->fdinletclose=lazy(close_socketfd)inLwt.catch(fun()->Lwt_unix.connectfdsockaddr>>=fun()->(tryLwt_unix.set_close_on_execfdwithInvalid_argument_->());Lwt.return(make?buffer:in_buffer~close:(fun_->Lazy.forceclose)~mode:input(Lwt_bytes.readfd),make?buffer:out_buffer~close:(fun_->Lazy.forceclose)~mode:output(Lwt_bytes.writefd)))(funexn->Lwt_unix.closefd>>=fun()->Lwt.failexn)letwith_close_connectionf(ic,oc)=(* If the user already tried to close the socket and got an exception, we
don't want to raise that exception again during implicit close. *)letclose_if_not_closedchannel=ifis_closedchannelthenLwt.return_unitelseclosechannelinLwt.finalize(fun()->f(ic,oc))(fun()->close_if_not_closedic<&>close_if_not_closedoc)letwith_connection?fd?in_buffer?out_buffersockaddrf=open_connection?fd?in_buffer?out_buffersockaddr>>=funchannels->with_close_connectionfchannelstypeserver={shutdown:unitLwt.tLazy.t;}letshutdown_serverserver=Lazy.forceserver.shutdownletshutdown_server_deprecatedserver=Lwt.async(fun()->shutdown_serverserver)(* There are several variants of establish_server that have accumulated over the
years in Lwt_io. This is their underlying implementation. The functions
exposed in the API are various wrappers around this one. *)letestablish_server_genericbind_function?fd:preexisting_socket_for_listening?(backlog=5)listening_addressconnection_handler_callback=letlistening_socket=matchpreexisting_socket_for_listeningwith|None->Lwt_unix.socket(Unix.domain_of_sockaddrlistening_address)Unix.SOCK_STREAM0|Somesocket->socketinLwt_unix.setsockoptlistening_socketUnix.SO_REUSEADDRtrue;(* This promise gets resolved with `Should_stop when the user calls
Lwt_io.shutdown_server. This begins the shutdown procedure. *)letshould_stop,notify_should_stop=Lwt.wait()in(* Some time after Lwt_io.shutdown_server is called, this function
establish_server_generic will actually close the listening socket. At that
point, this promise is resolved. This ends the shutdown procedure. *)letwait_until_listening_socket_closed,notify_listening_socket_closed=Lwt.wait()inletrecaccept_loop()=lettry_to_accept=Lwt_unix.acceptlistening_socket>|=funx->`AcceptedxinLwt.pick[try_to_accept;should_stop]>>=function|`Accepted(client_socket,client_address)->begintryLwt_unix.set_close_on_execclient_socketwithInvalid_argument_->()end;connection_handler_callbackclient_addressclient_socket;accept_loop()|`Should_stop->Lwt_unix.closelistening_socket>>=fun()->beginmatchlistening_addresswith|Unix.ADDR_UNIXpathwhenpath<>""&&path.[0]<>'\x00'->Unix.unlinkpath|_->()end[@ocaml.warning"-4"];Lwt.wakeup_laternotify_listening_socket_closed();Lwt.return_unitinletserver={shutdown=lazybeginLwt.wakeup_laternotify_should_stop`Should_stop;wait_until_listening_socket_closedend}in(* Actually start the server. *)letserver_has_started=bind_functionlistening_socketlistening_address>>=fun()->Lwt_unix.listenlistening_socketbacklog;Lwt.asyncaccept_loop;Lwt.return_unitinserver,server_has_startedletestablish_server_with_client_socket?server_fd?backlog?(no_close=false)sockaddrf=lethandlerclient_addressclient_socket=Lwt.asyncbeginfun()->(* Not using Lwt.finalize here, to make sure that exceptions from [f]
reach !Lwt.async_exception_hook before exceptions from closing the
channels. *)Lwt.catch(fun()->fclient_addressclient_socket)(funexn->!Lwt.async_exception_hookexn;Lwt.return_unit)>>=fun()->ifno_closethenLwt.return_unitelseifLwt_unix.stateclient_socket=Lwt_unix.ClosedthenLwt.return_unitelseLwt.catch(fun()->close_socketclient_socket)(funexn->!Lwt.async_exception_hookexn;Lwt.return_unit)endinletserver,server_started=establish_server_genericLwt_unix.bind?fd:server_fd?backlogsockaddrhandlerinserver_started>>=fun()->Lwt.returnserverletestablish_server_with_client_address_genericbind_function?fd?(buffer_size=!default_buffer_size)?backlog?(no_close=false)sockaddrhandler=letbest_effort_closechannel=(* First, check whether the channel is closed. f may have already tried to
close the channel, received an exception, and handled it somehow. If so,
trying to close the channel here will trigger the same exception, which
will go to !Lwt.async_exception_hook, despite the user's efforts. *)(* The Invalid state is not possible on the channel, because it was not
created using Lwt_io.atomic. *)ifis_closedchannelthenLwt.return_unitelseLwt.catch(fun()->closechannel)(funexn->!Lwt.async_exception_hookexn;Lwt.return_unit)inlethandlerclient_addressclient_socket=Lwt.async(fun()->letclose=lazy(close_socketclient_socket)inletinput_channel=of_fd~buffer:(Lwt_bytes.createbuffer_size)~mode:input~close:(fun()->Lazy.forceclose)client_socketinletoutput_channel=of_fd~buffer:(Lwt_bytes.createbuffer_size)~mode:output~close:(fun()->Lazy.forceclose)client_socketin(* Not using Lwt.finalize here, to make sure that exceptions from [f]
reach !Lwt.async_exception_hook before exceptions from closing the
channels. *)Lwt.catch(fun()->handlerclient_address(input_channel,output_channel))(funexn->!Lwt.async_exception_hookexn;Lwt.return_unit)>>=fun()->ifno_closethenLwt.return_unitelsebest_effort_closeinput_channel>>=fun()->best_effort_closeoutput_channel)inestablish_server_genericbind_function?fd?backlogsockaddrhandlerletestablish_server_with_client_address?fd?buffer_size?backlog?no_closesockaddrhandler=letserver,server_started=establish_server_with_client_address_genericLwt_unix.bind?fd?buffer_size?backlog?no_closesockaddrhandlerinserver_started>>=fun()->Lwt.returnserverletestablish_server?fd?buffer_size?backlog?no_closesockaddrf=letf_addrc=fcinestablish_server_with_client_address?fd?buffer_size?backlog?no_closesockaddrf(* Old, deprecated version of [establish_server]. This function has to persist
for a while, in some form, until it is no longer exposed as
[Lwt_io.Versioned.establish_server_1]. *)letestablish_server_deprecated?fd?buffer_size?backlogsockaddrf=letblocking_bindfdaddr=Lwt.return(Lwt_unix.Versioned.bind_1fdaddr)[@ocaml.warning"-3"]inletf_addrc=fc;Lwt.return_unitinletserver,server_started=establish_server_with_client_address_genericblocking_bind?fd?buffer_size?backlog~no_close:truesockaddrfin(* Poll for exceptions in server startup that may have occurred synchronously.
This emulates an old, deprecated behavior. *)Lwt.ignore_resultserver_started;serverletignore_closech=ignore(closech)letmake_streamflazy_ic=letlazy_ic=lazy(Lazy.forcelazy_ic>>=funic->Gc.finaliseignore_closeic;Lwt.returnic)inLwt_stream.from(fun_->Lazy.forcelazy_ic>>=funic->fic>>=funx->ifx=Nonethencloseic>>=fun()->Lwt.returnxelseLwt.returnx)letlines_of_filefilename=make_streamread_line_opt(lazy(open_file~mode:inputfilename))letlines_to_filefilenamelines=with_file~mode:outputfilename(funoc->write_linesoclines)letchars_of_filefilename=make_streamread_char_opt(lazy(open_file~mode:inputfilename))letchars_to_filefilenamechars=with_file~mode:outputfilename(funoc->write_charsocchars)lethexdump_streamocstream=write_linesoc(Lwt_stream.hexdumpstream)lethexdumpocbuf=hexdump_streamoc(Lwt_stream.of_stringbuf)letset_default_buffer_sizesize=check_buffer_size"set_default_buffer_size"size;default_buffer_size:=sizeletdefault_buffer_size_=!default_buffer_sizemoduleVersioned=structletestablish_server_1=establish_server_deprecatedletestablish_server_2=establish_serverletshutdown_server_1=shutdown_server_deprecatedletshutdown_server_2=shutdown_serverend