FSharp.Control.AsyncSeq


Binder

F# AsyncSeq Examples

#r "../../../bin/FSharp.Control.AsyncSeq.dll"
open System
open FSharp.Control

Group By

AsyncSeq.groupBy partitions an input sequence into sub-sequences with respect to the specified projection function. This operation is the asynchronous analog to Seq.groupBy.

Example Execution

An example execution can be depicted visually as follows:

--------------------------------------------------
| source  | a0 | a2 | a3 | a4 | a5 |             |
| key     | k1 | k2 | k1 | k3 |    |             | 
| result  | k1 * [a1,a3] | k2 * [a2] | k3 * [a4] |
--------------------------------------------------

Use Case

Suppose we would like to consume a stream of events AsyncSeq<Event> and perform an operation on each event. The operation on each event is of type Event -> Async<unit>. This can be done as follows:

type Event = {
  entityId : int64
  data : string 
}

let stream : AsyncSeq<Event> =
  failwith "undefined"

let action (e:Event) : Async<unit> =
  failwith "undefined"

stream 
|> AsyncSeq.iterAsync action

The above workflow will read an event from the stream, perform an operation and then read the next event. While the read operation and the operation on the event are asynchronous, the stream is processed sequentially. It may be desirable to parallelize the processing of the stream. Suppose that events correspond to some entity, such as a shopping cart. Events belonging to some shopping cart must be processed in a sequential order, however they are independent from events belonging to other shopping carts. Therefore, events belonging to distinct shopping carts can be processed in parallel. Using AsyncSeq.groupBy, we can partition the stream into a fixed set of sub-streams and then process the sub-streams in parallel using AsyncSeq.mapAsyncParallel:

stream
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4)
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
|> AsyncSeq.iter ignore

AsyncSeq.groupBy partitions the input sequence into sub-sequences based on a key returned by a projection function. The resulting sub-sequences emit elements when the source sequence emits an element corresponding to the key of the sub-sequence. Elements of the resulting sequence are pairs of keys and sub-sequences, in this case int * AsyncSeq<Event>. Since by definition, these sub-sequences are independent, they can be processed in parallel. In fact, the sub-sequences must be processed in parallel, because it isn't possible to complete the processing of a sub-sequence until all elements of the source sequence are exhausted.

To continue improving the efficiency of our workflow, we can make use of batching. Specifically, we can read the incoming events in batches and we can perform actions on entire batches of events.

let batchStream : AsyncSeq<Event[]> =
  failwith "undefined"

let batchAction (es:Event[]) : Async<unit> =
  failwith "undefined"

Ordering is still important. For example, the batch action could write events into a full-text search index. We would like the full-text search index to be sequentially consistent. As such, the events need to be applied in the order they were emitted. The following workflow has the desired properties:

batchStream
|> AsyncSeq.concatSeq // flatten the sequence of event arrays
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups
|> AsyncSeq.mapAsyncParallel (snd 
  >> AsyncSeq.bufferByCountAndTime 500 1000 // buffer sub-sequences
  >> AsyncSeq.iterAsync batchAction) // perform the batch operation
|> AsyncSeq.iter ignore

The above workflow:

  1. Reads events in batches.
  2. Flattens the batches.
  3. Partitions the events into mutually exclusive sub-sequences.
  4. Buffers elements of each sub-sequence by time and space.
  5. Processes the sub-sequences in parallel, but individual sub-sequences sequentially.

Merge

AsyncSeq.merge non-deterministically merges two async sequences into one. It is non-deterministic in the sense that the resulting sequence emits elements whenever either input sequence emits a value. Since it isn't always known which will emit a value first, if at all, the operation is non-deterministic. This operation is in contrast to AsyncSeq.zip which also takes two async sequences and returns a single async sequence, but as opposed to emitting an element when either input sequence produces a value, it emits an element when both sequences emit a value. This operation is also in contrast to AsyncSeq.append which concatenates two async sequences, emitting all element of one, followed by all elements of the another.

Example Execution

An example execution can be depicted visually as follows:

-----------------------------------------
| source1 | t0 |    | t1 |    |    | t2 |
| source2 |    | u0 |    |    | u1 |    |
| result  | t0 | u0 | t1 |    | u1 | t2 |
-----------------------------------------

Use Case

Suppose you wish to perform an operation when either of two async sequences emits an element. One way to do this is two start consuming both async sequences in parallel. If we would like to perform only one operation at a time, we can use AsyncSeq.merge as follows:

/// Represents an stream emitting elements on a specified interval.
let intervalMs (periodMs:int) = asyncSeq {
  yield DateTime.UtcNow
  while true do
    do! Async.Sleep periodMs
    yield DateTime.UtcNow }

