123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112(*
* Copyright (C) 2006-2009 Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)moduleM=MutexmoduleMutex=struct(** execute the function f with the mutex hold *)letexecutelockf=Mutex.locklock;Xapi_stdext_pervasives.Pervasiveext.finallyf(fun()->Mutex.unlocklock)end(** Parallel List.iter. Remembers all exceptions and returns an association list mapping input x to an exception.
Applications of x which succeed will be missing from the returned list. *)letthread_iter_all_exnsfxs=letexns=ref[]inletm=M.create()inList.iterThread.join(List.map(funx->Thread.create(fun()->tryfxwithe->Mutex.executem(fun()->exns:=(x,e)::!exns))())xs);!exns(** Parallel List.iter. Remembers one exception (at random) and throws it in the
error case. *)letthread_iterfxs=matchthread_iter_all_exnsfxswith|[]->()|(_,e)::_->raiseemoduleDelay=struct(* Concrete type is the ends of a pipe *)typet={(* A pipe is used to wake up a thread blocked in wait: *)mutablepipe_out:Unix.file_descroption;mutablepipe_in:Unix.file_descroption;(* Indicates that a signal arrived before a wait: *)mutablesignalled:bool;m:M.t}letmake()={pipe_out=None;pipe_in=None;signalled=false;m=M.create()}exceptionPre_signalledletwait(x:t)(seconds:float)=letfinally=Xapi_stdext_pervasives.Pervasiveext.finallyinletto_close=ref[]inletclose'fd=ifList.memfd!to_closethenUnix.closefd;to_close:=List.filter(funx->fd<>x)!to_closeinfinally(fun()->tryletpipe_out=Mutex.executex.m(fun()->ifx.signalledthenbeginx.signalled<-false;raisePre_signalled;end;letpipe_out,pipe_in=Unix.pipe()in(* these will be unconditionally closed on exit *)to_close:=[pipe_out;pipe_in];x.pipe_out<-Somepipe_out;x.pipe_in<-Somepipe_in;x.signalled<-false;pipe_out)inletr,_,_=Unix.select[pipe_out][][]secondsin(* flush the single byte from the pipe *)ifr<>[]thenignore(Unix.readpipe_out(Bytes.create1)01);(* return true if we waited the full length of time, false if we were woken *)r=[]withPre_signalled->false)(fun()->Mutex.executex.m(fun()->x.pipe_out<-None;x.pipe_in<-None;List.iterclose'!to_close))letsignal(x:t)=Mutex.executex.m(fun()->matchx.pipe_inwith|Somefd->ignore(Unix.writefd(Bytes.of_string"X")01)|None->x.signalled<-true(* If the wait hasn't happened yet then store up the signal *))end