123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118openStduneopenDune_utilopenResult.OopenCache_intftypet={socket:out_channel;fd:Unix.file_descr;input:in_channel;cache:Local.t;thread:Thread.t;finally:(unit->unit)option;version:Messages.version}letversions_supported_by_dune:Messages.versionlist=[{major=1;minor=1}]leterrmsg=User_error.E(User_error.make[Pp.textmsg])leterrfmsg=User_error.E(User_error.makemsg)letreadversioninput=let*sexp=Csexp.inputinputinlet+(Dedupv)=Messages.incoming_message_of_sexpversionsexpinDedupvletmake?finally?duplication_mode~command_handler()=(* This is a bit ugly as it is global, but flushing a closed socket will nuke
the program if we don't. *)let()=Sys.set_signalSys.sigpipeSys.Signal_ignoreinlet*cache=Result.map_error~f:err(Local.make?duplication_mode~command_handler:ignore())inlet*port=letcmd=Format.sprintf"%s cache start --display progress --exit-no-client"Sys.executable_nameandfstdout=matchIo.input_linesstdoutwith|[]->Result.Error(err"empty output starting cache")|[line]->Result.Okline|_->Result.Error(err"unrecognized output starting cache")andfinallystdout=ignore(Unix.close_process_instdout)(* FIXME *)inExn.protectx(Unix.open_process_incmd)~finally~finlet*addr,port=matchString.split_on_char~sep:':'portwith|[addr;port]->(matchInt.of_stringportwith|Somei->(tryResult.Ok(Unix.inet_addr_of_stringaddr,i)withFailure_->Result.Error(errf[Pp.textf"invalid address: %s"addr]))|None->Result.Error(errf[Pp.textf"invalid port: %s"port]))|_->Result.Error(errf[Pp.textf"invalid endpoint: %s"port])inletfd=Unix.socketUnix.PF_INETUnix.SOCK_STREAM0inlet*_=Result.try_with(fun()->Unix.connectfd(Unix.ADDR_INET(addr,port)))inletsocket=Unix.out_channel_of_descrfdinletinput=Unix.in_channel_of_descrfdinlet+version=Result.map_error~f:err(Messages.negotiate_version~versions_supported_by_dunefdinputsocket)inLog.info[Pp.textf"negotiated version: %s"(Messages.string_of_versionversion)];letrecthreadinput=matchlet+command=readversioninputinLog.info[(letopenPp.OinPp.text"dune-cache command: "++Dyn.pp(command_to_dyncommand))];command_handlercommandwith|Result.Errore->Log.info[Pp.textf"dune-cache read error: %s"e];Option.iter~f:(funf->f())finally|Result.Ok()->(thread[@tailcall])inputinletthread=Thread.createthreadinputin{socket;fd;input;cache;thread;finally;version}letwith_repositoriesclientrepositories=Messages.sendclient.versionclient.socket(SetReposrepositories);clientletpromote(client:t)fileskeymetadata~repository~duplication=letduplication=Some(Option.value~default:(Local.duplication_modeclient.cache)duplication)intryMessages.sendclient.versionclient.socket(Promote{key;files;metadata;repository;duplication});Result.Ok()withSys_error(* "Broken_pipe" *)_->Result.Error"lost connection to cache daemon"letset_build_dirclientpath=Messages.sendclient.versionclient.socket(SetBuildRootpath);clientletsearchclientkey=Local.searchclient.cachekeyletretrieveclientfile=Local.retrieveclient.cachefileletdeduplicateclientfile=Local.deduplicateclient.cachefileletteardownclient=(tryUnix.shutdownclient.fdUnix.SHUTDOWN_SENDwithUnix.Unix_error(Unix.ENOTCONN,_,_)->());Thread.joinclient.thread;Local.teardownclient.cache