let either : AsyncSeq<DateTime> =
  AsyncSeq.merge (intervalMs 20) (intervalMs 30)

The sequence either emits an element every 20ms and every 30ms.

Combine Latest

AsyncSeq.combineLatest non-deterministically merges two async sequences much like AsyncSeq.merge, combining their elements using the specified combine function. The resulting async sequence will only contain elements if both of the source sequences produce at least one element. After combining the first elements the source sequences, this operation emits elements when either source sequence emits an element, passing the newly emitted element as one of the arguments to the combine function, the other being the previously emitted element of that type.

Example Execution

An example execution can be depicted visually as follows:

----------------------------------------
| source1 | a0 |    |    | a1 |   | a2 |
| source2 |    | b0 | b1 |    |   |    |
| result  |    | c0 | c1 | c2 |   | c3 |
----------------------------------------

where

c0 = f a0 b0
c1 = f a0 b1
c2 = f a1 b1
c3 = f a2 b1

Use Case

Suppose we would like to trigger an operation whenever a change occurs. We can represent changes as an AsyncSeq. To gain intuition for this, consider the Consul configuration management system. It stores configuration information in a tree-like structure. For this purpose of this discussion, it can be thought of as a key-value store exposed via HTTP. In addition, Consul supports change notifications using HTTP long-polling - when an HTTP GET request is made to retrieve the value of a key, if the request specified a modify-index, Consul won't respond to the request until a change has occurred since the modify-index. We can represent this operation using the type Key * ModifyIndex -> Async<Value * ModifyIndex>. Next, we can take this operation and turn it into an AsyncSeq of changes as follows:

type Key = string

type Value = string

type ModifyIndex = int64

let longPollKey (key:Key, mi:ModifyIndex) : Async<Value * ModifyIndex> =
  failwith "undefined"

let changes (key:Key, mi:ModifyIndex) : AsyncSeq<Value> =
  AsyncSeq.unfoldAsync 
    (fun (mi:ModifyIndex) -> async {
      let! value,mi = longPollKey (key, mi)
      return Some (value,mi) })
    mi

The function changes produces an async sequence which emits elements whenever the value corresponding to the key changes. Suppose also that we would like to trigger an operation whenever the key changes or based on a fixed interval. We can represent a fixed interval as an async sequence as follows:

let intervalMs (periodMs:int) = asyncSeq {
  yield DateTime.UtcNow
  while true do
    do! Async.Sleep periodMs
    yield DateTime.UtcNow }

Putting it all together:

let changesOrInterval : AsyncSeq<Value> =
  AsyncSeq.combineLatestWith (fun v _ -> v) (changes ("myKey", 0L)) (intervalMs (1000 * 60))

We can now consume this async sequence and use it to trigger downstream operations, such as updating the configuration of a running program, in flight.

Distinct Until Changed

AsyncSeq.distinctUntilChanged returns an async sequence which returns every element of the source sequence, skipping elements which equal its predecessor.

Example Execution

An example execution can be visualized as follows:

-----------------------------------
| source  | a | a | b | b | b | a |
| result  | a |   | b |   |   | a |
-----------------------------------

Use Case

Suppose you're polling a resource which returns status information of a background job.

type Status = {
  completed : int
  finished : bool
  result : string
}

/// Gets the status of a job.
let status : Async<Status> =
  failwith ""

let statuses : AsyncSeq<Status> =
  asyncSeq {
    let! s = status
    while true do
      do! Async.Sleep 1000
      let! s = status
      yield s }

The async sequence statuses will return a status every second. It will return a status regardless of whether the status changed. Assuming the status changes monotonically, we can use AsyncSeq.distinctUntilChanged to transform statuses into an async sequence of distinct statuses:

let distinctStatuses : AsyncSeq<Status> =
  statuses |> AsyncSeq.distinctUntilChanged

Finally, we can create a workflow which prints the status every time a change is detected and terminates when the underlying job reaches the finished state:

let result : Async<string> =
  distinctStatuses
  |> AsyncSeq.pick (fun st -> 
    printfn "status=%A" st
    if st.finished then Some st.result
    else None)

Zip

AsyncSeq.zip : AsyncSeq<'a> -> AsyncSeq<'b> -> AsyncSeq<'a * 'b> takes a pair of sequences and combines them into a sequence of pairs element wise - the first element of one sequence is paired with the first element of the other, and so on. It can be used to pair sequences of related elements into a single sequence. It can also be used to combine a sequence of elements with a sequence of effects.

