123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274(** WebSocket connection management with TLS and reconnection support.
Uses tls-eio for pure-OCaml TLS, avoiding OpenSSL dependencies. *)letsrc=Logs.Src.create"polymarket.wss"~doc:"WebSocket connection"moduleLog=(valLogs.src_logsrc:Logs.LOG)(** Connection state *)typestate=Disconnected|Connecting|Connected|Closing|Closedtypeconfig={host:string;port:int;resource:string;initial_backoff:float;max_backoff:float;ping_interval:float;}(** Connection configuration *)letdefault_config~host~resource={host;port=443;resource;initial_backoff=1.0;max_backoff=60.0;ping_interval=30.0;}(** Network wrapper to hide type parameter *)typenet_t=Net:'aEio.Net.t->net_ttypet={config:config;sw:Eio.Switch.t;net:net_t;clock:floatEio.Time.clock_tyEio.Resource.t;mutablestate:state;mutableflow:Tls_eio.toption;message_stream:stringEio.Stream.t;mutablesubscription_msg:stringoption;mutableclosed:bool;mutablecurrent_backoff:float;}(** Internal connection type *)(** Create TLS configuration *)letmake_tls_config()=letauthenticator=matchCa_certs.authenticator()with|Okauth->auth|Error(`Msgmsg)->failwith("CA certs error: "^msg)inmatchTls.Config.client~authenticator()with|Okcfg->cfg|Error(`Msgmsg)->failwith("TLS config error: "^msg)(** Create a new connection *)letcreate~sw~net~clock~host~resource?(ping_interval=30.0)?(buffer_size=1000)()=letconfig=default_config~host~resourcein{config={configwithping_interval};sw;net=Netnet;clock;state=Disconnected;flow=None;message_stream=Eio.Stream.createbuffer_size;subscription_msg=None;closed=false;current_backoff=1.0;}(** Establish TCP + TLS connection *)letconnect_tlst=lethost=t.config.hostinletport=t.config.portinlet(Netnet)=t.netinLog.debug(funm->m"Connecting to %s:%d"hostport);(* Resolve address *)letaddr=matchEio.Net.getaddrinfo_streamnethost~service:(string_of_intport)with|[]->failwith("Failed to resolve host: "^host)|addr::_->addrin(* Connect TCP *)letsocket=Eio.Net.connect~sw:t.swnetaddrin(* Upgrade to TLS *)lettls_config=make_tls_config()inlethost_name=Domain_name.of_string_exnhost|>Domain_name.host_exninLog.debug(funm->m"TLS handshake");lettls_flow=Tls_eio.client_of_flowtls_config~host:host_namesocketinLog.debug(funm->m"TLS connected");tls_flow(** Connect and perform WebSocket handshake *)letconnect_internalt=t.state<-Connecting;letflow=connect_tlstin(* Perform WebSocket handshake *)matchHandshake.perform~flow~host:t.config.host~port:t.config.port~resource:t.config.resourcewith|Handshake.Success->t.flow<-Someflow;t.state<-Connected;t.current_backoff<-t.config.initial_backoff;Log.debug(funm->m"Connected");true|Handshake.Failedmsg->Log.err(funm->m"Handshake failed: %s"msg);t.state<-Disconnected;false(** Send a frame *)letsend_frametframe=matcht.flowwith|Someflow->letdata=Frame.encode~mask:trueframeinEio.Flow.copy_stringdataflow;Log.debug(funm->m"Frame sent (opcode %d)"(Frame.Opcode.to_intframe.opcode))|None->Log.warn(funm->m"Send failed: not connected")(** Send a text message *)letsendtmsg=send_framet(Frame.textmsg);Log.debug(funm->m"Message sent")(** Send a ping *)letsend_pingt=send_framet(Frame.ping())(** Receive loop - reads frames and dispatches to message stream *)letreceive_loopt=matcht.flowwith|None->()|Someflow->(trywhilet.state=Connecteddoletframe=Frame.decodeflowinmatchframe.opcodewith|Frame.Opcode.Text|Frame.Opcode.Binary->Eio.Stream.addt.message_streamframe.payload|Frame.Opcode.Ping->(* Respond with pong *)send_framet(Frame.pong~payload:frame.payload());Log.debug(funm->m"Ping received")|Frame.Opcode.Pong->Log.debug(funm->m"Pong received")|Frame.Opcode.Close->Log.debug(funm->m"Close received");t.state<-Closed|_->()donewith|End_of_file->Log.debug(funm->m"EOF");t.state<-Disconnected|exn->Log.err(funm->m"Receive error: %s"(Printexc.to_stringexn));t.state<-Disconnected)(** Ping loop - sends periodic pings *)letping_loopt=trywhilet.state=Connected&¬t.closeddoEio.Time.sleept.clockt.config.ping_interval;ift.state=Connectedthenbeginsend_pingt;Log.debug(funm->m"Ping sent")enddonewith|Eio.Cancel.Cancelled_->Log.debug(funm->m"Ping cancelled")|_->()(** Connect with exponential backoff retry *)letrecconnect_with_retryt=ift.closedthen()elseifconnect_internaltthenbegin(* Send subscription message if set *)matcht.subscription_msgwith|Somemsg->sendtmsg;Log.debug(funm->m"Resubscribed")|None->()endelsebeginLog.warn(funm->m"Retrying in %.1fs"t.current_backoff);Eio.Time.sleept.clockt.current_backoff;t.current_backoff<-min(t.current_backoff*.2.0)t.config.max_backoff;connect_with_retrytend(** Set subscription message for reconnection *)letset_subscriptiontmsg=t.subscription_msg<-Somemsg(** Get message stream *)letmessage_streamt=t.message_stream(** Check if connected *)letis_connectedt=t.state=Connected(** Check if closed *)letis_closedt=t.closed(** Close connection *)letcloset=ifnott.closedthenbegint.closed<-true;(matcht.flowwith|Someflow->(trysend_framet(Frame.close());Eio.Flow.closeflowwith_->());t.flow<-None|None->());t.state<-Closed;Log.debug(funm->m"Closed")end(** Start the connection with receive loop *)letstartt=Eio.Fiber.fork~sw:t.sw(fun()->trywhilenott.closeddoift.state=Disconnectedthenconnect_with_retryt;ift.state=Connectedthenreceive_loopt;(* Small delay before reconnect attempt *)ift.state=Disconnected&¬t.closedthenEio.Time.sleept.clock0.1donewithEio.Cancel.Cancelled_->Log.debug(funm->m"Receive cancelled"))(** Start ping loop *)letstart_pingt=Eio.Fiber.fork~sw:t.sw(fun()->ping_loopt)(** Start a message parsing fiber that reads from a connection's raw stream,
parses messages using the provided function, and adds them to the output
stream. Handles cancellation and errors with consistent logging.
@param sw Switch for fiber lifecycle
@param channel_name Name for log messages (e.g., "market", "user")
@param conn WebSocket connection to read from
@param parse Function to parse raw messages into typed messages
@param output_stream Output stream for parsed messages *)letstart_parsing_fiber(typea)~sw~channel_name~conn~(parse:string->alist)~(output_stream:aEio.Stream.t)=Eio.Fiber.fork~sw(fun()->tryletraw_stream=conn.message_streaminwhilenotconn.closeddoletraw=Eio.Stream.takeraw_streaminletmsgs=parserawinList.iter(funmsg->Eio.Stream.addoutput_streammsg)msgsdone;Log.debug(funm->m"Parser stopped (%s)"channel_name)with|Eio.Cancel.Cancelled_->Log.debug(funm->m"Parser cancelled (%s)"channel_name)|exn->Log.err(funm->m"Parser error (%s): %s"channel_name(Printexc.to_stringexn)))