123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170exceptionExceptionofstringtypeinstance_status=|Pending|Succeeded|Failed|Cancelled[@@derivingshow]typeinstance={id:string;name:string;input:string;tries:int;next_run_at:Ptime.t;max_tries:int;status:instance_status;last_error:stringoption;last_error_at:Ptime.toption}[@@derivingshow]type'ajob={name:string;encode:'a->string;decode:string->('a,string)Result.t;handle:'a->(unit,string)Result.tLwt.t;failed:string->instance->unitLwt.t;max_tries:int;retry_delay:Ptime.Span.t}[@@derivingshow]typejob'={name:string;handle:string->(unit,string)Result.tLwt.t;failed:string->instance->unitLwt.t;max_tries:int;retry_delay:Ptime.Span.t}[@@derivingshow]lethide(job:'ajob):job'=lethandleinput=matchjob.decodeinputwith|Okdecoded->job.handledecoded|Errormsg->Lwt.return@@Errormsgin{name=job.name;handle;failed=job.failed;max_tries=job.max_tries;retry_delay=job.retry_delay};;letshould_run(job_instance:instance)now=lettries=job_instance.triesinletmax_tries=job_instance.max_triesinletnext_run_at=job_instance.next_run_atinlethas_tries_left=tries<max_triesinletis_after_delay=not(Ptime.is_laternext_run_at~than:now)inletis_pending=matchjob_instance.statuswith|Pending->true|_->falseinis_pending&&has_tries_left&&is_after_delay;;letdefault_tries=5letdefault_retry_delay=Core_time.Span.minutes1letdefault_error_handlermsg(instance:instance)=Lwt.return@@Logs.err(funm->m"Job with id '%s' and name '%s' failed for input '%s': %s"instance.idinstance.nameinstance.inputmsg);;letcreate_jobhandle?(max_tries=default_tries)?(retry_delay=default_retry_delay)?(failed=default_error_handler)encodedecodename={name;handle;failed;max_tries;retry_delay;encode;decode};;(* Service signature *)letname="queue"moduletypeSig=sig(** [router ?back scope] returns a router that can be passed to the web server
to serve the job queue dashboard.
[back] is an optional URL which renders a back button on the dashboard.
Use this to provide your admin user a way to easily exit the dashboard. By
default, no URL is provided and no back button is shown.
[scope] is the URL path under which the dashboard can be accessed. It is
common to have some admin UI under [/admin], the job queue dashboard could
be available under [/admin/queue].
You can use HTMX by setting [HTMX_SCRIPT_URL] to the URL of the HTMX
JavaScript file that is then embedded into the dashboard using the
<script> tag in the page body. HTMX is used to add dynamic features such
as auto-refresh. The dashboard is perfectly usable without it. By default,
HTMX is not used. *)valrouter:?back:string->?theme:[`Customofstring|`Light|`Dark]->string->Web.router(** [dispatch ?ctx ?delay input job] queues [job] for later processing and
returns [unit Lwt.t] once the job has been queued.
An optional [delay] determines the amount of time from now (when dispatch
is called) up until the job can be run. If no delay is specified, the job
is processed as soon as possible.
[input] is the input of the [handle] function which is used for job
processing. *)valdispatch:?ctx:(string*string)list->?delay:Ptime.span->'a->'ajob->unitLwt.t(** [dispatch_all ?ctx ?delay inputs jobs] queues all [jobs] for later
processing and returns [unit Lwt.t] once all the jobs has been queued. The
jobs are put onto the queue in reverse order. The first job in the list of
[jobs] is put onto the queue last, which means it gets processed first.
If the queue backend supports transactions, [dispatch_all] guarantees that
either none or all jobs are queued.
An optional [delay] determines the amount of time from now (when dispatch
is called) up until the jobs can be run. If no delay is specified, the
jobs are processed as soon as possible.
[inputs] is the input of the [handle] function. It is a list of ['a], one
for each ['a job] instance. *)valdispatch_all:?ctx:(string*string)list->?delay:Ptime.span->'alist->'ajob->unitLwt.t(** [register_jobs jobs] registers jobs that can be dispatched later on.
Only registered jobs can be dispatched. Dispatching a job that was not
registered does nothing. *)valregister_jobs:job'list->unitLwt.tvalregister:?jobs:job'list->unit->Core_container.Service.tincludeCore_container.Service.Sigend