Example Execution

An example execution can be visually depicted as follows:

---------------------------------------------
| source1  |    a1    |    a2    |          |
| source2  |    b1    |    b2    |    b3    |
| result   |  a1 * b1 |  a2 * b2 |          | 
---------------------------------------------

Note that the resulting sequence terminates when either input sequence terminates.

Use Case

Suppose that we have an async sequence of events consumed from a message bus. We would like to process this sequence but we want to ensure that we're not processing to fast. We can pair the sequence of events with a sequence of durations corresponding to the minimum consumption time. We can do this as follows:

let events : AsyncSeq<Event> =
  failwith "TODO"

let eventsAtLeastOneSec =
  AsyncSeq.zipWith 
    (fun a _ -> a) 
    events 
    (AsyncSeq.replicateInfiniteAsync (Async.Sleep 1000))

The resulting async sequence eventsAtLeastOneSec will emit an element at-most every second. Note that the input sequence of timeouts is infinite - this is to allow the other sequence to have any length since AsyncSeq.zipWith will terminate when either input sequence terminates.

Buffer by Time and Count

AsyncSeq.bufferByTimeAndCount consumes the input sequence until a specified number of elements are consumed or a timeout expires at which point the resulting sequence emits the buffered of elements, unless no elements have been buffered. It is similar to AsyncSeq.bufferByCount but allows a buffer to be emitted base on a timeout in addition to buffer size. Both are useful for batching inputs before performing an operation. AsyncSeq.bufferByTimeAndCount allows an async workflow to proceed even if there are no inputs received during a certain time period.

Example Execution

An example execution can be visually depicted as follows:

-------------------------------------------------------
| source   |  a1 | a2 | a3         | a4      |        |
| result   |     |    | [a1,a2,a3] |         |  [a4]  |
-------------------------------------------------------

The last event a4 is emitted after a timeout.

Use Case

Suppose we're writing a service which consumes a stream of events and indexes them into full-text search index. We can index each event one by one, however we get a performance improvement if we buffer events into small batches. We can buffer into fixed size batches using AsyncSeq.bufferByCount. However, the source event stream may stop emitting events half way through a batch which would leave those events in the buffer until more events arrive. AsyncSeq.bufferByTimeAndCount allows the async workflow to make progress by imposing a bound on how long a non-empty but incomplete buffer can wait more additional items.

let individualEvents : AsyncSeq<Event> =
  failwith ""

let bufferSize = 100
let bufferTimeout = 1000

let bufferedEvents : AsyncSeq<Event[]> =
  events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout   
namespace System
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
Multiple items
module Event from Microsoft.FSharp.Control
<summary>Contains operations for working with values of type <see cref="T:Microsoft.FSharp.Control.IEvent`1" />.</summary>
<category index="3">Events and Observables</category>


--------------------
type Event = { entityId: int64 data: string }

--------------------
type Event<'T> = new : unit -> Event<'T> member Trigger : arg:'T -> unit member Publish : IEvent<'T>
<summary>Event implementations for the IEvent&lt;_&gt; type.</summary>
<category index="3">Events and Observables</category>


--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> = new : unit -> Event<'Delegate,'Args> member Trigger : sender:obj * args:'Args -> unit member Publish : IEvent<'Delegate,'Args>
<summary>Event implementations for a delegate types following the standard .NET Framework convention of a first 'sender' argument.</summary>
<category index="3">Events and Observables</category>


--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
Event.entityId: int64
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)
<summary>Converts the argument to signed 64-bit integer. This is a direct conversion for all primitive numeric types. For strings, the input is converted using <c>Int64.Parse()</c> with InvariantCulture settings. Otherwise the operation requires an appropriate static conversion method on the input type.</summary>
<param name="value">The input value.</param>
<returns>The converted int64</returns>


--------------------
[<Struct>] type int64 = Int64
<summary>An abbreviation for the CLI type <see cref="T:System.Int64" />.</summary>
<category>Basic Types</category>


--------------------
type int64<'Measure> = int64
<summary>The type of 64-bit signed integer numbers, annotated with a unit of measure. The unit of measure is erased in compiled code and when values of this type are analyzed using reflection. The type is representationally equivalent to <see cref="T:System.Int64" />.</summary>
<category>Basic Types with Units of Measure</category>
Event.data: string
Multiple items
val string : value:'T -> string
<summary>Converts the argument to a string using <c>ToString</c>.</summary>
<remarks>For standard integer and floating point values the and any type that implements <c>IFormattable</c><c>ToString</c> conversion uses <c>CultureInfo.InvariantCulture</c>. </remarks>
<param name="value">The input value.</param>
<returns>The converted string.</returns>


