123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110moduleP=CatapultmoduleDb=Sqlite3moduleAtomic=P.Atomic_shim_openCatapult_utilslet(let@)xf=xfletcheck_ret_e=matchewith|Db.Rc.DONE|Db.Rc.OK->()|e->failwith("db error: "^Db.Rc.to_stringe)typet={mutablestmt_insert:Db.stmt;(* guard: db_lock *)db:Db.db;(* guard: db_lock *)db_lock:Mutex.t;closed:boolAtomic.t;}let[@inline]with_locklockf=Mutex.locklock;tryletx=f()inMutex.unlocklock;xwithe->Mutex.unlocklock;raisee(* main schema. *)letschema={|
CREATE TABLE IF NOT EXISTS events (ev TEXT NOT NULL);
|}letcloseself=ifnot(Atomic.exchangeself.closedtrue)then((* close DB itself *)beginlet@()=with_lockself.db_lockinDb.finalizeself.stmt_insert|>check_ret_;Db.execself.db"PRAGMA journal=delete;"|>check_ret_;(* remove wal now *)whilenot(Db.db_closeself.db)do()doneend)letcreate?(sync=`NORMAL)?(append=false)?file~trace_id~dir():t=letfile=matchfilewith|Somef->f|None->(tryignore(Sys.command(Printf.sprintf"mkdir -p %s"(Filename.quotedir)):int)with_->());Filename.concatdir(trace_id^".db")inletdb=Db.db_openfileinDb.busy_timeoutdb3_000;(* TODO: is this worth it?
Db.exec db "PRAGMA journal_mode=MEMORY;" |> check_ret_;
*)Db.execdb"PRAGMA journal_mode=WAL;"|>check_ret_;beginmatchsyncwith|`OFF->Db.execdb"PRAGMA synchronous=OFF;"|>check_ret_;|`FULL->Db.execdb"PRAGMA synchronous=FULL;"|>check_ret_;|`NORMAL->Db.execdb"PRAGMA synchronous=NORMAL;"|>check_ret_;end;Db.execdbschema|>check_ret_;ifnotappendthen((* tabula rasa *)Db.execdb"DELETE FROM events; "|>check_ret_;);letstmt_insert=Db.preparedb"insert into events values (?);"inletself={stmt_insert;db;db_lock=Mutex.create();closed=Atomic.makefalse;}inGc.finalisecloseself;selfletcycle_stmt(self:t)=Db.finalizeself.stmt_insert|>check_ret_;letstmt_insert=Db.prepareself.db"insert into events values (?);"inself.stmt_insert<-stmt_insertlet[@inline]write_str_selfs=Db.bind_blobself.stmt_insert1s|>check_ret_;Db.stepself.stmt_insert|>check_ret_;Db.resetself.stmt_insert|>check_ret_let[@inline]transactionally_selff=Db.execself.db"begin transaction;"|>check_ret_;f();Db.execself.db"commit transaction;"|>check_ret_letwrite_string(self:t)(j:string)=beginlet@()=with_lockself.db_lockinwrite_str_selfjendletwrite_string_l(self:t)(l:stringlist)=beginlet@()=with_lockself.db_lockinlet@()=transactionally_selfinList.iter(write_str_self)lend