Async_kernel.StreamSourceAn immutable sequence of values, with a possibly incomplete tail that may be extended asynchronously.
For most applications one should use Pipe instead of Stream. One justifiable usage of Stream rather than Pipe is in single-writer, multi-consumer (multicast) scenarios where pushback is not required.
The basic primitive operation for getting the next element out of stream is Stream.next, which (asynchronously) returns the element and the rest of the stream.
sexp_of_t t f returns a sexp of all of the elements currently available in the stream. It is just for display purposes. There is no t_of_sexp.
create f returns a stream t and calls f tail, where the elements of the stream are determined as the tail is extended, and the end of the stream is reached when the tail is closed.
next t returns a deferred that will become determined when the next part of the stream is determined. This is Cons (v, t'), where v is the next element of the stream and t' is the rest of the stream, or with Nil at the end of the stream.
first_exn t returns a deferred that becomes determined with the first element of t.
Streams can be converted to and from lists. Although, conversion to a list returns a deferred, because the stream is determined asynchronously.
to_list t returns a deferred that will become determined with the list of elements in t, if the end of t is reached.
of_fun f returns a stream whose elements are determined by calling f forever.
copy_to_tail t tail reads elements from t and puts them in tail, until the end of t is reached.
Sequence operations ---------------------------------------------------------------------- There are the usual sequence operations:
append, fold, iter, map, filter_map, take
There are also deferred variants:
iter', map', filter_map'
These take anonymous functions that return deferreds generalizing the usual sequence operation and allowing the client to control the rate at which the sequence is processed.
append t1 t2 returns a stream with all the values of t1, in order, and if t1 ends, these values are followed by all the values of t2.
concat t takes a stream of streams and produces a stream that is the concatenation of each stream in order (you see all of stream 1, then all of stream 2... etc.)
available_now t returns t prefix of t that is available now, along with the rest of the stream.
filter_deprecated s ~f returns a stream with one element, v, for each v in s such with f v = true.
Using filter_deprecated can easily lead to space leaks. It is better to use Async.Pipe than Async.Stream.
filter_map_deprecated s ~f returns a stream with one element, v', for each v in s such with f v = Some v'.
Using filter_map_deprecated can easily lead to space leaks. It is better to use Async.Pipe than Async.Stream.
fold' t ~init ~f is like list fold, walking over the elements of the stream in order, as they become available. fold' returns a deferred that will yield the final value of the accumulator, if the end of the stream is reached.
fold t ~init ~f is a variant of fold' in which f does not return a deferred.
iter' t ~f applies f to each element of the stream in turn, as they become available. It continues onto the next element only after the deferred returned by f becomes determined.
closed t returns a deferred that becomes determined when the end of t is reached.
iter t ~f = don't_wait_for (iter' t ~f:(fun a -> f a; return ()))
take_until t d returns a stream t' that has the same elements as t up until d becomes determined.
iter_durably' t ~f is like iter' t ~f, except if f raises an exception it continues with the next element of the stream *and* reraises the exception (to the monitor in scope when iter_durably was called).
iter_durably t ~f is like iter t ~f, except if f raises an exception it continues with the next element of the stream *and* reraises the exception (to the monitor in scope when iter_durably was called).
iter_durably_report_end t ~f is equivalent to iter_durably' t ~f:(fun x -> return (f x)) but it is more efficient
length s returns a deferred that is determined when the end of s is reached, taking the value of the number of elements in s
map' t f creates a new stream that with one element, (f v), for each element v of t.
map t ~f creates a new stream that with one element, (f v), for each element v of t. map t f = map' t ~f:(fun a -> return (f a)).
first_n t n returns a stream with the first n elements of t, if t has n or more elements, or it returns t.
Stream generation ----------------------------------------------------------------------
unfold b f returns a stream a1; a2; ...; an whose elements are determined by the equations:
b0 = b Some (a1, b1) = f b0 Some (a2, b2) = f b1 ... None = f bn
Miscellaneous operations ----------------------------------------------------------------------
val split :
?stop:unit Deferred.t ->
?f:('a -> [ `Continue | `Found of 'b ]) ->
'a t ->
'a t * [ `End_of_stream | `Stopped of 'a t | `Found of 'b * 'a t ] Deferred.tsplit ~stop ~f t returns a pair (p, d), where p is a prefix of t that ends for one of three reasons:
1. [t] ends 2. stop becomes determined 3. f returns `Found
The deferred d describes why the prefix ended, and returns the suffix of the stream in case (2) or (3).
find ~f t returns a deferred that becomes determined when f x is true for some element of t, or if the end of the stream is reached
ungroup t takes a stream of lists and unpacks the items from each list into a single stream