--------------------
type string = String
<summary>An abbreviation for the CLI type <see cref="T:System.String" />.</summary>
<category>Basic Types</category>
val stream : AsyncSeq<Event>
Multiple items
module AsyncSeq from FSharp.Control

--------------------
type AsyncSeq<'T> = IAsyncEnumerable<'T>
<summary> An asynchronous sequence represents a delayed computation that can be started to give an enumerator for pulling results asynchronously </summary>
val failwith : message:string -> 'T
<summary>Throw a <see cref="T:System.Exception" /> exception.</summary>
<param name="message">The exception message.</param>
<returns>The result value.</returns>
val action : e:Event -> Async<unit>
val e : Event
Multiple items
type Async = static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool> static member AwaitTask : task:Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool> static member CancelDefaultToken : unit -> unit static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>> static member Choice : computations:seq<Async<'T option>> -> Async<'T option> static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...
<summary>Holds static members for creating and manipulating asynchronous computations.</summary>
<remarks> See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>. </remarks>
<category index="1">Async Programming</category>


--------------------
type Async<'T> =
<summary> An asynchronous computation, which, when run, will eventually produce a value of type T, or else raises an exception. </summary>
<remarks> This type has no members. Asynchronous computations are normally specified either by using an async expression or the static methods in the <see cref="T:Microsoft.FSharp.Control.Async" /> type. See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>. </remarks>
<namespacedoc><summary> Library functionality for asynchronous programming, events and agents. See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">Asynchronous Programming</a>, <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/members/events">Events</a> and <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/lazy-expressions">Lazy Expressions</a> in the F# Language Guide. </summary></namespacedoc>
<category index="1">Async Programming</category>
type unit = Unit
<summary>The type 'unit', which has only one value "()". This value is special and always uses the representation 'null'.</summary>
<category index="1">Basic Types</category>
val iterAsync : action:('T -> Async<unit>) -> source:AsyncSeq<'T> -> Async<unit>
<summary> Iterates over the input sequence and calls the specified asynchronous function for every value. The input sequence will be asked for the next element after the processing of an element completes. </summary>
val groupBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * AsyncSeq<'T>> (requires equality)
<summary> Applies a key-generating function to each element and returns an async sequence containing unique keys and async sequences containing elements corresponding to the key. Note that the resulting async sequence has to be processed in parallel (e.g AsyncSeq.mapAsyncParallel) becaused completion of sub-sequences depends on completion of other sub-sequences. </summary>
Multiple items
val int : value:'T -> int (requires member op_Explicit)
<summary>Converts the argument to signed 32-bit integer. This is a direct conversion for all primitive numeric types. For strings, the input is converted using <c>Int32.Parse()</c> with InvariantCulture settings. Otherwise the operation requires an appropriate static conversion method on the input type.</summary>
<param name="value">The input value.</param>
<returns>The converted int</returns>


--------------------
[<Struct>] type int = int32
<summary>An abbreviation for the CLI type <see cref="T:System.Int32" />.</summary>
<category>Basic Types</category>


--------------------
type int<'Measure> = int
<summary>The type of 32-bit signed integer numbers, annotated with a unit of measure. The unit of measure is erased in compiled code and when values of this type are analyzed using reflection. The type is representationally equivalent to <see cref="T:System.Int32" />.</summary>
<category>Basic Types with Units of Measure</category>
val mapAsyncParallel : mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
<summary> Builds a new asynchronous sequence whose elements are generated by applying the specified function to all elements of the input sequence. The function is applied to elements in order and results are emitted in order, but in parallel, without waiting for a prior mapping operation to complete. Parallelism is bound by the ThreadPool. </summary>
val snd : tuple:('T1 * 'T2) -> 'T2
<summary>Return the second element of a tuple, <c>snd (a,b) = b</c>.</summary>
<param name="tuple">The input tuple.</param>
<returns>The second value.</returns>
val iter : action:('T -> unit) -> source:AsyncSeq<'T> -> Async<unit>
<summary> Iterates over the input sequence and calls the specified function for every value. </summary>
val ignore : value:'T -> unit
<summary>Ignore the passed value. This is often used to throw away results of a computation.</summary>
<param name="value">The value to ignore.</param>
val batchStream : AsyncSeq<Event []>
val batchAction : es:Event [] -> Async<unit>
val es : Event []
val concatSeq : source:AsyncSeq<#seq<'T>> -> AsyncSeq<'T>
<summary> Flattens an AsyncSeq of synchronous sequences. </summary>
val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
<summary> Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed. </summary>
val intervalMs : periodMs:int -> AsyncSeq<DateTime>
 Represents an stream emitting elements on a specified interval.
val periodMs : int
val asyncSeq : AsyncSeq.AsyncSeqBuilder
<summary> Builds an asynchronous sequence using the computation builder syntax </summary>
Multiple items
[<Struct>] type DateTime = new : year: int * month: int * day: int -> unit + 10 overloads member Add : value: TimeSpan -> DateTime member AddDays : value: float -> DateTime member AddHours : value: float -> DateTime member AddMilliseconds : value: float -> DateTime member AddMinutes : value: float -> DateTime member AddMonths : months: int -> DateTime member AddSeconds : value: float -> DateTime member AddTicks : value: int64 -> DateTime member AddYears : value: int -> DateTime ...
<summary>Represents an instant in time, typically expressed as a date and time of day.</summary>

--------------------
DateTime ()
   (+0 other overloads)
DateTime(ticks: int64) : DateTime
   (+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : DateTime
   (+0 other overloads)
property DateTime.UtcNow: DateTime with get
<summary>Gets a <see cref="T:System.DateTime" /> object that is set to the current date and time on this computer, expressed as the Coordinated Universal Time (UTC).</summary>
<returns>An object whose value is the current UTC date and time.</returns>
static member Async.Sleep : dueTime:TimeSpan -> Async<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val either : AsyncSeq<DateTime>
val merge : source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> AsyncSeq<'T>
<summary> Merges two async sequences of the same type into an async sequence non-deterministically. The resulting async sequence produces elements when any argument sequence produces an element. </summary>
type Key = string
type Value = string
[<Struct>] type ModifyIndex = int64
val longPollKey : key:Key * mi:ModifyIndex -> Async<Value * ModifyIndex>
val key : Key
val mi : ModifyIndex
val changes : key:Key * mi:ModifyIndex -> AsyncSeq<Value>
val unfoldAsync : generator:('State -> Async<('T * 'State) option>) -> state:'State -> AsyncSeq<'T>
<summary> Generates an async sequence using the specified asynchronous generator function. </summary>
val async : AsyncBuilder
<summary>Builds an asynchronous workflow using computation expression syntax.</summary>
val value : Value
union case Option.Some: Value: 'T -> Option<'T>
<summary>The representation of "Value of type 'T"</summary>
<param name="Value">The input value.</param>
<returns>An option representing the value.</returns>
val intervalMs : periodMs:int -> AsyncSeq<DateTime>
val changesOrInterval : AsyncSeq<Value>
val combineLatestWith : combine:('T -> 'U -> 'V) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'U> -> AsyncSeq<'V>
<summary> Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence. If either of the input sequences is empty, the resulting sequence is empty. </summary>
val v : Value
type Status = { completed: int finished: bool result: string }
Status.completed: int
Status.finished: bool
[<Struct>] type bool = Boolean
<summary>An abbreviation for the CLI type <see cref="T:System.Boolean" />.</summary>
<category>Basic Types</category>
Status.result: string
val status : Async<Status>
 Gets the status of a job.
val statuses : AsyncSeq<Status>
val s : Status
val distinctStatuses : AsyncSeq<Status>
val distinctUntilChanged : source:AsyncSeq<'T> -> AsyncSeq<'T> (requires equality)
<summary> Returns an async sequence which contains no contiguous duplicate elements. </summary>
val result : Async<string>
val pick : chooser:('T -> 'TResult option) -> source:AsyncSeq<'T> -> Async<'TResult>
<summary> Asynchronously pick a value from a sequence based on the specified chooser function. Raises KeyNotFoundException if the chooser function can't find a matching key. </summary>
val st : Status
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
<summary>Print to <c>stdout</c> using the given format, and add a newline.</summary>
<param name="format">The formatter.</param>
<returns>The formatted result.</returns>
union case Option.None: Option<'T>
<summary>The representation of "No value"</summary>
val events : AsyncSeq<Event>
val eventsAtLeastOneSec : AsyncSeq<Event>
val zipWith : mapping:('T1 -> 'T2 -> 'U) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<'U>
<summary> Combines two asynchronous sequences using the specified function. The resulting sequence stops when either of the argument sequences stop. </summary>
val a : Event
val replicateInfiniteAsync : v:Async<'T> -> AsyncSeq<'T>
<summary> Creates an infinite async sequence which repeatedly evaluates and emits the specified async value. </summary>
val individualEvents : AsyncSeq<Event>
val bufferSize : int
val bufferTimeout : int
val bufferedEvents : AsyncSeq<Event []>