#r "../../../bin/FSharp.Control.AsyncSeq.dll"
open System
open FSharp.Control
AsyncSeq.groupBy
partitions an input sequence into sub-sequences with respect to the specified projection
function. This operation is the asynchronous analog to Seq.groupBy
.
An example execution can be depicted visually as follows:
--------------------------------------------------
| source | a0 | a2 | a3 | a4 | a5 | |
| key | k1 | k2 | k1 | k3 | | |
| result | k1 * [a1,a3] | k2 * [a2] | k3 * [a4] |
--------------------------------------------------
Suppose we would like to consume a stream of events AsyncSeq<Event>
and perform an operation on each event. The operation on each event is of type Event -> Async<unit>
. This can be done as follows:
type Event = {
entityId : int64
data : string
}
let stream : AsyncSeq<Event> =
failwith "undefined"
let action (e:Event) : Async<unit> =
failwith "undefined"
stream
|> AsyncSeq.iterAsync action
The above workflow will read an event from the stream, perform an operation and then read the next event.
While the read operation and the operation on the event are asynchronous, the stream is processed sequentially.
It may be desirable to parallelize the processing of the stream. Suppose that events correspond to some entity,
such as a shopping cart. Events belonging to some shopping cart must be processed in a sequential order, however they
are independent from events belonging to other shopping carts. Therefore, events belonging to distinct shopping carts
can be processed in parallel. Using AsyncSeq.groupBy
, we can partition the stream into a fixed set of sub-streams
and then process the sub-streams in parallel using AsyncSeq.mapAsyncParallel
:
stream
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4)
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
|> AsyncSeq.iter ignore
AsyncSeq.groupBy
partitions the input sequence into sub-sequences based on a key returned by a projection function.
The resulting sub-sequences emit elements when the source sequence emits an element corresponding to the key of the
sub-sequence. Elements of the resulting sequence are pairs of keys and sub-sequences, in this case int * AsyncSeq<Event>
. Since by definition, these sub-sequences are independent, they can be processed in parallel. In fact, the sub-sequences must be processed in parallel, because it isn't possible to complete the processing of a sub-sequence until all elements of the source sequence are exhausted.
To continue improving the efficiency of our workflow, we can make use of batching. Specifically, we can read the incoming
events in batches and we can perform actions on entire batches of events.
let batchStream : AsyncSeq<Event[]> =
failwith "undefined"
let batchAction (es:Event[]) : Async<unit> =
failwith "undefined"
Ordering is still important. For example, the batch action could write events into a full-text search index. We would like the full-text search index to be sequentially consistent. As such, the events need to be applied in the order they were emitted. The following workflow has the desired properties:
batchStream
|> AsyncSeq.concatSeq // flatten the sequence of event arrays
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups
|> AsyncSeq.mapAsyncParallel (snd
>> AsyncSeq.bufferByCountAndTime 500 1000 // buffer sub-sequences
>> AsyncSeq.iterAsync batchAction) // perform the batch operation
|> AsyncSeq.iter ignore
The above workflow:
- Reads events in batches.
- Flattens the batches.
- Partitions the events into mutually exclusive sub-sequences.
- Buffers elements of each sub-sequence by time and space.
- Processes the sub-sequences in parallel, but individual sub-sequences sequentially.
AsyncSeq.merge
non-deterministically merges two async sequences into one. It is non-deterministic in the sense that the resulting sequence emits elements whenever either input sequence emits a value. Since it isn't always known which will emit a value first, if at all, the operation is non-deterministic. This operation is in contrast to AsyncSeq.zip
which also takes two async sequences and returns a single async sequence, but as opposed to emitting an element when either input sequence produces a value, it emits an element when both sequences emit a value. This operation is also in contrast to AsyncSeq.append
which concatenates two async sequences, emitting all element of one, followed by all elements of the another.
An example execution can be depicted visually as follows:
-----------------------------------------
| source1 | t0 | | t1 | | | t2 |
| source2 | | u0 | | | u1 | |
| result | t0 | u0 | t1 | | u1 | t2 |
-----------------------------------------
Suppose you wish to perform an operation when either of two async sequences emits an element. One way to do this is two start consuming both async sequences in parallel. If we would like to perform only one operation at a time, we can use AsyncSeq.merge
as follows:
/// Represents an stream emitting elements on a specified interval.
let intervalMs (periodMs:int) = asyncSeq {
yield DateTime.UtcNow
while true do
do! Async.Sleep periodMs
yield DateTime.UtcNow }
let either : AsyncSeq<DateTime> =
AsyncSeq.merge (intervalMs 20) (intervalMs 30)
The sequence either
emits an element every 20ms and every 30ms.
AsyncSeq.combineLatest
non-deterministically merges two async sequences much like AsyncSeq.merge
, combining their elements using the specified combine
function. The resulting async sequence will only contain elements if both of the source sequences produce at least one element. After combining the first elements the source sequences, this operation emits elements when either source sequence emits an element, passing the newly emitted element as one of the arguments to the combine
function, the other being the previously emitted element of that type.
An example execution can be depicted visually as follows:
----------------------------------------
| source1 | a0 | | | a1 | | a2 |
| source2 | | b0 | b1 | | | |
| result | | c0 | c1 | c2 | | c3 |
----------------------------------------
where
c0 = f a0 b0
c1 = f a0 b1
c2 = f a1 b1
c3 = f a2 b1
Suppose we would like to trigger an operation whenever a change occurs. We can represent changes as an AsyncSeq
. To gain intuition for this, consider the Consul
configuration management system. It stores configuration information in a tree-like structure. For this purpose of this discussion, it can be thought of as a key-value store
exposed via HTTP. In addition, Consul
supports change notifications using HTTP long-polling - when an HTTP GET request is made to retrieve the value of a key,
if the request specified a modify-index, Consul
won't respond to the request until a change has occurred since the modify-index. We can represent this operation using
the type Key * ModifyIndex -> Async<Value * ModifyIndex>
. Next, we can take this operation and turn it into an AsyncSeq
of changes as follows:
type Key = string
type Value = string
type ModifyIndex = int64
let longPollKey (key:Key, mi:ModifyIndex) : Async<Value * ModifyIndex> =
failwith "undefined"
let changes (key:Key, mi:ModifyIndex) : AsyncSeq<Value> =
AsyncSeq.unfoldAsync
(fun (mi:ModifyIndex) -> async {
let! value,mi = longPollKey (key, mi)
return Some (value,mi) })
mi
The function changes
produces an async sequence which emits elements whenever the value corresponding to the key changes. Suppose also that we would like to trigger an operation
whenever the key changes or based on a fixed interval. We can represent a fixed interval as an async sequence as follows:
let intervalMs (periodMs:int) = asyncSeq {
yield DateTime.UtcNow
while true do
do! Async.Sleep periodMs
yield DateTime.UtcNow }
Putting it all together:
let changesOrInterval : AsyncSeq<Value> =
AsyncSeq.combineLatestWith (fun v _ -> v) (changes ("myKey", 0L)) (intervalMs (1000 * 60))
We can now consume this async sequence and use it to trigger downstream operations, such as updating the configuration of a running program, in flight.
AsyncSeq.distinctUntilChanged
returns an async sequence which returns every element of the source sequence, skipping elements which equal its predecessor.
An example execution can be visualized as follows:
-----------------------------------
| source | a | a | b | b | b | a |
| result | a | | b | | | a |
-----------------------------------
Suppose you're polling a resource which returns status information of a background job.
type Status = {
completed : int
finished : bool
result : string
}
/// Gets the status of a job.
let status : Async<Status> =
failwith ""
let statuses : AsyncSeq<Status> =
asyncSeq {
let! s = status
while true do
do! Async.Sleep 1000
let! s = status
yield s }
The async sequence statuses
will return a status every second. It will return a status regardless of whether the status changed. Assuming the status changes monotonically, we can use AsyncSeq.distinctUntilChanged
to transform statuses
into an async sequence of distinct statuses:
let distinctStatuses : AsyncSeq<Status> =
statuses |> AsyncSeq.distinctUntilChanged
Finally, we can create a workflow which prints the status every time a change is detected and terminates when the underlying job reaches the finished
state:
let result : Async<string> =
distinctStatuses
|> AsyncSeq.pick (fun st ->
printfn "status=%A" st
if st.finished then Some st.result
else None)
AsyncSeq.zip : AsyncSeq<'a> -> AsyncSeq<'b> -> AsyncSeq<'a * 'b>
takes a pair of sequences and combines them into a sequence of pairs element wise - the first element of one sequence is paired with the first element of the other, and so on. It can be used to pair sequences of related elements into a single sequence. It can also be used to combine a sequence of elements with a sequence of effects.
An example execution can be visually depicted as follows:
---------------------------------------------
| source1 | a1 | a2 | |
| source2 | b1 | b2 | b3 |
| result | a1 * b1 | a2 * b2 | |
---------------------------------------------
Note that the resulting sequence terminates when either input sequence terminates.
Suppose that we have an async sequence of events consumed from a message bus. We would like to process this sequence but we want to ensure that we're not processing to fast. We can pair the sequence of events with a sequence of durations corresponding to the minimum consumption time. We can do this as follows:
let events : AsyncSeq<Event> =
failwith "TODO"
let eventsAtLeastOneSec =
AsyncSeq.zipWith
(fun a _ -> a)
events
(AsyncSeq.replicateInfiniteAsync (Async.Sleep 1000))
The resulting async sequence eventsAtLeastOneSec
will emit an element at-most every second. Note that the input sequence of timeouts is infinite - this is to allow the other sequence to have any length since AsyncSeq.zipWith
will terminate when either input sequence terminates.
AsyncSeq.bufferByTimeAndCount
consumes the input sequence until a specified number of elements are consumed or a timeout expires at which point the resulting sequence emits the buffered of elements, unless no elements have been buffered. It is similar to AsyncSeq.bufferByCount
but allows a buffer to be emitted base on a timeout in addition to buffer size. Both are useful for batching inputs before performing an operation. AsyncSeq.bufferByTimeAndCount
allows an async workflow to proceed even if there are no inputs received during a certain time period.
An example execution can be visually depicted as follows:
-------------------------------------------------------
| source | a1 | a2 | a3 | a4 | |
| result | | | [a1,a2,a3] | | [a4] |
-------------------------------------------------------
The last event a4
is emitted after a timeout.
Suppose we're writing a service which consumes a stream of events and indexes them into full-text search index. We can index each event one by one, however we get a performance improvement if we buffer events into small batches. We can buffer into fixed size batches using AsyncSeq.bufferByCount
. However, the source event stream may stop emitting events half way through a batch which would leave those events in the buffer until more events arrive. AsyncSeq.bufferByTimeAndCount
allows the async workflow to make progress by imposing a bound on how long a non-empty but incomplete buffer can wait more additional items.
let individualEvents : AsyncSeq<Event> =
failwith ""
let bufferSize = 100
let bufferTimeout = 1000
let bufferedEvents : AsyncSeq<Event[]> =
events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout
namespace System
Multiple items
namespace FSharp
--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
Multiple items
module Event
from Microsoft.FSharp.Control
<summary>Contains operations for working with values of type <see cref="T:Microsoft.FSharp.Control.IEvent`1" />.</summary>
<category index="3">Events and Observables</category>
--------------------
type Event =
{ entityId: int64
data: string }
--------------------
type Event<'T> =
new : unit -> Event<'T>
member Trigger : arg:'T -> unit
member Publish : IEvent<'T>
<summary>Event implementations for the IEvent<_> type.</summary>
<category index="3">Events and Observables</category>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
new : unit -> Event<'Delegate,'Args>
member Trigger : sender:obj * args:'Args -> unit
member Publish : IEvent<'Delegate,'Args>
<summary>Event implementations for a delegate types following the standard .NET Framework convention of a first 'sender' argument.</summary>
<category index="3">Events and Observables</category>
--------------------
new : unit -> Event<'T>
--------------------
new : unit -> Event<'Delegate,'Args>
Event.entityId: int64
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)
<summary>Converts the argument to signed 64-bit integer. This is a direct conversion for all
primitive numeric types. For strings, the input is converted using <c>Int64.Parse()</c>
with InvariantCulture settings. Otherwise the operation requires an appropriate
static conversion method on the input type.</summary>
<param name="value">The input value.</param>
<returns>The converted int64</returns>
--------------------
[<Struct>]
type int64 = Int64
<summary>An abbreviation for the CLI type <see cref="T:System.Int64" />.</summary>
<category>Basic Types</category>
--------------------
type int64<'Measure> =
int64
<summary>The type of 64-bit signed integer numbers, annotated with a unit of measure. The unit
of measure is erased in compiled code and when values of this type
are analyzed using reflection. The type is representationally equivalent to
<see cref="T:System.Int64" />.</summary>
<category>Basic Types with Units of Measure</category>
Event.data: string
Multiple items
val string : value:'T -> string
<summary>Converts the argument to a string using <c>ToString</c>.</summary>
<remarks>For standard integer and floating point values the and any type that implements <c>IFormattable</c><c>ToString</c> conversion uses <c>CultureInfo.InvariantCulture</c>. </remarks>
<param name="value">The input value.</param>
<returns>The converted string.</returns>
--------------------
type string = String
<summary>An abbreviation for the CLI type <see cref="T:System.String" />.</summary>
<category>Basic Types</category>
val stream : AsyncSeq<Event>
Multiple items
module AsyncSeq
from FSharp.Control
--------------------
type AsyncSeq<'T> = IAsyncEnumerable<'T>
<summary>
An asynchronous sequence represents a delayed computation that can be
started to give an enumerator for pulling results asynchronously
</summary>
val failwith : message:string -> 'T
<summary>Throw a <see cref="T:System.Exception" /> exception.</summary>
<param name="message">The exception message.</param>
<returns>The result value.</returns>
val action : e:Event -> Async<unit>
val e : Event
Multiple items
type Async =
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T> + 1 overload
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> + 3 overloads
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
...
<summary>Holds static members for creating and manipulating asynchronous computations.</summary>
<remarks>
See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>.
</remarks>
<category index="1">Async Programming</category>
--------------------
type Async<'T> =
<summary>
An asynchronous computation, which, when run, will eventually produce a value of type T, or else raises an exception.
</summary>
<remarks>
This type has no members. Asynchronous computations are normally specified either by using an async expression
or the static methods in the <see cref="T:Microsoft.FSharp.Control.Async" /> type.
See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>.
</remarks>
<namespacedoc><summary>
Library functionality for asynchronous programming, events and agents. See also
<a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">Asynchronous Programming</a>,
<a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/members/events">Events</a> and
<a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/lazy-expressions">Lazy Expressions</a> in the
F# Language Guide.
</summary></namespacedoc>
<category index="1">Async Programming</category>
type unit = Unit
<summary>The type 'unit', which has only one value "()". This value is special and
always uses the representation 'null'.</summary>
<category index="1">Basic Types</category>
val iterAsync : action:('T -> Async<unit>) -> source:AsyncSeq<'T> -> Async<unit>
<summary>
Iterates over the input sequence and calls the specified asynchronous function for
every value. The input sequence will be asked for the next element after
the processing of an element completes.
</summary>
val groupBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * AsyncSeq<'T>> (requires equality)
<summary>
Applies a key-generating function to each element and returns an async sequence containing unique keys
and async sequences containing elements corresponding to the key.
Note that the resulting async sequence has to be processed in parallel (e.g AsyncSeq.mapAsyncParallel) becaused
completion of sub-sequences depends on completion of other sub-sequences.
</summary>
Multiple items
val int : value:'T -> int (requires member op_Explicit)
<summary>Converts the argument to signed 32-bit integer. This is a direct conversion for all
primitive numeric types. For strings, the input is converted using <c>Int32.Parse()</c>
with InvariantCulture settings. Otherwise the operation requires an appropriate
static conversion method on the input type.</summary>
<param name="value">The input value.</param>
<returns>The converted int</returns>
--------------------
[<Struct>]
type int = int32
<summary>An abbreviation for the CLI type <see cref="T:System.Int32" />.</summary>
<category>Basic Types</category>
--------------------
type int<'Measure> =
int
<summary>The type of 32-bit signed integer numbers, annotated with a unit of measure. The unit
of measure is erased in compiled code and when values of this type
are analyzed using reflection. The type is representationally equivalent to
<see cref="T:System.Int32" />.</summary>
<category>Basic Types with Units of Measure</category>
val mapAsyncParallel : mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
<summary>
Builds a new asynchronous sequence whose elements are generated by
applying the specified function to all elements of the input sequence.
The function is applied to elements in order and results are emitted in order,
but in parallel, without waiting for a prior mapping operation to complete.
Parallelism is bound by the ThreadPool.
</summary>
val snd : tuple:('T1 * 'T2) -> 'T2
<summary>Return the second element of a tuple, <c>snd (a,b) = b</c>.</summary>
<param name="tuple">The input tuple.</param>
<returns>The second value.</returns>
val iter : action:('T -> unit) -> source:AsyncSeq<'T> -> Async<unit>
<summary>
Iterates over the input sequence and calls the specified function for
every value.
</summary>
val ignore : value:'T -> unit
<summary>Ignore the passed value. This is often used to throw away results of a computation.</summary>
<param name="value">The value to ignore.</param>
val batchStream : AsyncSeq<Event []>
val batchAction : es:Event [] -> Async<unit>
val es : Event []
val concatSeq : source:AsyncSeq<#seq<'T>> -> AsyncSeq<'T>
<summary>
Flattens an AsyncSeq of synchronous sequences.
</summary>
val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
<summary>
Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed.
</summary>
val intervalMs : periodMs:int -> AsyncSeq<DateTime>
Represents an stream emitting elements on a specified interval.
val periodMs : int
val asyncSeq : AsyncSeq.AsyncSeqBuilder
<summary>
Builds an asynchronous sequence using the computation builder syntax
</summary>
Multiple items
[<Struct>]
type DateTime =
new : year: int * month: int * day: int -> unit + 10 overloads
member Add : value: TimeSpan -> DateTime
member AddDays : value: float -> DateTime
member AddHours : value: float -> DateTime
member AddMilliseconds : value: float -> DateTime
member AddMinutes : value: float -> DateTime
member AddMonths : months: int -> DateTime
member AddSeconds : value: float -> DateTime
member AddTicks : value: int64 -> DateTime
member AddYears : value: int -> DateTime
...
<summary>Represents an instant in time, typically expressed as a date and time of day.</summary>
--------------------
DateTime ()
(+0 other overloads)
DateTime(ticks: int64) : DateTime
(+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : DateTime
(+0 other overloads)
property DateTime.UtcNow: DateTime with get
<summary>Gets a <see cref="T:System.DateTime" /> object that is set to the current date and time on this computer, expressed as the Coordinated Universal Time (UTC).</summary>
<returns>An object whose value is the current UTC date and time.</returns>
static member Async.Sleep : dueTime:TimeSpan -> Async<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val either : AsyncSeq<DateTime>
val merge : source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> AsyncSeq<'T>
<summary>
Merges two async sequences of the same type into an async sequence non-deterministically.
The resulting async sequence produces elements when any argument sequence produces an element.
</summary>
type Key = string
type Value = string
[<Struct>]
type ModifyIndex = int64
val longPollKey : key:Key * mi:ModifyIndex -> Async<Value * ModifyIndex>
val key : Key
val mi : ModifyIndex
val changes : key:Key * mi:ModifyIndex -> AsyncSeq<Value>
val unfoldAsync : generator:('State -> Async<('T * 'State) option>) -> state:'State -> AsyncSeq<'T>
<summary>
Generates an async sequence using the specified asynchronous generator function.
</summary>
val async : AsyncBuilder
<summary>Builds an asynchronous workflow using computation expression syntax.</summary>
val value : Value
union case Option.Some: Value: 'T -> Option<'T>
<summary>The representation of "Value of type 'T"</summary>
<param name="Value">The input value.</param>
<returns>An option representing the value.</returns>
val intervalMs : periodMs:int -> AsyncSeq<DateTime>
val changesOrInterval : AsyncSeq<Value>
val combineLatestWith : combine:('T -> 'U -> 'V) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'U> -> AsyncSeq<'V>
<summary>
Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either
input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence.
If either of the input sequences is empty, the resulting sequence is empty.
</summary>
val v : Value
type Status =
{ completed: int
finished: bool
result: string }
Status.completed: int
Status.finished: bool
[<Struct>]
type bool = Boolean
<summary>An abbreviation for the CLI type <see cref="T:System.Boolean" />.</summary>
<category>Basic Types</category>
Status.result: string
val status : Async<Status>
Gets the status of a job.
val statuses : AsyncSeq<Status>
val s : Status
val distinctStatuses : AsyncSeq<Status>
val distinctUntilChanged : source:AsyncSeq<'T> -> AsyncSeq<'T> (requires equality)
<summary>
Returns an async sequence which contains no contiguous duplicate elements.
</summary>
val result : Async<string>
val pick : chooser:('T -> 'TResult option) -> source:AsyncSeq<'T> -> Async<'TResult>
<summary>
Asynchronously pick a value from a sequence based on the specified chooser function.
Raises KeyNotFoundException if the chooser function can't find a matching key.
</summary>
val st : Status
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
<summary>Print to <c>stdout</c> using the given format, and add a newline.</summary>
<param name="format">The formatter.</param>
<returns>The formatted result.</returns>
union case Option.None: Option<'T>
<summary>The representation of "No value"</summary>
val events : AsyncSeq<Event>
val eventsAtLeastOneSec : AsyncSeq<Event>
val zipWith : mapping:('T1 -> 'T2 -> 'U) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<'U>
<summary>
Combines two asynchronous sequences using the specified function.
The resulting sequence stops when either of the argument sequences stop.
</summary>
val a : Event
val replicateInfiniteAsync : v:Async<'T> -> AsyncSeq<'T>
<summary>
Creates an infinite async sequence which repeatedly evaluates and emits the specified async value.
</summary>
val individualEvents : AsyncSeq<Event>
val bufferSize : int
val bufferTimeout : int
val bufferedEvents : AsyncSeq<Event []>