123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899(* Copyright (C) 2019--2020 Petter A. Urkedal <paurkedal@gmail.com>
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version, with the OCaml static compilation exception.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this library. If not, see <http://www.gnu.org/licenses/>.
*)openCaqti_common_privopenPrintfmoduleMake_helpers(System:Caqti_driver_sig.System_common)=structopenSystemletassert_single_use~whatin_usef=if!in_usethenfailwith("Invalid concurrent usage of "^what^" detected.");assert(not!in_use);in_use:=true;f()>|=funy->assert!in_use;in_use:=false;yendmoduleMake_convenience(System:Caqti_driver_sig.System_common)(C:Caqti_connection_sig.Basewithtype'afuture:='aSystem.futureandtype('a,'err)stream:=('a,'err)System.Stream.t)=structopenSystemmoduleResponse=C.Responseletexecqp=C.call~f:Response.execqpletfindqp=C.call~f:Response.findqpletfind_optqp=C.call~f:Response.find_optqpletfoldqfpacc=C.call~f:(funresp->Response.foldfrespacc)qpletfold_sqfpacc=C.call~f:(funresp->Response.fold_sfrespacc)qpletiter_sqfp=C.call~f:(funresp->Response.iter_sfresp)qpletcollect_listqp=letfresp=Response.foldList.consresp[]>|=Result.mapList.revinC.call~fqpletrev_collect_listqp=letfresp=Response.foldList.consresp[]inC.call~fqpletexec_with_affected_countqp=letfresponse=Response.execresponse>>=funexecResult->matchexecResultwith|Ok()->Response.affected_countresponse|Errorx->return(Errorx)inC.call~fqpendmoduleMake_populate(System:Caqti_driver_sig.System_common)(C:Caqti_connection_sig.Basewithtype'afuture:='aSystem.futureandtype('a,'e)stream:=('a,'e)System.Stream.t)=structopenSystemlet(>>=?)mf=m>>=functionOkx->fx|Error_asr->returnrletpopulate~table~columnsrow_typedata=letrequest=letcolumns_tuple=String.concat", "columnsinletq=letopenCaqti_queryinS[L(sprintf"INSERT INTO %s (%s) VALUES ("tablecolumns_tuple);concat", "(List.mapi(funi_->Pi)columns);L")"]inCaqti_request.createrow_typeCaqti_type.unitCaqti_mult.zero(fun_->q)inC.start()>>=?fun()->Stream.iter_s~f:(C.call~f:C.Response.execrequest)data>>=funres->C.deallocaterequest>>=fun_->(matchreswith|Ok()->C.commit()|Error(`Congestederr)->C.rollback()>>=?fun()->return(Error(`Congestederr))|Errorerr->return(Errorerr))end