123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339(*
Inputs:
- Loader.db (for creating probes, etc.)
- Names of unique RRs
- Received packets
- Event: application initiates first probe
- Event: delay complete
- Event: send complete
- (To Do: send fail or network down? trie changed?)
Outputs (actions):
- Packets to be sent
- Delay time
- Idle/finished indications
*)letmulticast_dns_ip=Ipaddr.V4.of_string_exn"224.0.0.251"typedatagram=Packet.t*Ipaddr.V4.t*int(* RFC 6762 section 10.2 implies that uniqueness is based on name/rrtype/rrclass,
but section 8.1 implies that a domain name is enough. *)moduleUniqueSet=Name.Settypeprobe_number=FirstProbe|SecondProbe|ThirdProbetypeprobing_state={datagram:datagram;num:probe_number;rrs:Packet.rrlist;}typerestart_after=AfterSend|AfterDelay|DoProbetypeprobe_stage=|ProbeIdle|SendingProbeofprobing_state|DelayAfterSendingProbeofprobing_state|NeedRestartofrestart_after*float|DelayBeforeRestart|ProbeStoppedtypestate={stage:probe_stage;first_done:bool;db:Loader.db;(* mutable *)(* Three lists of unique names *)names_pending:UniqueSet.t;names_probing:UniqueSet.t;names_confirmed:UniqueSet.t;}typeaction=|Nothing|ToSendofdatagram|Delayoffloat|Continue|NotReady|Stopletnew_statedb={stage=ProbeIdle;first_done=false;db;names_pending=UniqueSet.empty;names_probing=UniqueSet.empty;names_confirmed=UniqueSet.empty;}(* May call do_probe after this *)letadd_namestatename={statewithnames_pending=UniqueSet.addnamestate.names_pending}(* This predicate controls the cache-flush bit *)letis_confirmedstatename=UniqueSet.memnamestate.names_confirmedletstopstate={statewithstage=ProbeStopped}letis_first_completestate=state.first_doneletprepare_probenamesdb=(* Build a list of questions *)letquestions=List.map(funname->Packet.({q_name=name;q_type=Q_ANY_TYP;q_class=Q_IN;q_unicast=Q_mDNS_Unicast;(* request unicast response as per RFC 6762 section 8.1 para 6 *)}))namesin(* Reuse Query.answer_multiple to get the records that we need for the authority section *)letanswer=Query.answer_multiple~dnssec:false~mdns:truequestionsdb.Loader.trieinletrrs=List.filter(funanswer->List.memanswer.Packet.namenames)answer.Query.answerinifrrs=[]then(* There are no unique records to probe for *)Noneelse(* I don't know whether the cache flush bit needs to be set in the authority RRs, but seems logical *)letauthorities=List.map(funrr->{rrwithPacket.flush=true})rrsinletdetail=Packet.({qr=Query;opcode=Standard;aa=false;tc=false;rd=false;ra=false;rcode=NoError;})inletquery=Packet.({id=0;detail;questions;answers=[];authorities;additionals=[];})inSome(query,rrs)(* Initiates the first probe *)letdo_probestate=letbegin_probe()=matchprepare_probe(UniqueSet.elementsstate.names_pending)state.dbwith|None->(* Nothing to do right now *)({statewithstage=ProbeIdle},Nothing)|Some(packet,rrs)->(* Send the probe *)(* TODO: probes should be per-link if there are multiple NICs *)letdatagram=(packet,multicast_dns_ip,5353)in({statewithstage=SendingProbe{datagram;num=FirstProbe;rrs};names_pending=UniqueSet.empty;names_probing=state.names_pending;},ToSenddatagram)inmatchstate.stagewith|ProbeIdle->begin_probe()|NeedRestart(DoProbe,delay)->ifdelay=0.0thenbegin_probe()else({statewithstage=DelayBeforeRestart},Delaydelay)|SendingProbe_|DelayAfterSendingProbe_|NeedRestart(AfterSend,_)|NeedRestart(AfterDelay,_)|DelayBeforeRestart->(state,NotReady)|ProbeStopped->(state,Stop)letrestart_laterstatedelay=matchstate.stagewith|SendingProbe_->{statewithstage=NeedRestart(AfterSend,delay)}|DelayAfterSendingProbe_->(* Delays a bit longer than needed *){statewithstage=NeedRestart(AfterDelay,delay)}|ProbeIdle->{statewithstage=NeedRestart(DoProbe,delay)}|NeedRestart_|DelayBeforeRestart|ProbeStopped->(* No change *)stateleton_send_completestate=matchstate.stagewith|SendingProbeprobing->(* Fixed delay of 250 ms *)({statewithstage=DelayAfterSendingProbeprobing},Delay0.25)(* Continues in on_delay_complete *)|NeedRestart(AfterSend,delay)->({statewithstage=NeedRestart(DoProbe,delay)},Continue)|DelayAfterSendingProbe_|ProbeIdle|NeedRestart(AfterDelay,_)|NeedRestart(DoProbe,_)|DelayBeforeRestart->(* Unexpected event *)(state,NotReady)|ProbeStopped->(state,Stop)leton_delay_completestate=letafter_delaystateprobing=matchprobing.numwith|FirstProbe->({statewithstage=SendingProbe{probingwithnum=SecondProbe}},ToSendprobing.datagram)(* Wait for on_send_complete *)|SecondProbe->({statewithstage=SendingProbe{probingwithnum=ThirdProbe}},ToSendprobing.datagram)(* Wait for on_send_complete *)|ThirdProbe->({statewithstage=ProbeIdle;first_done=true;names_probing=UniqueSet.empty;names_confirmed=UniqueSet.unionstate.names_confirmedstate.names_probing;},Continue)(* Call do_probe in case state.names_pending is not empty. *)inmatchstate.stagewith|DelayAfterSendingProbeprobing->after_delaystateprobing|NeedRestart(AfterDelay,delay)->({statewithstage=NeedRestart(DoProbe,delay)},Continue)|DelayBeforeRestart->do_probe{statewithstage=ProbeIdle}|ProbeIdle|SendingProbe_|NeedRestart(AfterSend,_)|NeedRestart(DoProbe,_)->(* Unexpected event *)(state,NotReady)|ProbeStopped->(state,Stop)(* FIXME: db is mutable *)letrename_uniquestateold_name=letincrement_namename=matchName.to_string_listnamewith|head::tail->letre=Re.Str.regexp"\\(.*\\)\\([0-9]+\\)"inletnew_head=ifRe.Str.string_matchrehead0thenbeginletnum=int_of_string(Re.Str.matched_group2head)in(Re.Str.matched_group1head)^(string_of_int(num+1))endelsehead^"2"inName.of_string_list(new_head::tail)|[]->failwith"can't offer the DNS root"in(* Find the old RR from the trie *)letrrsets=matchTrie.simple_lookup(Name.to_keyold_name)state.db.Loader.triewith|None->failwith"rename_unique: old name not found"|Somenode->letrrsets=node.RR.rrsetsin(* Remove the rrsets from the old node *)(* TODO: remove the node itself *)node.RR.rrsets<-[];rrsetsin(* Create a new name *)letnew_name=increment_nameold_namein(* Add the new RR to the trie *)(* TODO: Loader doesn't support a simple rename operation *)List.iter(funrrset->matchrrset.RR.rdatawith|RR.Al->List.iter(funip->Loader.add_a_rriprrset.RR.ttlnew_namestate.db)l|_->failwith"Only A records are supported")rrsets;new_nametypeconflict=NoConflict|ConflictRestartleton_response_receivedstateresponse=(* Check for conflicts *)letprobing_rrs=matchstate.stagewith|SendingProbeprobing->probing.rrs|DelayAfterSendingProbeprobing->probing.rrs|ProbeIdle|NeedRestart_|DelayBeforeRestart|ProbeStopped->[]inletset_of_listl=List.fold_left(funse->UniqueSet.addes)UniqueSet.emptylin(* RFC 6762 section 9 - need to check all sections *)letresponse_rrs=List.flatten[response.Packet.answers;response.Packet.authorities;response.Packet.additionals]in(* Identical records do not count as conflicts, so ignore those *)letnon_identical=List.filter(funrr->not(List.exists(funour->our.Packet.name=rr.Packet.name&&Packet.compare_rdatarr.Packet.rdataour.Packet.rdata=0)probing_rrs))response_rrsinletresponse_names=List.map(funrr->rr.Packet.name)non_identical|>set_of_listin(* There was a probe conflict: defer to the existing host *)letrenamed=UniqueSet.interresponse_namesstate.names_probinginletnot_renamed=UniqueSet.diffstate.names_probingrenamedin(* Rename the conflicting records *)letnew_names=UniqueSet.fold(funnameset->(* Modifies the trie! *)UniqueSet.add(rename_uniquestatename)set)renamedUniqueSet.emptyin(* There could also be conflicts with names that we already confirmed as unique,
in which case we also have to re-probe. *)letother_conflicts=UniqueSet.interresponse_namesstate.names_confirmedinifUniqueSet.is_emptyrenamed&&UniqueSet.is_emptyother_conflictsthen(* No conflicts *)(state,NoConflict)elsebegin(* At least one conflict *)letnow_pending=UniqueSet.unionnot_renamednew_namesin(restart_later{statewith(* Reset probing names back to pending *)names_pending=UniqueSet.unionstate.names_pendingnow_pending;names_probing=UniqueSet.empty;}0.0,ConflictRestart)endleton_query_receivedstatequeryresponse=(* A "simultaneous probe conflict" occurs if we see a (probe) request
that contains a question matching one of our unique records,
and the authority section contains different data. *)lettheirs=List.filter(funrr->UniqueSet.memrr.Packet.namestate.names_probing)query.Packet.authoritiesinletresult=List.fold_left(funresultauth->matchresultwith|ConflictRestart->result|NoConflict->try(* For this step we only care about records that are part of the current probe cycle. *)letour_rr=List.find(funrr->UniqueSet.memrr.Packet.namestate.names_probing)response.Packet.answersin(* TODO: proper lexicographical comparison *)letcompare=Packet.compare_rdataour_rr.Packet.rdataauth.Packet.rdatainifcompare<0then(* Our data is less than the requester's data, so restart the probe sequence *)ConflictRestartelseNoConflict(* else if compare > 0 then the requester will restart its own probe sequence *)(* else if compare = 0 then there is no conflict *)(* TODO: if compare = 0 and the peer is sending a TTL less than half of our record
then we are supposed to announce our record to avoid premature expiry *)with|Not_found->NoConflict)NoConflicttheirsin(* Now filter out answers that are unique but unconfirmed *)letanswers=List.filter(funrr->not(UniqueSet.memrr.Packet.namestate.names_pending)&¬(UniqueSet.memrr.Packet.namestate.names_probing))response.Packet.answersinletresponse={responsewithPacket.answers=answers}inifresult=ConflictRestartthen(* If we lose a simultaneous probe tie-break then we have to delay 1 second *)(* TODO: if there are more than 15 conflicts in 10 seconds then we are
supposed to wait 5 seconds *)(response,restart_later{statewith(* Reset probing names back to pending *)names_pending=UniqueSet.unionstate.names_pendingstate.names_probing;names_probing=UniqueSet.empty;}1.0,ConflictRestart)else(response,state,NoConflict)