123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528(*
* Copyright (C) 2011-2013 Citrix Inc
* Copyright (C) 2016 Docker Inc
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
* REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
* INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
* LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
* OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
* PERFORMANCE OF THIS SOFTWARE.
*)letsrc=letsrc=Logs.Src.create"mirage-block-unix"~doc:"Mirage BLOCK interface for Unix"inLogs.Src.set_levelsrc(SomeLogs.Info);src(* samoht: `Msg should be the list of all possible exceptions *)typeerror=[Mirage_block.error|`Msgofstring](* samoht: `Msg should be the list of all possible exceptions *)typewrite_error=[Mirage_block.write_error|`Msgofstring]letpp_errorppf=function|#Mirage_block.errorase->Mirage_block.pp_errorppfe|`Msgs->Fmt.stringppfsletpp_write_errorppf=function|#Mirage_block.write_errorase->Mirage_block.pp_write_errorppfe|`Msgs->Fmt.stringppfsmoduleLog=(valLogs.src_logsrc:Logs.LOG)letis_win32=Sys.os_type="Win32"moduleRaw=structexternalopenfile_unbuffered:string->bool->int->Unix.file_descr="stub_openfile_direct"letopenfile_bufferednamerwperm=Unix.openfilename[ifrwthenUnix.O_RDWRelseUnix.O_RDONLY]permexternalblkgetsize:Unix.file_descr->int64="stub_blkgetsize"externalblkgetsectorsize:Unix.file_descr->int="stub_blkgetsectorsize"externallseek_data:Unix.file_descr->int64->int64="stub_lseek_data_64"externallseek_hole:Unix.file_descr->int64->int64="stub_lseek_hole_64"typebuffer=(char,Bigarray.int8_unsigned_elt,Bigarray.c_layout)Bigarray.Array1.texternalwritev_job:Unix.file_descr->(buffer*int*int)list->intLwt_unix.job="mirage_block_unix_writev_job"externalreadv_job:Unix.file_descr->(buffer*int*int)list->intLwt_unix.job="mirage_block_unix_readv_job"externaliov_len:unit->int="mirage_block_unix_get_iov_len"externalchsize_job:Unix.file_descr->int64->unitLwt_unix.job="mirage_block_unix_chsize_job"externalflock:Unix.file_descr->bool(* ex *)->bool(* nb *)->unit="stub_flock"endletftruncatefdsize=ifis_win32thenLwt_unix.run_job(Raw.chsize_job(Lwt_unix.unix_file_descrfd)size)elseLwt_unix.LargeFile.ftruncatefdsizeopenLwtmoduleConfig=structtypesync_behaviour=[|`ToOS|`ToDrive]letsync_behaviour_of_string=function|"0"|"none"->None|"1"|"drive"->Some`ToDrive|"os"->Some`ToOS|_->Noneletstring_of_sync=function|None->"none"|Some`ToDrive->"drive"|Some`ToOS->"os"typet={buffered:bool;sync:sync_behaviouroption;path:string;lock:bool;}letcreate?(buffered=true)?(sync=Some`ToOS)?(lock=false)path={buffered;sync;path;lock}letto_stringt=letquery=["buffered",[ift.bufferedthen"1"else"0"];"sync",[string_of_synct.sync];"lock",[ift.lockthen"1"else"0"];]inletu=Uri.make~scheme:"file"~path:t.path~query()inUri.to_stringuletof_stringx=letu=Uri.of_stringxinmatchUri.schemeuwith|Some"file"->letquery=Uri.queryuinletbuffered=tryList.assoc"buffered"query=["1"]withNot_found->falseinletsync=trysync_behaviour_of_string@@List.hd@@List.assoc"sync"querywithNot_found->Noneinletlock=tryList.assoc"lock"query=["1"]withNot_found->falseinletpath=Uri.(pct_decode@@pathu)inOk{buffered;sync;path;lock}|_->Error(`Msg"Config.to_string expected a string of the form file://<path>?sync=(none|os|drive)&buffered=(0|1)&lock=(0|1)")endtypet={mutablefd:Lwt_unix.file_descroption;mutableseek_offset:int64;(* a shadow copy of the fd's seek offset which avoids calling `lseek`
unnecessarily, speeding up sequential read and write *)m:Lwt_mutex.t;mutableinfo:Mirage_block.info;size_bytes:int64;(* used to handle the last sector, if the file isn't a multiple *)config:Config.t;use_fsync_after_write:bool;}letto_configx=x.configlet(>>*=)mf=m>>=function|Okx->fx|Errorx->Lwt.return(Errorx)letstat_filenamefd=Rresult.R.trap_exnUnix.LargeFile.fstatfd|>Rresult.R.error_exn_trap_to_msg|>Lwt.returnletblkgetsize_filenamefd=Rresult.R.trap_exnRaw.blkgetsizefd|>Rresult.R.error_exn_trap_to_msgletblkgetsectorsizefd=Rresult.R.trap_exnRaw.blkgetsectorsizefd|>Rresult.R.error_exn_trap_to_msgletget_file_sizefilenamefd=statfilenamefd>>*=funst->matchst.Unix.LargeFile.st_kindwith|Unix.S_REG->Lwt.return@@Okst.Unix.LargeFile.st_size|Unix.S_BLK->Lwt.return@@blkgetsize()fd|_->Log.err(funf->f"get_file_size %s: entity is neither a file nor a block device"filename);Lwt.return@@Error(`Msg(Printf.sprintf"get_file_size %s: neither a file nor a block device"filename))letget_sector_sizefilenamefd=statfilenamefd>>*=funst->matchst.Unix.LargeFile.st_kindwith|Unix.S_REG->Lwt.return@@Ok512(* FIXME: no easy way to determine this *)|Unix.S_BLK->Lwt.return@@blkgetsectorsizefd|_->Log.err(funf->f"get_sector_size %s: entity is neither a file nor a block device"filename);Lwt.return@@Error(`Msg(Printf.sprintf"get_sector_size %s: neither a file nor a block device"filename))letof_config({Config.buffered;path;lock;sync=_}asconfig)=letopenfile,use_fsync_after_write=matchbuffered,is_win32with|true,_->Raw.openfile_buffered,false|false,false->Raw.openfile_unbuffered,false|false,true->(* We can't use O_DIRECT or F_NOCACHE on Win32, so for now
we will use `fsync` after every write. *)Raw.openfile_buffered,truein(* first try read/write and then fall back to read/only *)tryletfd,read_write=tryopenfilepathtrue0o0,truewith_->openfilepathfalse0o0,falsein(* Acquire an exclusive lock if in read/write mode, otherwise a shared lock *)iflockthenRaw.flockfdread_writetrue;get_file_sizepathfd>>=function|Error(`Msge)->Unix.closefd;fail_withe|Error_->fail_with"mirage-block-unix:of_config: unknown error"|Oksize_bytes->get_sector_sizepathfd>>=function|Error(`Msge)->Unix.closefd;fail_withe|Error_->fail_with"mirage-block-unix:of_config: unknown error"|Oksector_size->(* If the file length is not sector-aligned, we would like to represent the
last bytes as a sector with zero-padding. Unfortunately on Linux with
O_DIRECT `read` will fail with EINVAL. *)letsize_sectors=Int64.(div(addsize_bytes(of_int(sector_size-1)))(of_intsector_size))inifInt64.(mulsize_sectors(of_intsector_size))>size_bytes&¬(buffered)thenLog.warn(funf->f"Length not sector aligned: O_DIRECT will fail with EINVAL on some platforms");letfd=Lwt_unix.of_unix_file_descrfdinletm=Lwt_mutex.create()inletseek_offset=0Linreturn({fd=Somefd;seek_offset;m;info={Mirage_block.sector_size;size_sectors;read_write};size_bytes;config;use_fsync_after_write})with_->Log.err(funf->f"connect %s: failed to open file"path);fail_with(Printf.sprintf"connect %s: failed to open file"path)(* prefix which signals we want to use buffered I/O *)letbuffered_prefix="buffered:"letis_prefix~prefixx=letprefix'=String.lengthprefixandx'=String.lengthxinx'>=prefix'&&(String.subx0prefix'=prefix)letconnect?buffered?sync?lockname=letlegacy_buffered=is_prefix~prefix:buffered_prefixnamein(* Keep support for the legacy buffered: prefix until version 3.x.y *)letbuffered=iflegacy_bufferedthenSometrueelsebufferedinletconfig=Config.create?buffered?sync?locknameinof_configconfigletdisconnectt=matcht.fdwith|Somefd->Lwt_unix.closefd>>=fun()->t.fd<-None;return()|None->return()letget_infox=returnx.infoletreally_readfd=Lwt_cstruct.complete(Lwt_cstruct.readfd)letreally_writefd=Lwt_cstruct.complete(Lwt_cstruct.writefd)openMirage_blockletlwt_wrap_exntopoffset?(buffers=[])f=letfatalffmt=Printf.ksprintf(funs->Log.err(funf->f"%s"s);return(Error(`Msgs)))fmtinletdescribe_buffersbuffers=ifbuffers=[]then""else"with buffers of length [ "^(String.concat", "(List.map(funb->string_of_int@@Cstruct.lenb)buffers))^" ]"in(* Buffer must be a multiple of sectors in length *)Lwt_list.fold_left_s(funaccb->matchaccwith|Errore->Lwt.return(Errore)|Ok()->letlen=Cstruct.lenbiniflenmodt.info.sector_size<>0thenfatalf"%s: buffer length (%d) is not a multiple of sector_size (%d) for file %s"oplent.info.sector_sizet.config.Config.pathelseLwt.return(Ok()))(Ok())buffers>>*=fun()->Lwt.catchf(function|End_of_file->fatalf"%s: End_of_file at file %s offset %Ld %s"opt.config.Config.pathoffset(describe_buffersbuffers)|Unix.Unix_error(code,fn,arg)->fatalf"%s: %s in %s '%s' at file %s offset %Ld %s"op(Unix.error_messagecode)fnargt.config.Config.pathoffset(describe_buffersbuffers)|e->fatalf"%s: %s at file %s offset %Ld %s"op(Printexc.to_stringe)t.config.Config.pathoffset(describe_buffersbuffers))letseek_already_lockedxfdoffset=ifx.seek_offset<>offsetthenbeginx.seek_offset<-offset;Lwt_unix.LargeFile.lseekfdoffsetUnix.SEEK_SETendelseLwt.returnoffsetmoduleCstructs=struct(** A list of buffers, like a Unix iovec *)letpp_tppft=List.iter(funt->Format.fprintfppf"[%d,%d](%d)"t.Cstruct.offt.Cstruct.len(Bigarray.Array1.dimt.Cstruct.buffer))tletlen=List.fold_left(funaccc->Cstruct.lenc+acc)0leterrfmt=letb=Buffer.create20in(* for thread safety. *)letppf=Format.formatter_of_bufferbinletkppf=Format.pp_print_flushppf();invalid_arg(Buffer.contentsb)inFormat.kfprintfkppffmtletrecshifttx=ifx=0thentelsematchtwith|[]->err"Cstructs.shift %a %d"pp_ttx|y::ys->lety'=Cstruct.lenyinify'>xthenCstruct.shiftyx::yselseshiftys(x-y')letto_iovects=List.map(funt->t.Cstruct.buffer,t.Cstruct.off,t.Cstruct.len)tsendletiov_len=Raw.iov_len()letsplit_listxsmaxlen=letrecloop(acc,l,n)xs=ifn=maxlenthenloop((List.revl)::acc,[],0)xselsematchxswith|[]->List.rev((List.revl)::acc)|x::xs->loop(acc,x::l,n+1)xsinloop([],[],0)xsletreadxsector_startbuffers=letoffset=Int64.(mulsector_start(of_intx.info.sector_size))inlwt_wrap_exnx"read"offset~buffers(fun()->matchx.fdwith|None->return(Error`Disconnected)|Somefd->letlen=Cstructs.lenbuffersinletlen_sectors=(len+x.info.sector_size-1)/x.info.sector_sizeinifInt64.(addsector_start(of_intlen_sectors)>x.info.size_sectors)thenbeginLog.err(funf->f"read beyond end of file: sector_start (%Ld) + len (%d) > size_sectors (%Ld)"sector_startlen_sectorsx.info.size_sectors);failEnd_of_fileendelsebeginLwt_mutex.with_lockx.m(fun()->seek_already_lockedxfdoffset>>=fun_->Lwt.catch(fun()->(ifis_win32||List.lengthbuffers=1thenbeginletrecloop=function|[]->Lwt.return_unit|b::bs->letvirtual_zeroes=Int64.(sub(addoffset(of_int(Cstruct.lenb)))x.size_bytes)in(ifvirtual_zeroes<=0Lthenreally_readfdbelsebegin(* we've had to round up size_sectors to include all the data.
We expect End_of_file but ensure that the data missing from the
file is full of zeroes. *)Cstruct.memsetb0;Lwt.catch(fun()->really_readfdb)(function|End_of_file->Lwt.return_unit|e->Lwt.faile)end)>>=fun()->x.seek_offset<-Int64.(addx.seek_offset(of_int(Cstruct.lenb)));loopbsinloopbuffersendelsebeginLwt_list.iter_s(funbuffers->letrecloopremaining=ifCstructs.lenremaining=0thenLwt.return_unitelsebeginletiovec=Cstructs.to_iovecremaininginLwt_unix.run_job(Raw.readv_job(Lwt_unix.unix_file_descrfd)iovec)>>=funn->loop(Cstructs.shiftremainingn)endinloopbuffers)(split_listbuffersiov_len)>>=fun()->x.seek_offset<-Int64.addx.seek_offset(Int64.of_intlen);Lwt.return_unitend))(fune->x.seek_offset<--1L;(* actual file pointer is undefined now *)Lwt.faile)>>=fun()->Lwt.return(Ok()))end)letwritexsector_startbuffers=letoffset=Int64.(mulsector_start(of_intx.info.sector_size))inlwt_wrap_exnx"write"offset~buffers(fun()->matchxwith|{fd=None;_}->return(Error`Disconnected)|{info={read_write=false;_};_}->return(Error`Is_read_only)|{fd=Somefd;_}->letlen=Cstructs.lenbuffersinletlen_sectors=(len+x.info.sector_size-1)/x.info.sector_sizeinifInt64.(addsector_start(of_intlen_sectors)>x.info.size_sectors)thenbeginLog.err(funf->f"write beyond end of file: sector_start (%Ld) + len (%d) > size_sectors (%Ld)"sector_startlen_sectorsx.info.size_sectors);failEnd_of_fileendelsebeginLwt_mutex.with_lockx.m(fun()->seek_already_lockedxfdoffset>>=fun_->Lwt.catch(fun()->(ifis_win32||List.lengthbuffers=1thenbeginletrecloop=function|[]->Lwt.return_unit|b::bs->really_writefdb>>=fun()->x.seek_offset<-Int64.(addx.seek_offset(of_int(Cstruct.lenb)));loopbsinloopbuffersendelsebeginLwt_list.iter_s(funbuffers->letrecloopremaining=ifCstructs.lenremaining=0thenLwt.return_unitelsebeginletiovec=Cstructs.to_iovecremaininginLwt_unix.run_job(Raw.writev_job(Lwt_unix.unix_file_descrfd)iovec)>>=funn->loop(Cstructs.shiftremainingn)endinloopbuffers)(split_listbuffersiov_len)>>=fun()->x.seek_offset<-Int64.addx.seek_offset(Int64.of_intlen);Lwt.return_unitend))(fune->x.seek_offset<--1L;(* actual file pointer is undefined now *)Lwt.faile;)>>=fun()->(ifx.use_fsync_after_writethenLwt_unix.fsyncfdelseLwt.return())>>=fun()->Lwt.return(Ok()))end)letresizetnew_size_sectors=letnew_size_bytes=Int64.(mulnew_size_sectors(of_intt.info.sector_size))inmatcht.fdwith|None->return(Error`Disconnected)|Somefd->lwt_wrap_exnt"ftruncate"new_size_bytes(fun()->Lwt_mutex.with_lockt.m(fun()->ftruncatefdnew_size_bytes>>=fun()->t.info<-{t.infowithsize_sectors=new_size_sectors};return(Ok())))externalflush_job:Unix.file_descr->bool->unitLwt_unix.job="mirage_block_unix_flush_job"letflusht=matcht.fdwith|None->return(Error`Disconnected)|Somefd->lwt_wrap_exnt"fsync"0L(fun()->(matcht.config.Config.syncwith|None->Lwt.return_unit|Some`ToOS->Lwt_unix.run_job(flush_job(Lwt_unix.unix_file_descrfd)false)|Some`ToDrive->Lwt_unix.run_job(flush_job(Lwt_unix.unix_file_descrfd)true))>>=fun()->return(Ok()))letseek_mappedtfrom=matcht.fdwith|None->return(Error`Disconnected)|Somefd->letoffset=Int64.(mulfrom(of_intt.info.sector_size))inlwt_wrap_exnt"seek_mapped"offset(fun()->Lwt_mutex.with_lockt.m(fun()->letfd=Lwt_unix.unix_file_descrfdinletoffset=Raw.lseek_datafdoffsetint.seek_offset<-offset;return(OkInt64.(divoffset(of_intt.info.sector_size)))))letseek_unmappedtfrom=matcht.fdwith|None->return(Error`Disconnected)|Somefd->letoffset=Int64.(mulfrom(of_intt.info.sector_size))inlwt_wrap_exnt"seek_unmapped"offset(fun()->Lwt_mutex.with_lockt.m(fun()->letfd=Lwt_unix.unix_file_descrfdinletoffset=Raw.lseek_holefdoffsetint.seek_offset<-offset;return(OkInt64.(divoffset(of_intt.info.sector_size)))))externaldiscard_job:Unix.file_descr->int64->int64->unitLwt_unix.job="mirage_block_unix_discard_job"letdiscardtsectorn=matchtwith|{fd=None;_}->return(Error`Disconnected)|{info={read_write=false;_};_}->return(Error`Is_read_only)|{fd=Somefd;_}->ifis_win32thenreturn(Error`Unimplemented)elseifn=0LthenLwt.return(Ok())elselwt_wrap_exnt"discard"sector(fun()->letfd=Lwt_unix.unix_file_descrfdinletoffset=Int64.(mulsector(of_intt.info.sector_size))inletn=Int64.(muln(of_intt.info.sector_size))inLwt_unix.run_job(discard_jobfdoffsetn)>>=fun()->Lwt.return(Ok()))