123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131openLwt.SyntaxopenModelmoduleCore=Sihl_coreexceptionExceptionofstringletlog_src=Logs.Src.create~doc:"database""sihl.service.database"moduleLogs=(valLogs.src_loglog_src:Logs.LOG)letpool_ref:pooloptionref=refNonetypeconfig={url:string;pool_size:intoption}letconfigurlpool_size={url;pool_size}letschema=letopenConformistinmake[string"DATABASE_URL";optional(int~default:5"DATABASE_POOL_SIZE")]config;;letraise_errorerr=matcherrwith|Errorerr->raise(Exception(Caqti_error.showerr))|Okresult->result;;letprint_pool_usagepool=letn_connections=Caqti_lwt.Pool.sizepoolinletmax_connections=Option.value(Core.Configuration.readschema).pool_size~default:10inLogs.debug(funm->m"Pool usage: %i/%i"n_connectionsmax_connections);;letfetch_pool()=match!pool_refwith|Somepool->Logs.debug(funm->m"Skipping pool creation, re-using existing pool");pool|None->letpool_size=Option.value(Core.Configuration.readschema).pool_size~default:10inLogs.debug(funm->m"Create pool with size %i"pool_size);(Core.Configuration.readschema).url|>Uri.of_string|>Caqti_lwt.connect_pool~max_size:pool_size|>(function|Okpool->pool_ref:=Somepool;pool|Errorerr->letmsg="Failed to connect to DB pool"inLogs.err(funm->m"%s %s"msg(Caqti_error.showerr));raise(Exception("Failed to create pool "^msg)));;lettransactionf=letpool=fetch_pool()inprint_pool_usagepool;let*result=Caqti_lwt.Pool.use(funconnection->Logs.debug(funm->m"Fetched connection from pool");let(moduleConnection:Caqti_lwt.CONNECTION)=connectioninlet*start_result=Connection.start()inmatchstart_resultwith|Errormsg->Logs.debug(funm->m"Failed to start transaction %s"(Caqti_error.showmsg));Lwt.return@@Errormsg|Ok()->Logs.debug(funm->m"Started transaction");Lwt.catch(fun()->let*result=fconnectioninlet*commit_result=Connection.commit()inmatchcommit_resultwith|Ok()->Logs.debug(funm->m"Successfully committed transaction");Lwt.return@@Okresult|Errorerror->Logs.err(funm->m"Failed to commit transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to commit transaction")(fune->let*rollback_result=Connection.rollback()inmatchrollback_resultwith|Ok()->Logs.debug(funm->m"Successfully rolled back transaction");Lwt.faile|Errorerror->Logs.err(funm->m"Failed to rollback transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to rollback transaction"))poolinmatchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLogs.err(funm->m"%s"msg);Lwt.fail(Exceptionmsg);;letqueryf=letpool=fetch_pool()inprint_pool_usagepool;let*result=Caqti_lwt.Pool.use(funconnection->fconnection|>Lwt.mapResult.ok)poolinmatchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLogs.err(funm->m"%s"msg);Lwt.fail(Exceptionmsg);;(* Service lifecycle *)letstart()=(* Make sure that database is online when starting service. *)let_=fetch_pool()inLwt.return();;letstop_=Lwt.return()letlifecycle=Core.Container.Lifecycle.create"database"~start~stopletregister()=Core.Container.Service.createlifecycle