d$ *)(* Security of named pipes:
- http://www.blakewatts.com/namedpipepaper.html
- impersonation: http://msdn.microsoft.com/en-us/library/aa378832(VS.85).aspx
*)openNetsys_typesopenPrintfexternalfill_random:Bytes.t->unit="netsys_fill_random"externalget_full_path_name:string ->string="netsys_get_full_path_name"externalget_long_path_name:string ->string="netsys_get_long_path_name"typec_eventtypec_pipe_helper(* How the proxy table works:
The i_* records are the values in the proxy table. The keys are the
proxy descriptors (which must be open all the time). When the last
reference to the proxy descriptor is released, the GC will call the
finalizer, and (with some trickery) the entry is removed from the
proxy table. Of course, the i_* records must not contain the proxy
descriptor - otherwise there would be a self-reference in the table,
and the entry is never released.
Because of this we define w_* types as a pair of the i_* records and
the proxy descriptors. The w_* types are the public types. As no
i_* can escape from its w_* value outside this module, it is ensured
that all public references of i_* also imply public references of the
proxy descriptor. So as long as there are w_* values the i_* values
cannot be collected.
When the proxy descriptor is accessed from outside the module, the
caller becomes responsible for closing it. Therefore we track whether
this is the case. The proxy descriptor is also stored in the c_* values
(i.e. in the values handled by the C part of this module), and so
the C-written finalizer can close the proxy descriptor if required.
There is a flag whether to do so (auto_close_*_proxy), and this flag
is cleared when the caller takes over the ownership of the descriptor.
*)typei_event=c_eventtypew32_event=i_event*Unix.file_descr(* The descriptor is the proxy descriptor *)typepipe_mode=Pipe_in|Pipe_out|Pipe_duplextypei_pipe ={pipe_name:string;pipe_mode:pipe_mode;pipe_helper:c_pipe_helper;(* mutable pipe_signal : w32_event option; *)pipe_rd_event:w32_event;pipe_wr_event:w32_event;}typew32_pipe=i_pipe*Unix.file_descr(* The descriptor is the proxy descriptor *)typei_pipe_server={psrv_name:string;psrv_mode:pipe_mode;psrv_max:int;mutable psrv_first:bool;mutablepsrv_queue:c_pipe_helper list;(* The queue of pipes waiting for an incoming connection *)mutablepsrv_listen:int;(* The backlog parameter of [listen] (target length of psrv_queue) *)psrv_ready:c_pipe_helperQueue.t;(* The already accepted but not yet reported connections *)psrv_cn_event:w32_event;psrv_proxy_handle:c_event;psrv_mutex:Netsys_oothr.mutex;}(* As there is no C counterpart for the pipe server (no c_pipe_server),
the question is how to ensure that the proxy descriptor is closed.
For that reason we allocate an event (psrv_proxy_handle) and use
this event as proxy descriptor. For events there is the possibility
to let the C part of the module close the descriptor.
*)typew32_pipe_server=i_pipe_server*Unix.file_descr(* The descriptor is the proxy descriptor *)typepipe_conn_state=Pipe_deaf|Pipe_listening|Pipe_connected|Pipe_downtype c_processtypei_process=c_processtypew32_process=i_process*Unix.file_descr(* The descriptor is the proxy descriptor *)typei_input_thread={ithr_descr:Unix.file_descr;(* One can send command to the thread by setting ithr_cmd, and signaling
ithr_cmd_cond:
*)ithr_cmd_cond:Netsys_oothr.condition;ithr_cmd_mutex:Netsys_oothr.mutex;mutableithr_cmd:[`Read|`Cancel]option;mutableithr_cancel_cmd:bool;(* similar to ithr_cmd = Some `Cancel *)ithr_event:w32_event;(* The event is set when there is something to read *)ithr_buffer:Bytes.t;mutableithr_buffer_start:int;mutableithr_buffer_len:int;mutableithr_buffer_cond:[`Cancelled|`EOF|`Exceptionofexn|`Data];mutableithr_thread:int32;(* The Win32 thread ID *)ithr_read_mutex:Netsys_oothr.mutex;(* to serialize user read accesses *)mutableithr_running:bool;ithr_proxy_handle:c_event;(* the proxy - same pattern as in pipe servers *)}typew32_input_thread=i_input_thread*Unix.file_descr*<>(* The descriptor is the proxy descriptor *)typei_output_thread={othr_descr:Unix.file_descr;othr_cmd_cond:Netsys_oothr.condition;othr_cmd_mutex:Netsys_oothr.mutex;mutableothr_cmd:[`Write|`Close|`Cancel]option;mutableothr_cancel_cmd:bool;othr_event:w32_event;othr_buffer:Bytes.t;mutableothr_buffer_len:int;mutableothr_write_cond:[`Cancelled|`Exceptionofexn]option;mutableothr_thread:int32;(* The Win32 thread ID *)othr_write_mutex:Netsys_oothr.mutex;mutableothr_running:bool;othr_proxy_handle:c_event;}typew32_output_thread=i_output_thread *Unix.file_descr*<>(* The descriptor is the proxy descriptor *)typei_object=|I_eventofi_event|I_pipeofi_pipe|I_pipe_serverofi_pipe_server|I_processofi_process|I_input_thread ofi_input_thread *<>|I_output_threadofi_output_thread *<>typew32_object=|W32_eventofw32_event|W32_pipeofw32_pipe|W32_pipe_serverofw32_pipe_server|W32_processofw32_process|W32_input_threadofw32_input_thread|W32_output_threadofw32_output_threadtypecreate_process_option =|CP_change_directoryofstring|CP_set_envofstring|CP_std_handles ofUnix.file_descr *Unix.file_descr*Unix.file_descr|CP_create_console|CP_detach_from_console|CP_inherit_console|CP_inherit_or_create_console|CP_unicode_environment|CP_ansi_environment|CP_new_process_group|CP_inherit_process_groupmoduleInt64Map=Map.Make(Int64)externalint64_of_file_descr:Unix.file_descr->int64="netsys_int64_of_file_descr"(* Also occurs in netsys.ml! *)externalnetsys_win32_set_debug:bool->unit="netsys_win32_set_debug"module Debug=structletenable =reffalseletdebug_c_wrapper=netsys_win32_set_debugendlet dlog=Netlog.Debug.mk_dlog"Netsys_win32"Debug.enableletdlogr=Netlog.Debug.mk_dlogr"Netsys_win32"Debug.enablelet()=Netlog.Debug.register_module"Netsys_win32"Debug.enablemoduleFD=structtypet=int64letequal(fd1)(fd2)=fd1=fd2lethashfd=Hashtbl.hashfdendmodule H=Hashtbl.Make(FD)(* Hash table mapping
proxy file descriptors to the w32_object referenced by the descriptors.
The keys are the handle values contained in the fd values. As we allow
that proxies are [Unix.close]d it can happen that several fd's exist
that have the same handle values. In this case, the address of the
fd itself is used to distinguish between these same-looking fd's.
*)letproxies=ref(H.create41)letproxies_mutex=!Netsys_oothr.provider#create_mutex()letproxies_gc_flag=reffalseletproxies_unreg_count=ref0letmk_weakx=letw=Weak.create1inWeak.setw0(Somex);wletget_weakw=Weak.getw0letfinalise_proxycell_=(* the GC finaliser *)proxies_unreg_count :=!proxies_unreg_count+1;cell:=Noneletgc_proxy()=(* Walk through the table and check. We have to take care that the
order of the bindings for the same key is preserved, i.e. the most
recent use of a descriptor number needs to be the visible binding
in the table.
*)letproxies'=H.create41inletn_old=ref0inletn_new=ref0inH.iter(funfd_numentries ->letm=ref[]inList.iter(funentry ->incrn_old;let(_,cell)=entryinif!cell<>None then(incrn_new;m:=entry::!m))entries;H.addproxies'fd_num(List.rev!m))!proxies;proxies:=proxies';proxies_unreg_count:=0;proxies_gc_flag:=false;dlogr(fun()->sprintf"register_proxy: keeping %d/%d entries in proxy tbl"!n_new!n_old;);dlogr(fun()->letb=Buffer.create500inBuffer.add_stringb"\n";H.iter(funfd_num entries->List.iter(fun(_,cell)->bprintf b" - proxy tbl %Ld -> %s\n"fd_num(match!cellwith|None->"<free>"|Some(I_event_)->"I_event"|Some(I_pipe_)->"I_pipe"|Some(I_pipe_server_)->"I_pipe_server"|Some(I_process_)->"I_process"|Some(I_input_thread_)->"I_input_thread"|Some(I_output_thread_)->"I_output_thread"))entries)proxies';Buffer.contentsb)letregister_proxyfdi_obj=letfd_num=int64_of_file_descrfdin(* Note that it is possible that we register several i_obj for the same
fd_num. This can happen when fd is first closed, and then collected
by the GC. So after the close the OS can reuse the fd_num for something
else, but the old fd_num is still in the table.
*)Netsys_oothr.serializeproxies_mutex(fun()->if(!proxies_gc_flag&&2*!proxies_unreg_count>H.length!proxies)then((* do a GC pass *)gc_proxy());letcell=ref(Somei_obj)inletfd_weak=mk_weakfdinletl=tryH.find!proxiesfd_numwithNot_found->[]inH.replace!proxiesfd_num((fd_weak,cell)::l);Gc.finalise(finalise_proxycell)fd)()letunregisterfd=(* called from user code *)letfd_num=int64_of_file_descrfdinNetsys_oothr.serializeproxies_mutex(fun()->letl=tryH.find!proxiesfd_numwithNot_found->[]inlet l'=List.filter(fun(fd'_weak,cell)->matchget_weakfd'_weakwith|None->false|Somefd'->!cell<>None&&fd!=fd'(* phys. cmp! *))linH.replace!proxiesfd_numl')()let_=Gc.create_alarm(fun()->proxies_gc_flag :=true)letlookupfd=let fd_num=int64_of_file_descrfdinNetsys_oothr.serializeproxies_mutex(fun()->letl=H.find!proxiesfd_numinlet(_,cell_opt)=List.find(fun(fd'_weak,cell)->matchget_weakfd'_weakwith|None->false|Somefd'->!cell<>None&&fd==fd'(* phys. cmp! *))linmatch!cell_optwith|None->assertfalse|Somei_obj->(matchi_objwith|I_eventi_ev->W32_event(i_ev,fd)|I_pipei_pipe->W32_pipe(i_pipe,fd)|I_pipe_serveri_psrv->W32_pipe_server(i_psrv,fd)|I_processi_proc->W32_process(i_proc,fd)|I_input_thread(i_thr,keep_alive)->W32_input_thread(i_thr,fd,keep_alive)|I_output_thread(o_thr,keep_alive)->W32_output_thread(o_thr,fd,keep_alive)))()letlookup_pipe fd=trymatchlookupfdwith|W32_pipeph->ph|_->raiseNot_foundwith Not_found->failwith"Netsys_win32.lookup_pipe: not found"letlookup_pipe_serverfd=trymatchlookupfdwith|W32_pipe_serverph->ph|_->raiseNot_foundwith Not_found->failwith"Netsys_win32.lookup_pipe_server: not found"letlookup_eventfd=trymatchlookupfdwith|W32_evente->e|_->raiseNot_foundwithNot_found->failwith"Netsys_win32.lookup_event: not found"letlookup_processfd=trymatchlookupfdwith|W32_processe->e|_->raiseNot_foundwithNot_found->failwith"Netsys_win32.lookup_process: not found"letlookup_input_threadfd=trymatchlookupfdwith|W32_input_threade->e|_->raiseNot_foundwithNot_found->failwith"Netsys_win32.lookup_input_thread: not found"letlookup_output_threadfd=trymatchlookupfdwith|W32_output_threade->e|_->raiseNot_foundwithNot_found->failwith"Netsys_win32.lookup_output_thread: not found"externalget_active_code_page:unit->int="netsys_getacp"external netsys_real_select :Unix.file_descrlist->Unix.file_descr list ->Unix.file_descr list ->float->(Unix.file_descrlist*Unix.file_descrlist*Unix.file_descrlist)="netsys_real_select"letreal_select=netsys_real_selectexternalnetsys_test_close_on_exec:Unix.file_descr->bool="netsys_test_close_on_exec"lettest_close_on_exec=netsys_test_close_on_execexternalnetsys_modify_close_on_exec:Unix.file_descr->bool->unit="netsys_modify_close_on_exec"letmodify_close_on_exec=netsys_modify_close_on_execexternalnetsys_is_crt_fd:Unix.file_descr->int->bool="netsys_is_crt_fd"letis_crt_fd=netsys_is_crt_fdexternalnetsys_create_event:unit->c_event="netsys_create_event"externalnetsys_event_descr:c_event->Unix.file_descr="netsys_event_descr"externalnetsys_close_event:c_event->unit="netsys_close_event"externalnetsys_set_auto_close_event_proxy:c_event->bool->unit="netsys_set_auto_close_event_proxy"letdecorate_evente=lete_proxy=netsys_event_descr einletev=(e,e_proxy)inGc.finalisenetsys_close_evente;register_proxy e_proxy(I_evente);evletcreate_event()=letev=decorate_event(netsys_create_event())indlogr(fun()->sprintf"create_event: descr=%Ld"(int64_of_file_descr(sndev)));evletevent_descr(e,e_proxy)=netsys_set_auto_close_event_proxyefalse;e_proxyexternalnetsys_set_event:c_event->unit="netsys_set_event"externalnetsys_reset_event:c_event->unit="netsys_reset_event"externalnetsys_test_event:c_event->bool="netsys_test_event"externalnetsys_event_wait:c_event->int->bool="netsys_event_wait"letset_event (e,e_proxy)=dlogr(fun()->sprintf "set_event: descr=%Ld"(int64_of_file_descr e_proxy));netsys_set_eventeletreset_event (e,e_proxy)=dlogr(fun()->sprintf "reset_event: descr=%Ld"(int64_of_file_descre_proxy));netsys_reset_eventelettest_event (e,_)=netsys_test_eventeletevent_wait(e,e_proxy)tmo=dlogr(fun()->sprintf "event_wait: descr=%Ld tmo=%f"(int64_of_file_descre_proxy)tmo);letflag=Netsys_impl_util.slice_time_ms(funtmo_ms->ifnetsys_event_wait etmo_ms thenSome()elseNone)tmo<>Noneindlogr(fun()-> sprintf"event_wait: descr=%Ld returning %b"(int64_of_file_descre_proxy)flag);flagexternalnetsys_wsa_event_select:c_event->Unix.file_descr->Netsys_posix.poll_req_events->unit="netsys_wsa_event_select"externalwsa_maximum_wait_events :unit->int="netsys_wsa_maximum_wait_events"externalnetsys_wsa_wait_for_multiple_events :c_eventarray->int->intoption="netsys_wsa_wait_for_multiple_events"externalnetsys_wsa_enum_network_events :Unix.file_descr->c_event->Netsys_posix.poll_act_events="netsys_wsa_enum_network_events"letwsa_event_select(e,e_proxy)fdpie=dlogr(fun()->sprintf"wsa_event_select: evdescr=%Ld sockdescr=%Ld bits=%d"(int64_of_file_descre_proxy)(int64_of_file_descrfd)(Netsys_posix.int_of_req_eventspie));netsys_wsa_event_selectefdpieletwsa_wait_for_multiple_eventsean=dlogr(fun()->sprintf"wsa_wait_for_multiple_events: descrs=%s tmo=%d"(String.concat","(Array.to_list(Array.map(fun(_,e_proxy)->Int64.to_string(int64_of_file_descre_proxy))ea)))n);let r=netsys_wsa_wait_for_multiple_events(Array.mapfstea)nindlogr(fun()->sprintf"wsa_wait_for_multiple_events: returning %s"(matchrwith|None->"None"|Somek->lete_proxy=snd(ea.(k))insprintf"Some %d (descr %Ld)"k(int64_of_file_descre_proxy)));rletwsa_enum_network_eventsfd(e,e_proxy)=letr=netsys_wsa_enum_network_eventsfdeindlogr(fun()->sprintf"wsa_enum_network_events: sockdescr=%Ld evdescr=%Ld bits=%d"(int64_of_file_descrfd)(int64_of_file_descre_proxy)(Netsys_posix.int_of_act_eventsr));rexternalnetsys_pipe_free :c_pipe_helper->unit="netsys_pipe_free"externalnetsys_create_local_named_pipe:string->pipe_mode->int->c_event->bool->c_pipe_helper="netsys_create_local_named_pipe"externalnetsys_pipe_listen:c_pipe_helper->unit="netsys_pipe_listen"externalnetsys_pipe_deafen:c_pipe_helper->unit="netsys_pipe_deafen"externalnetsys_pipe_connect:string->pipe_mode->c_pipe_helper="netsys_pipe_connect"externalnetsys_pipe_read:c_pipe_helper ->Bytes.t->int->int->int="netsys_pipe_read"external netsys_pipe_write:c_pipe_helper->Bytes.t->int->int->int="netsys_pipe_write"externalnetsys_pipe_shutdown:c_pipe_helper->unit="netsys_pipe_shutdown"externalnetsys_pipe_rd_event:c_pipe_helper->c_event="netsys_pipe_rd_event"externalnetsys_pipe_wr_event:c_pipe_helper->c_event="netsys_pipe_wr_event"externalnetsys_pipe_descr:c_pipe_helper->Unix.file_descr="netsys_pipe_descr"external netsys_pipe_conn_state:c_pipe_helper->pipe_conn_state="netsys_pipe_conn_state"external netsys_pipe_signal:c_pipe_helper->c_event->unit="netsys_pipe_signal"externalnetsys_set_auto_close_pipe_proxy:c_pipe_helper->bool->unit="netsys_set_auto_close_pipe_proxy"letrev_mode=function|Pipe_in->Pipe_out|Pipe_out->Pipe_in|Pipe_duplex->Pipe_duplexletcreate_local_pipe_servernamemoden=letcn_event=create_event()inletp_event=netsys_create_event()inletproxy=netsys_event_descrp_eventinletpsrv={psrv_name=name;psrv_mode=mode;psrv_max =n;psrv_first=true;psrv_queue=[];psrv_listen=0;psrv_ready=Queue.create();psrv_cn_event=cn_event;psrv_proxy_handle=p_event;psrv_mutex=!Netsys_oothr.provider#create_mutex();}inGc.finalisenetsys_close_eventp_event;register_proxyproxy(I_pipe_serverpsrv);dlogr(fun()->sprintf"create_local_pipe_server: \
name=%s proxydescr=%Ld cnevdescr=%Ld"name(int64_of_file_descrproxy)(int64_of_file_descr(sndcn_event)));(psrv,proxy)letdecorate_pipe_nogc phname mode=letfd=netsys_pipe_descr ph inletpipe={pipe_name=name;pipe_mode=mode;pipe_helper =ph;(* pipe_signal = None; *)pipe_rd_event=decorate_event(netsys_pipe_rd_eventph);pipe_wr_event=decorate_event(netsys_pipe_wr_eventph);}inregister_proxyfd(I_pipepipe);(pipe,fd)letdecorate_pipephnamemode=Gc.finalisenetsys_pipe_freeph;decorate_pipe_nogc phnamemodeletprefix="\\\\.\\pipe\\"letprefix_len=String.lengthprefixletpipe_connect namemode =(* Check that namestarts with the right prefix, to prevent security
vulnerabilities:
*)ifString.lengthname<prefix_len||(String.subname0prefix_len<>prefix)thenraise(Unix.Unix_error(Unix.EPERM,"pipe_connect",name));dlogr(fun()->sprintf"pipe_connect: name=%s"name);letpipe=decorate_pipe(netsys_pipe_connect namemode)namemodeindlogr(fun()->sprintf"pipe_connect: name=%s returning %Ld"name(int64_of_file_descr(sndpipe)));pipeletpipe_server_descr(psrv,psrv_proxy)=netsys_set_auto_close_event_proxy psrv.psrv_proxy_handlefalse;psrv_proxyletpipe_descr(pipe,pipe_proxy)=netsys_set_auto_close_pipe_proxy pipe.pipe_helperfalse;pipe_proxyletpipe_server_endpointpsrv=letph=netsys_create_local_named_pipepsrv.psrv_namepsrv.psrv_modepsrv.psrv_max(fstpsrv.psrv_cn_event)psrv.psrv_firstinGc.finalisenetsys_pipe_freeph;netsys_pipe_listen ph;psrv.psrv_first<-false;phletpipe_listen_lckpsrvn=ifpsrv.psrv_listen<nthen(letd=n-psrv.psrv_listeninfork=1toddoletph=pipe_server_endpointpsrvinpsrv.psrv_queue<-ph:: psrv.psrv_queuedone);(* else: we do nothing. You may consider this as a bug, but it is simply
too risky to shut down server pipes because of race conditions
*)psrv.psrv_listen<-nletpipe_listen(psrv,psrv_proxy)n=dlogr(fun()->sprintf"pipe_listen: name=%s proxydescr=%Ld n=%d"psrv.psrv_name(int64_of_file_descrpsrv_proxy)n);Netsys_oothr.serializepsrv.psrv_mutex(fun()->pipe_listen_lckpsrvn)()letcheck_for_connectionspsrv=letrecfind_deletel=matchlwith|[]->[]|ph::l'->lets=netsys_pipe_conn_statephinifs=Pipe_connectedthen(Queue.pushphpsrv.psrv_ready;find_deletel')elseph::find_delete l'inletqueue'=find_deletepsrv.psrv_queueinletold_listen =psrv.psrv_listeninpsrv.psrv_listen <-List.lengthqueue';psrv.psrv_queue<-queue';pipe_listen_lckpsrvold_listen(* In rare casesit may happen that cn_event is reset for a short
period of time, and then set again.
*)letempty_buf=Bytes.create0letrecpipe_accept_1psrv=matchQueue.lengthpsrv.psrv_readywith|0->ignore(event_waitpsrv.psrv_cn_event(-1.0));reset_eventpsrv.psrv_cn_event;check_for_connectionspsrv;ifnot(Queue.is_emptypsrv.psrv_ready)thenset_eventpsrv.psrv_cn_event;pipe_accept_1psrv|1->letph=Queue.takepsrv.psrv_readyinreset_eventpsrv.psrv_cn_event;check_for_connectionspsrv;ifnot(Queue.is_emptypsrv.psrv_ready)thenset_eventpsrv.psrv_cn_event;ignore(netsys_pipe_readphempty_buf00);(* check for errors *)decorate_pipe_nogcphpsrv.psrv_namepsrv.psrv_mode|_->letph=Queue.takepsrv.psrv_readyinignore(netsys_pipe_readphempty_buf00);(* check for errors *)decorate_pipe_nogcphpsrv.psrv_namepsrv.psrv_modeletpipe_accept(psrv,psrv_proxy)=dlogr(fun ()-> sprintf"pipe_accept: name=%s proxydescr=%Ld" psrv.psrv_name(int64_of_file_descrpsrv_proxy));letpipe=Netsys_oothr.serializepsrv.psrv_mutex(fun()->pipe_accept_1psrv)()indlogr(fun()->sprintf"pipe_accept: name=%s proxydescr=%Ld returning %Ld"psrv.psrv_name(int64_of_file_descrpsrv_proxy)(int64_of_file_descr(sndpipe)));pipeletpipe_rd_event(pipe,_)=pipe.pipe_rd_eventletpipe_wr_event(pipe,_)=pipe.pipe_wr_eventletpipe_connect_event(psrv,_)=psrv.psrv_cn_eventletpipe_read(pipe,pipe_proxy)sposlen=ifpos<0||len<0||pos>Bytes.length s-lentheninvalid_arg"Netsys_win32.pipe_read";dlogr(fun()->sprintf"pipe_read: name=%s proxydescr=%Ld len=%d"pipe.pipe_name(int64_of_file_descrpipe_proxy)len);tryletn=netsys_pipe_read pipe.pipe_helpersposlenindlogr (fun()->sprintf"pipe_read: name=%s proxydescr=%Ld returning %d"pipe.pipe_name(int64_of_file_descrpipe_proxy)n);nwith|errorwhen!Debug.enable->dlogr(fun()->sprintf"pipe_read: name=%s proxydescr=%Ld exception %s"pipe.pipe_name(int64_of_file_descrpipe_proxy)(Netexn.to_stringerror));raiseerrorletpipe_write(pipe,pipe_proxy)sposlen=ifpos<0||len<0||pos>Bytes.length s-lentheninvalid_arg"Netsys_win32.pipe_write";dlogr(fun()->sprintf"pipe_write: name=%s proxydescr=%Ld len=%d"pipe.pipe_name(int64_of_file_descrpipe_proxy)len);tryletn=netsys_pipe_write pipe.pipe_helpersposlenindlogr (fun()->sprintf"pipe_write: name=%s proxydescr=%Ld returning %d"pipe.pipe_name(int64_of_file_descrpipe_proxy)n);nwith|errorwhen!Debug.enable->dlogr(fun()->sprintf"pipe_write: name=%s proxydescr=%Ld exception %s"pipe.pipe_name(int64_of_file_descrpipe_proxy)(Netexn.to_stringerror));raiseerrorlet pipe_write_string (pipe,pipe_proxy)sposlen=pipe_write(pipe,pipe_proxy)(Bytes.unsafe_of_strings)poslenletpipe_shutdown(pipe,pipe_proxy)=dlogr(fun ()->sprintf"pipe_shutdown: name=%s proxydescr=%Ld" pipe.pipe_name(int64_of_file_descrpipe_proxy));netsys_pipe_shutdownpipe.pipe_helperletpipe_shutdown_server(psrv,psrv_proxy)=dlogr(fun()->sprintf"pipe_shutdown_server: name=%s proxydescr=%Ld" psrv.psrv_name(int64_of_file_descrpsrv_proxy));Netsys_oothr.serializepsrv.psrv_mutex(fun()->List.iter(funph->netsys_pipe_shutdownph)psrv.psrv_queue;psrv.psrv_queue<-[];psrv.psrv_listen<-0)()letpipe_wait_rd(pipe,pipe_proxy)tmo=dlogr(fun()->sprintf "pipe_wait_rd: name=%s proxydescr=%Ld"pipe.pipe_name(int64_of_file_descrpipe_proxy));event_waitpipe.pipe_rd_eventtmoletpipe_wait_wr(pipe,pipe_proxy)tmo=dlogr(fun()->sprintf "pipe_wait_wr: name=%s proxydescr=%Ld"pipe.pipe_name(int64_of_file_descrpipe_proxy));event_waitpipe.pipe_wr_eventtmoletpipe_wait_connect(psrv,psrv_proxy)tmo=dlogr(fun()->sprintf "pipe_wait_connect: name=%s proxydescr=%Ld"psrv.psrv_name(int64_of_file_descrpsrv_proxy));event_waitpsrv.psrv_cn_eventtmoletpipe_name(pipe,_)=pipe.pipe_nameletpipe_server_name(psrv,_)=psrv.psrv_nameletpipe_mode(pipe,_)=pipe.pipe_modeletpipe_server_mode(psrv,_)=psrv.psrv_modeletcounter=ref0letcounter_mutex=!Netsys_oothr.provider#create_mutex()letunpredictable_pipe_name()=letn=(counter_mutex #lock();letn=!counterinincrcounter;counter_mutex#unlock();n)inletrandom=Bytes.make16' 'infill_randomrandom;letname="\\\\.\\pipe\\ocamlnet"^string_of_int(Unix.getpid())^"_"^string_of_intn^"_"^Digest.to_hex(Bytes.to_stringrandom)innameletpipe_pairmode=(* FIXME: If somebody guesses the pipe name (which is hard),
it is possible to connect from the outside to lph. We detect
this problem, and give up on the pipe pair, but external code can
make our programs unreliable.
*)dlog"pipe_pair";letmode'=matchmodewith|Pipe_in ->Pipe_out|Pipe_out->Pipe_in|Pipe_duplex->Pipe_duplexinletname=unpredictable_pipe_name()inletpsrv=create_local_pipe_servernamemode1inpipe_listenpsrv1;letrph=pipe_connectnamemode'in(trypipe_listenpsrv0;letlph=pipe_acceptpsrvin(trylets=Bytes.create0inignore(pipe_writelphs00);dlogr(fun()->sprintf"pipe_pair: returning \
name=%s proxydescr1=%Ld proxydescr2=%Ld"name(int64_of_file_descr(sndlph))(int64_of_file_descr(sndrph)));(lph,rph)with e->pipe_shutdownlph;raisee)withe->pipe_shutdown rph;raisee)externalnetsys_create_process:string->string->create_process_optionlist->c_process="netsys_create_process"externalnetsys_close_process:c_process->unit="netsys_close_process"externalnetsys_get_process_status:c_process->int="netsys_get_process_status"externalnetsys_as_process_event:c_process->c_event="netsys_as_process_event"externalnetsys_emulated_pid:c_process->int="netsys_emulated_pid"externalnetsys_win_pid:c_process->int="netsys_win_pid"externalnetsys_process_free:c_process->unit="netsys_process_free"externalnetsys_process_descr:c_process->Unix.file_descr="netsys_process_descr"externalnetsys_set_auto_close_process_proxy:c_process->bool->unit="netsys_set_auto_close_process_proxy"externalnetsys_terminate_process:c_process->unit="netsys_terminate_process"letclose_process(c_proc,_)=netsys_process_freec_procletget_process_status(c_proc,_)=tryletcode=netsys_get_process_statusc_procinSome(Unix.WEXITEDcode)with|Not_found->Noneletdefault_opts=[CP_inherit_or_create_console;CP_ansi_environment;CP_inherit_process_group]letcreate_processcmdcmdlineopts=letopts=(* prepend defaults: *)default_opts @optsinletc_proc=netsys_create_processcmdcmdlineoptsinletproxy=netsys_process_descr c_procinregister_proxyproxy(I_process c_proc);Gc.finalisenetsys_process_freec_proc;ignore(get_process_status(c_proc,proxy));(* The new process seems to remain suspended until the caller waits
for the process handle. So we do this here.
*)(c_proc,proxy)letterminate_process(c_proc,_)=netsys_terminate_processc_procletas_process_event(c_proc,_)=letev=netsys_as_process_eventc_procindecorate_eventevletemulated_pid(c_proc,_)=netsys_emulated_pidc_procletwin_pid(c_proc,_)=netsys_win_pid c_proclet process_descr(c_proc,fd)=netsys_set_auto_close_process_proxyc_procfalse;fdletcp_set_envenv=CP_set_env(String.concat"\000"(Array.to_listenv)^"\000")(* another null byte is implicitly added by the ocaml runtime! *)externalsearch_path:stringoption->string->stringoption ->string="netsys_search_path"typew32_console_attr={mutablecursor_x:int;mutablecursor_y:int;mutablecursor_size:int;mutablecursor_visible:bool;mutabletext_attr:int;}typew32_console_info={mutablewidth:int;mutableheight:int;}typew32_console_mode={mutableenable_echo_input:bool;mutableenable_insert_mode:bool;mutableenable_line_input:bool;mutableenable_processed_input :bool;mutableenable_quick_edit_mode :bool;mutableenable_processed_output :bool;mutableenable_wrap_at_eol_output:bool;}externalhas_console:unit ->bool="netsys_has_console"externalis_console:Unix.file_descr->bool="netsys_is_console"externalalloc_console:unit->unit="netsys_alloc_console"letget_console_input()=ifnot(has_console())thenalloc_console();Unix.openfile"CONIN$"[Unix.O_RDWR]0(* O_RDONLY is insufficient for certain console ops *)letget_console_output()=ifnot(has_console())thenalloc_console();Unix.openfile"CONOUT$"[Unix.O_RDWR]0(* O_WRONLY is insufficient for certain console ops *)externalget_console_attr:unit->w32_console_attr="netsys_get_console_attr"externalset_console_attr:w32_console_attr ->unit="netsys_set_console_attr"externalget_console_info:unit ->w32_console_info="netsys_get_console_info"letfg_blue=1letfg_green=2letfg_red=4letfg_intensity=8letbg_blue=16letbg_green =32letbg_red=64letbg_intensity =128externalget_console_mode:unit->w32_console_mode="netsys_get_console_mode"externalset_console_mode:w32_console_mode ->unit="netsys_set_console_mode"externalinit_console_codepage:unit->unit="netsys_init_console_codepage"typeclear_mode=|EOL|EOS|Allexternalclear_console:clear_mode ->unit="netsys_clear_console"letclear_until_end_of_line()=clear_consoleEOLletclear_until_end_of_screen()=clear_consoleEOSletclear_console()=clear_consoleAllexternalget_current_thread_id:unit->int32="netsys_get_current_thread_id"externalcancel_synchronous_io:int32->unit="netsys_cancel_synchronous_io"(* Only implemented on Vista (and newer). *)moduleInputThread=structletrecthread_body(ithr:i_input_thread)=(* Check for new commands: *)dlogr(fun()->sprintf"input_thread_body: descr=%Ld waiting"(int64_of_file_descrithr.ithr_descr));ithr.ithr_cmd_mutex#lock();while ithr.ithr_cmd=None&¬ithr.ithr_cancel_cmddoithr.ithr_cmd_cond #waitithr.ithr_cmd_mutexdone;letnext_cmd=ifithr.ithr_cancel_cmdthen`Cancelelsematchithr.ithr_cmdwith|None->assertfalse|Somec->ithr.ithr_cmd<-None;cinletcontinue=matchnext_cmdwith|`Cancel->dlogr(fun ()->sprintf"input_thread_body: descr=%Ld got `Cancel"(int64_of_file_descrithr.ithr_descr));ithr.ithr_buffer_cond<-`Cancelled;false|`Read->dlogr(fun()->sprintf"input_thread_body: descr=%Ld got `Read"(int64_of_file_descrithr.ithr_descr));(tryletn=Unix.readithr.ithr_descrithr.ithr_buffer0(Bytes.lengthithr.ithr_buffer)inifn=0then(ithr.ithr_buffer_cond<-`EOF;ithr.ithr_buffer_start<-0;ithr.ithr_buffer_len<-0;false)else(ithr.ithr_buffer_cond<-`Data;ithr.ithr_buffer_start<-0;ithr.ithr_buffer_len<-n;true)with|Unix.Unix_error(Unix.EPIPE,_,_)->(* same as EOF *)ithr.ithr_buffer_cond<-`EOF;ithr.ithr_buffer_start<-0;ithr.ithr_buffer_len<-0;false|error->ithr.ithr_buffer_cond<-`Exceptionerror;ithr.ithr_buffer_start<-0;ithr.ithr_buffer_len<-0;false)indlogr(fun()->sprintf"input_thread_body: descr=%Ld unblocking"(int64_of_file_descrithr.ithr_descr));set_eventithr.ithr_event;ithr.ithr_cmd_mutex#unlock();if continuethenthread_bodyithrelse((* clean-up: *)dlogr(fun()->sprintf"input_thread_body: descr=%Ld terminating"(int64_of_file_descrithr.ithr_descr));Unix.closeithr.ithr_descr;ithr.ithr_running <-false)leti_cancel_input_threadithr=dlogr(fun()->sprintf "cancel_input_thread: descr=%Ld"(int64_of_file_descrithr.ithr_descr));ithr.ithr_cancel_cmd<-true;(* don't mess with locks here *)ithr.ithr_cmd_cond#signal();(* This is clearly a race condition... The thread may terminate
right now, and cancel_io_thread is called with an invalid thread
ID.
*)ifithr.ithr_runningthen(trycancel_synchronous_ioithr.ithr_threadwith_->())letf_cancel_input_threadithr_=i_cancel_input_thread ithrletcancel_input_thread(ithr,_,_)=i_cancel_input_threadithrletcreate_input_threadfd=letoothr=!Netsys_oothr.providerinletinit_cond=oothr#create_condition()inletinit_mutex=oothr#create_mutex()inlet p_event=netsys_create_event()inletproxy=netsys_event_descrp_eventinletithr={ithr_descr =fd;ithr_cmd_cond=oothr#create_condition();ithr_cmd_mutex=oothr#create_mutex();ithr_cmd=Some`Read;ithr_cancel_cmd=false;ithr_event=create_event();ithr_buffer=Bytes.create4096;ithr_buffer_start=0;ithr_buffer_len=0;ithr_buffer_cond=`Data;ithr_thread=0l;(* initialized below *)ithr_read_mutex=oothr#create_mutex();ithr_running=true;ithr_proxy_handle=p_event;}inlet_=oothr #create_thread(fun()->ithr.ithr_thread<-get_current_thread_id();init_cond#signal();thread_bodyithr)()ininit_cond#waitinit_mutex;letf=f_cancel_input_thread ithrinletkeep_alive=(objectend)inGc.finalise fkeep_alive;Gc.finalisenetsys_close_event p_event;register_proxyproxy(I_input_thread(ithr,keep_alive));(ithr,proxy,keep_alive)letinput_thread_read(ithr,_,_)sposlen=ifpos<0||len<0||pos>Bytes.length s-lentheninvalid_arg"Netsys_win32.input_thread_read";iflen=0then0else(Netsys_oothr.serializeithr.ithr_read_mutex(* only one reader at a time *)(fun()->letb=test_eventithr.ithr_eventinifbthen(ithr.ithr_cmd_mutex#lock();(* Look at what we have: *)matchithr.ithr_buffer_condwith|`EOF ->ithr.ithr_cmd_mutex#unlock();0|`Exceptione->ithr.ithr_cmd_mutex#unlock();raise e|`Cancelled->ithr.ithr_cmd_mutex#unlock();raise(Unix.Unix_error(Unix.EPERM,"Netsys_win32.input_thread_read",""))|`Data->letn=minlenithr.ithr_buffer_leninBytes.blitithr.ithr_bufferithr.ithr_buffer_startsposn;ithr.ithr_buffer_start<-ithr.ithr_buffer_start+n;ithr.ithr_buffer_len<-ithr.ithr_buffer_len-n;ifithr.ithr_buffer_len=0then(ithr.ithr_cmd<-Some`Read;ithr.ithr_cmd_cond#signal();reset_event ithr.ithr_event;);ithr.ithr_cmd_mutex#unlock();n)elseraise(Unix.Unix_error(Unix.EAGAIN,"Netsys_win32.input_thread_read","")))())letinput_thread_event(ithr,_,_)=ithr.ithr_eventlet input_thread_proxy_descr(ithr,proxy,_)=netsys_set_auto_close_event_proxyithr.ithr_proxy_handlefalse;proxyendletcreate_input_thread=InputThread.create_input_threadletinput_thread_event=InputThread.input_thread_eventletinput_thread_read=InputThread.input_thread_readletcancel_input_thread=InputThread.cancel_input_threadletinput_thread_proxy_descr=InputThread.input_thread_proxy_descrletinput_thread_descr(ithr,_,_)=ithr.ithr_descrmoduleOutputThread=structletrecwrite_stringothrposlen=iflen=0||othr.othr_cancel_cmdthen()elseletn=Unix.single_writeothr.othr_descrothr.othr_bufferposleninwrite_string othr(pos+n)(len-n)letrecthread_body(othr:i_output_thread)=(* Check for new commands: *)dlogr(fun()->sprintf"output_thread_body: descr=%Ld waiting"(int64_of_file_descrothr.othr_descr));othr.othr_cmd_mutex#lock();while othr.othr_cmd=None&¬othr.othr_cancel_cmddoothr.othr_cmd_cond #waitothr.othr_cmd_mutexdone;letnext_cmd=ifothr.othr_cancel_cmdthen`Cancelelsematchothr.othr_cmdwith|None->assertfalse|Somec->othr.othr_cmd<-None;cinletcontinue=matchnext_cmdwith|`Cancel->dlogr(fun ()->sprintf"output_thread_body: descr=%Ld got `Cancel"(int64_of_file_descrothr.othr_descr));othr.othr_buffer_len<-0;othr.othr_write_cond<-Some`Cancelled;false|`Close->dlogr(fun()->sprintf"output_thread_body: descr=%Ld got `Close"(int64_of_file_descrothr.othr_descr));othr.othr_write_cond<-Some`Cancelled;false|`Write->dlogr(fun()->sprintf"output_thread_body: descr=%Ld got `Write"(int64_of_file_descrothr.othr_descr));(trywrite_stringothr0othr.othr_buffer_len;othr.othr_buffer_len<-0;truewith|error->othr.othr_write_cond<-Some(`Exceptionerror);false)indlogr(fun()->sprintf"output_thread_body: descr=%Ld unblocking"(int64_of_file_descrothr.othr_descr));set_eventothr.othr_event;othr.othr_cmd_mutex#unlock();if continuethenthread_bodyothrelse((* clean-up: *)dlogr(fun()->sprintf"output_thread_body: descr=%Ld terminating"(int64_of_file_descrothr.othr_descr));Unix.closeothr.othr_descr;othr.othr_running <-false)leti_cancel_output_threadothr=dlogr(fun()->sprintf "cancel_output_thread: descr=%Ld"(int64_of_file_descrothr.othr_descr));othr.othr_cancel_cmd<-true;(* don't mess with locks here *)othr.othr_cmd_cond#signal();(* This is clearly a race condition... The thread may terminate
right now, and cancel_io_thread is called with an invalid thread
ID.
*)ifothr.othr_runningthen(trycancel_synchronous_ioothr.othr_threadwith_->())letf_cancel_output_threadothr_=i_cancel_output_thread othrletcancel_output_thread(othr,_,_)=i_cancel_output_thread othrletcreate_output_threadfd=letoothr=!Netsys_oothr.providerinletinit_cond=oothr#create_condition()inletinit_mutex=oothr#create_mutex()inlet p_event=netsys_create_event()inletproxy=netsys_event_descrp_eventinletothr={othr_descr =fd;othr_cmd_cond=oothr#create_condition();othr_cmd_mutex=oothr#create_mutex();othr_cmd=None;othr_cancel_cmd=false;othr_event=create_event();othr_buffer=Bytes.create4096;othr_buffer_len=0;othr_write_cond=None;othr_thread=0l;(* initialized below *)othr_write_mutex=oothr#create_mutex();othr_running=true;othr_proxy_handle=p_event;}inset_eventothr.othr_event;let_=oothr #create_thread(fun()->othr.othr_thread<-get_current_thread_id();init_cond#signal();thread_bodyothr)()ininit_cond#waitinit_mutex;letf=f_cancel_output_thread othrinletkeep_alive=(objectend)inGc.finalise fkeep_alive;Gc.finalisenetsys_close_event p_event;register_proxyproxy(I_output_thread(othr,keep_alive));(othr,proxy,keep_alive)letoutput_thread_write(othr,_,_)sposlen=ifpos <0||len<0||pos>Bytes.length s-lentheninvalid_arg"Netsys_win32.output_thread_write";iflen=0then0else(Netsys_oothr.serializeothr.othr_write_mutex(* only one writer at a time *)(fun()->letb=test_eventothr.othr_eventinifbthen(othr.othr_cmd_mutex#lock();(* Look at what we have: *)matchothr.othr_write_condwith|Some(`Exceptione)->othr.othr_cmd_mutex#unlock();raisee|Some`Cancelled->othr.othr_cmd_mutex#unlock();raise(Unix.Unix_error(Unix.EPERM,"Netsys_win32.output_thread_write",""))|None->assert(othr.othr_buffer_len=0);letn=minlen(Bytes.lengthothr.othr_buffer)inBytes.blitsposothr.othr_buffer0n;othr.othr_buffer_len<-n;othr.othr_cmd<-Some`Write;othr.othr_cmd_cond#signal();reset_event othr.othr_event;othr.othr_cmd_mutex#unlock();n)elseraise(Unix.Unix_error(Unix.EAGAIN,"Netsys_win32.output_thread_write","")))())letclose_output_thread(othr,_,_)=Netsys_oothr.serializeothr.othr_write_mutex(* only one writer at a time *)(fun()->letb=test_eventothr.othr_eventinifbthen(othr.othr_cmd_mutex#lock();(* Look at what we have: *)matchothr.othr_write_condwith|Some(`Exceptione)->othr.othr_cmd_mutex#unlock();raise e|Some`Cancelled->othr.othr_cmd_mutex#unlock();raise(Unix.Unix_error(Unix.EPERM,"Netsys_win32.close_output_thread",""))|None->assert(othr.othr_buffer_len=0);othr.othr_cmd<-Some`Close;othr.othr_cmd_cond#signal();reset_event othr.othr_event;othr.othr_cmd_mutex#unlock();)elseraise(Unix.Unix_error(Unix.EAGAIN,"Netsys_win32.close_output_thread","")))()letoutput_thread_event(othr,_,_)=othr.othr_eventlet output_thread_proxy_descr(othr,proxy,_)=netsys_set_auto_close_event_proxy othr.othr_proxy_handlefalse;proxyendletcreate_output_thread=OutputThread.create_output_threadletoutput_thread_event=OutputThread.output_thread_eventletoutput_thread_write=OutputThread.output_thread_writeletcancel_output_thread=OutputThread.cancel_output_threadletclose_output_thread=OutputThread.close_output_threadletoutput_thread_proxy_descr=OutputThread.output_thread_proxy_descrletoutput_thread_descr(othr,_,_)=othr.othr_descrlet output_thread_write_stringoutsposlen=output_thread_writeout(Bytes.unsafe_of_strings)poslen