12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559openCoreopenAsyncopenInt.Replace_polymorphic_comparemoduleNotification_channel=Types.Notification_channelmoduleColumn_metadata=Column_metadata(* https://www.postgresql.org/docs/current/protocol.html *)modulePgasync_error=struct(* The MLI introduces the [Pgasync_error] type; it's our place to store the generic
error, and the error code _if_ we know it.
Now, on the subject of error handling more generally, there are a few axes on which
one must make a decision here.
API
---
First and foremost, we need to decide whether or not we should expose a single api,
where all the result types are [Or_pgasync_error.t], or should we expose a more
convenient API that uses [Or_error.t], and an expert API that uses
[Or_pgasync_error.t]? We do the latter, because the vast majority of our users don't
care for the [postgres_error_code] and do use [Error.t] everywhere else.
Internal storage of connection-killing errors
---------------------------------------------
But more subtly, once you've agreed to provide [postgres_error_code], you need to be
very careful about when you provide it. If you return an error from [query] with
[postgres_error_code = Some _] in it, then the user will quite reasonably assume that
the query failed with that error. But that might not be true: if the connection
previously asynchronously failed due to some error (say, the connection closed),
we're going to return that error to the user.
This is a problem, because the error that closed the connection might imply that
a specific thing is wrong with their query, when actually it is not. I don't have
a great example, but suppose that there existed an error code relating to replication
that might cause the backend to die.
If the backend asynchronously dies, we'll close [t] and stash the reason it closed
inside [t.state]. If someone then tries to use [query t], we'll fetch the reason
out of [t.state], and return that as an error. But if we did this naively, then it
would look to the user like their specific query conflicted with replication, and
they might retry it, when actually that was unrelated.
The important thing here is: we should only return a [postgres_error_code] that has
semantic meaning to the user _if_ it relates to the operation they just performed,
not some operation that happened previously.
You could imagine two different ways of trying to achieve this:
1) Only ever stash [Error.t]s inside [state] (or similar). This ensures we'll never
fetch a [postgres_error_code] out of storage and give it to the user.
But this is tricky, because now we have a mixture of different error types in the
library, which is super annoying and messy, and you have to be careful about when you
use one vs. the other, and you have to keep converting back and forth.
Furthermore, you need to be careful that a refactor cause an error relating to
a specific query to be stashed and immediately retrieved, as that might erase the
error code, or passed through some generic function that erases
2) Use [Pgasync_error.t] everywhere within the library, but before every operation
like [query], examine [t.state] to see if the connection is already dead, and make
sure that when you return the stashed [Pgasync_error.t], you erase the code from it
(and tag it with a message like "query issued on closed connection; original error
was foo" too).
We do #2.
This is precisely achieved by the first line of [parse_and_start_executing_query].
We're using the simplifying argument that even if an error gets stashed in [t.state]
and then returned to the user at the end of [query], that error _wasn't_ there when
the query was started, so it's reasonable to associate it with the [query], which
sounds fine. *)modulePostgres_field=Protocol.Backend.Error_or_notice_fieldtypeserver_error=Protocol.Backend.ErrorResponse.t={error_code:string;all_fields:(Postgres_field.t*string)list}typet={error:Error.t;server_error:server_erroroption}letsexp_of_tt=[%sexp(t.error:Error.t)]letof_errore={error=e;server_error=None}letof_exne=of_error(Error.of_exne)letof_strings=of_error(Error.of_strings)letcreate_ss=of_error(Error.create_ss)letof_error_response(error_response:Protocol.Backend.ErrorResponse.t)=leterror=(* We omit some of the particularly noisy and uninteresting fields from the error
message that will be displayed to users.
Note that as-per [ErrorResponse.t]'s docstring, [Code] is included in this
list. *)letinteresting_fields=List.filtererror_response.all_fields~f:(fun(field,value)->matchfieldwith|File|Line|Routine|Severity_non_localised->false|Severity->(* ERROR is the normal case for an error message, so just omit it *)String.(<>)value"ERROR"|_->true)inError.create_s[%sexp(interesting_fields:(Postgres_field.t*string)list)]in{error;server_error=Someerror_response};;lettagt~tag={twitherror=Error.tagt.error~tag}letto_errort=t.errorletpostgres_error_codet=matcht.server_errorwith|None->None|Some{error_code;_}->Someerror_code;;letpostgres_fieldtfield=matcht.server_errorwith|None->None|Some{all_fields;_}->List.Assoc.findall_fieldsfield~equal:[%equal:Postgres_field.t];;letraiset=Error.raiset.errorendmoduleOr_pgasync_error=structtype'at=('a,Pgasync_error.t)Result.t[@@derivingsexp_of]letto_or_error=Result.map_error~f:Pgasync_error.to_errorletok_exn=function|Okx->x|Errore->Pgasync_error.raisee;;leterror_ss=Error(Pgasync_error.create_ss)leterror_strings=Error(Pgasync_error.of_strings)leterrorffmt=ksprintferror_stringfmtletof_exne=Error(Pgasync_error.of_exne)endmoduleExpert=struct(* Unlike the [Closing]...[Closed_gracefully] sequence, when an error occurs we must
immediately transition to the [Failed] state, to prevent any other operations being
attempted while we are releasing resources.
So that [closed_finished] can wait on all resources to be released in the [Failed]
case, we have this bool [resources_released].
The reason we don't model graceful close this way is because [Closing] can transition
to either of [Failed _] or [Closed_gracefully] states; putting a bool in that
constructor makes it sound like we never transition away from some hypothetical
[Closed _] constructor. *)typestate=|Open|Closing|Failedof{error:Pgasync_error.t;resources_released:bool}|Closed_gracefully[@@derivingsexp_of](* There are a couple of invariants we need to be careful of:
- changes to [t.state] need simultaneous ivar filling and/or reader & writer closing.
- [t.state = Open _] must imply [t.reader] is not closed
Further, we need to be careful to catch synchronous writer exceptions and write flush
messages to postgres.
We try and make sure these invariants hold by making some of the fields in [t] opaque
to stuff outside of this small module, so that by reading the short [module T] we can
be confident the whole module handles this correctly.
(It's not perfect, e.g. [t.reader] is exposed, but this isn't awful.) *)moduleT:sig(* [opaque] makes [writer] and [state_changed] inaccessible outside of [T]. *)type'aopaquetypet=private{writer:Writer.topaque;reader:Reader.t;mutablestate:state;state_changed:(unit,read_write)Bvar.topaque;sequencer:Query_sequencer.t;runtime_parameters:stringString.Table.t;notification_buses:(Pid.t->string->unit,read_write)Bus.tNotification_channel.Table.t;backend_key:Types.backend_keySet_once.t}[@@derivingsexp_of]valcreate_internal:Reader.t->Writer.t->tvalfailed:t->Pgasync_error.t->unittypeto_flush_or_not=|Not_required|Write_afterwards(** [flush_message] is a bit of a weird argument for this function/weird feature
to give this part of the code, but it reminds us to always consider the need
to flush. *)valcatch_write_errors:t->f:(Writer.t->unit)->flush_message:to_flush_or_not->unitvalbytes_pending_in_writer_buffer:t->intvalwait_for_writer_buffer_to_be_empty:t->unitDeferred.tvalstatus:t->statevalclose_finished:t->unitOr_pgasync_error.tDeferred.tvalclose:t->unitOr_pgasync_error.tDeferred.tend=structtype'aopaque='atypet={writer:(Writer.t[@sexp.opaque]);reader:(Reader.t[@sexp.opaque]);mutablestate:state;state_changed:(unit,read_write)Bvar.t;sequencer:Query_sequencer.t;runtime_parameters:stringString.Table.t;notification_buses:(Pid.t->string->unit,read_write)Bus.tNotification_channel.Table.t;backend_key:Types.backend_keySet_once.t}[@@derivingsexp_of]letset_statetstate=t.state<-state;Bvar.broadcastt.state_changed();;letcleanup_resourcest=let%bind()=Writer.closet.writer~force_close:Deferred.unitinlet%bind()=Reader.closet.readerinHashtbl.cleart.notification_buses;Hashtbl.cleart.runtime_parameters;return();;letfailedterror=matcht.statewith|Failed_|Closed_gracefully->()|Open|Closing->set_statet(Failed{error;resources_released=false});don't_wait_for(let%bind()=cleanup_resourcestinset_statet(Failed{error;resources_released=true});return());;letcreate_internalreaderwriter={reader;writer;state=Open;state_changed=Bvar.create();sequencer=Query_sequencer.create();runtime_parameters=String.Table.create();notification_buses=Notification_channel.Table.create~size:1();backend_key=Set_once.create()};;typeto_flush_or_not=|Not_required|Write_afterwardsletcatch_write_errorst~f~flush_message:unit=matchft.writerwith|exceptionexn->failedt(Pgasync_error.of_exnexn)|()->(matchflush_messagewith|Not_required->()|Write_afterwards->(matchProtocol.Frontend.Writer.flusht.writerwith|exceptionexn->failedt(Pgasync_error.of_exnexn)|()->()));;letbytes_pending_in_writer_buffert=Writer.bytes_to_writet.writerletwait_for_writer_buffer_to_be_emptyt=Writer.flushedt.writerletdo_close_gracefully_if_possiblet=matcht.statewith|Failed_|Closed_gracefully|Closing->return()|Open->set_statetClosing;letclosed_gracefully_deferred=Query_sequencer.enqueuet.sequencer(fun()->catch_write_errorst~flush_message:Not_required~f:(funwriter->Protocol.Frontend.Writer.terminatewriter);(* we'll get an exception if the reader is already closed. *)match%bindMonitor.try_with~run:`Now~rest:`Log(fun()->Reader.read_chart.reader)with|Ok`Eof->return(Ok())|Ok(`Okc)->return(Or_pgasync_error.errorf"EOF expected, but got '%c'"c)|Errorexn->return(Or_pgasync_error.of_exnexn))in(match%bindClock.with_timeout(sec1.)closed_gracefully_deferredwith|`Timeout->failedt(Pgasync_error.of_string"EOF expected, but instead stuck");return()|`Result(Errorerr)->failedterr;return()|`Result(Ok())->let%bind()=cleanup_resourcestin(matcht.statewith|Open|Closed_gracefully->assertfalse|Failed_->(* e.g., if the Writer had an asynchronous exn while the reader was being
closed, we might have called [failed t] and filled this ivar already. *)return()|Closing->set_statetClosed_gracefully;return()));;letstatust=t.stateletrecclose_finishedt=matcht.statewith|Failed{error;resources_released=true}->return(Errorerror)|Closed_gracefully->return(Ok())|Closing|Open|Failed{resources_released=false;_}->let%bind()=Bvar.waitt.state_changedinclose_finishedt;;letcloset=don't_wait_for(do_close_gracefully_if_possiblet);close_finishedt;;endincludeTletnotification_bustchannel=Hashtbl.find_or_addt.notification_buseschannel~default:(fun()->Bus.create[%here]Arity2~on_subscription_after_first_write:Allow~on_callback_raise:Error.raise);;(* [Message_reading] hides the helper functions of [read_messages] from the below. *)moduleMessage_reading:sigtype'ahandle_message_result=|Stopof'a|Continue|Protocol_errorofPgasync_error.t(** [read_messages] and will handle and dispatch the three asynchronous message types
for you; you should never see them.
[handle_message] is given a message type constructor, and an iobuf windowed on the
payload of the message (that is, the message-type-specific bytes; the window does
not include the type or message length header, they have already been consumed).
[handle_message] must consume (as in [Iobuf.Consume]) all of the bytes of the
message. *)type'ahandle_message=Protocol.Backend.constructor->(read,Iobuf.seek)Iobuf.t->'ahandle_message_resulttype'aread_messages_result=|Connection_closedofPgasync_error.t|Doneof'a(** NoticeResponse, and ParameterStatus and NotificationResponse are 'asynchronous
messages' and are not associated with a specific request-response conversation.
They can happen at any time.
[read_messages] handles these for you, and does not show them to your
[handle_message] callback. *)valread_messages:?pushback:(unit->unitDeferred.t)->t->handle_message:'ahandle_message->'aread_messages_resultDeferred.t(** If a message arrives while no request-response conversation (query or otherwise)
is going on, use [consume_one_asynchronous_message] to eat it.
If the message is one of those asynchronous messages, it will be handled. If it is
some other message type, that is a protocol error and the connection will be
closed. If the reader is actually at EOF, the connection will be closed with an
error. *)valconsume_one_asynchronous_message:t->unitread_messages_resultDeferred.tend=structtype'ahandle_message_result=|Stopof'a|Continue|Protocol_errorofPgasync_error.tletmax_message_length=letdefault=50*1024*1024inlazy(letoverride=(* for emergencies, in case 50M is a terrible default (seems very unlikely) *)letopenOption.Let_syntaxinlet%bindv=Unix.getenv"POSTGRES_ASYNC_OVERRIDE_MAX_MESSAGE_LENGTH"inlet%bindv=Option.try_with(fun()->Int.of_stringv)inlet%bindv=Option.some_if(v>0)vinSomevinOption.valueoverride~default);;type'ahandle_message=Protocol.Backend.constructor->(read,Iobuf.seek)Iobuf.t->'ahandle_message_result(* [Reader.read_one_iobuf_at_a_time] hands us 'chunks' to [handle_chunk]. A chunk is
an iobuf, where the window of the iobuf is set to the data that has been pulled
from the OS into the [Reader.t] but not yet consumed by the application. Said
window 'chunk' contains messages, potentially a partial message at the end in case
e.g. the bytes of a message is split across two calls to the [read] syscall.
Suppose the server has send us three 5-byte messages, "AAAAA", "BBBBB" and "CCCCC".
Each message has a 5-byte header (containing its type and length), represented by
'h'. Suppose further that the OS handed us 23 bytes when [Reader.t] called [read].
{v
0 bytes 10 bytes 20 bytes
| | |
hhhhhAAAAAhhhhhBBBBBhhh
/--------window---------\
v}
We want to give each message to [handle_message], one at a time. So, first we must
move the window on the iobuf so that it contains precisely the first message's
payload, i.e., the bytes 'AAAAA', i.e., this iobuf but with the window set to
[5,10). [Protocol.Backend.focus_on_message] does this by consuming the header
'hhhhh' to learn the type and length (thereby moving the window lo-bound up) and
then resizing it (moving the hi-bound down) like so:
{v
hhhhhAAAAAhhhhhBBBBBhhh
/-wdw-\
v}
Now we can call [handle_message] with this iobuf.
The contract we have with [handle_message] is that it will use calls to functions
in [Iobuf.Consume] to consume the bytes inside the window we set, that is, consume
the entire message. As it consumes bytes of the message, the window lo-bound is
moved up, so when it's done consuming the window is [10,10)
{v
hhhhhAAAAAhhhhhBBBBBhhh
/\
v}
and then [bounded_flip_hi] sets the window to [10,23) (since we remembered '23' in
[chunk_hi_bound]).
{v
hhhhhAAAAAhhhhhBBBBBhhh
/-----window--\
v}
then, we recurse (focus on [15,20), call [handle_message], etc.), and recurse
again.
At this point, reading the 'message length' field tells us that we only have part
of 'C' in our buffer (three out of 5+5 bytes). So, we return from [handle_chunk].
The contract we have with [Reader.t] is that we will leave in the iobuf the data
that we did not consume. It looks at the iobuf window to understand what's left,
keeps only those three bytes of 'C' in its internal buffer, and uses [read] to
refill the buffer before calling us again; hopefully at that point we'll have the
whole of message 'C'.
For this to work it is important that [handle_message] actually consumed the whole
message, and did not move the window hi-bound. We _could_ save those bounds and
forcibly restore them before doing the flip, however we prefer to instead just
check that [handle_message] did this because the functions that parse messages are
written to 'consume' them anyway, and checking that they actually did so is a good
validation that we correctly understand and are thinking about all fields in the
postgres protocol; not consuming all the bytes of the message indicates a potential
parsing bug. That's what [check_window_after_handle_message] does.
Note the check against [max_message_length]. Without it, if postgres told us that
it was sending us an extremely long message, we'd return from [handle_chunk]
constantly asking for refills, always thinking we needed more data and consuming
all the RAM. Instead, we will just bail out rather than asking for refills if
a message gets too long. The limit is arbitrary, but ought to be enough for anybody
(with an environment variable escape hatch if we turn out to be wrong). *)letcheck_window_after_handle_messagemessage_typeiobuf~message_hi_bound=match[%compare.equal:Iobuf.Hi_bound.t](Iobuf.Hi_bound.windowiobuf)message_hi_boundwith|false->Or_pgasync_error.error_s[%message"handle_message moved the hi-bound!"(message_type:Protocol.Backend.constructor)]|true->(matchIobuf.is_emptyiobufwith|false->Or_pgasync_error.error_s[%message"handle_message did not consume entire iobuf"(message_type:Protocol.Backend.constructor)~bytes_remaining:(Iobuf.lengthiobuf:int)]|true->Ok());;lethandle_chunk~handle_message~pushback=letstop_errorsexp=return(`Stop(`Protocol_error(Pgasync_error.create_ssexp)))inletrecloopiobuf=letchunk_hi_bound=Iobuf.Hi_bound.windowiobufinmatchProtocol.Backend.focus_on_messageiobufwith|ErrorIobuf_too_short_for_header->let%bind()=pushback()inreturn`Continue|Error(Iobuf_too_short_for_message{message_length})->(matchmessage_length>forcemax_message_lengthwith|true->stop_error[%message"Message too long"(message_length:int)]|false->let%bind()=pushback()inreturn`Continue)|Error(Nonsense_message_lengthv)->stop_error[%message"Nonsense message length in header"~_:(v:int)]|Error(Unknown_message_typeother)->stop_error[%message"Unrecognised message type character"(other:char)]|Okmessage_type->letmessage_hi_bound=Iobuf.Hi_bound.windowiobufinletres=letiobuf=Iobuf.read_onlyiobufinhandle_messagemessage_typeiobufinletres=matchreswith|Protocol_error_asres->res|(Stop_|Continue)asres->(matchcheck_window_after_handle_messagemessage_typeiobuf~message_hi_boundwith|Errorerr->Protocol_errorerr|Ok()->res)inIobuf.bounded_flip_hiiobufchunk_hi_bound;(matchreswith|Protocol_errorerr->return(`Stop(`Protocol_errorerr))|Continue->loopiobuf|Stops->return(`Stop(`Dones)))inStaged.stageloop;;letno_pushback()=return()type'aread_messages_result=|Connection_closedofPgasync_error.t|Doneof'aletcan't_read_because_closed=lazy(return(Connection_closed(Pgasync_error.of_string"can't read: closed or closing")));;letrun_reader?(pushback=no_pushback)t~handle_message=lethandle_chunk=Staged.unstage(handle_chunk~handle_message~pushback)inmatcht.statewith|Failed{error;_}->return(Connection_closederror)|Closed_gracefully|Closing->forcecan't_read_because_closed|Open->(* [t.state] = [Open _] implies [t.reader] is not closed, and so this function will
not raise. Note further that if the reader is closed _while_ we are reading, it
does not raise. *)let%bindres=Reader.read_one_iobuf_at_a_timet.reader~handle_chunkin(* In case of some failure (writer failure, including asynchronous) the
reader will be closed and reads will return Eof. So, after the read
result is determined, we check [t.state], so that we can give a better
error message than "unexpected EOF". *)(matcht.statewith|Failed{error;_}->return(Connection_closederror)|Closing|Closed_gracefully->forcecan't_read_because_closed|Open->letres=matchreswith|`Stoppeds->s|`Eof_with_unconsumed_datadata->`Protocol_error(Pgasync_error.create_s[%message"Unexpected EOF"~unconsumed_bytes:(String.lengthdata:int)])|`Eof->`Protocol_error(Pgasync_error.of_string"Unexpected EOF (no unconsumed messages)")in(matchreswith|`Protocol_errorerr->failedterr;return(Connection_closederr)|`Doneres->return(Doneres)));;lethandle_notice_responseiobuf=matchProtocol.Backend.NoticeResponse.consumeiobufwith|Errorerr->Error(Pgasync_error.of_errorerr)|Ok{error_code=_;all_fields}->Log.Global.sexp~level:`Info[%message"Postgres NoticeResponse"~_:(all_fields:(Protocol.Backend.Error_or_notice_field.t*string)list)];Ok();;lethandle_parameter_statustiobuf=matchProtocol.Backend.ParameterStatus.consumeiobufwith|Errorerr->Error(Pgasync_error.of_errorerr)|Ok{key;data}->Hashtbl.sett.runtime_parameters~key~data;Ok();;lethandle_notification_responsetiobuf=matchProtocol.Backend.NotificationResponse.consumeiobufwith|Errorerr->Error(Pgasync_error.of_errorerr)|Ok{pid;channel;payload}->letbus=notification_bustchannelin(matchBus.num_subscribersbuswith|0->Log.Global.sexp~level:`Error[%message"Postgres NotificationResponse on channel that no callbacks are listening \
to"(channel:Notification_channel.t)]|_->Bus.write2buspidpayload);Ok();;letread_messages?pushbackt~handle_message=letcontinue_if_ok=function|Errorerr->Protocol_errorerr|Ok()->Continueinlethandle_messagemessage_typeiobuf=match(message_type:Protocol.Backend.constructor)with|NoticeResponse->continue_if_ok(handle_notice_responseiobuf)|ParameterStatus->continue_if_ok(handle_parameter_statustiobuf)|NotificationResponse->continue_if_ok(handle_notification_responsetiobuf)|_->handle_messagemessage_typeiobufinrun_reader?pushbackt~handle_message;;letconsume_one_asynchronous_messaget=letstop_if_ok=function|Errorerr->Protocol_errorerr|Ok()->Stop()inlethandle_messagemessage_typeiobuf=match(message_type:Protocol.Backend.constructor)with|NoticeResponse->stop_if_ok(handle_notice_responseiobuf)|ParameterStatus->stop_if_ok(handle_parameter_statustiobuf)|NotificationResponse->stop_if_ok(handle_notification_responsetiobuf)|ErrorResponse->(* ErrorResponse is a weird one, because it's very much a message that's part of
the request-response part of the protocol, but secretly you will actually get
one if e.g. your backend is terminated by [pg_terminate_backend]. The fact
that this asynchronous error-response possibility isn't mentioned in the
"protocol-flow" docs kinda doesn't matter as the connection is being
destroyed.
Ultimately handling it specially here only changes the contents of the
[Protocol_error _] we return (i.e., behaviour is unch vs. the catch-all
case). Think of it as giving people better error messages, not a semantic
thing. *)lettag="ErrorResponse received asynchronously, assuming connection is dead"inleterror=matchProtocol.Backend.ErrorResponse.consumeiobufwith|Errorerr->Pgasync_error.of_errorerr|Okerr->Pgasync_error.of_error_responseerr|>Pgasync_error.tag~taginProtocol_errorerror|other->Protocol_error(Pgasync_error.create_s[%message"Unsolicited message from server outside of query conversation"(other:Protocol.Backend.constructor)])inrun_readert~handle_message;;endincludeMessage_readingletprotocol_error_of_errorerror=Protocol_error(Pgasync_error.of_errorerror)letprotocol_error_ssexp=Protocol_error(Pgasync_error.create_ssexp)letunexpected_msg_typemsg_typestate=protocol_error_s[%message"Unexpected message type"(msg_type:Protocol.Backend.constructor)(state:Sexp.t)];;letlogint~user~password~gss_krb_token~database=catch_write_errorst~flush_message:Not_required~f:(funwriter->Protocol.Frontend.Writer.startup_messagewriter{user;database});read_messagest~handle_message:(funmsg_typeiobuf->matchmsg_typewith|AuthenticationRequest->letmoduleQ=Protocol.Backend.AuthenticationRequestin(matchQ.consumeiobufwith|Errorerr->protocol_error_of_errorerr|OkOk->Continue|Ok(MD5Password{salt})->(matchpasswordwith|Somepassword->letds=Md5.to_hex(Md5.digest_strings)inletmd5_hex="md5"^d(d(password^user)^salt)incatch_write_errorst~flush_message:Not_required~f:(funwriter->Protocol.Frontend.Writer.password_messagewriter(Cleartext_or_md5_hexmd5_hex));Continue|None->lets="Server requested (md5) password, but no password was provided"inStop(Or_pgasync_error.error_strings))|OkGSS->(matchgss_krb_tokenwith|Sometoken->catch_write_errorst~flush_message:Not_required~f:(funwriter->Protocol.Frontend.Writer.password_messagewriter(Gss_binary_blobtoken));Continue|None->lets="Server requested GSS auth, but no gss_krb_token was provided"inStop(Or_pgasync_error.error_strings))|Okother->lets=sprintf!"Server wants unimplemented auth subtype: %{sexp:Q.t}"otherinStop(Or_pgasync_error.error_strings))|BackendKeyData->(matchProtocol.Backend.BackendKeyData.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Okdata->(matchSet_once.sett.backend_key[%here]datawith|Ok()->Continue|Error_->protocol_error_s[%sexp"duplicate BackendKeyData messages"]))|ReadyForQuery->letmoduleQ=Protocol.Backend.ReadyForQueryin(matchQ.consumeiobufwith|Errorerr->protocol_error_of_errorerr|OkIdle->Stop(Ok())|Okst->protocol_error_s[%message"Unexpected initial transaction status"(st:Q.t)])|ErrorResponse->(matchProtocol.Backend.ErrorResponse.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Okerr->Stop(Error(Pgasync_error.of_error_responseerr)))|msg_type->unexpected_msg_typemsg_type[%sexp"logging in"]);;(* So there are a few different ways one could imagine handling asynchronous messages,
and it basically breaks down to whether or not you have a single background job
pulling messages out of the reader, filtering away asynchronous messages, and putting
the remainder somewhere that request-response functions can pull them out of, or you
have some way of synchronising access to the reader and noticing if it lights up
while no request-response function is running.
We've gone for the latter, basically because efficiently handing messages from the
background to request-response functions is painful, and it puts a lot of complicated
state into [t] (the background job can't be separate, because it needs to know
whether or not [t.sequencer] is in use in order to classify an [ErrorResponse] as
part of request-response or as asynchronous).
The tradeoff is that we need a custom [Query_sequencer] module for the [when_idle]
function (though at least that can be a separate, isolated module), and we need to
use the fairly low level [interruptible_ready_to] on the [Reader]'s fd (which is
skethcy). All in, I think this turns out to be simpler, and it also isolates the
features that add async message support from the rest of the library. *)lethandle_asynchronous_messagest=Query_sequencer.when_idlet.sequencer(fun()->letmoduleQ=Query_sequencerinmatcht.statewith|Failed_|Closing|Closed_gracefully->returnQ.Finished|Open->let%bindres=matchReader.bytes_availablet.reader>0with|true->return`Ready|false->Fd.interruptible_ready_to(Reader.fdt.reader)`Read~interrupt:(Query_sequencer.other_jobs_are_waitingt.sequencer)in(matcht.statewith|Closed_gracefully|Closing|Failed_->returnQ.Finished|Open->(matchreswith|`Interrupted->returnQ.Call_me_when_idle_again|(`Bad_fd|`Closed)asres->(* [t.state = Open _] implies the reader is open. *)raise_s[%message"handle_asynchronous_messages"(res:[`Bad_fd|`Closed])]|`Ready->(match%bindconsume_one_asynchronous_messagetwith|Connection_closed_->returnQ.Finished|Done()->returnQ.Call_me_when_idle_again))));;letdefault_user=Memo.unit(fun()->Monitor.try_with_or_error~rest:`Log(fun()->Unix.getlogin()));;typepacked_where_to_connect=|Where_to_connect:'aTcp.Where_to_connect.t->packed_where_to_connectletdefault_where_to_connect=lazy(Where_to_connect(Tcp.Where_to_connect.of_file"/run/postgresql/.s.PGSQL.5432"));;letconnect?(interrupt=Deferred.never())?server?user?password?gss_krb_token~database()=let(Where_to_connectserver)=matchserverwith|Somex->Where_to_connectx|None->forcedefault_where_to_connectinlet%bind(user:stringOr_error.t)=matchuserwith|Someu->return(Oku)|None->default_user()inmatchuserwith|Errorerr->return(Error(Pgasync_error.of_errorerr))|Okuser->(match%bindMonitor.try_with_or_error~rest:`Log(fun()->Tcp.connect~interruptserver)with|Errorerr->return(Error(Pgasync_error.of_errorerr))|Ok(_sock,reader,writer)->lett=create_internalreaderwriterinletwriter_failed=Monitor.detach_and_get_next_error(Writer.monitorwriter)inuponwriter_failed(funexn->failedt(Pgasync_error.create_s[%message"Writer failed asynchronously"(exn:Exn.t)]));letlogin_failederr=failedterr;return(Errorerr)in(match%bindchoose[choiceinterrupt(fun()->`Interrupt);choice(logint~user~password~gss_krb_token~database)(funl->`Resultl)]with|`Interrupt->login_failed(Pgasync_error.of_string"login interrupted")|`Result(Connection_closederr)->return(Errorerr)|`Result(Done(Errorerr))->login_failederr|`Result(Done(Ok()))->handle_asynchronous_messagest;return(Okt)));;letwith_connection?interrupt?server?user?password?gss_krb_token~database~on_handler_exception:`Raisefunc=match%bindconnect?interrupt?server?user?password?gss_krb_token~database()with|Error_ase->returne|Okt->(match%bindMonitor.try_with~run:`Now~rest:`Raise(fun()->funct)with|Ok_asok_res->let%bind(_:unitOr_pgasync_error.t)=closetinreturnok_res|Errorexn->don't_wait_for(let%bind(_:unitOr_pgasync_error.t)=closetinreturn());raiseexn);;(* We use the extended query protocol rather than the simple query protocol because it
provides support for [parameters] (via the 'bind' message), and because it guarantees
that only a SQL statement is executed per query, which simplifies the state machine
we need to use to handle the responses significantly.
We don't currently take advantage of named statements or portals, we just use the
'unnamed' ones. We don't take advantage of the ability to bind a statement twice or
partially execute a portal; we just send the full parse-bind-describe-execute
sequence unconditionally in one go.
Ultimately, one needs to read the "message flow/extended query" section of the
documentation:
https://www.postgresql.org/docs/10/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
What follows is a brief summary, and then a description of how the different
functions fit in.
When any error occurs, the server will report the error and tehen discard messages
until a 'sync' message. This means that we can safely send
parse-bind-describe-execute in one go and then read all the responses, because if one
step fails the remainder will be ignored. So, we do this because it improves latency.
[parse_and_start_executing_query] instructs the server to parse the query, bind the
parameters, describe the type of the rows that will be produced. It then reads
response messages, walking through a state machine. The first three states are easy,
because we expect either the relevant response messages (ParseComplete, BindComplete,
RowDescription|NoData), or an ErrorResponse.
After that, things are trickier. We don't know what string was in the query and
whether or not the query emits no data (e.g., a DELETE), or if server is going to
send us rows, or if it's going to try and COPY IN/OUT, etc.
[parse_and_start_executing_query] detects which case we're in and returns a value of
type [setup_loop_result] indicating this. It's a little weird because we don't want
to consume any of the actual data (if applicable) in that funciton, just detect the
mode.
If the response to the [Describe] message was a [RowDescription], we know that the
response to the [Execute] will be some [DataRow]s, so we don't look at any messages
after that (i.e., in that case we don't look at the response to [Execute] in this
function).
However if the response to [Describe] was [NoData], then we _have_ to look at the
first message in response to the [Execute] message to know which case we're in.
_Fortunately_, the first message in all the other cases does not actually contain any
data, so we can do what we want (detect mode and return).
Here they are:
- [About_to_deliver_rows] e.g., a SELECT. We know we're in this case iff the response
to our [Describe] message was a [RowDescription], and we don't look at the response
to the [Execute] message; the caller is responsible for consuming the [DataRow]s.
- [About_to_copy_out] i.e., COPY ... TO STDOUT. We're here iff we get
a [CopyOutResponse] in response to our [Execute].
- [Ready_to_copy_in] i.e., COPY ... FROM STDIN. We're here iff we get
a [CopyInResponse]. We return, and the caller is responsible for sending a load of
[CopyData] messages etc.
- [EmptyQuery] i.e., the query was the empty string. We're here if we get
a [EmptyQueryResponse], at which point we're done (and can sync); we do not expect
a [CommandComplete].
- [Command_complete_without_output], e.g. a DELETE. We're here if we just get
[CommandComplete] straight away.
Note that it's also possible for the [About_to_deliver_rows] case to complete without
output. The difference is that one corresponds to receiving [RowDescription],
[CommandComplete] and the other [NoData], [CommandComplete]. *)typesetup_loop_state=|Parsing|Binding|Describing|Executing[@@derivingsexp_of]typesetup_loop_result=|About_to_deliver_rowsofProtocol.Backend.RowDescription.t|About_to_copy_out|Ready_to_copy_inofProtocol.Backend.CopyInResponse.t|Empty_query|Command_complete_without_output|Remote_reported_errorofPgasync_error.tletparse_and_start_executing_querytquery_string~parameters=matcht.statewith|Failed{error;_}->(* See comment at top of file regarding erasing error codes from this error. *)return(Connection_closed(Pgasync_error.create_s[%message"query issued against previously-failed connection"~original_error:(error:Pgasync_error.t)]))|Closing|Closed_gracefully->return(Connection_closed(Pgasync_error.of_string"query issued against connection closed by user"))|Open->catch_write_errorst~flush_message:Write_afterwards~f:(funwriter->Protocol.Frontend.Writer.parsewriter{destination=Types.Statement_name.unnamed;query=query_string};Protocol.Frontend.Writer.bindwriter{destination=Types.Portal_name.unnamed;statement=Types.Statement_name.unnamed;parameters};Protocol.Frontend.Writer.describewriter(PortalTypes.Portal_name.unnamed);Protocol.Frontend.Writer.executewriter{portal=Types.Portal_name.unnamed;limit=Unlimited});letstate=refParsinginletunexpected_msg_typemsg_type=unexpected_msg_typemsg_type[%sexp(state:setup_loop_stateref)]inread_messagest~handle_message:(funmsg_typeiobuf->match!state,msg_typewith|state,ErrorResponse->(matchProtocol.Backend.ErrorResponse.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Okerr->lettag=sprintf!"Postgres Server Error (state=%{sexp:setup_loop_state})"stateinleterr=Pgasync_error.of_error_responseerr|>Pgasync_error.tag~taginStop(Remote_reported_errorerr))|Parsing,ParseComplete->let()=Protocol.Backend.ParseComplete.consumeiobufinstate:=Binding;Continue|Parsing,msg_type->unexpected_msg_typemsg_type|Binding,BindComplete->let()=Protocol.Backend.BindComplete.consumeiobufinstate:=Describing;Continue|Binding,msg_type->unexpected_msg_typemsg_type|Describing,RowDescription->(matchProtocol.Backend.RowDescription.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Okdescription->Stop(About_to_deliver_rowsdescription))|Describing,NoData->let()=Protocol.Backend.NoData.consumeiobufinstate:=Executing;Continue|Describing,msg_type->unexpected_msg_typemsg_type|Executing,EmptyQueryResponse->let()=Protocol.Backend.EmptyQueryResponse.consumeiobufinStopEmpty_query|Executing,CommandComplete->(matchProtocol.Backend.CommandComplete.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Ok(_:string)->StopCommand_complete_without_output)|Executing,CopyInResponse->(matchProtocol.Backend.CopyInResponse.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Okdetails->Stop(Ready_to_copy_indetails))|Executing,CopyOutResponse->(matchProtocol.Backend.CopyOutResponse.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Ok_->StopAbout_to_copy_out)|Executing,CopyBothResponse->(* CopyBothResponse is only used for streaming replication, which we do not
initiate. *)unexpected_msg_typemsg_type|Executing,msg_type->unexpected_msg_typemsg_type);;(* really the return type of [f] should be [Protocol_error _ | Continue], but it's
convenient to re-use [handle_message_result] and use [Nothing.t] to 'delete' the
third constructor... *)letread_datarowst~pushback~f=read_messagest?pushback~handle_message:(funmsg_typeiobuf->matchmsg_typewith|DataRow->(matchf~datarow_iobuf:iobufwith|(Protocol_error_|Continue)asx->x|Stop(_:Nothing.t)->.)|ErrorResponse->(matchProtocol.Backend.ErrorResponse.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Okerr->leterr=Pgasync_error.of_error_responseerr|>Pgasync_error.tag~tag:"Error during query execution (despite parsing ok)"inStop(Errorerr))|CommandComplete->(matchProtocol.Backend.CommandComplete.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Ok(_:string)->Stop(Ok()))|msg_type->unexpected_msg_typemsg_type[%sexp"reading DataRows"]);;letdrain_datarowst=letcount=ref0inletf~datarow_iobuf=incrcount;Protocol.Backend.DataRow.skipdatarow_iobuf;Continueinmatch%bindread_datarowst~pushback:None~fwith|Connection_closed_ase->returne|Done(Ok()|Error_)->return(Done!count);;letdrain_copy_outt=letseen_copy_done=reffalseinread_messagest~handle_message:(funmsg_typeiobuf->matchmsg_typewith|ErrorResponse->(* [ErrorResponse] terminates copy-out mode; no separate [CopyDone] is required. *)(matchProtocol.Backend.ErrorResponse.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Ok_->Stop())|CopyData->(match!seen_copy_donewith|true->protocol_error_s[%sexp"CopyData message after CopyDone?"]|false->Protocol.Backend.CopyData.skipiobuf;Continue)|CopyDone->Protocol.Backend.CopyDone.consumeiobuf;seen_copy_done:=true;Continue|CommandComplete->(matchProtocol.Backend.CommandComplete.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Ok(_:string)->(match!seen_copy_donewith|false->protocol_error_s[%sexp"CommandComplete before CopyDone?"]|true->Stop()))|msg_type->unexpected_msg_typemsg_type[%sexp"draining copy-out mode"]);;letabort_copy_int~reason=catch_write_errorst~flush_message:Write_afterwards~f:(funwriter->Protocol.Frontend.Writer.copy_failwriter{reason});read_messagest~handle_message:(funmsg_typeiobuf->matchmsg_typewith|ErrorResponse->(matchProtocol.Backend.ErrorResponse.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Ok_->Stop())|msg_type->unexpected_msg_typemsg_type[%sexp"aborting copy-in mode"]);;letclose_unnamed_portal_and_statement_and_sync_after_queryt=catch_write_errorst~flush_message:Not_required~f:(funwriter->(* Closing the portal & statement is not strictly necessary. The Portal is closed
anyway when the current transaction ends (be it due to the sync message, or the
COMMIT if we're inside a BEGIN/END block), and they're implicitly closed (before
being re-opened) when the unnamed portal/statement is next used (e.g., by the
next query). However, doing so frees resources more eagerly, which is good. *)Protocol.Frontend.Writer.closewriter(PortalTypes.Portal_name.unnamed);(* Note that closing a statement implicitly closes any open portals that were
constructed from it. *)Protocol.Frontend.Writer.closewriter(StatementTypes.Statement_name.unnamed);(* The sync message obviates the need for a flush message.
Note that if we're not within a BEGIN/END block, this commits the transaction. *)Protocol.Frontend.Writer.syncwriter);read_messagest~handle_message:(funmsg_typeiobuf->matchmsg_typewith|CloseComplete->Protocol.Backend.CloseComplete.consumeiobuf;Continue|ReadyForQuery->(matchProtocol.Backend.ReadyForQuery.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Ok(Idle|In_transaction|In_failed_transaction)->Stop())|ErrorResponse->(matchProtocol.Backend.ErrorResponse.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Okerr->leterr=Pgasync_error.of_error_responseerr|>Pgasync_error.tag~tag:"response to close/sync was an error"in(* The "message flow" part of the postgres protocol docs says:
It is not an error to issue Close against a nonexistent statement or portal
name. The response is normally CloseComplete, but could be ErrorResponse if
some difficulty is encountered while releasing resources.
We therefore treat ErrorResponse here as "the backend is hosed, kill the
connection". *)Protocol_errorerr)|msg_type->unexpected_msg_typemsg_type[%sexp"synchronising after query"]);;letinternal_queryt?(parameters=[||])?pushback~handle_columnsquery_string~handle_row=let%bindresult=match%bindparse_and_start_executing_querytquery_string~parameterswith|Connection_closed_aserr->returnerr|DoneAbout_to_copy_out->let%bind(Connection_closed_|Done())=drain_copy_outtinreturn(Done(Or_pgasync_error.error_s[%message"COPY TO STDOUT is not appropriate for [Postgres_async.query]"]))|Done(Ready_to_copy_in_)->letreason="COPY FROM STDIN is not appropriate for [Postgres_async.query]"inlet%bind(Connection_closed_|Done())=abort_copy_int~reasoninreturn(Done(Or_pgasync_error.error_stringreason))|Done(Remote_reported_errorerror)->return(Done(Errorerror))|Done(Empty_query|Command_complete_without_output)->return(Done(Ok()))|Done(About_to_deliver_rowsdescription)->handle_columnsdescription;letcolumn_names=Array.mapdescription~f:Column_metadata.nameinread_datarowst~pushback~f:(fun~datarow_iobuf:iobuf->matchProtocol.Backend.DataRow.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Okvalues->(matchArray.lengthvalues=Array.lengthcolumn_nameswith|false->protocol_error_s[%message"number of columns in DataRow message did not match RowDescription"(column_names:stringarray)(values:stringoptionarray)]|true->handle_row~column_names~values;Continue))in(* Note that if we're not within a BEGIN/END block, this commits the transaction. *)let%bindsync_result=close_unnamed_portal_and_statement_and_sync_after_querytinmatchresult,sync_resultwith|Connection_closederr,_|Done(Errorerr),_|_,Connection_closederr->return(Errorerr)|Done(Ok()),Done()->return(Ok());;(* [query] wraps [internal_query], acquiring the sequencer lock and keeping the user's
exceptions away from trashing our state. *)letqueryt?parameters?pushback?handle_columnsquery_string~handle_row=letcallback_raised=reffalseinletwrap_callback~f=match!callback_raisedwith|true->()|false->(matchf()with|()->()|exceptionexn->(* it's important that we drain (and discard) the remaining rows. *)Monitor.send_exn(Monitor.current())exn;callback_raised:=true)inlethandle_columnsdescription=matchhandle_columnswith|None->()|Somehandler->wrap_callback~f:(fun()->handlerdescription)inlethandle_row~column_names~values=wrap_callback~f:(fun()->handle_row~column_names~values)inlet%bindresult=Query_sequencer.enqueuet.sequencer(fun()->internal_queryt?parameters?pushback~handle_columnsquery_string~handle_row)inmatch!callback_raisedwith|true->Deferred.never()|false->returnresult;;(* [query_expect_no_data] doesn't need the separation that [internal_query] and [query]
have because there's no callback to wrap and it's easy enough to just throw the
sequencer around the whole function. *)letquery_expect_no_datat?(parameters=[||])query_string=Query_sequencer.enqueuet.sequencer(fun()->let%bindresult=match%bindparse_and_start_executing_querytquery_string~parameterswith|Connection_closed_aserr->returnerr|DoneAbout_to_copy_out->let%bind(Connection_closed_|Done())=drain_copy_outtinreturn(Done(Or_pgasync_error.error_s[%message"[Postgres_async.query_expect_no_data]: query attempted COPY OUT"]))|Done(Ready_to_copy_in_)->letreason="[Postgres_async.query_expect_no_data]: query attempted COPY IN"inlet%bind(Connection_closed_|Done())=abort_copy_int~reasoninreturn(Done(Or_pgasync_error.error_stringreason))|Done(About_to_deliver_rows_)->(match%binddrain_datarowstwith|Connection_closed_aserr->returnerr|Done0->return(Done(Ok()))|Done_->return(Done(Or_pgasync_error.error_s[%message"query unexpectedly produced rows"])))|Done(Remote_reported_errorerror)->return(Done(Errorerror))|Done(Empty_query|Command_complete_without_output)->return(Done(Ok()))inlet%bindsync_result=close_unnamed_portal_and_statement_and_sync_after_querytinmatchresult,sync_resultwith|Connection_closederr,_|Done(Errorerr),_|_,Connection_closederr->return(Errorerr)|Done(Ok()),Done()->return(Ok()));;type'afeed_data_result=|Abortof{reason:string}|WaitofunitDeferred.t|Dataof'a|Finishedletquery_did_not_initiate_copy_in=lazy(return(Done(Or_pgasync_error.error_s[%message"Query did not initiate copy-in mode"])));;(* [internal_copy_in_raw] is to [copy_in_raw] as [internal_query] is to [query].
Sequencer and exception handling. *)letinternal_copy_in_rawt?(parameters=[||])query_string~feed_data=let%bindresult=match%bindparse_and_start_executing_querytquery_string~parameterswith|Connection_closed_aserr->returnerr|DoneAbout_to_copy_out->let%bind(Connection_closed_|Done())=drain_copy_outtinreturn(Done(Or_pgasync_error.error_s[%message"COPY TO STDOUT is not appropriate for [Postgres_async.query]"]))|Done(Empty_query|Command_complete_without_output)->forcequery_did_not_initiate_copy_in|Done(About_to_deliver_rows_)->let%bind(Connection_closed_|Done(_:int))=drain_datarowstinforcequery_did_not_initiate_copy_in|Done(Remote_reported_errorerror)->return(Done(Errorerror))|Done(Ready_to_copy_in_)->letsent_copy_done=reffalseinlet(response_deferred:unitOr_pgasync_error.tread_messages_resultDeferred.t)=read_messagest~handle_message:(funmsg_typeiobuf->matchmsg_typewith|ErrorResponse->(matchProtocol.Backend.ErrorResponse.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Okerr->Stop(Error(Pgasync_error.of_error_responseerr)))|CommandComplete->(matchProtocol.Backend.CommandComplete.consumeiobufwith|Errorerr->protocol_error_of_errorerr|Ok(_res:string)->(match!sent_copy_donewith|false->protocol_error_s[%sexp"CommandComplete response before we sent CopyDone?"]|true->Stop(Ok())))|msg_type->unexpected_msg_typemsg_type[%sexp"copying in"])inlet%bind()=letrecloop()=matchfeed_data()with|Abort{reason}->catch_write_errorst~flush_message:Write_afterwards~f:(funwriter->Protocol.Frontend.Writer.copy_failwriter{reason});return()|Waitdeferred->continue~after:deferred|Datapayload->catch_write_errorst~flush_message:Not_required~f:(funwriter->Protocol.Frontend.Writer.copy_datawriterpayload);(matchbytes_pending_in_writer_buffert>128*1024*1024with|false->loop()|true->continue~after:(wait_for_writer_buffer_to_be_emptyt))|Finished->sent_copy_done:=true;catch_write_errorst~flush_message:Write_afterwards~f:(funwriter->Protocol.Frontend.Writer.copy_donewriter);return()andcontinue~after=let%bind()=afterin(* check for early termination; not necessary for correctness, just nice. *)matchDeferred.peekresponse_deferredwith|None->loop()|Some(Connection_closed_|Done(Error_))->return()|Some(Done(Ok()))->failwith"BUG: response_deferred is (Done (Ok ())), but we never set \
!sent_copy_done?"inloop()inresponse_deferredinlet%bindsync_result=close_unnamed_portal_and_statement_and_sync_after_querytinmatchresult,sync_resultwith|Connection_closederr,_|Done(Errorerr),_|_,Connection_closederr->return(Errorerr)|Done(Ok()),Done()->return(Ok());;letcopy_in_rawt?parametersquery_string~feed_data=letcallback_raised=reffalseinletfeed_data()=(* We shouldn't be called again after [Abort] *)assert(not!callback_raised);matchfeed_data()with|(Abort_|Wait_|Data_|Finished)asx->x|exceptionexn->Monitor.send_exn(Monitor.current())exn;callback_raised:=true;Abort{reason="feed_data callback raised"}inlet%bindresult=Query_sequencer.enqueuet.sequencer(fun()->internal_copy_in_rawt?parametersquery_string~feed_data)inmatch!callback_raisedwith|true->Deferred.never()|false->returnresult;;(* [copy_in_rows] builds on [copy_in_raw] and clearly doesn't raise exceptions in the
callback wrapper, so we can just rely on [copy_in_raw] to handle sequencing and
exceptions here. *)letcopy_in_rowst~table_name~column_names~feed_data=letquery_string=String_escaping.Copy_in.query~table_name~column_namesincopy_in_rawtquery_string~feed_data:(fun()->matchfeed_data()with|(Abort_|Wait_|Finished)asx->x|Datarow->Data(String_escaping.Copy_in.row_to_stringrow));;letlisten_to_notificationst~channel~f=letquery=String_escaping.Listen.query~channelinletchannel=Notification_channel.of_stringchannelinletbus=notification_bustchannelinletmonitor=Monitor.current()inletsubscriber=Bus.subscribe_exnbus[%here]~f:(funpidpayload->f~pid~payload)~on_callback_raise:(funerror->Monitor.send_exnmonitor(Error.to_exnerror))inignore(subscriber:_Bus.Subscriber.t);query_expect_no_datatquery;;endtypet=Expert.t[@@derivingsexp_of]letconnect?interrupt?server?user?password?gss_krb_token~database()=Expert.connect?interrupt?server?user?password?gss_krb_token~database()>>|Or_pgasync_error.to_or_error;;letcloset=Expert.closet>>|Or_pgasync_error.to_or_errorletclose_finishedt=Expert.close_finishedt>>|Or_pgasync_error.to_or_errortypestate=|Open|Closing|Failedof{error:Error.t;resources_released:bool}|Closed_gracefully[@@derivingsexp_of]letstatust=matchExpert.statustwith|Failed{error;resources_released}->Failed{error=Pgasync_error.to_errorerror;resources_released}|Open->Open|Closing->Closing|Closed_gracefully->Closed_gracefully;;letwith_connection?interrupt?server?user?password?gss_krb_token~database~on_handler_exception:`Raisefunc=Expert.with_connection?interrupt?server?user?password?gss_krb_token~database~on_handler_exception:`Raisefunc>>|Or_pgasync_error.to_or_error;;type'afeed_data_result='aExpert.feed_data_result=|Abortof{reason:string}|WaitofunitDeferred.t|Dataof'a|Finishedletcopy_in_rawt?parametersquery_string~feed_data=Expert.copy_in_rawt?parametersquery_string~feed_data>>|Or_pgasync_error.to_or_error;;letcopy_in_rowst~table_name~column_names~feed_data=Expert.copy_in_rowst~table_name~column_names~feed_data>>|Or_pgasync_error.to_or_error;;letlisten_to_notificationst~channel~f=Expert.listen_to_notificationst~channel~f>>|Or_pgasync_error.to_or_error;;letquery_expect_no_datat?parametersquery_string=Expert.query_expect_no_datat?parametersquery_string>>|Or_pgasync_error.to_or_error;;letqueryt?parameters?pushback?handle_columnsquery_string~handle_row=Expert.queryt?parameters?pushback?handle_columnsquery_string~handle_row>>|Or_pgasync_error.to_or_error;;modulePrivate=structletpgasync_error_of_error=Pgasync_error.of_errorend