123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)(* Reading guide
Welcome to the implementation of the Lwt core! This is a big file, but we
hope that reading it (parts at a time!) will not be scary :) Here is why:
* Sectioning
The code is broken up into sections, each one of which is an internal module.
Most of the modules have a signature, which serves as a neat table of
contents.
It is recommended that you read this file with code folding enabled. If you
fold all the modules, you can visualize the logical structure of Lwt quite
easily. You can then expand modules as needed, depending on what part of the
implementation you are interested in. Without code folding, you face an
intimidating wall of code :( You can still visually parse the file, however,
because there are plenty of blank lines to help section things off. You can
also view this file folded online:
https://gist.github.com/aantron/9fab0bdead98a60fccf06e0189186863
https://gist.github.com/aantron/97b58520d5bb4858ccac6f54700a24d7
The signatures are unusual: big comments are absent. They are moved into the
modules, so that they are hidden by code folding when you (the reader!) are
not interested in those modules.
* Documentation
The documentation begins with an overview of major concepts and components.
This overview puts everything into context. You don't have to read the whole
thing. The overview begins with basic concepts, moves on to advanced ones,
and then gets into the truly esoteric. You can read about each concept on an
as-needed basis. However, once you have read the whole overview, you will be
aware of *everything* that is needed to understand, and work with, the core
of Lwt.
Littered in the code are additional comments, that go in-depth on various
local implementation details, opportunities, regrets, and the like.
The sections (modules) of the code correspond closely to sections of the
overview.
* Whitespace
The total line count of this file may seem frightening, but one third of it
is whitespace and comments, both there to help you read the remaining two
thirds!
Also, within those two thirds, there are large groups of functions that are
repetitive and formulaic, so there is much less conceptually-unique code in
Lwt than you might think at first.
* Please edit the code and the docs!
This code is meant to be readable, and to be edited. If you are reading
something, and think there is a better way to express it, please go ahead and
open a pull request to the Lwt repository at
https://github.com/ocsigen/lwt
Even if your pull request somehow doesn't get merged, you will have educated
the maintainers, not to mention other contributors, and users. This is true
even if the change is trivial -- sometimes, maintainers just need to be
educated multiple times before they see the wisdom of it :/
Likewise, if you would like to make a code contribution to the Lwt core, it
is quite welcome, and we hope that this code is readable enough for you to be
able to make it!
Enjoy! *)(* Overview
In this file, there is a "model" function -- [Lwt.bind] -- which pulls
together many (though not all) of the concepts and helpers discussed in this
overview. To find it, search for "let bind," and you can examine it while
reading the overview. The authors of this file intend to put extra effort
into writing nice comments inside [Lwt.bind] :)
0. Main mechanism and two aspects
The Lwt interface ([lwt.mli]) provides one main mechanism, promises, and two
"aspects," which are *not* necessary to understand the main mechanism
promises, but they are still there:
- promise cancellation
- sequence-associated storage
If you are not interested in cancellation or storage, you can ignore these
two complications, and still get a pretty good understanding of the code. To
help, all identifiers related to cancellation contain the string "cancel,"
and all identifiers related to storage contain "storage."
1. Promises
A promise is a cell that can be in one of two states: "resolved" or
"pending."
- Resolved promises
A resolved promise is either "fulfilled" with a value, or "rejected" with
an exception. The state of a resolved promise will never change again: a
resolved promise is immutable. A resolved promise is basically equivalent
to an [('a, exn) Pervasives.result]. Resolved promises are produced in two
ways:
- [Lwt.return], [Lwt.fail], and related functions, produce "trivial"
promises that are resolved from the start.
- The other way is to resolve a promise that started out pending.
Note that rejected promises have nothing to do with unhandled exceptions.
- Pending promises
...are those that may become resolved in the future. Each pending promise
carries a list of callbacks. These callbacks are added by functions like
[Lwt.bind], and called by Lwt if/when the promise is resolved. These
callbacks typically end up resolving additional promises; see section
"Resolution loop" below.
Pending promises are produced in three ways, according to how they can be
resolved:
- Initial promises
...are created by [Lwt.wait] and [Lwt.task]. The user of Lwt resolves
these promises manually, through the resolvers returned by those
functions.
- Sequential composition
For example, [Lwt.bind]. These promises only are only resolved when the
preceding sequence of promises resolves. The user cannot resolve these
promises directly (but see the section on cancellation below).
- Concurrent composition
For example, [Lwt.join] or [Lwt.choose]. These promises are only resolved
when all or one of a set of "preceding" promises resolve. The user cannot
resolve these promises directly (but see the section on cancellation
below).
2. Resolvers
Resolvers are given to the user by [Lwt.wait] and [Lwt.task], and can be used
by the user to resolve the corresponding promises. Note that this means the
user only ever gets resolvers for initial promises.
Internally, resolvers are the exact same objects as the promises they
resolve, even though the resolver is exposed as a reference of a different
type by [lwt.mli]. For details on why, see section "Type system abuse" below.
3. Callbacks
...are attached by Lwt to pending promises, and are run by Lwt if/when those
promises are resolved. These callbacks are not directly exposed through
[lwt.mli] -- they are a low-level mechanism. For example, to implement
[Lwt.bind p f], Lwt attaches a callback to [p] that does some internal Lwt
book-keeping, and then calls [f] if [p] is fulfilled, and does something else
if [p] is rejected.
Callbacks come in two flavors: regular callbacks and cancel callbacks. The
only material differences between them are that:
- regular callbacks are always called when a promise is resolved, but cancel
callbacks are called, in addition, only if the promise is canceled, and
- all cancel callbacks of a promise are called before any regular callback
is called.
Cancellation is a special case of resolution, in particular, a special case
of rejection, but see the section on cancellation later below.
4. Resolution loop
Resolving a pending promise triggers its callbacks, and those might resolve
more pending promises, triggering more callbacks, etc. This behavior is the
*resolution loop*. Lwt has some machinery to avoid stack overflow and other
unfortunate situations during this loop.
This chaining of promise resolutions through callbacks can be seen as a kind
of promise dependency graph, in which the nodes are pending promises, and the
edges are callbacks. During the resolution loop, Lwt starts at some initial
promise that is getting resolved by the user, and recursively resolves all
dependent promises. The graph is modified: resolved promises are no longer
pending, so they are no longer part of the graph.
Some of these dependencies are explicit to Lwt, e.g. the callbacks registered
by [Lwt.bind]. Others are not visible to Lwt, because the user can always
register a callback using a function like [Lwt.on_success], and use that
callback to resolve another initial promise. All the explicit dependencies
are created by Lwt's own sequential and concurrent composition functions
(so, [Lwt.bind], [Lwt.join], etc). Whether dependencies are explicit or not
is relevant only to cancellation.
5. Cancellation
As described above, ordinary promise resolution proceeds from an initial
promise, forward along callbacks through the dependency graph. Since it
starts from an initial promise, it can only be triggered using a resolver.
Cancellation is a sort of dual to ordinary resolution. Instead of starting at
an initial promise/resolver, cancellation starts at *any* promise. It then
goes *backwards* through the explicit dependency graph, looking for
cancelable initial promises to cancel -- those that were created by
[Lwt.task]. After finding them, cancellation resolves them normally with
[Rejected Lwt.Canceled], causing an ordinary promise resolution process.
To summarize, cancellation is a way to trigger an *ordinary* resolution of
promises created with [Lwt.task], by first searching for them in the promise
dependency graph (which is assembled by [Lwt.bind], [Lwt.join], etc).
This backwards search is triggered only by [Lwt.cancel]. It is also possible
for the user to cancel a promise directly by rejecting it with
[Lwt.Canceled], but in all cases where the user can do so, the search would
be redundant anyway -- the user has only two ways of directly rejecting a
promise with [Lwt.Canceled] (or any exception, for that matter):
- The user can create an initial promise, then reject it through its
resolver. The search is redundant because it would find only the same
initial promise to cancel.
- The user can create a trivial promise by calling [Lwt.fail Lwt.Canceled].
The search is again redundant; in this case it would find nothing to
cancel.
Note that there is a quirk: only promises created by [Lwt.task] are
susceptible to being canceled by [Lwt.cancel], but the user can manually
cancel initial promises created by both [Lwt.task] and [Lwt.wait].
Due to [Lwt.cancel], promise cancellation, and therefore resolution, can be
initiated by the user without access to a resolver. This is important for
reasoning about state changes in the implementation of Lwt, and is referenced
in some implementation detail comments.
6. No I/O
The Lwt core deliberately doesn't do I/O. The resolution loop stops running
once no promises can be resolved immediately. It has to be restarted later
by some surrounding I/O loop. This I/O loop typically keeps track of pending
promises that represent blocked or in-progress I/O; other pending promises
that indirectly depend on I/O are not explicitly tracked. They are retained
in memory by references captured inside callbacks.
On Unix and Windows, a separate top-level loop, typically [Lwt_main.run], is
necessary to repeatedly call [select], [epoll], or [kevent], and resolve
blocked I/O promises.
In JavaScript, references to promises are retained by JavaScript code, which
is, in turn, triggered by the JS engine. In other words, the top-level loop
is buried inside the JS engine.
This separation of the Lwt core from the top-level I/O loop keeps the core
portable.
7. Promise "proxying"
In [Lwt.bind : 'a t -> ('a -> 'b t) -> 'b t], the outer ['b t] is created by
[bind] first, and returned to the user. The inner ['b t] is created by the
user later, and then returned to [bind]. At that point, [bind] needs to make
the inner and outer ['b t]s behave identically.
This is accomplished by making one of the promises point to the other. The
first of the promises thus becomes a "proxy," and the other is its
"underlying" promise.
After that, all operations that would be performed by Lwt on the proxy are
instead performed on the underlying promise. This is ensured by the numerous
calls to the internal function [underlying] in this file.
Because of the pervasive use of [underlying], proxies can be more or less
ignored on a first reading the code. However, becoming a proxy is a kind of
state change, and any promise that is returned by a callback to [bind], or to
a similar Lwt function, might become a proxy. That means: just about any
promise that is handed to the user, might become a proxy promise by the next
time Lwt sees it. This is important for reasoning about possible state
changes in implementation of Lwt, and is referenced in some implementation
detail comments.
8. Sequence-associated storage
Lwt has a global key-value map. The map can be preserved across sequential
composition functions, so that it has the same state in the user's callback
[f] as it did at the time the user called [Lwt.bind p f].
The details are pretty straightforward, and discussed in module
[Sequence_associated_storage]. The main thing to be aware of is the many
references to [current_storage] throughout Lwt, which are needed to properly
save and restore the mapping.
9. Type system abuse
The implementation uses the type system somewhat extensively. Gentle
introductions can be found here:
https://discuss.ocaml.org/t/161/7
https://discuss.ocaml.org/t/161/16
A short summary follows.
The promise state is, internally, a GADT which encodes the state in its type
parameters. Thus, if you do [let p = underlying p], the shadowing reference
[p] is statically known *not* to be a proxy, and the compiler knows that the
corresponding match case [Proxy _] is impossible.
The external promise type, ['a t], and the external resolver type, ['a u],
are not GADTs. Furthermore, they are, respectively, covariant and
contravariant in ['a], while the internal promise type is invariant in ['a].
For these reasons, there are nasty casts between ['a t], ['a u], and the
internal promise type. The implementation is, of course, written in terms of
the internal type.
Casting from an ['a t] to an internal promise produces a reference for
which the state is "unknown": this is simulated with a helper GADT, which
encodes existential types. There are several similar casts, which are used
to document possible state changes between the time a promise is created,
and the later time it is used in a callback. You can see these casts in
action in [Lwt.bind]. The cast syntax is pretty light, and, besides being
commented in [bind], all such casts are documented in modules [Public_types]
and [Basic_helpers].
If you've made it this far, you are an Lwt expert! Rejoice! *)(* Suppress warning 4, "fragile pattern matching," in this file only, due to
https://caml.inria.fr/mantis/view.php?id=7451
This can be removed if/when Lwt requires a minimum OCaml version 4.05. *)[@@@ocaml.warning"-4"](* [Lwt_sequence] is deprecated – we don't want users outside Lwt using it.
However, it is still used internally by Lwt. So, briefly disable warning 3
("deprecated"), and create a local, non-deprecated alias for
[Lwt_sequence] that can be referred to by the rest of the code in this
module without triggering any more warnings. *)[@@@ocaml.warning"-3"]moduleLwt_sequence=Lwt_sequence[@@@ocaml.warning"+3"](* Some sequence-associated storage types
Sequence-associated storage is defined and documented later, in module
[Sequence_associated_storage]. However, the following types are mentioned in
the definition of [promise], so they must be defined here first. *)moduleStorage_map=Map.Make(structtypet=intletcompare=compareend)typestorage=(unit->unit)Storage_map.tmoduleMain_internal_types=struct(* Phantom types for use with types [promise] and [state]. These are never
constructed; the purpose of the constructors is to prove to the type
checker that these types are distinct from each other. Warning 37, "unused
constructor," therefore has to be temporarily suppressed. *)[@@@ocaml.warning"-37"]typeunderlying=privateUnderlying_and_this_constructor_is_not_usedtypeproxy=privateProxy_and_this_constructor_is_not_usedtyperesolved=privateResolved_and_this_constructor_is_not_usedtypepending=privatePending_and_this_constructor_is_not_used[@@@ocaml.warning"+37"](* Promises proper. *)type('a,'u,'c)promise={mutablestate:('a,'u,'c)state;}and(_,_,_)state=|Fulfilled:'a->('a,underlying,resolved)state|Rejected:exn->(_,underlying,resolved)state|Pending:'acallbacks->('a,underlying,pending)state|Proxy:('a,_,'c)promise->('a,proxy,'c)state(* Note:
A promise whose state is [Proxy _] is a "proxy" promise. A promise whose
state is *not* [Proxy _] is an "underlying" promise.
The "underlying promise of [p]" is:
- [p], if [p] is itself underlying.
- Otherwise, [p] is a proxy and has state [Proxy p']. The underlying
promise of [p] is the underlying promise of [p'].
In other words, to find the underlying promise of a proxy, Lwt follows the
[Proxy _] links to the end. *)(* Note:
When a promise is resolved, or becomes a proxy, its state field is
mutated. This invalidates the type invariants on the promise. See internal
function [set_promise_state] for details about that.
When an Lwt function has a reference to a promise, and also registers a
callback that has a reference to the same promise, the invariants on the
reference may become invalid by the time the callback is called. All such
callbacks have comments explaining what the valid invariants are at that
point, and/or casts to (1) get the correct typing and (2) document the
potential state change for readers of the code. *)(* Callback information for pending promises. *)and'acallbacks={mutableregular_callbacks:'aregular_callback_list;mutablecancel_callbacks:'acancel_callback_list;mutablehow_to_cancel:how_to_cancel;mutablecleanups_deferred:int;}and'aregular_callback='aresolved_state->unitandcancel_callback=unit->unitand'aresolved_state=('a,underlying,resolved)stateandhow_to_cancel=|Not_cancelable:how_to_cancel|Cancel_this_promise:how_to_cancel|Propagate_cancel_to_one:(_,_,_)promise->how_to_cancel|Propagate_cancel_to_several:(_,_,_)promiselist->how_to_canceland'aregular_callback_list=|Regular_callback_list_empty|Regular_callback_list_concatof'aregular_callback_list*'aregular_callback_list|Regular_callback_list_implicitly_removed_callbackof'aregular_callback|Regular_callback_list_explicitly_removable_callbackof'aregular_callbackoptionrefand_cancel_callback_list=|Cancel_callback_list_empty:_cancel_callback_list|Cancel_callback_list_concat:'acancel_callback_list*'acancel_callback_list->'acancel_callback_list|Cancel_callback_list_callback:storage*cancel_callback->_cancel_callback_list|Cancel_callback_list_remove_sequence_node:('a,_,_)promiseLwt_sequence.node->'acancel_callback_list(* Notes:
These type definitions are guilty of performing several optimizations,
without which they would be much easier to understand.
- The type parameters of ['a resolved_state] guarantee that it is either
[Fulfilled _] or [Rejected _]. So, it is equivalent to
[('a, exn) Pervasives.result], and, indeed, should have an identical
memory representation.
- As per the Overview, there are regular callbacks and cancel callbacks.
Cancel callbacks are called only on cancellation, and, then, before any
regular callbacks are called.
Despite the different types for the two kinds of callbacks, they are
otherwise the same. Cancel callbacks just don't need a result state
argument, because it is known to be [Rejected Canceled].
- Regular callbacks are not allowed to raise exceptions. All regular
callbacks are created in this file, so this can be checked.
Cancel callbacks can raise exceptions, but if they do so, the exceptions
are passed to [async_exception_hook].
- [how_to_cancel] implements the dependency graph mentioned in the
Overview. It is traversed backwards during [Lwt.cancel]. It is a GADT
because we don't care about the actual types of the promise references
stored, or their invariants. The constructors correspond to pending
promise kinds as follows:
- [Not_cancelable]: initial, [Lwt.wait].
- [Cancel_this_promise]: initial, [Lwt.task].
- [Propagate_cancel_to_one]: sequential composition, e.g. [Lwt.bind].
- [Propagate_cancel_to_several]: concurrent composition, e.g.
[Lwt.join].
- The two callback list types are ordinary append-friendly lists, with two
optimizations inlined:
- ['a regular_callback_list] apparently has two "kinds" of regular
callbacks, implicitly removed and explicitly removable. All callbacks
are removable. It's just that, for some callbacks, they will only be
removed at the same time that the promise they are attached to becomes
resolved. When that happens, the entire state of that promise changes
to [Fulfilled _] or [Rejected _], and the reference to the whole
callback list is simply lost. This "removes" the callback. For these
callbacks, ['a regular_callback_list] attempts to trim an option and a
reference cell with the
[Regular_callback_list_implicitly_removed_callback] constructor.
- ['a cancel_callback_list] has
[Cancel_callback_list_remove_sequence_node node], which is the same as
[Cancel_callback_list_callback (_, (fun _ ->
Lwt_sequence.remove node))].
This was probably done to avoid a closure allocation.
- The [cleanups_deferred] field is explained in module
[Pending_callbacks]. *)endopenMain_internal_typesmodulePublic_types=structtype+'attype-'au(* The contravariance of resolvers is, technically, unsound due to the
existence of [Lwt.waiter_of_wakener]. That is why that function is
deprecated. See
https://github.com/ocsigen/lwt/issues/458 *)letto_public_promise:('a,_,_)promise->'at=Obj.magicletto_public_resolver:('a,_,_)promise->'au=Obj.magictype_packed_promise=|Internal:('a,_,_)promise->'apacked_promise[@@ocaml.unboxed]letto_internal_promise(p:'at):'apacked_promise=Internal(Obj.magicp)letto_internal_resolver(r:'au):'apacked_promise=Internal(Obj.magicr)(* Most functions that take a public promise (['a t]) convert it to an
internal promise as follows:
(* p : 'a t *)
let Internal p = to_internal_promise p in
(* p : ('a, u, c) promise, where u and c are fresh types, i.e. the
invariants on p are unknown. *)
This cast is a no-op cast. It only produces a reference with a different
type. The introduction and immediate elimination of [Internal _] seems to
be optimized away even on older versions of OCaml that don't have Flambda
and don't support [[@@ocaml.unboxed]]. *)(* Internal name of the public [+'a Lwt.result]. The public name is defined
later in the module. This is to avoid potential confusion with
[Pervasives.result]/[Result.result], as the public name would not be
prefixed with [Lwt.] inside this file. *)type+'alwt_result=('a,exn)Result.result(* This could probably save an allocation by using [Obj.magic]. *)letstate_of_result=function|Result.Okx->Fulfilledx|Result.Errorexn->RejectedexnendincludePublic_typesmoduleBasic_helpers:sigvalidentical:('a,_,_)promise->('a,_,_)promise->boolvalunderlying:('a,'u,'c)promise->('a,underlying,'c)promisetype('a,'u,'c)state_changed=|State_may_have_changedof('a,'u,'c)promise[@@ocaml.unboxed]valset_promise_state:('a,_,_)promise->('a,'u,'c)state->('a,'u,'c)state_changedtype'amay_now_be_proxy=|State_may_now_be_pending_proxy:('a,_,pending)promise->'amay_now_be_proxy[@@ocaml.unboxed]valmay_now_be_proxy:('a,underlying,pending)promise->'amay_now_be_proxyend=struct(* Checks physical equality ([==]) of two internal promises. Unlike [==], does
not force unification of their invariants. *)letidenticalp1p2=(to_public_promisep1)==(to_public_promisep2)(* [underlying p] evaluates to the underlying promise of [p].
If multiple [Proxy _] links are traversed, [underlying] updates all the
proxies to point immediately to their final underlying promise. *)letrecunderlying:'u'c.('a,'u,'c)promise->('a,underlying,'c)promise=fun(typeu)(typec)(p:('a,u,c)promise)->matchp.statewith|Fulfilled_->(p:(_,underlying,_)promise)|Rejected_->p|Pending_->p|Proxyp'->letp''=underlyingp'inifnot(identicalp''p')thenp.state<-Proxyp'';p''type('a,'u,'c)state_changed=|State_may_have_changedof('a,'u,'c)promise[@@ocaml.unboxed]letset_promise_statepstate=letp:(_,_,_)promise=Obj.magicpinp.state<-state;State_may_have_changedp(* [set_promise_state p state] mutates the state of [p], and evaluates to a
(wrapped) reference to [p] with the same invariants as on [state]. The
original reference [p] should be shadowed when calling this function:
let State_may_have_changed p = set_promise_state p (Fulfilled 42) in ...
This is a kind of cheap imitation of linear typing, which is good enough
for the needs of [lwt.ml].
Internal functions that transitively call [set_promise_state] likewise
return the new reference. This ends at some top-level function, typically
either a callback or a function in the public API. There, the new reference
is still bound, but is then explicitly ignored.
The state of a promise is never updated directly outside this module
[Basic_helpers]. All updates elsewhere are done through
[set_promise_state].
To avoid problems with type-level invariants not matching reality, data
structures do not store promises with concrete invariants -- except
resolved promises, which are immutable. Indeed, if one looks at
definitions of data structures that can store pending promises, e.g. the
[how_to_cancel] graph, the invariants are existentially quantified.
Note: it's possible to statically disallow the setting of the [state] field
by making type [promise] private. However, that seems to require writing a
signature that is a near-duplicate of [Main_internal_types], or some abuse
of functors. *)type'amay_now_be_proxy=|State_may_now_be_pending_proxy:('a,_,pending)promise->'amay_now_be_proxy[@@ocaml.unboxed]letmay_now_be_proxyp=State_may_now_be_pending_proxyp(* Many functions, for example [Lwt.bind] and [Lwt.join], create a fresh
pending promise [p] and return it to the user.
They do not return a corresponding resolver. That means that only the
function itself (typically, a callback registered by it) can resolve [p].
The only thing the user can do directly is try to cancel [p], but, since
[p] is not an initial promise, the cancellation attempt simply propagates
past [p] to [p]'s predecessors. If that eventually results in canceling
[p], it will be through the normal mechanisms of the function (e.g.
[Lwt.bind]'s callback).
As a result, the only possible state change, before the callback, is that
[p] may have become a proxy. Now,
- If [p] does not undergo this state change and become a proxy, it remains
an underlying, pending promise.
- If [p] does become a proxy, it will be a proxy for another promise [p']
created fresh by [Lwt.bind], to which this same argument applies. See
[make_into_proxy].
So, by induction on the length of the proxy ([Proxy _]) chain, at the time
the callback is called, [p] is either an underlying, pending promise, or a
proxy for a pending promise.
The cast
let State_may_now_be_pending_proxy p = may_now_be_proxy p in ...
encodes the possibility of this state change. It replaces a reference
p : ('a, underlying, pending)
with
p : ('a, $Unknown, pending)
and is typically seen at the beginning of callbacks registered by
[Lwt.bind] and similar functions.
The cast is a no-op cast. The introduction and immediate elimination of
[State_may_have_changed _] seems to be optimized away even on old versions
of OCaml. *)endopenBasic_helpersmoduleSequence_associated_storage:sig(* Public interface *)type'vkeyvalnew_key:unit->_keyvalget:'vkey->'voptionvalwith_value:'vkey->'voption->(unit->'b)->'b(* Internal interface *)valcurrent_storage:storagerefend=struct(* The idea behind sequence-associated storage is to preserve some values
during a call to [bind] or other sequential composition operation, and
restore those values in the callback function:
Lwt.with_value my_key (Some "foo") (fun () ->
p >|= fun () ->
assert (Lwt.get my_key = Some "foo"))
(* Will succeed even if this callback is called later. *)
Note that it does not matter that the callback is defined within an
argument of [with_value], i.e., this does the same:
let f = fun () -> assert (Lwt.get my_key = Some "foo") in
Lwt.with_value my_key (Some "foo") (fun () -> p >|= f)
All that matters is that the top-most sequencing operation (in this case,
map) is executed by that argument.
This is implemented using a single global heterogeneous key-value map.
Sequential composition functions snapshot this map when they are called,
and restore the snapshot right before calling the user's callback. The same
happens for cancel triggers added by [on_cancel].
Maintainer's note: I think using this mechanism should be discouraged in
new code. *)type'vkey={id:int;mutablevalue:'voption;}letnext_key_id=ref0letnew_key()=letid=!next_key_idinnext_key_id:=id+1;{id=id;value=None}letcurrent_storage=refStorage_map.emptyletgetkey=ifStorage_map.memkey.id!current_storagethenbeginletrefresh=Storage_map.findkey.id!current_storageinrefresh();letvalue=key.valueinkey.value<-None;valueendelseNoneletwith_valuekeyvaluef=letnew_storage=matchvaluewith|Some_->letrefresh=fun()->key.value<-valueinStorage_map.addkey.idrefresh!current_storage|None->Storage_map.removekey.id!current_storageinletsaved_storage=!current_storageincurrent_storage:=new_storage;tryletresult=f()incurrent_storage:=saved_storage;resultwithexn->current_storage:=saved_storage;raiseexnendincludeSequence_associated_storagemodulePending_callbacks:sig(* Mutating callback lists attached to pending promises *)valadd_implicitly_removed_callback:'acallbacks->'aregular_callback->unitvaladd_explicitly_removable_callback_to_each_of:'atlist->'aregular_callback->unitvaladd_explicitly_removable_callback_and_give_remove_function:'atlist->'aregular_callback->(unit->unit)valadd_cancel_callback:'acallbacks->(unit->unit)->unitvalmerge_callbacks:from:'acallbacks->into:'acallbacks->unitend=structletconcat_regular_callbacksl1l2=beginmatchl1,l2with|Regular_callback_list_empty,_->l2|_,Regular_callback_list_empty->l1|_,_->Regular_callback_list_concat(l1,l2)end[@ocaml.warning"-4"]letconcat_cancel_callbacksl1l2=beginmatchl1,l2with|Cancel_callback_list_empty,_->l2|_,Cancel_callback_list_empty->l1|_,_->Cancel_callback_list_concat(l1,l2)end[@ocaml.warning"-4"](* In a callback list, filters out cells of explicitly removable callbacks
that have been removed. *)letrecclean_up_callback_cells=function|Regular_callback_list_explicitly_removable_callback{contents=None}->Regular_callback_list_empty|Regular_callback_list_explicitly_removable_callback{contents=Some_}|Regular_callback_list_implicitly_removed_callback_|Regular_callback_list_emptyascallbacks->callbacks|Regular_callback_list_concat(l1,l2)->letl1=clean_up_callback_cellsl1inletl2=clean_up_callback_cellsl2inconcat_regular_callbacksl1l2(* See [clear_explicitly_removable_callback_cell] and [merge_callbacks]. *)letcleanup_throttle=42(* Explicitly removable callbacks are added (mainly) by [Lwt.choose] and its
similar functions. In [Lwt.choose [p; p']], if [p'] resolves first, the
callback added by [Lwt.choose] to [p] is removed.
The removal itself is accomplished when this function clears the reference
cell [cell], which contains the reference to that callback.
If [p] is a long-pending promise that repeatedly participates in
[Lwt.choose], perhaps in a loop, it will accumulate a large number of
cleared reference cells in this fashion. To avoid a memory leak, they must
be cleaned up. However, the cells are not cleaned up on *every* removal,
presumably because scanning the callback list that often, and rebuilding
it, can get expensive.
Cleanup is throttled by maintaining a counter, [cleanups_deferred], on each
pending promise. The counter is incremented each time this function wants
to clean the callback list (right after clearing a cell). When the counter
reaches [cleanup_throttle], the callback list is actually scanned and
cleared callback cells are removed. *)letclear_explicitly_removable_callback_cellcell~originally_added_to:ps=cell:=None;(* Go through the promises the cell had originally been added to, and either
defer a cleanup, or actually clean up their callback lists. *)ps|>List.iter(funp->letInternalp=to_internal_promisepinmatch(underlyingp).statewith(* Some of the promises may already have been resolved at the time this
function is called. *)|Fulfilled_->()|Rejected_->()|Pendingcallbacks->matchcallbacks.regular_callbackswith(* If the promise has only one regular callback, and it is removable, it
must have been the cell cleared in this function, above. In that
case, just set its callback list to empty. *)|Regular_callback_list_explicitly_removable_callback_->callbacks.regular_callbacks<-Regular_callback_list_empty(* Maintainer's note: I think this function shouldn't try to trigger a
cleanup in the first two cases, but I am preserving them for now, as
this is how the code was written in the past. *)|Regular_callback_list_empty|Regular_callback_list_implicitly_removed_callback_|Regular_callback_list_concat_->letcleanups_deferred=callbacks.cleanups_deferred+1inifcleanups_deferred>cleanup_throttlethenbegincallbacks.cleanups_deferred<-0;callbacks.regular_callbacks<-clean_up_callback_cellscallbacks.regular_callbacksendelsecallbacks.cleanups_deferred<-cleanups_deferred)(* Concatenates both kinds of callbacks on [~from] to the corresponding lists
of [~into]. The callback lists on [~from] are *not* then cleared, because
this function is called only by [Sequential_composition.make_into_proxy],
which immediately changes the state of [~from] and loses references to the
original callback lists.
The [cleanups_deferred] fields of both promises are summed, and if the sum
exceeds [cleanup_throttle], a cleanup of regular callbacks is triggered.
This is to prevent memory leaks; see
[clear_explicitly_removable_callback_cell]. *)letmerge_callbacks~from~into=letregular_callbacks=concat_regular_callbacksinto.regular_callbacksfrom.regular_callbacksinletcleanups_deferred=into.cleanups_deferred+from.cleanups_deferredinletregular_callbacks,cleanups_deferred=ifcleanups_deferred>cleanup_throttlethenclean_up_callback_cellsregular_callbacks,0elseregular_callbacks,cleanups_deferredinletcancel_callbacks=concat_cancel_callbacksinto.cancel_callbacksfrom.cancel_callbacksininto.regular_callbacks<-regular_callbacks;into.cancel_callbacks<-cancel_callbacks;into.cleanups_deferred<-cleanups_deferred(* General, internal, function for adding a regular callback. *)letadd_regular_callback_list_nodecallbacksnode=callbacks.regular_callbacks<-matchcallbacks.regular_callbackswith|Regular_callback_list_empty->node|Regular_callback_list_implicitly_removed_callback_|Regular_callback_list_explicitly_removable_callback_|Regular_callback_list_concat_asexisting->Regular_callback_list_concat(node,existing)letadd_implicitly_removed_callbackcallbacksf=add_regular_callback_list_nodecallbacks(Regular_callback_list_implicitly_removed_callbackf)(* Adds [callback] as removable to each promise in [ps]. The first promise in
[ps] to trigger [callback] removes [callback] from the other promises; this
guarantees that [callback] is called at most once. All the promises in [ps]
must be pending.
This is an internal function, indirectly used by the implementations of
[Lwt.choose] and related functions. *)letadd_explicitly_removable_callback_and_give_cellpsf=letreccell=ref(Someself_removing_callback_wrapper)andself_removing_callback_wrapperresult=clear_explicitly_removable_callback_cellcell~originally_added_to:ps;fresultinletnode=Regular_callback_list_explicitly_removable_callbackcellinps|>List.iter(funp->letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Pendingcallbacks->add_regular_callback_list_nodecallbacksnode|Fulfilled_->assertfalse|Rejected_->assertfalse);cellletadd_explicitly_removable_callback_to_each_ofpsf=ignore(add_explicitly_removable_callback_and_give_cellpsf)(* This is basically just to support [Lwt.protected], which needs to remove
the callback in circumstances other than the callback being called. *)letadd_explicitly_removable_callback_and_give_remove_functionpsf=letcell=add_explicitly_removable_callback_and_give_cellpsfinfun()->clear_explicitly_removable_callback_cellcell~originally_added_to:psletadd_cancel_callbackcallbacksf=(* Ugly cast :( *)letcast_cancel_callback:(unit->unit)->cancel_callback=Obj.magicinletf=cast_cancel_callbackfinletnode=Cancel_callback_list_callback(!current_storage,f)incallbacks.cancel_callbacks<-matchcallbacks.cancel_callbackswith|Cancel_callback_list_empty->node|Cancel_callback_list_callback_|Cancel_callback_list_remove_sequence_node_|Cancel_callback_list_concat_->Cancel_callback_list_concat(node,callbacks.cancel_callbacks)endopenPending_callbacksmoduleResolution_loop:sig(* All user-provided callbacks are called by Lwt only through this module. It
tracks the current callback stack depth, and decides whether each callback
call should be deferred or not. *)(* Internal interface used only in this module Lwt *)valresolve:?allow_deferring:bool->?maximum_callback_nesting_depth:int->('a,underlying,pending)promise->'aresolved_state->('a,underlying,resolved)state_changedvalrun_callbacks_or_defer_them:?allow_deferring:bool->?maximum_callback_nesting_depth:int->('acallbacks)->'aresolved_state->unitvalrun_callback_or_defer_it:?run_immediately_and_ensure_tail_call:bool->callback:(unit->'a)->if_deferred:(unit->'a*'bregular_callback*'bresolved_state)->'avalhandle_with_async_exception_hook:('a->unit)->'a->unit(* Internal interface exposed to other modules in Lwt *)valabandon_wakeups:unit->unit(* Public interface *)exceptionCanceledvalasync_exception_hook:(exn->unit)refend=struct(* When Lwt needs to call a callback, it enters the resolution loop. This
typically happens when Lwt sets the state of one promise to [Fulfilled _]
or [Rejected _]. The callbacks that were attached to the promise when it
was pending must then be called.
This also happens in a few other situations. For example, when [Lwt.bind]
is called on a promise, but that promise is already resolved, the callback
passed to [bind] must be called.
The callbacks triggered during the resolution loop might resolve more
promises, triggering more callbacks, and so on. This is what makes the
resolution loop a {e loop}.
Lwt generally tries to call each callback immediately. However, this can
lead to a progressive deepening of the call stack, until there is a stack
overflow. This can't be avoided by doing tail calls, because Lwt always
needs to do exception handling around callbacks calls: each callback call
is followed by an exception handler. Instead, what Lwt does is track the
current callback call depth. Once that depth reaches a certain number,
[default_maximum_callback_nesting_depth], defined below, further callbacks
are deferred into a queue instead. That queue is drained when Lwt exits
from the top-most callback call that triggered the resolution loop in the
first place.
To ensure that this deferral mechanism is always properly invoked, all
callbacks called by Lwt are called through one of three functions provided
by this module:
- [resolve], which calls all the callbacks associated to a pending promise
(and resolves it, changing its state).
- [run_callbacks_or_defer_them], which is internally used by [resolve] to
call callbacks that are in a record of type ['a callbacks], which records
are associated with pending promises. This function is exposed because
the current implementation of [Lwt.cancel] needs to call it directly.
Promise resolution and callback calling are separated in a unique way in
[cancel].
- [run_callback_or_defer_it], which is used by [Lwt.bind] and similar
functions to call single callbacks when the promises passed to
[Lwt.bind], etc., are already resolved.
Current Lwt actually has a messy mix of callback-calling behaviors. For
example, [Lwt.bind] is expected to always call its callback immediately,
while [Lwt.wakeup_later] is expected to defer all callbacks of the promise
resolved, {e unless} Lwt is not already inside the resolution loop.
We planned to make these behaviors uniform in Lwt 4.0.0, but decided
against it due to the risk of breaking users. See
- https://github.com/ocsigen/lwt/pull/500
- https://github.com/ocsigen/lwt/pull/519
As part of the preparation for the change, the above callback-invoking
functions support several optional arguments to emulate the various
behaviors. We decided not to remove this machinery, because we might want
to expose different APIs to Lwt in the future.
- [~allow_deferring:false] allows ignoring the callback stack depth, and
calling the callbacks immediately. This emulates the old resolution
behavior.
- [~maximum_callback_nesting_depth:1] allows limiting the depth which
triggers deferral on a per-call-site basis. This is used by
[Lwt.wakeup_later].
- [~run_immediately_and_ensure_tail_call:true] is like
[~allow_deferring:false], which ignores the callback stack depth.
However, to ensure that the callback is tail-called, Lwt doesn't even
update the callback stack depth for the benefit of *other* callback
calls. It just blindly calls the callback.
See discussion of callback-calling semantics in:
https://github.com/ocsigen/lwt/issues/329
* Context
The resolution loop effectively handles all promises that can be resolved
immediately, without blocking on I/O. A complete program that does I/O
calls [Lwt_main.run]. See "No I/O" in the Overview. *)letasync_exception_hook=ref(funexn->prerr_string"Fatal error: exception ";prerr_string(Printexc.to_stringexn);prerr_char'\n';Printexc.print_backtracestderr;flushstderr;exit2)lethandle_with_async_exception_hookfv=(* Note that this function does not care if [f] evaluates to a promise. In
particular, if [f v] evaluates to [p] and [p] is already rejected or will
be reject later, it is not the responsibility of this function to pass
the exception to [!async_exception_hook]. *)tryfvwithexn->!async_exception_hookexnexceptionCanceled(* Runs the callbacks (formerly) associated to a promise. Cancel callbacks are
run first, if the promise was canceled. These are followed by regular
callbacks.
The reason for the "formerly" is that the promise's state has already been
set to [Fulfilled _] or [Rejected _], so the callbacks are no longer
reachable through the promise reference. This is why the direct [callbacks]
record must be given to this function. *)letrun_callbacks(callbacks:'acallbacks)(result:'aresolved_state):unit=letrun_cancel_callbacksfs=letreciter_callback_listfsrest=matchfswith|Cancel_callback_list_empty->iter_listrest|Cancel_callback_list_callback(storage,f)->current_storage:=storage;handle_with_async_exception_hookf();iter_listrest|Cancel_callback_list_remove_sequence_nodenode->Lwt_sequence.removenode;iter_listrest|Cancel_callback_list_concat(fs,fs')->iter_callback_listfs(fs'::rest)anditer_listrest=matchrestwith|[]->()|fs::rest->iter_callback_listfsrestiniter_callback_listfs[]inletrun_regular_callbacksfs=letreciter_callback_listfsrest=matchfswith|Regular_callback_list_empty->iter_listrest|Regular_callback_list_implicitly_removed_callbackf->fresult;iter_listrest|Regular_callback_list_explicitly_removable_callback{contents=None}->iter_listrest|Regular_callback_list_explicitly_removable_callback{contents=Somef}->fresult;iter_listrest|Regular_callback_list_concat(fs,fs')->iter_callback_listfs(fs'::rest)anditer_listrest=matchrestwith|[]->()|fs::rest->iter_callback_listfsrestiniter_callback_listfs[]in(* Pattern matching is much faster than polymorphic comparison. *)letis_canceled=matchresultwith|RejectedCanceled->true|Rejected_->false|Fulfilled_->falseinifis_canceledthenrun_cancel_callbackscallbacks.cancel_callbacks;run_regular_callbackscallbacks.regular_callbacksletdefault_maximum_callback_nesting_depth=42letcurrent_callback_nesting_depth=ref0typedeferred_callbacks=Deferred:('acallbacks*'aresolved_state)->deferred_callbacks[@@ocaml.unboxed]letdeferred_callbacks:deferred_callbacksQueue.t=Queue.create()(* Before entering a resolution loop, it is necessary to take a snapshot of
the current state of sequence-associated storage. This is because many of
the callbacks that will be run will modify the storage. The storage is
restored to the snapshot when the resolution loop is exited. *)letenter_resolution_loop()=current_callback_nesting_depth:=!current_callback_nesting_depth+1;letstorage_snapshot=!current_storageinstorage_snapshotletleave_resolution_loop(storage_snapshot:storage):unit=if!current_callback_nesting_depth=1thenbeginwhilenot(Queue.is_emptydeferred_callbacks)doletDeferred(callbacks,result)=Queue.popdeferred_callbacksinrun_callbackscallbacksresultdoneend;current_callback_nesting_depth:=!current_callback_nesting_depth-1;current_storage:=storage_snapshotletrun_in_resolution_loopf=letstorage_snapshot=enter_resolution_loop()inletresult=f()inleave_resolution_loopstorage_snapshot;result(* This is basically a hack to fix https://github.com/ocsigen/lwt/issues/48.
If currently resolving promises, it immediately exits all recursive
entries of the resolution loop, goes to the top level, runs any deferred
callbacks, and exits the top-level resolution loop.
The name should probably be [abaondon_resolution_loop]. *)letabandon_wakeups()=if!current_callback_nesting_depth<>0thenleave_resolution_loopStorage_map.emptyletrun_callbacks_or_defer_them?(allow_deferring=true)?(maximum_callback_nesting_depth=default_maximum_callback_nesting_depth)callbacksresult=letshould_defer=allow_deferring&&!current_callback_nesting_depth>=maximum_callback_nesting_depthinifshould_deferthenQueue.push(Deferred(callbacks,result))deferred_callbackselserun_in_resolution_loop(fun()->run_callbackscallbacksresult)letresolve?allow_deferring?maximum_callback_nesting_depthpresult=letPendingcallbacks=p.stateinletp=set_promise_statepresultinrun_callbacks_or_defer_them?allow_deferring?maximum_callback_nesting_depthcallbacksresult;pletrun_callback_or_defer_it?(run_immediately_and_ensure_tail_call=false)~callback:f~if_deferred=ifrun_immediately_and_ensure_tail_callthenf()elseletshould_defer=!current_callback_nesting_depth>=default_maximum_callback_nesting_depthinifshould_deferthenbeginletimmediate_result,deferred_callback,deferred_result=if_deferred()inletdeferred_record={regular_callbacks=Regular_callback_list_implicitly_removed_callbackdeferred_callback;cancel_callbacks=Cancel_callback_list_empty;how_to_cancel=Not_cancelable;cleanups_deferred=0}inQueue.push(Deferred(deferred_record,deferred_result))deferred_callbacks;immediate_resultendelserun_in_resolution_loop(fun()->f())endincludeResolution_loopmoduleResolving:sigvalwakeup_later_result:'au->'alwt_result->unitvalwakeup_later:'au->'a->unitvalwakeup_later_exn:_u->exn->unitvalwakeup_result:'au->'alwt_result->unitvalwakeup:'au->'a->unitvalwakeup_exn:_u->exn->unitvalcancel:'at->unitend=struct(* Note that this function deviates from the "ideal" callback deferral
behavior: it runs callbacks directly on the current stack. It should
therefore be possible to cause a stack overflow using this function. *)letwakeup_generalapi_function_namerresult=letInternalp=to_internal_resolverrinletp=underlyingpinmatchp.statewith|RejectedCanceled->()|Fulfilled_->Printf.ksprintfinvalid_arg"Lwt.%s"api_function_name|Rejected_->Printf.ksprintfinvalid_arg"Lwt.%s"api_function_name|Pending_->letresult=state_of_resultresultinletState_may_have_changedp=resolve~allow_deferring:falsepresultinignorepletwakeup_resultrresult=wakeup_general"wakeup_result"rresultletwakeuprv=wakeup_general"wakeup"r(Result.Okv)letwakeup_exnrexn=wakeup_general"wakeup_exn"r(Result.Errorexn)letwakeup_later_generalapi_function_namerresult=letInternalp=to_internal_resolverrinletp=underlyingpinmatchp.statewith|RejectedCanceled->()|Fulfilled_->Printf.ksprintfinvalid_arg"Lwt.%s"api_function_name|Rejected_->Printf.ksprintfinvalid_arg"Lwt.%s"api_function_name|Pending_->letresult=state_of_resultresultinletState_may_have_changedp=resolve~maximum_callback_nesting_depth:1presultinignorepletwakeup_later_resultrresult=wakeup_later_general"wakeup_later_result"rresultletwakeup_laterrv=wakeup_later_general"wakeup_later"r(Result.Okv)letwakeup_later_exnrexn=wakeup_later_general"wakeup_later_exn"r(Result.Errorexn)typepacked_callbacks=|Packed:_callbacks->packed_callbacks[@@ocaml.unboxed](* Note that this function deviates from the "ideal" callback deferral
behavior: it runs callbacks directly on the current stack. It should
therefore be possible to cause a stack overflow using this function. *)letcancelp=letcanceled_result=RejectedCanceledin(* Walks the promise dependency graph backwards, looking for cancelable
initial promises, and cancels (only) them.
Found initial promises are canceled immediately, as they are found, by
setting their state to [Rejected Canceled]. This is to prevent them from
being "found twice" if they are reachable by two or more distinct paths
through the promise dependency graph.
The callbacks of these initial promises are then run, in a separate
phase. These callbacks propagate cancellation forwards to any dependent
promises. See "Cancellation" in the Overview. *)letpropagate_cancel:(_,_,_)promise->packed_callbackslist=funp->letreccancel_and_collect_callbacks:'a'u'c.packed_callbackslist->('a,'u,'c)promise->packed_callbackslist=fun(typec)callbacks_accumulator(p:(_,_,c)promise)->letp=underlyingpinmatchp.statewith(* If the promise is not still pending, it can't be canceled. *)|Fulfilled_->callbacks_accumulator|Rejected_->callbacks_accumulator|Pendingcallbacks->matchcallbacks.how_to_cancelwith|Not_cancelable->callbacks_accumulator|Cancel_this_promise->letState_may_have_changedp=set_promise_statepcanceled_resultinignorep;(Packedcallbacks)::callbacks_accumulator|Propagate_cancel_to_onep'->cancel_and_collect_callbackscallbacks_accumulatorp'|Propagate_cancel_to_severalps->List.fold_leftcancel_and_collect_callbackscallbacks_accumulatorpsincancel_and_collect_callbacks[]pinletInternalp=to_internal_promisepinletcallbacks=propagate_cancelpincallbacks|>List.iter(fun(Packedcallbacks)->run_callbacks_or_defer_them~allow_deferring:falsecallbackscanceled_result)endincludeResolvingmoduleTrivial_promises:sigvalreturn:'a->'atvalfail:exn->_tvalof_result:'alwt_result->'atvalreturn_unit:unittvalreturn_true:booltvalreturn_false:booltvalreturn_none:_optiontvalreturn_some:'a->'aoptiontvalreturn_ok:'a->('a,_)Result.resulttvalreturn_error:'e->(_,'e)Result.resulttvalreturn_nil:_listtvalfail_with:string->_tvalfail_invalid_arg:string->_tend=structletreturnv=to_public_promise{state=Fulfilledv}letof_resultresult=to_public_promise{state=state_of_resultresult}letfailexn=to_public_promise{state=Rejectedexn}letreturn_unit=return()letreturn_none=returnNoneletreturn_somex=return(Somex)letreturn_nil=return[]letreturn_true=returntrueletreturn_false=returnfalseletreturn_okx=return(Result.Okx)letreturn_errorx=return(Result.Errorx)letfail_withmsg=to_public_promise{state=Rejected(Failuremsg)}letfail_invalid_argmsg=to_public_promise{state=Rejected(Invalid_argumentmsg)}endincludeTrivial_promisesmodulePending_promises:sig(* Internal *)valnew_pending:how_to_cancel:how_to_cancel->('a,underlying,pending)promisevalpropagate_cancel_to_several:_tlist->how_to_cancel(* Initial pending promises (public) *)valwait:unit->'at*'auvaltask:unit->'at*'auvalwaiter_of_wakener:'au->'atvaladd_task_r:'auLwt_sequence.t->'atvaladd_task_l:'auLwt_sequence.t->'atvalprotected:'at->'atvalno_cancel:'at->'atend=structletnew_pending~how_to_cancel=letstate=Pending{regular_callbacks=Regular_callback_list_empty;cancel_callbacks=Cancel_callback_list_empty;how_to_cancel;cleanups_deferred=0;}in{state}letpropagate_cancel_to_severalps=(* Using a dirty cast here to avoid rebuilding the list :( Not bothering
with the invariants, because [Propagate_cancel_to_several] packs them,
and code that matches on [Propagate_cancel_to_several] doesn't care about
them anyway. *)letcast_promise_list:'atlist->('a,_,_)promiselist=Obj.magicinPropagate_cancel_to_several(cast_promise_listps)letwait()=letp=new_pending~how_to_cancel:Not_cancelableinto_public_promisep,to_public_resolverplettask()=letp=new_pending~how_to_cancel:Cancel_this_promiseinto_public_promisep,to_public_resolverpletwaiter_of_wakenerr=letInternalr=to_internal_resolverrinletp=rinto_public_promisepletcast_sequence_node(node:'auLwt_sequence.node)(_actual_content:('a,'u,'c)promise):('a,'u,'c)promiseLwt_sequence.node=Obj.magicnodeletadd_task_rsequence=letp=new_pending~how_to_cancel:Cancel_this_promiseinletnode=Lwt_sequence.add_r(to_public_resolverp)sequenceinletnode=cast_sequence_nodenodepinletPendingcallbacks=p.stateincallbacks.cancel_callbacks<-Cancel_callback_list_remove_sequence_nodenode;to_public_promisepletadd_task_lsequence=letp=new_pending~how_to_cancel:Cancel_this_promiseinletnode=Lwt_sequence.add_l(to_public_resolverp)sequenceinletnode=cast_sequence_nodenodepinletPendingcallbacks=p.stateincallbacks.cancel_callbacks<-Cancel_callback_list_remove_sequence_nodenode;to_public_promisepletprotectedp=letInternalp_internal=to_internal_promisepinmatch(underlyingp_internal).statewith|Fulfilled_->p|Rejected_->p|Pending_->letp'=new_pending~how_to_cancel:Cancel_this_promiseinletcallbackp_result=letState_may_now_be_pending_proxyp'=may_now_be_proxyp'inletp'=underlyingp'in(* In this callback, [p'] will either still itself be pending, or it
will have become a proxy for a pending promise. The reasoning for
this is almost the same as in the comment at [may_now_be_proxy]. The
differences are:
- [p'] *is* an initial promise, so it *can* get canceled. However, if
it does, the [on_cancel] handler installed below will remove this
callback.
- [p'] never gets passed to [make_into_proxy], the only effect of
which is that it cannot be the underlying promise of another
(proxy) promise. So, [p'] can only appear at the head of a chain of
[Proxy _] links, and it's not necessary to worry about whether the
inductive reasoning at [may_now_be_proxy] applies. *)letState_may_have_changedp'=resolve~allow_deferring:falsep'p_resultinignorep'inletremove_the_callback=add_explicitly_removable_callback_and_give_remove_function[p]callbackinletPendingp'_callbacks=p'.stateinadd_cancel_callbackp'_callbacksremove_the_callback;to_public_promisep'letno_cancelp=letInternalp_internal=to_internal_promisepinmatch(underlyingp_internal).statewith|Fulfilled_->p|Rejected_->p|Pendingp_callbacks->letp'=new_pending~how_to_cancel:Not_cancelableinletcallbackp_result=letState_may_now_be_pending_proxyp'=may_now_be_proxyp'inletp'=underlyingp'in(* In this callback, [p'] will either still itself be pending, or it
will have become a proxy for a pending promise. The reasoning for
this is as in [protected] and [may_now_be_proxy], but even simpler,
because [p'] is not cancelable. *)letState_may_have_changedp'=resolve~allow_deferring:falsep'p_resultinignorep'inadd_implicitly_removed_callbackp_callbackscallback;to_public_promisep'endincludePending_promisesmoduleSequential_composition:sig(* Main interface (public) *)valbind:'at->('a->'bt)->'btvalmap:('a->'b)->'at->'btvalcatch:(unit->'at)->(exn->'at)->'atvalfinalize:(unit->'at)->(unit->unitt)->'atvaltry_bind:(unit->'at)->('a->'bt)->(exn->'bt)->'bt(* Cancel callbacks (public). *)valon_cancel:'at->(unit->unit)->unit(* Non-promise callbacks (public) *)valon_success:'at->('a->unit)->unitvalon_failure:_t->(exn->unit)->unitvalon_termination:_t->(unit->unit)->unitvalon_any:'at->('a->unit)->(exn->unit)->unit(* Backtrace support (internal; for use by the PPX) *)valbacktrace_bind:(exn->exn)->'at->('a->'bt)->'btvalbacktrace_catch:(exn->exn)->(unit->'at)->(exn->'at)->'atvalbacktrace_finalize:(exn->exn)->(unit->'at)->(unit->unitt)->'atvalbacktrace_try_bind:(exn->exn)->(unit->'at)->('a->'bt)->(exn->'bt)->'btend=struct(* There are five primary sequential composition functions: [bind], [map],
[catch], [finalize], and [try_bind]. Of these, [try_bind] is the most
general -- all the others can be implemented in terms of it.
Lwt conflates concurrency with error propagation. If Lwt did not do this,
there would be only two primary functions: [bind] and [map], and, of these
two, [bind] is the most general. Since [bind] is the most relevant
specifically to concurrency, and is also the most familiar function in Lwt,
its implementation serves as a kind of "model" for the rest. It is the most
commented, and all the other functions follow a similar pattern to [bind].
Four of the primary functions have [backtrace_*] versions, which are not
truly public, and exist to support the PPX. [backtrace_map] does not exist
because the PPX does not need it.
The remaining four functions in this section attach "lower-level-ish"
non-promise-producing callbacks to promises: these are the [on_*]
functions. Of these, [on_any] is the most general. If Lwt did not conflate
concurrency with error handling, there would only be one: [on_success]. *)(* Makes [~user_provided_promise] into a proxy of [~outer_promise]. After
[make_into_proxy], these two promise references "behave identically."
Note that this is not symmetric: [user_provided_promise] always becomes the
proxy. [make_into_proxy] is called only by [bind] and similar functions in
this module. This means that:
- the only way for a promise to become a proxy is by being returned from
the callback given by the user to [bind], or a similar function, and
- the only way for a promise to become underlying for a promise other than
itself is to be the outer promise originally returned to the user from
[bind], or a similar function.
These two facts are important for reasoning about how and which promises
can become proxies, underlying, etc.; in particular, it is used in the
argument in [may_now_be_proxy] for correct predictions about state changes.
[~outer_promise] is always a pending promise when [make_into_proxy] is
called; for the explanation, see [may_now_be_proxy] (though the caller of
[make_into_proxy] always calls [underlying] first to pass the underlying
pending promise to [make_into_proxy]).
The reasons proxying is used, instead of adding a callback to
[~user_provided_promise] to resolve [~outer_promise] when the former
becomes resolved probably are:
- Promises have more behaviors than resolution. One would have to add a
cancellation handler to [~outer_promise] to propagate the cancellation
back to [~user_provided_promise], for example. It may be easier to just
think of them as the same promise.
- If using callbacks, resolving [~user_provided_promise] would not
immediately resolve [~outer_promise]. Another callback added to
[~user_provided_promise] might see [~user_provided_promise] resolved,
but [~outer_promise] still pending, depending on the order in which
callbacks are run. *)letmake_into_proxy(typec)~(outer_promise:('a,underlying,pending)promise)~(user_provided_promise:('a,_,c)promise):('a,underlying,c)state_changed=(* Using [p'] as it's the name used inside [bind], etc., for promises with
this role -- [p'] is the promise returned by the user's function. *)letp'=underlyinguser_provided_promiseinifidenticalp'outer_promisethenState_may_have_changedp'(* We really want to return [State_may_have_changed outer_promise], but
the reference through [p'] has the right type. *)elsematchp'.statewith|Fulfilled_->resolve~allow_deferring:falseouter_promisep'.state|Rejected_->resolve~allow_deferring:falseouter_promisep'.state|Pendingp'_callbacks->letPendingouter_callbacks=outer_promise.stateinmerge_callbacks~from:p'_callbacks~into:outer_callbacks;outer_callbacks.how_to_cancel<-p'_callbacks.how_to_cancel;letState_may_have_changedp'=set_promise_statep'(Proxyouter_promise)inignorep';State_may_have_changedouter_promise(* The state hasn't actually changed, but we still have to wrap
[outer_promise] for type checking. *)(* The state of [p'] may instead have changed -- it may have become a
proxy. However, callers of [make_into_proxy] don't know if
[user_provided_promise] was a proxy or not (that's why we call
underlying on it at the top of this function, to get [p']). We can
therefore take a dangerous shortcut and not bother returning a new
reference to [user_provided_promise] for shadowing. *)(* Maintainer's note: a lot of the code below can probably be deduplicated in
some way, especially if assuming Flambda. *)letbindpf=letInternalp=to_internal_promisepinletp=underlyingpin(* In case [Lwt.bind] needs to defer the call to [f], this function will be
called to create:
1. The promise, [p''], that must be returned to the caller immediately.
2. The callback that resolves [p''].
[Lwt.bind] defers the call to [f] in two circumstances:
1. The promise [p] is pending.
2. The promise [p] is fulfilled, but the current callback call nesting
depth is such that the call to [f] must go into the callback queue, in
order to avoid stack overflow.
Mechanism (2) is currently disabled. It may be used in an alternative Lwt
API.
Functions other than [Lwt.bind] have analogous deferral behavior. *)letcreate_result_promise_and_callback_if_deferred()=letp''=new_pending~how_to_cancel:(Propagate_cancel_to_onep)in(* The result promise is a fresh pending promise.
Initially, trying to cancel this fresh pending promise [p''] will
propagate the cancellation attempt to [p] (backwards through the
promise dependency graph). If/when [p] is fulfilled, Lwt will call the
user's callback [f] below, which will provide a new promise [p'], and
[p'] will become a proxy of [p'']. At that point, trying to cancel
[p''] will be equivalent to trying to cancel [p'], so the behavior will
depend on how the user obtained [p']. *)letsaved_storage=!current_storageinletcallbackp_result=matchp_resultwith|Fulfilledv->current_storage:=saved_storage;letp'=tryfvwithexn->failexninletInternalp'=to_internal_promisep'in(* Run the user's function [f]. *)letState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''in(* [p''] was an underlying promise when it was created above, but it
may have become a proxy by the time this code is being executed.
However, it is still either an underlying pending promise, or a
proxy for a pending promise. Therefore, [may_now_be_proxy] produces
a reference with the right type variables. We immediately get
[p'']'s current underlying promise. *)letState_may_have_changedp''=make_into_proxy~outer_promise:p''~user_provided_promise:p'inignorep''(* Make the outer promise [p''] behaviorally identical to the promise
[p'] returned by [f] by making [p'] into a proxy of [p'']. *)|Rejected_asp_result->letState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=resolve~allow_deferring:falsep''p_resultinignorep''in(to_public_promisep'',callback)inmatchp.statewith|Fulfilledv->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->fv)~if_deferred:(fun()->let(p'',callback)=create_result_promise_and_callback_if_deferred()in(p'',callback,p.state))|Rejected_asresult->to_public_promise{state=result}|Pendingp_callbacks->let(p'',callback)=create_result_promise_and_callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallback;p''letbacktrace_bindadd_locpf=letInternalp=to_internal_promisepinletp=underlyingpinletcreate_result_promise_and_callback_if_deferred()=letp''=new_pending~how_to_cancel:(Propagate_cancel_to_onep)inletsaved_storage=!current_storageinletcallbackp_result=matchp_resultwith|Fulfilledv->current_storage:=saved_storage;letp'=tryfvwithexn->fail(add_locexn)inletInternalp'=to_internal_promisep'inletState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=make_into_proxy~outer_promise:p''~user_provided_promise:p'inignorep''|Rejectedexn->letState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=resolve~allow_deferring:falsep''(Rejected(add_locexn))inignorep''in(to_public_promisep'',callback)inmatchp.statewith|Fulfilledv->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->fv)~if_deferred:(fun()->let(p'',callback)=create_result_promise_and_callback_if_deferred()in(p'',callback,p.state))|Rejectedexn->to_public_promise{state=Rejected(add_locexn)}|Pendingp_callbacks->let(p'',callback)=create_result_promise_and_callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallback;p''letmapfp=letInternalp=to_internal_promisepinletp=underlyingpinletcreate_result_promise_and_callback_if_deferred()=letp''=new_pending~how_to_cancel:(Propagate_cancel_to_onep)inletsaved_storage=!current_storageinletcallbackp_result=matchp_resultwith|Fulfilledv->current_storage:=saved_storage;letp''_result=tryFulfilled(fv)withexn->RejectedexninletState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=resolve~allow_deferring:falsep''p''_resultinignorep''|Rejected_asp_result->letState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=resolve~allow_deferring:falsep''p_resultinignorep''in(to_public_promisep'',callback)inmatchp.statewith|Fulfilledv->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->to_public_promise{state=tryFulfilled(fv)withexn->Rejectedexn})~if_deferred:(fun()->let(p'',callback)=create_result_promise_and_callback_if_deferred()in(p'',callback,p.state))|Rejected_asresult->to_public_promise{state=result}|Pendingp_callbacks->let(p'',callback)=create_result_promise_and_callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallback;p''letcatchfh=letp=tryf()withexn->failexninletInternalp=to_internal_promisepinletp=underlyingpinletcreate_result_promise_and_callback_if_deferred()=letp''=new_pending~how_to_cancel:(Propagate_cancel_to_onep)inletsaved_storage=!current_storageinletcallbackp_result=matchp_resultwith|Fulfilled_asp_result->letState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=resolve~allow_deferring:falsep''p_resultinignorep''|Rejectedexn->current_storage:=saved_storage;letp'=tryhexnwithexn->failexninletInternalp'=to_internal_promisep'inletState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=make_into_proxy~outer_promise:p''~user_provided_promise:p'inignorep''in(to_public_promisep'',callback)inmatchp.statewith|Fulfilled_->to_public_promisep|Rejectedexn->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->hexn)~if_deferred:(fun()->let(p'',callback)=create_result_promise_and_callback_if_deferred()in(p'',callback,p.state))|Pendingp_callbacks->let(p'',callback)=create_result_promise_and_callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallback;p''letbacktrace_catchadd_locfh=letp=tryf()withexn->failexninletInternalp=to_internal_promisepinletp=underlyingpinletcreate_result_promise_and_callback_if_deferred()=letp''=new_pending~how_to_cancel:(Propagate_cancel_to_onep)inletsaved_storage=!current_storageinletcallbackp_result=matchp_resultwith|Fulfilled_asp_result->letState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=resolve~allow_deferring:falsep''p_resultinignorep''|Rejectedexn->current_storage:=saved_storage;letp'=tryhexnwithexn->fail(add_locexn)inletInternalp'=to_internal_promisep'inletState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=make_into_proxy~outer_promise:p''~user_provided_promise:p'inignorep''in(to_public_promisep'',callback)inmatchp.statewith|Fulfilled_->to_public_promisep|Rejectedexn->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->h(add_locexn))~if_deferred:(fun()->let(p'',callback)=create_result_promise_and_callback_if_deferred()in(p'',callback,p.state))|Pendingp_callbacks->let(p'',callback)=create_result_promise_and_callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallback;p''lettry_bindff'h=letp=tryf()withexn->failexninletInternalp=to_internal_promisepinletp=underlyingpinletcreate_result_promise_and_callback_if_deferred()=letp''=new_pending~how_to_cancel:(Propagate_cancel_to_onep)inletsaved_storage=!current_storageinletcallbackp_result=matchp_resultwith|Fulfilledv->current_storage:=saved_storage;letp'=tryf'vwithexn->failexninletInternalp'=to_internal_promisep'inletState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=make_into_proxy~outer_promise:p''~user_provided_promise:p'inignorep''|Rejectedexn->current_storage:=saved_storage;letp'=tryhexnwithexn->failexninletInternalp'=to_internal_promisep'inletState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=make_into_proxy~outer_promise:p''~user_provided_promise:p'inignorep''in(to_public_promisep'',callback)inmatchp.statewith|Fulfilledv->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->f'v)~if_deferred:(fun()->let(p'',callback)=create_result_promise_and_callback_if_deferred()in(p'',callback,p.state))|Rejectedexn->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->hexn)~if_deferred:(fun()->let(p'',callback)=create_result_promise_and_callback_if_deferred()in(p'',callback,p.state))|Pendingp_callbacks->let(p'',callback)=create_result_promise_and_callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallback;p''letbacktrace_try_bindadd_locff'h=letp=tryf()withexn->failexninletInternalp=to_internal_promisepinletp=underlyingpinletcreate_result_promise_and_callback_if_deferred()=letp''=new_pending~how_to_cancel:(Propagate_cancel_to_onep)inletsaved_storage=!current_storageinletcallbackp_result=matchp_resultwith|Fulfilledv->current_storage:=saved_storage;letp'=tryf'vwithexn->fail(add_locexn)inletInternalp'=to_internal_promisep'inletState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=make_into_proxy~outer_promise:p''~user_provided_promise:p'inignorep''|Rejectedexn->current_storage:=saved_storage;letp'=tryhexnwithexn->fail(add_locexn)inletInternalp'=to_internal_promisep'inletState_may_now_be_pending_proxyp''=may_now_be_proxyp''inletp''=underlyingp''inletState_may_have_changedp''=make_into_proxy~outer_promise:p''~user_provided_promise:p'inignorep''in(to_public_promisep'',callback)inmatchp.statewith|Fulfilledv->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->f'v)~if_deferred:(fun()->let(p'',callback)=create_result_promise_and_callback_if_deferred()in(p'',callback,p.state))|Rejectedexn->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->h(add_locexn))~if_deferred:(fun()->let(p'',callback)=create_result_promise_and_callback_if_deferred()in(p'',callback,p.state))|Pendingp_callbacks->let(p'',callback)=create_result_promise_and_callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallback;p''letfinalizeff'=try_bindf(funx->bind(f'())(fun()->returnx))(fune->bind(f'())(fun()->faile))letbacktrace_finalizeadd_locff'=backtrace_try_bindadd_locf(funx->bind(f'())(fun()->returnx))(fune->bind(f'())(fun()->fail(add_loce)))leton_cancelpf=letInternalp=to_internal_promisepinletp=underlyingpinmatchp.statewith|RejectedCanceled->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->handle_with_async_exception_hookf())~if_deferred:(fun()->((),(fun_->handle_with_async_exception_hookf()),Fulfilled()))|Rejected_->()|Fulfilled_->()|Pendingcallbacks->add_cancel_callbackcallbacksfleton_successpf=letInternalp=to_internal_promisepinletp=underlyingpinletcallback_if_deferred()=letsaved_storage=!current_storageinfunresult->matchresultwith|Fulfilledv->current_storage:=saved_storage;handle_with_async_exception_hookfv|Rejected_->()inmatchp.statewith|Fulfilledv->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->handle_with_async_exception_hookfv)~if_deferred:(fun()->letcallback=callback_if_deferred()in((),callback,p.state))|Rejected_->()|Pendingp_callbacks->letcallback=callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallbackleton_failurepf=letInternalp=to_internal_promisepinletp=underlyingpinletcallback_if_deferred()=letsaved_storage=!current_storageinfunresult->matchresultwith|Fulfilled_->()|Rejectedexn->current_storage:=saved_storage;handle_with_async_exception_hookfexninmatchp.statewith|Fulfilled_->()|Rejectedexn->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->handle_with_async_exception_hookfexn)~if_deferred:(fun()->letcallback=callback_if_deferred()in((),callback,p.state))|Pendingp_callbacks->letcallback=callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallbackleton_terminationpf=letInternalp=to_internal_promisepinletp=underlyingpinletcallback_if_deferred()=letsaved_storage=!current_storageinfun_result->current_storage:=saved_storage;handle_with_async_exception_hookf()inmatchp.statewith|Fulfilled_->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->handle_with_async_exception_hookf())~if_deferred:(fun()->letcallback=callback_if_deferred()in((),callback,p.state))|Rejected_->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->handle_with_async_exception_hookf())~if_deferred:(fun()->letcallback=callback_if_deferred()in((),callback,p.state))|Pendingp_callbacks->letcallback=callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallbackleton_anypfg=letInternalp=to_internal_promisepinletp=underlyingpinletcallback_if_deferred()=letsaved_storage=!current_storageinfunresult->matchresultwith|Fulfilledv->current_storage:=saved_storage;handle_with_async_exception_hookfv|Rejectedexn->current_storage:=saved_storage;handle_with_async_exception_hookgexninmatchp.statewith|Fulfilledv->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->handle_with_async_exception_hookfv)~if_deferred:(fun()->letcallback=callback_if_deferred()in((),callback,p.state))|Rejectedexn->run_callback_or_defer_it~run_immediately_and_ensure_tail_call:true~callback:(fun()->handle_with_async_exception_hookgexn)~if_deferred:(fun()->letcallback=callback_if_deferred()in((),callback,p.state))|Pendingp_callbacks->letcallback=callback_if_deferred()inadd_implicitly_removed_callbackp_callbackscallbackendincludeSequential_compositionmoduleConcurrent_composition:sigvalasync:(unit->_t)->unitvalignore_result:_t->unitvalboth:'at->'bt->('a*'b)tvaljoin:unittlist->unittvalchoose:'atlist->'atvalpick:'atlist->'atvalnchoose:'atlist->'alisttvalnpick:'atlist->'alisttvalnchoose_split:'atlist->('alist*'atlist)tend=structexternalreraise:exn->'a="%reraise"letasyncf=letp=tryf()withexn->failexninletInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilled_->()|Rejectedexn->!async_exception_hookexn|Pendingp_callbacks->letcallbackresult=matchresultwith|Fulfilled_->()|Rejectedexn->!async_exception_hookexninadd_implicitly_removed_callbackp_callbackscallbackletignore_resultp=letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilled_->()|Rejectedexn->reraiseexn|Pendingp_callbacks->letcallbackresult=matchresultwith|Fulfilled_->()|Rejectedexn->!async_exception_hookexninadd_implicitly_removed_callbackp_callbackscallbackletjoinps=letp'=new_pending~how_to_cancel:(propagate_cancel_to_severalps)inletnumber_pending_in_ps=ref0inletjoin_result=ref(Fulfilled())in(* Callback attached to each promise in [ps] that is still pending at the
time [join] is called. *)letcallbacknew_result=letState_may_now_be_pending_proxyp'=may_now_be_proxyp'inbeginmatchnew_resultwith|Fulfilled()->()|Rejected_->(* For the first promise in [ps] to be rejected, set the result of the
[join] to rejected with the same exception.. *)match!join_resultwith|Fulfilled()->join_result:=new_result|Rejected_->()end;(* In all cases, decrement the number of promises still pending, and
resolve the [join] once all promises resolve. *)number_pending_in_ps:=!number_pending_in_ps-1;if!number_pending_in_ps=0thenbeginletp'=underlyingp'inletState_may_have_changedp'=resolve~allow_deferring:false(underlyingp')!join_resultinignorep'endin(* Attach the above callback. Simultaneously count how many pending promises
there are in [ps] (initially). If that number is zero, the [join] must
resolve immediately. *)letrecattach_callback_or_resolve_immediatelyps=matchpswith|[]->if!number_pending_in_ps=0thento_public_promise{state=!join_result}elseto_public_promisep'|p::ps->letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Pendingp_callbacks->number_pending_in_ps:=!number_pending_in_ps+1;add_implicitly_removed_callbackp_callbackscallback;attach_callback_or_resolve_immediatelyps|Rejected_asp_result->(* As in the callback above, but for already-resolved promises in
[ps]: reject the [join] with the same exception as in the first
rejected promise found. [join] still waits for any pending promises
before actually resolving, though. *)beginmatch!join_resultwith|Fulfilled()->join_result:=p_result;|Rejected_->()end;attach_callback_or_resolve_immediatelyps|Fulfilled()->attach_callback_or_resolve_immediatelypsinattach_callback_or_resolve_immediatelypsletbothp1p2=letv1=refNoneinletv2=refNoneinletp1'=bindp1(funv->v1:=Somev;return_unit)inletp2'=bindp2(funv->v2:=Somev;return_unit)injoin[p1';p2']|>map(fun()->match!v1,!v2with|Somev1,Somev2->v1,v2|_->assertfalse)(* Maintainer's note: the next few functions are helpers for [choose] and
[pick]. Perhaps they should be factored into some kind of generic
[choose]/[pick] implementation, which may actually be optimal anyway with
Flambda. *)letcount_resolved_promises_in(ps:_tlist)=letaccumulatetotalp=letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilled_->total+1|Rejected_->total+1|Pending_->totalinList.fold_leftaccumulate0ps(* Evaluates to the [n]th promise in [ps], among only those promises in [ps]
that are resolved. The caller is expected to ensure that there are at
least [n] resolved promises in [ps]. *)letrecnth_resolved(ps:'atlist)(n:int):'at=matchpswith|[]->assertfalse|p::ps->letInternalp'=to_internal_promisepinmatch(underlyingp').statewith|Pending_->nth_resolvedpsn|Fulfilled_->ifn<=0thenpelsenth_resolvedps(n-1)|Rejected_->ifn<=0thenpelsenth_resolvedps(n-1)(* Like [nth_resolved], but cancels all pending promises found while
traversing [ps]. *)letrecnth_resolved_and_cancel_pending(ps:'atlist)(n:int):'at=matchpswith|[]->assertfalse|p::ps->letInternalp'=to_internal_promisepinmatch(underlyingp').statewith|Pending_->cancelp;nth_resolved_and_cancel_pendingpsn|Fulfilled_->ifn<=0then(List.itercancelps;p)elsenth_resolved_and_cancel_pendingps(n-1)|Rejected_->ifn<=0then(List.itercancelps;p)elsenth_resolved_and_cancel_pendingps(n-1)(* The PRNG state is initialized with a constant to make non-IO-based programs
deterministic. *)(* Maintainer's note: is this necessary? *)letprng=lazy(Random.State.make[||])letchooseps=ifps=[]theninvalid_arg"Lwt.choose [] would return a promise that is pending forever";matchcount_resolved_promises_inpswith|0->letp=new_pending~how_to_cancel:(propagate_cancel_to_severalps)inletcallbackresult=letState_may_now_be_pending_proxyp=may_now_be_proxypinletp=underlyingpinletState_may_have_changedp=resolve~allow_deferring:falsepresultinignorepinadd_explicitly_removable_callback_to_each_ofpscallback;to_public_promisep|1->nth_resolvedps0|n->nth_resolvedps(Random.State.int(Lazy.forceprng)n)letpickps=ifps=[]theninvalid_arg"Lwt.pick [] would return a promise that is pending forever";matchcount_resolved_promises_inpswith|0->letp=new_pending~how_to_cancel:(propagate_cancel_to_severalps)inletcallbackresult=letState_may_now_be_pending_proxyp=may_now_be_proxypinList.itercancelps;letp=underlyingpinletState_may_have_changedp=resolve~allow_deferring:falsepresultinignorepinadd_explicitly_removable_callback_to_each_ofpscallback;to_public_promisep|1->nth_resolved_and_cancel_pendingps0|n->nth_resolved_and_cancel_pendingps(Random.State.int(Lazy.forceprng)n)(* If [nchoose ps] or [npick ps] found all promises in [ps] pending, the
callback added to each promise in [ps] eventually calls this function. The
function collects promises in [ps] that have become fulfilled, or finds one
promise in [ps] that has been rejected. It then returns the desired state
of the final promise: either the list of results collected, or the
exception found. *)letreccollect_fulfilled_promises_after_pending(results:'alist)(ps:'atlist):('alistresolved_state)=matchpswith|[]->Fulfilled(List.revresults)|p::ps->letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilledv->collect_fulfilled_promises_after_pending(v::results)ps|Rejected_asresult->result|Pending_->collect_fulfilled_promises_after_pendingresultspsletnchooseps=(* If at least one promise in [ps] is found fulfilled, this function is
called to find all such promises. *)ifps=[]theninvalid_arg"Lwt.nchoose [] would return a promise that is pending forever";letreccollect_already_fulfilled_promises_or_find_rejectedaccps=matchpswith|[]->return(List.revacc)|p::ps->letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilledv->collect_already_fulfilled_promises_or_find_rejected(v::acc)ps|Rejected_asresult->to_public_promise{state=result}|Pending_->collect_already_fulfilled_promises_or_find_rejectedaccpsin(* Looks for already-resolved promises in [ps]. If none are fulfilled or
rejected, adds a callback to all promises in [ps] (all of which are
pending). *)letreccheck_for_already_resolved_promisesps'=matchps'with|[]->letp=new_pending~how_to_cancel:(propagate_cancel_to_severalps)inletcallback_result=letState_may_now_be_pending_proxyp=may_now_be_proxypinletp=underlyingpinletresult=collect_fulfilled_promises_after_pending[]psinletState_may_have_changedp=resolve~allow_deferring:falsepresultinignorepinadd_explicitly_removable_callback_to_each_ofpscallback;to_public_promisep|p::ps->letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilledv->collect_already_fulfilled_promises_or_find_rejected[v]ps|Rejected_asresult->to_public_promise{state=result}|Pending_->check_for_already_resolved_promisespsinletp=check_for_already_resolved_promisespsinp(* See [nchoose]. This function differs only in having additional calls to
[cancel]. *)letnpickps=ifps=[]theninvalid_arg"Lwt.npick [] would return a promise that is pending forever";letreccollect_already_fulfilled_promises_or_find_rejectedaccps'=matchps'with|[]->List.itercancelps;return(List.revacc)|p::ps'->letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilledv->collect_already_fulfilled_promises_or_find_rejected(v::acc)ps'|Rejected_asresult->List.itercancelps;to_public_promise{state=result}|Pending_->collect_already_fulfilled_promises_or_find_rejectedaccps'inletreccheck_for_already_resolved_promisesps'=matchps'with|[]->letp=new_pending~how_to_cancel:(propagate_cancel_to_severalps)inletcallback_result=letState_may_now_be_pending_proxyp=may_now_be_proxypinletp=underlyingpinletresult=collect_fulfilled_promises_after_pending[]psinList.itercancelps;letState_may_have_changedp=resolve~allow_deferring:falsepresultinignorepinadd_explicitly_removable_callback_to_each_ofpscallback;to_public_promisep|p::ps'->letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilledv->collect_already_fulfilled_promises_or_find_rejected[v]ps'|Rejected_asresult->List.itercancelps;to_public_promise{state=result}|Pending_->check_for_already_resolved_promisesps'inletp=check_for_already_resolved_promisespsinp(* Same general pattern as [npick] and [nchoose]. *)letnchoose_splitps=ifps=[]theninvalid_arg"Lwt.nchoose_split [] would return a promise that is pending forever";letrecfinish(to_resolve:('alist*'atlist,underlying,pending)promise)(fulfilled:'alist)(pending:'atlist)(ps:'atlist):('alist*'atlist,underlying,resolved)state_changed=matchpswith|[]->resolve~allow_deferring:falseto_resolve(Fulfilled(List.revfulfilled,List.revpending))|p::ps->letInternalp_internal=to_internal_promisepinmatch(underlyingp_internal).statewith|Fulfilledv->finishto_resolve(v::fulfilled)pendingps|Rejected_asresult->resolve~allow_deferring:falseto_resolveresult|Pending_->finishto_resolvefulfilled(p::pending)psinletreccollect_already_resolved_promisesresultspendingps=matchpswith|[]->(* Maintainer's note: should the pending promise list also be
reversed? It is reversed in finish. *)return(List.revresults,pending)|p::ps->letInternalp_internal=to_internal_promisepinmatch(underlyingp_internal).statewith|Fulfilledv->collect_already_resolved_promises(v::results)pendingps|Rejected_asresult->to_public_promise{state=result}|Pending_->collect_already_resolved_promisesresults(p::pending)psinletreccheck_for_already_resolved_promisespending_accps'=matchps'with|[]->letp=new_pending~how_to_cancel:(propagate_cancel_to_severalps)inletcallback_result=letState_may_now_be_pending_proxyp=may_now_be_proxypinletp=underlyingpinletState_may_have_changedp=finishp[][]psinignorepinadd_explicitly_removable_callback_to_each_ofpscallback;to_public_promisep|p::ps'->letInternalp_internal=to_internal_promisepinmatch(underlyingp_internal).statewith|Fulfilledv->collect_already_resolved_promises[v]pending_accps'|Rejected_asresult->to_public_promise{state=result}|Pending_->check_for_already_resolved_promises(p::pending_acc)ps'inletp=check_for_already_resolved_promises[]psinpendincludeConcurrent_compositionmoduleMiscellaneous:sig(* Promise state query *)type'astate=|Returnof'a|Failofexn|Sleepvalstate:'at->'astatevalis_sleeping:'at->boolvaldebug_state_is:'astate->'at->boolt(* Function lifters *)valapply:('a->'bt)->'a->'btvalwrap:(unit->'b)->'btvalwrap1:('a1->'b)->('a1->'bt)valwrap2:('a1->'a2->'b)->('a1->'a2->'bt)valwrap3:('a1->'a2->'a3->'b)->('a1->'a2->'a3->'bt)valwrap4:('a1->'a2->'a3->'a4->'b)->('a1->'a2->'a3->'a4->'bt)valwrap5:('a1->'a2->'a3->'a4->'a5->'b)->('a1->'a2->'a3->'a4->'a5->'bt)valwrap6:('a1->'a2->'a3->'a4->'a5->'a6->'b)->('a1->'a2->'a3->'a4->'a5->'a6->'bt)valwrap7:('a1->'a2->'a3->'a4->'a5->'a6->'a7->'b)->('a1->'a2->'a3->'a4->'a5->'a6->'a7->'bt)(* Paused promises *)valpause:unit->unittvalwakeup_paused:unit->unitvalpaused_count:unit->intvalregister_pause_notifier:(int->unit)->unit(* Internal interface for other modules in Lwt *)valpoll:'at->'aoptionend=structtype'astate=|Returnof'a|Failofexn|Sleepexternalreraise:exn->'a="%reraise"letstatep=letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilledv->Returnv|Rejectedexn->Failexn|Pending_->Sleepletdebug_state_isexpected_statep=return(statep=expected_state)letis_sleepingp=letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Fulfilled_->false|Rejected_->false|Pending_->trueletpollp=letInternalp=to_internal_promisepinmatch(underlyingp).statewith|Rejectede->reraisee|Fulfilledv->Somev|Pending_->Noneletapplyfx=tryfxwithexn->failexnletwrapf=tryreturn(f())withexn->failexnletwrap1fx1=tryreturn(fx1)withexn->failexnletwrap2fx1x2=tryreturn(fx1x2)withexn->failexnletwrap3fx1x2x3=tryreturn(fx1x2x3)withexn->failexnletwrap4fx1x2x3x4=tryreturn(fx1x2x3x4)withexn->failexnletwrap5fx1x2x3x4x5=tryreturn(fx1x2x3x4x5)withexn->failexnletwrap6fx1x2x3x4x5x6=tryreturn(fx1x2x3x4x5x6)withexn->failexnletwrap7fx1x2x3x4x5x6x7=tryreturn(fx1x2x3x4x5x6x7)withexn->failexnletpause_hook=refignoreletpaused=Lwt_sequence.create()letpaused_count=ref0letpause()=letp=add_task_rpausedinincrpaused_count;!pause_hook!paused_count;pletwakeup_paused()=ifLwt_sequence.is_emptypausedthenpaused_count:=0elsebeginlettmp=Lwt_sequence.create()inLwt_sequence.transfer_rpausedtmp;paused_count:=0;Lwt_sequence.iter_l(funr->wakeupr())tmpendletregister_pause_notifierf=pause_hook:=fletpaused_count()=!paused_countendincludeMiscellaneousmoduleInfix=structlet(>>=)=bindlet(=<<)fp=bindpflet(>|=)pf=mapfplet(=|<)=maplet(<&>)pp'=join[p;p']let(<?>)pp'=choose[p;p']moduleLet_syntax=structletreturn=returnletmapt~f=mapftletbindt~f=bindtfletboth=bothmoduleOpen_on_rhs=structendendendincludeInfixmoduleLwt_result_type=structtype+'aresult='alwt_result(* Deprecated. *)letmake_valuev=Result.Okvletmake_errorexn=Result.ErrorexnendincludeLwt_result_type