123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490openCoreopenCore_profiler_disabledletdebug=falseletdefault_output_filename="profiler.dat"letcurrent_output_filename=letenv="OUTPUT_FILE"inletv=matchCheck_environment.get_varenvwith|Somev->v|None->default_output_filenameinifdebugthenprintf"file = %s\n"v;refvletset_current_output_filenamev=current_output_filename:=vmoduleShort_header:sigvalid_bits:intvaltime_bits:intvalmax_id:intvalmax_time_diff:Time_ns.Span.tvalpack_exn:Profiler_epoch.t->Probe_id.t->Time_ns.t->intvalpack_unsafe:Profiler_epoch.t->Probe_id.t->Time_ns.t->intvalunpack_id:int->Probe_id.tvalunpack_time:Profiler_epoch.t->int->Time_ns.tvalunpack:Profiler_epoch.t->int->Probe_id.t*Time_ns.tend=structlettime_bits=54(* ~208 days *)let%test_=time_bits>1let%test_=time_bits<63letid_bits=63-time_bits(* you may need to change the size of header_chunk *)letmax_id=1lslid_bits-1letmax_time_diff_int=1lsltime_bits-1letmax_time_diff=Time_ns.Span.of_int_nsmax_time_diff_intletpack_exnepochidtime=letid=Probe_id.to_int_exnidinlettime=Profiler_epoch.diffepochtime|>Time_ns.Span.to_int_nsinif(time<0||time>max_time_diff_int||id<0||id>max_id)thenraise(Invalid_argument"parameter out of range")elsetimelor(idlsltime_bits)letpack_unsafeepochidtime=letid=Probe_id.to_int_exnidinlettime=Profiler_epoch.diffepochtime|>Time_ns.Span.to_int_nsin(timeland(1lsltime_bits-1))lor(idlsltime_bits)letunpack_idheader=headerlsrtime_bits|>Probe_id.of_int_exnletunpack_timeepochheader=headerland(1lsltime_bits-1)|>Time_ns.Span.of_int_ns|>Profiler_epoch.addepochletunpackepochheader=(unpack_idheader,unpack_timeepochheader)let%test_module"unpack_pack"=(modulestructletepoch=Profiler_epoch.of_time(Time_ns.of_int_ns_since_epoch(Int64.to_int_exn1405085600000000000L))lettestidtime=letid=Probe_id.of_int_exnidinlettime=Time_ns.of_int_ns_since_epoch(Int64.to_int_exntime)inletpacked=pack_exnepochidtimeinletpacked_unsafe=pack_unsafeepochidtimeinletunpacked=unpackepochpacked_unsafein[%test_eq:int]packedpacked_unsafe;[%test_eq:Probe_id.t*Time_ns.t]unpacked(id,time)let%test_unit"0 0"=test01405085600000000000Llet%test_unit"max max"=test5111423099998509481983Llet%test_unit"1 1"=test11405085600000000001Llet%test_unit"256 100_000"=test2561405085600000100000Lend)let%bench_module"Short message header packing"=(modulestructletepoch=Profiler_epoch.of_time(Time_ns.of_int_ns_since_epoch(Int64.to_int_exn1405085600000000000L))letid=Probe_id.of_int_exn123lettime=Time_ns.of_int_ns_since_epoch(Int64.to_int_exn1405085600123123000L)let%bench"pack_exn"=ignore(pack_exnepochidtime:int)let%bench"pack_unsafe"=ignore(pack_unsafeepochidtime:int)end)endmoduleBuffer:sig(* read_write buffers are exposed to Writer *)valheader_chunk:(read_write,_)Iobuf.tLazy.tvalcurrent_chunk:(read_write,_)Iobuf.t(* These are public: *)(** Is the main (short message) buffer empty? *)valis_empty:unit->boolvalget_chunks:unit->(read_write,Iobuf.no_seek)Iobuf.tlistvalget_header_chunk:unit->(read,_)Iobuf.tvalensure_free:int->unitmoduleUnsafe_internals:sigvalreset:unit->unitendend=struct(* If we create 512 group points with every other point as a source,
this buffer _still_ won't fill up (512 * (72 + 512 * 2)) *)letheader_chunk=lazy(Iobuf.create~len:(561152))letget_header_chunk()=letcopy=Iobuf.create~len:0inifLazy.is_valheader_chunkthenbeginIobuf.set_bounds_and_buffer~src:(Lazy.forceheader_chunk)~dst:copy;Iobuf.flip_locopyend;(copy:>(read,_)Iobuf.t)(* Iobufs are mutable to the extent that you can swap the pointer to the underlying
memory with another Iobuf. I use this to avoid a [ref] / another indirection:
When we want to swap the buffer, we copy its pointer & limits into a new [Iobuf.t]
structure, and then overwrite it with the pointer & limits from a freshly created
[Iobuf.t] *)letcurrent_chunk=Iobuf.create~len:0letchunk_size=10_000_000letprevious_chunks=ref[]letallocate_new_chunklen=Iobuf.flip_locurrent_chunk;ifnot(Iobuf.is_emptycurrent_chunk)thenbegin(* Use sub to copy the Iobuf.t structure (and narrow the chunk in the process). *)letcopy=Iobuf.sub_sharedcurrent_chunkinprevious_chunks:=copy::!previous_chunksend;letnew_memory=Iobuf.create~leninIobuf.set_bounds_and_buffer~src:new_memory~dst:current_chunk;(* We need to force the kernel to actually give us the memory, or we're liable to
get spikes in poke times. *)iflen>0thenfori=0to(len-1)/512doIobuf.Unsafe.Poke.uint8_trunccurrent_chunk~pos:(i*512)0doneletensure_freelen=assert(len<=chunk_size);ifIobuf.lengthcurrent_chunk<lenthenallocate_new_chunkchunk_sizeletget_chunks()=(* ... thereby moving the curent chunk into [previous_chunks] *)allocate_new_chunk0;List.rev!previous_chunksletis_empty()=List.is_empty(get_chunks())moduleUnsafe_internals=structletreset()=ifLazy.is_valheader_chunkthenIobuf.reset(Lazy.forceheader_chunk);allocate_new_chunk0;previous_chunks:=[]endlet%test_unit"allocate_new_chunk"=protect~f:(fun()->allocate_new_chunk1000;[%test_eq:int](Iobuf.lengthcurrent_chunk)1000;Iobuf.Fill.stringocurrent_chunk"the first chunk\n";Iobuf.Fill.stringocurrent_chunk"still the first chunk\n";allocate_new_chunk500;allocate_new_chunk500;(* empty, should be ignored *)Iobuf.Fill.stringocurrent_chunk"the second chunk\n";allocate_new_chunk0;[%test_eq:int](Iobuf.lengthcurrent_chunk)0;[%test_eq:int](List.length!previous_chunks)2;beginmatch!previous_chunkswith|[second;first]->[%test_eq:string](Iobuf.to_stringfirst)"the first chunk\nstill the first chunk\n";[%test_eq:string](Iobuf.to_stringsecond)"the second chunk\n"|_->assertfalseend)~finally:Unsafe_internals.resetlet%test_unit"ensure_free"=protect~f:(fun()->ensure_free100;[%test_eq:int](Iobuf.lengthcurrent_chunk)chunk_size;[%test_eq:int](List.length!previous_chunks)0;Iobuf.advancecurrent_chunk(chunk_size-50);ensure_free500;[%test_eq:int](Iobuf.lengthcurrent_chunk)chunk_size;[%test_eq:int](List.length!previous_chunks)1;)~finally:Unsafe_internals.resetlet%test_unit"get_header_chunk"=protect~f:(fun()->letheader_chunk=Lazy.forceheader_chunkinIobuf.Fill.stringoheader_chunk"some data";letcontents=get_header_chunk()|>Iobuf.to_stringin[%test_eq:string]contents"some data")~finally:Unsafe_internals.resetlet%test_unit"get_chunks"=protect~f:(fun()->allocate_new_chunk1000;Iobuf.Fill.stringocurrent_chunk"the first chunk";allocate_new_chunk1000;Iobuf.Fill.stringocurrent_chunk"the second chunk";letcontents=get_chunks()|>List.map~f:Iobuf.to_stringin[%test_eq:stringlist]contents["the first chunk";"the second chunk"])~finally:Unsafe_internals.resetendmoduleWriter=structletepoch=Time_ns.now()|>Fn.flipTime_ns.sub(Time_ns.Span.of_min1.)|>Profiler_epoch.of_timeletmax_time=Profiler_epoch.addepochShort_header.max_time_diffletwrite_epoch()=letheader_chunk=Lazy.forceBuffer.header_chunkinletwritten=Header_protocol.Epoch.write~epochheader_chunkinIobuf.advanceheader_chunkwrittenletwrite_end_of_header()=letheader_chunk=Lazy.forceBuffer.header_chunkinletwritten=Header_protocol.End_of_header.writeheader_chunkinIobuf.advanceheader_chunkwrittenletwrite_new_singleidnamespec=letheader_chunk=Lazy.forceBuffer.header_chunkinletwritten=Header_protocol.New_single.write~id~spec~nameheader_chunkinIobuf.advanceheader_chunkwrittenletwrite_new_groupidnamespec=letheader_chunk=Lazy.forceBuffer.header_chunkinletwritten=Header_protocol.New_group.write~id~spec~nameheader_chunkinIobuf.advanceheader_chunkwrittenletwrite_new_group_point~group_id~idnamesources=letheader_chunk=Lazy.forceBuffer.header_chunkinletmoduleNPP=Header_protocol.New_group_pointinletsources_count=Array.lengthsourcesinletlen=NPP.write~group_id~id~name~sources_countheader_chunkinArray.iterisources~f:(funindexid->NPP.write_sourcesheader_chunk~count:sources_count~index~source_id:id);Iobuf.advanceheader_chunklenletwrite_timer_atidtime=Buffer.ensure_free8;Iobuf.Unsafe.Fill.int64_leBuffer.current_chunk(Short_header.pack_unsafeepochidtime)letwrite_probe_atidtimevalue=letcurrent_chunk=Buffer.current_chunkinBuffer.ensure_free16;Iobuf.Unsafe.Poke.int64_lecurrent_chunk~pos:0(Short_header.pack_unsafeepochidtime);Iobuf.Unsafe.Poke.int64_lecurrent_chunk~pos:8value;Iobuf.unsafe_advancecurrent_chunk16letwrite_group_reset=write_timer_atlet%test_module"write header messages"=(modulestructletunpack_one()=letchunk=Lazy.forceBuffer.header_chunkinIobuf.flip_lochunk;matchHeader_protocol.to_unpackedchunkwith|Ok(unpacked,length)->[%test_eq:int](Iobuf.lengthchunk)length;unpacked|_->failwith"to_unpacked failed"let%test_unit"write_new_single"=protect~finally:Buffer.Unsafe_internals.reset~f:(fun()->write_new_single(Probe_id.of_int_exn100)"unittest"(Probe_type.Timer);matchunpack_one()with|New_single{id;spec;name;message_length=_;message_type=_}->[%test_eq:Probe_id.t]id(Probe_id.of_int_exn100);[%test_eq:Probe_type.t]specProbe_type.Timer;[%test_eq:string]name"unittest"|_->failwith"Incorrect message type")let%test_unit"write_new_group"=protect~finally:Buffer.Unsafe_internals.reset~f:(fun()->write_new_group(Probe_id.of_int_exn100)"unittest"(Probe_type.ProbeProfiler_units.Seconds);match(unpack_one())with|New_group{id;spec;name;message_length=_;message_type=_}->[%test_eq:Probe_id.t]id(Probe_id.of_int_exn100);[%test_eq:Probe_type.t]spec(Probe_type.ProbeProfiler_units.Seconds);[%test_eq:string]name"unittest"|_->failwith"Incorrect message type")let%test_unit"write_new_group_point"=protect~finally:Buffer.Unsafe_internals.reset~f:(fun()->write_new_group_point~group_id:(Probe_id.of_int_exn100)~id:(Probe_id.of_int_exn300)"unittest"(Array.map~f:Probe_id.of_int_exn[|500;700|]);matchunpack_one()with|New_group_point{group_id;id;name;sources_grp;message_length=_;message_type=_}->[%test_eq:int](Probe_id.to_int_exngroup_id)100;[%test_eq:int](Probe_id.to_int_exnid)300;[%test_eq:string]name"unittest";[%test_eq:intarray](Array.mapsources_grp~f:(funr->letr:Header_protocol.New_group_point.Unpacked.t_sources=rinProbe_id.to_int_exnr.source_id))[|500;700|];|_->assertfalse)end)letwrite_to_fdfdheader_chunkchunks=List.iter(header_chunk::chunks)~f:(funchunk->Iobuf.protect_window_and_boundschunk~f:(funchunk->Bigstring.really_writefd(Iobuf.Peek.bigstringo~pos:0chunk)))let%test_unit"write_to_fd"=let(filename,fd)=Unix.mkstemp"/tmp/core-profiler-tests"inprotect~f:(fun()->letheader_chunk=Iobuf.of_string"the header chunk\n"inletchunks=[Iobuf.of_string"the first chunk\n";Iobuf.of_string"the second chunk\n"]inwrite_to_fdfdheader_chunkchunks;Unix.closefd;[%test_eq:string](In_channel.read_allfilename)"the header chunk\nthe first chunk\nthe second chunk\n";)~finally:(fun()->begintryUnix.closefdwith_->()end;Unix.unlinkfilename;)letwrite_to_filename_refheader_chunkchunks=letname=!name_refinbeginmatchSys.file_existsnamewith|`Yes->Unix.rename~src:name~dst:(name^".old")|`No|`Unknown->()end;Unix.with_filename~mode:[Unix.O_CREAT;Unix.O_WRONLY;Unix.O_TRUNC]~f:(funfd->write_to_fdfdheader_chunkchunks)letat_exit_handler=ref(Some(write_to_filecurrent_output_filename))letset_at_exit_handler=function|`Write_filename->at_exit_handler:=Some(write_to_file(refname))|`Functionf->at_exit_handler:=Somef|`Disable->at_exit_handler:=Noneletdump_stats_internalhandler=write_epoch();write_end_of_header();letchunks=Buffer.get_chunks()inifnot(List.is_emptychunks)thenhandler(Buffer.get_header_chunk())(chunks:>(read,Iobuf.no_seek)Iobuf.tlist)letdump_stats()=Option.iter!at_exit_handler~f:(funhandler->letheader_chunk=Lazy.forceBuffer.header_chunkinletlo_bound=Iobuf.Lo_bound.windowheader_chunkinlethi_bound=Iobuf.Hi_bound.windowheader_chunkindump_stats_internalhandler;Iobuf.Lo_bound.restorelo_boundheader_chunk;Iobuf.Hi_bound.restorehi_boundheader_chunk)let()=at_exit(fun()->Option.iter!at_exit_handler~f:dump_stats_internal)moduleUnsafe_internals=structletwrite_epoch=write_epochletwrite_end_of_header=write_end_of_headerendend