123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473(* Ocsigen
* http://www.ocsigen.org
* Module ocsipersist.ml
* Copyright (C) 2007 Vincent Balat
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, with linking exception;
* either version 2.1 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*)(* FIX: the log file is never reopened *)(** Module Ocsipersist: persistent data *)openOcsidbmtypesopenLwt.Infixletsection=Lwt_log.Section.make"ocsigen:ocsipersist:dbm"(** Data are divided into stores.
Create one store for your project, where you will save all your data. *)typestore=stringexceptionOcsipersist_errorletsocketname="socket"(*****************************************************************************)(** Internal functions: storage directory *)(** getting the directory from config file *)letrecparse_global_config(store,ocsidbm,delayloadingasd)=function|[]->d|Xml.Element("delayloading",[("val",("true"|"1"))],[])::ll->parse_global_config(store,ocsidbm,true)ll|Xml.Element("store",[("dir",s)],[])::ll->ifstore=Nonethenparse_global_config((Somes),ocsidbm,delayloading)llelseOcsigen_extensions.badconfig"Ocsipersist: Duplicate <store> tag"|Xml.Element("ocsidbm",[("name",s)],[])::ll->ifocsidbm=Nonethenparse_global_config(store,(Somes),delayloading)llelseOcsigen_extensions.badconfig"Ocsipersist: Duplicate <ocsidbm> tag"|(Xml.Element(s,_,_))::_ll->Ocsigen_extensions.badconfig"Bad tag %s"s|_->Ocsigen_extensions.badconfig"Unexpected content inside Ocsipersist config"let(directory,ocsidbm)=(ref((Ocsigen_config.get_datadir())^"/ocsipersist"),ref((Ocsigen_config.get_extdir())^"/ocsidbm"^Ocsigen_config.native_ext))(*****************************************************************************)(** Communication with the DB server *)externalsys_exit:int->'a="caml_sys_exit"lettry_connectsname=Lwt.catch(fun()->letsocket=Lwt_unix.socketUnix.PF_UNIXUnix.SOCK_STREAM0inLwt_unix.connectsocket(Unix.ADDR_UNIXsname)>>=fun()->Lwt.returnsocket)(fun_->Lwt_log.ign_warning_f~section"Launching a new Ocsidbm process: %s on directory %s."!ocsidbm!directory;letparam=[|!ocsidbm;!directory|]inletchild()=letlog=Unix.openfile(Ocsigen_messages.error_log_path())[Unix.O_WRONLY;Unix.O_CREAT;Unix.O_APPEND]0o640inUnix.dup2logUnix.stderr;Unix.closelog;letdevnull=Unix.openfile"/dev/null"[Unix.O_WRONLY]0inUnix.dup2devnullUnix.stdout;Unix.closedevnull;Unix.closeUnix.stdin;Unix.execv!ocsidbmparaminletpid=Lwt_unix.fork()inifpid=0thenbegin(* double fork *)ifLwt_unix.fork()=0thenbeginchild()endelsesys_exit0;endelseLwt_unix.waitpid[]pid>>=(fun_->Lwt_unix.sleep1.1>>=(fun()->letsocket=Lwt_unix.socketUnix.PF_UNIXUnix.SOCK_STREAM0inLwt_unix.connectsocket(Unix.ADDR_UNIXsname)>>=fun()->Lwt.returnsocket)))letrecget_indescri=(Lwt.catch(fun()->try_connect(!directory^"/"^socketname))(fune->ifi=0thenbeginLwt_log.ign_error_f~section"Cannot connect to Ocsidbm. Will continue \
without persistent session support. \
Error message is: %s .\
Have a look at the logs to see if there is an \
error message from the Ocsidbm process."(matchewith|Unix.Unix_error(a,b,c)->Printf.sprintf"%a in %s(%s)"(fun()->Unix.error_message)abc|_->Printexc.to_stringe);Lwt.faileendelse(Lwt_unix.sleep2.1)>>=(fun()->get_indescr(i-1))))letinch=ref(Lwt.fail(Failure"Ocsipersist not initialised"))letoutch=ref(Lwt.fail(Failure"Ocsipersist not initialised"))letinit_funconfig=let(store,ocsidbmconf,delay_loading)=parse_global_config(None,None,false)configin(matchstorewith|None->()|Somed->directory:=d);(matchocsidbmconfwith|None->()|Somed->ocsidbm:=d);(ifdelay_loadingthenLwt_log.ign_warning~section"Asynchronuous initialization (may fail later)"elseLwt_log.ign_warning~section"Initializing ...");letindescr=get_indescr2inifdelay_loadingthen(inch:=Lwt.map(Lwt_io.of_fd~mode:Lwt_io.input)indescr;outch:=Lwt.map(Lwt_io.of_fd~mode:Lwt_io.output)indescr;)else(letr=Lwt_main.runindescrininch:=Lwt.return(Lwt_io.of_fd~mode:Lwt_io.inputr);outch:=Lwt.return(Lwt_io.of_fd~mode:Lwt_io.outputr);Lwt_log.ign_warning~section"...Initialization complete";)letsend=letprevious=ref(Lwt.returnOk)infunv->Lwt.catch(fun()->!previous)(fun_->Lwt.returnOk)>>=(fun_->!inch>>=funinch->!outch>>=funoutch->previous:=(Lwt_io.write_valueoutchv>>=fun()->Lwt_io.flushoutch>>=fun()->Lwt_io.read_valueinch);!previous)letdb_get(store,name)=send(Get(store,name))>>=(function|Valuev->Lwt.returnv|Dbm_not_found->Lwt.failNot_found|Errore->Lwt.faile|_->Lwt.failOcsipersist_error)letdb_remove(store,name)=send(Remove(store,name))>>=(function|Ok->Lwt.return()|Errore->Lwt.faile|_->Lwt.failOcsipersist_error)letdb_replace(store,name)value=send(Replace(store,name,value))>>=(function|Ok->Lwt.return()|Errore->Lwt.faile|_->Lwt.failOcsipersist_error)letdb_replace_if_exists(store,name)value=send(Replace_if_exists(store,name,value))>>=(function|Ok->Lwt.return()|Dbm_not_found->Lwt.failNot_found|Errore->Lwt.faile|_->Lwt.failOcsipersist_error)letdb_firstkeystore=send(Firstkeystore)>>=(function|Keyk->Lwt.return(Somek)|Errore->Lwt.faile|_->Lwt.returnNone)letdb_nextkeystore=send(Nextkeystore)>>=(function|Keyk->Lwt.return(Somek)|Errore->Lwt.faile|_->Lwt.returnNone)letdb_lengthstore=send(Lengthstore)>>=(function|Valuev->Lwt.return(Marshal.from_stringv0)|Dbm_not_found->Lwt.return0|Errore->Lwt.faile|_->Lwt.failOcsipersist_error)(*****************************************************************************)(** Public functions: *)(** Type of persistent data *)type'at=store*stringletopen_storename=Lwt.returnnameletmake_persistent_lazy_lwt~store~name~default=letpvname=(store,name)in(Lwt.catch(fun()->db_getpvname>>=(fun_->Lwt.return()))(function|Not_found->default()>>=fundef->db_replacepvname(Marshal.to_stringdef[])|e->Lwt.faile))>>=(fun()->Lwt.returnpvname)letmake_persistent_lazy~store~name~default=letdefault()=Lwt.wrapdefaultinmake_persistent_lazy_lwt~store~name~defaultletmake_persistent~store~name~default=make_persistent_lazy~store~name~default:(fun()->default)letget(pvname:'at):'a=db_getpvname>>=(funr->Lwt.return(Marshal.from_stringr0))letsetpvnamev=letdata=Marshal.to_stringv[]indb_replacepvnamedata(* FUNCTORIAL INTERFACE *******************************************************)moduletypeTABLE_CONF=sigvalname:stringendtypeinternal=stringmoduletypeCOLUMN=sigtypetvalcolumn_type:stringvalencode:t->stringvaldecode:string->tendmoduletypeTABLE=sigtypekeytypevaluevalname:stringvalfind:key->valueLwt.tvaladd:key->value->unitLwt.tvalreplace_if_exists:key->value->unitLwt.tvalremove:key->unitLwt.tvalmodify_opt:key->(valueoption->valueoption)->unitLwt.tvallength:unit->intLwt.tvaliter:?count:int64->?gt:key->?geq:key->?lt:key->?leq:key->(key->value->unitLwt.t)->unitLwt.tvalfold:?count:int64->?gt:key->?geq:key->?lt:key->?leq:key->(key->value->'a->'aLwt.t)->'a->'aLwt.tvaliter_block:?count:int64->?gt:key->?geq:key->?lt:key->?leq:key->(key->value->unit)->unitLwt.tendmoduleTable(T:TABLE_CONF)(Key:COLUMN)(Value:COLUMN):TABLEwithtypekey=Key.tandtypevalue=Value.t=structtypekey=Key.ttypevalue=Value.tletname=T.nameletfindkey=Lwt.mapValue.decode@@db_get(name,Key.encodekey)letaddkeyvalue=db_replace(name,Key.encodekey)(Value.encodevalue)letreplace_if_existskeyvalue=db_replace_if_exists(name,Key.encodekey)(Value.encodevalue)letremovekey=db_remove(name,Key.encodekey)letfold?count?gt?geq?lt?leqfbeg=leti=ref0Linletrecauxnextkeybeg=matchcountwith|Somecwhen!i>=c->Lwt.returnbeg|_->nextkeyname>>=function|None->Lwt.returnbeg|Somek->letk=Key.decodekinmatchgt,geq,lt,leqwith|_,_,Somelt,_whenk>=lt->Lwt.returnbeg|_,_,_,Somelewhenk>le->Lwt.returnbeg|Somegt,_,_,_whenk<=gt->auxdb_nextkeybeg|_,Somege,_,_whenk<ge->auxdb_nextkeybeg|_->i:=Int64.succ!i;findk>>=funr->fkrbeg>>=auxdb_nextkeyinauxdb_firstkeybegletiter?count?gt?geq?lt?leqf=fold?count?gt?geq?lt?leq(funkv()->fkv)()letiter_block?count:_?gt:_?geq:_?lt:_?leq:__=failwith"iter_block not implemented for DBM. Please use Ocsipersist with sqlite"letmodify_optkeyf=Lwt.catch(fun()->findkey>>=funv->Lwt.return_somev)(functionNot_found->Lwt.return_none|_->assertfalse)>>=funold_value->matchfold_valuewith|None->removekey|Somenew_value->replace_if_existskeynew_valueletlength()=db_lengthname(* for DBM the result may be less than the actual lengeth *)endmoduleColumn=structmoduleString:COLUMNwithtypet=string=structtypet=stringletcolumn_type="_"letencodes=sletdecodes=sendmoduleFloat:COLUMNwithtypet=float=structtypet=floatletcolumn_type="_"letencode=string_of_floatletdecode=float_of_stringendmoduleMarshal(C:sigtypetend):COLUMNwithtypet=C.t=structtypet=C.tletcolumn_type="_"letencodev=Marshal.to_stringv[]letdecodev=Marshal.from_stringv0endend(******************************************************************************)(** Type of persistent tables *)type'valuetable=stringletopen_tablename=Lwt.returnnamelettable_namen=Lwt.returnnletfindtablekey=db_get(table,key)>>=(funv->Lwt.return(Marshal.from_stringv0))letaddtablekeyvalue=letdata=Marshal.to_stringvalue[]indb_replace(table,key)dataletreplace_if_existstablekeyvalue=letdata=Marshal.to_stringvalue[]indb_replace_if_exists(table,key)dataletremovetablekey=db_remove(table,key)letiter_tableftable=letrecauxnextkey=nextkeytable>>=(function|None->Lwt.return()|Somek->findtablek>>=fk>>=(fun()->auxdb_nextkey))inauxdb_firstkeyletiter_step=iter_tableletfold_tableftablebeg=letrecauxnextkeybeg=nextkeytable>>=(function|None->Lwt.returnbeg|Somek->findtablek>>=funr->fkrbeg>>=(funres->auxdb_nextkeyres))inauxdb_firstkeybegletfold_step=fold_tableletiter_block_a_b=failwith"iter_block not implemented for DBM. Please use Ocsipersist with sqlite"(* iterator: with a separate connexion:
exception Exn1
let iter_table f table =
let first = Marshal.to_string (Firstkey table) [] in
let firstl = String.length first in
let next = Marshal.to_string (Nextkey table) [] in
let nextl = String.length next in
(Lwt_unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 >>=
(fun socket ->
Lwt_unix.connect
(Lwt_unix.Plain socket)
(Unix.ADDR_UNIX (directory^"/"^socketname)) >>=
(fun () -> return (Lwt_unix.Plain socket)) >>=
(fun indescr ->
let inch = Lwt_unix.in_channel_of_descr indescr in
let nextkey next nextl =
Lwt_unix.write indescr next 0 nextl >>=
(fun l2 -> if l2 <> nextl
then Lwt.fail Ocsipersist_error
else (Lwt_unix.input_line inch >>=
fun answ -> return (Marshal.from_string answ 0)))
in
let rec aux n l =
nextkey n l >>=
(function
| End -> return ()
| Key k -> find table k >>= f k
| Error e -> Lwt.fail e
| _ -> Lwt.fail Ocsipersist_error) >>=
(fun () -> aux next nextl)
in
catch
(fun () ->
aux first firstl >>=
(fun () -> Unix.close socket; return ()))
(fun e -> Unix.close socket; Lwt.fail e))))
*)letlengthtable=db_lengthtable(* Because of Dbm implementation, the result may be less than the expected
result in some case (with a version of ocsipersist based on Dbm) *)let_=Ocsigen_extensions.register~name:"ocsipersist"~init_fun()