FSharp.Control.AsyncSeq


F# AsyncSeq Examples

1: 
2: 
3: 
#r "../../../bin/FSharp.Control.AsyncSeq.dll"
open System
open FSharp.Control

Group By

AsyncSeq.groupBy partitions an input sequence into sub-sequences with respect to the specified projection function. This operation is the asynchronous analog to Seq.groupBy.

Example Execution

An example execution can be depicted visually as follows:


| source | a0 | a2 | a3 | a4 | a5 | | | key | k1 | k2 | k1 | k3 | | |

| result | k1 [a1,a3] | k2 [a2] | k3 * [a4] |

a0 k0

Use Case

Suppose we would like to consume a stream of events AsyncSeq<Event> and perform an operation on each event. The operation on each event is of type Event -> Async<unit>. This can be done as follows:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
type Event = {
  entityId : int64
  data : string 
}

let stream : AsyncSeq<Event> =
  failwith "undefined"

let action (e:Event) : Async<unit> =
  failwith "undefined"

stream 
|> AsyncSeq.iterAsync action

The above workflow will read an event from the stream, perform an operation and then read the next event. While the read operation and the operation on the event are asynchronous, the stream is processed sequentially. It may be desirable to parallelize the processing of the stream. Suppose that events correspond to some entity, such as a shopping cart. Events belonging to some shopping cart must be processed in a sequential order, however they are independent from events belonging to other shopping carts. Therefore, events belonging to distinct shopping carts can be processed in parallel. Using AsyncSeq.groupBy, we can partition the stream into a fixed set of sub-streams and then process the sub-streams in parallel using AsyncSeq.mapAsyncParallel:

1: 
2: 
3: 
4: 
stream
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4)
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
|> AsyncSeq.iter ignore

AsyncSeq.groupBy partitions the input sequence into sub-sequences based on a key returned by a projection function. The resulting sub-sequences emit elements when the source sequence emits an element corresponding to the key of the sub-sequence. Elements of the resulting sequence are pairs of keys and sub-sequences, in this case int * AsyncSeq<Event>. Since by definition, these sub-sequences are independent, they can be processed in parallel. In fact, the sub-sequences must be processed in parallel, because it isn't possible to complete the processing of a sub-sequence until all elements of the source sequence are exhausted.

To continue improving the efficiency of our workflow, we can make use of batching. Specifically, we can read the incoming events in batches and we can perform actions on entire batches of events.

1: 
2: 
3: 
4: 
5: 
let batchStream : AsyncSeq<Event[]> =
  failwith "undefined"

let batchAction (es:Event[]) : Async<unit> =
  failwith "undefined"

Ordering is still important. For example, the batch action could write events into a full-text search index. We would like the full-text search index to be sequentially consistent. As such, the events need to be applied in the order they were emitted. The following workflow has the desired properties:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
batchStream
|> AsyncSeq.concatSeq // flatten the sequence of event arrays
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups
|> AsyncSeq.mapAsyncParallel (snd 
  >> AsyncSeq.bufferByCountAndTime 500 1000 // buffer sub-sequences
  >> AsyncSeq.iterAsync batchAction) // perform the batch operation
|> AsyncSeq.iter ignore

The above workflow:

  1. Reads events in batches.
  2. Flattens the batches.
  3. Partitions the events into mutually exclusive sub-sequences.
  4. Buffers elements of each sub-sequence by time and space.
  5. Processes the sub-sequences in parallel, but individual sub-sequences sequentially.

Merge

AsyncSeq.merge non-deterministically merges two async sequences into one. It is non-deterministic in the sense that the resulting sequence emits elements whenever either input sequence emits a value. Since it isn't always known which will emit a value first, if at all, the operation is non-deterministic. This operation is in contrast to AsyncSeq.zip which also takes two async sequences and returns a single async sequence, but as opposed to emitting an element when either input sequence produces a value, it emits an element when both sequences emit a value. This operation is also in contrast to AsyncSeq.append which concatenates two async sequences, emitting all element of one, followed by all elements of the another.

Example Execution

An example execution can be depicted visually as follows:


| source1 | t0 | | t1 | | | t2 | | source2 | | u0 | | | u1 | |

| result | t0 | u0 | t1 | | u1 | t2 |

Use Case

Suppose you wish to perform an operation when either of two async sequences emits an element. One way to do this is two start consuming both async sequences in parallel. If we would like to perform only one operation at a time, we can use AsyncSeq.merge as follows:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
/// Represents an stream emitting elements on a specified interval.
let intervalMs (periodMs:int) = asyncSeq {
  yield DateTime.UtcNow
  while true do
    do! Async.Sleep periodMs
    yield DateTime.UtcNow }

let either : AsyncSeq<DateTime> =
  AsyncSeq.merge (intervalMs 20) (intervalMs 30)

The sequence `either` emits an element every 20ms and every 30ms.

Combine Latest

AsyncSeq.combineLatest non-deterministically merges two async sequences much like AsyncSeq.merge, combining their elements using the specified combine function. The resulting async sequence will only contain elements if both of the source sequences produce at least one element. After combining the first elements the source sequences, this operation emits elements when either source sequence emits an element, passing the newly emitted element as one of the arguments to the combine function, the other being the previously emitted element of that type.

Example Execution

An example execution can be depicted visually as follows:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
----------------------------------------
| source1 | a0 |    |    | a1 |   | a2 |
| source2 |    | b0 | b1 |    |   |    |
| result  |    | c0 | c1 | c2 |   | c3 |
----------------------------------------

where

c0 = f a0 b0
c1 = f a0 b1
c2 = f a1 b1
c3 = f a2 b1

Use Case

Suppose we would like to trigger an operation whenever a change occurs. We can represent changes as an AsyncSeq. To gain intuition for this, consider the Consul configuration management system. It stores configuration information in a tree-like structure. For this purpose of this discussion, it can be thought of as a key-value store exposed via HTTP. In addition, Consul supports change notifications using HTTP long-polling - when an HTTP GET request is made to retrieve the value of a key, if the request specified a modify-index, Consul won't respond to the request until a change has occurred since the modify-index. We can represent this operation using the type Key * ModifyIndex -> Async<Value * ModifyIndex>. Next, we can take this operation and turn it into an AsyncSeq of changes as follows:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
type Key = string

type Value = string

type ModifyIndex = int64

let longPollKey (key:Key, mi:ModifyIndex) : Async<Value * ModifyIndex> =
  failwith "undefined"

let changes (key:Key, mi:ModifyIndex) : AsyncSeq<Value> =
  AsyncSeq.unfoldAsync 
    (fun (mi:ModifyIndex) -> async {
      let! value,mi = longPollKey (key, mi)
      return Some (value,mi) })
    mi

The function changes produces an async sequence which emits elements whenever the value corresponding to the key changes. Suppose also that we would like to trigger an operation whenever the key changes or based on a fixed interval. We can represent a fixed interval as an async sequence as follows:

1: 
2: 
3: 
4: 
5: 
let intervalMs (periodMs:int) = asyncSeq {
  yield DateTime.UtcNow
  while true do
    do! Async.Sleep periodMs
    yield DateTime.UtcNow }

Putting it all together:

1: 
2: 
let changesOrInterval : AsyncSeq<Value> =
  AsyncSeq.combineLatestWith (fun v _ -> v) (changes ("myKey", 0L)) (intervalMs (1000 * 60))

We can now consume this async sequence and use it to trigger downstream operations, such as updating the configuration of a running program, in flight.


Distinct Until Changed

AsyncSeq.distinctUntilChanged returns an async sequence which returns every element of the source sequence, skipping elements which equal its predecessor.

Example Execution

An example execution can be visualized as follows:


| source | a | a | b | b | b | a |

| result | a | | b | | | a |

Use Case

Suppose you're polling a resource which returns status information of a background job.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
type Status = {
  completed : int
  finished : bool
  result : string
}

/// Gets the status of a job.
let status : Async<Status> =
  failwith ""

let statuses : AsyncSeq<Status> =
  asyncSeq {
    let! s = status
    while true do
      do! Async.Sleep 1000
      let! s = status
      yield s }

The async sequence statuses will return a status every second. It will return a status regardless of whether the status changed. Assuming the status changes monotonically, we can use AsyncSeq.distinctUntilChanged to transform statuses into an async sequence of distinct statuses:

1: 
2: 
let distinctStatuses : AsyncSeq<Status> =
  statuses |> AsyncSeq.distinctUntilChanged

Finally, we can create a workflow which prints the status every time a change is detected and terminates when the underlying job reaches the finished state:

1: 
2: 
3: 
4: 
5: 
6: 
let result : Async<string> =
  distinctStatuses
  |> AsyncSeq.pick (fun st -> 
    printfn "status=%A" st
    if st.finished then Some st.result
    else None)

Zip

AsyncSeq.zip : AsyncSeq<'a> -> AsyncSeq<'b> -> AsyncSeq<'a * 'b> takes a pair of sequences and combines them into a sequence of pairs element wise - the first element of one sequence is paired with the first element of the other, and so on. It can be used to pair sequences of related elements into a single sequence. It can also be used to combine a sequence of elements with a sequence of effects.

Example Execution

An example execution can be visually depicted as follows:

1: 
2: 
3: 
4: 
5: 
---------------------------------------------
| source1  |    a1    |    a2    |          |
| source2  |    b1    |    b2    |    b3    |
| result   |  a1 * b1 |  a2 * b2 |          | 
---------------------------------------------

Note that the resulting sequence terminates when either input sequence terminates.

Use Case

Suppose that we have an async sequence of events consumed from a message bus. We would like to process this sequence but we want to ensure that we're not processing to fast. We can pair the sequence of events with a sequence of durations corresponding to the minimum consumption time. We can do this as follows:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let events : AsyncSeq<Event> =
  failwith "TODO"

let eventsAtLeastOneSec =
  AsyncSeq.zipWith 
    (fun a _ -> a) 
    events 
    (AsyncSeq.replicateInfiniteAsync (Async.Sleep 1000))

The resulting async sequence eventsAtLeastOneSec will emit an element at-most every second. Note that the input sequence of timeouts is infinite - this is to allow the other sequence to have any length since AsyncSeq.zipWith will terminate when either input sequence terminates.

Buffer by Time and Count

AsyncSeq.bufferByTimeAndCount consumes the input sequence until a specified number of elements are consumed or a timeout expires at which point the resulting sequence emits the buffered of elements, unless no elements have been buffered. It is similar to AsyncSeq.bufferByCount but allows a buffer to be emitted base on a timeout in addition to buffer size. Both are useful for batching inputs before performing an operation. AsyncSeq.bufferByTimeAndCount allows an async workflow to proceed even if there are no inputs received during a certain time period.

Example Execution

An example execution can be visually depicted as follows:

1: 
2: 
3: 
4: 
-------------------------------------------------------
| source   |  a1 | a2 | a3         | a4      |        |
| result   |     |    | [a1,a2,a3] |         |  [a4]  |
-------------------------------------------------------

The last event a4 is emitted after a timeout.

Use Case

Suppose we're writing a service which consumes a stream of events and indexes them into full-text search index. We can index each event one by one, however we get a performance improvement if we buffer events into small batches. We can buffer into fixed size batches using AsyncSeq.bufferByCount. However, the source event stream may stop emitting events half way through a batch which would leave those events in the buffer until more events arrive. AsyncSeq.bufferByTimeAndCount allows the async workflow to make progress by imposing a bound on how long a non-empty but incomplete buffer can wait more additional items.

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
let individualEvents : AsyncSeq<Event> =
  failwith ""

let bufferSize = 100
let bufferTimeout = 1000

let bufferedEvents : AsyncSeq<Event[]> =
  events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout   
namespace System
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
Multiple items
module Event

from Microsoft.FSharp.Control

--------------------
type Event =
  {entityId: int64;
   data: string;}

Full name: AsyncSeqExamples.Event

--------------------
type Event<'T> =
  new : unit -> Event<'T>
  member Trigger : arg:'T -> unit
  member Publish : IEvent<'T>

Full name: Microsoft.FSharp.Control.Event<_>

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
  new : unit -> Event<'Delegate,'Args>
  member Trigger : sender:obj * args:'Args -> unit
  member Publish : IEvent<'Delegate,'Args>

Full name: Microsoft.FSharp.Control.Event<_,_>

--------------------
new : unit -> Event<'T>

--------------------
new : unit -> Event<'Delegate,'Args>
Event.entityId: int64
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
Event.data: string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
val stream : AsyncSeq<Event>

Full name: AsyncSeqExamples.stream
Multiple items
module AsyncSeq

from FSharp.Control

--------------------
type AsyncSeq<'T> = IAsyncEnumerable<'T>

Full name: FSharp.Control.AsyncSeq<_>
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val action : e:Event -> Async<unit>

Full name: AsyncSeqExamples.action
val e : Event
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
val iterAsync : action:('T -> Async<unit>) -> source:AsyncSeq<'T> -> Async<unit>

Full name: FSharp.Control.AsyncSeq.iterAsync
val groupBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * AsyncSeq<'T>> (requires equality)

Full name: FSharp.Control.AsyncSeq.groupBy
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val mapAsyncParallel : mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>

Full name: FSharp.Control.AsyncSeq.mapAsyncParallel
val snd : tuple:('T1 * 'T2) -> 'T2

Full name: Microsoft.FSharp.Core.Operators.snd
val iter : action:('T -> unit) -> source:AsyncSeq<'T> -> Async<unit>

Full name: FSharp.Control.AsyncSeq.iter
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val batchStream : AsyncSeq<Event []>

Full name: AsyncSeqExamples.batchStream
val batchAction : es:Event [] -> Async<unit>

Full name: AsyncSeqExamples.batchAction
val es : Event []
val concatSeq : source:AsyncSeq<#seq<'T>> -> AsyncSeq<'T>

Full name: FSharp.Control.AsyncSeq.concatSeq
val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>

Full name: FSharp.Control.AsyncSeq.bufferByCountAndTime
val intervalMs : periodMs:int -> 'a

Full name: AsyncSeqExamples.intervalMs


 Represents an stream emitting elements on a specified interval.
val periodMs : int
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val either : obj

Full name: AsyncSeqExamples.either
type Key = string

Full name: AsyncSeqExamples.Key
type Value = string

Full name: AsyncSeqExamples.Value
type ModifyIndex = int64

Full name: AsyncSeqExamples.ModifyIndex
val longPollKey : key:Key * mi:ModifyIndex -> Async<Value * ModifyIndex>

Full name: AsyncSeqExamples.longPollKey
val key : Key
val mi : ModifyIndex
val changes : key:Key * mi:ModifyIndex -> AsyncSeq<Value>

Full name: AsyncSeqExamples.changes
val unfoldAsync : generator:('State -> Async<('T * 'State) option>) -> state:'State -> AsyncSeq<'T>

Full name: FSharp.Control.AsyncSeq.unfoldAsync
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val value : Value
union case Option.Some: Value: 'T -> Option<'T>
val intervalMs : periodMs:int -> AsyncSeq<DateTime>

Full name: AsyncSeqExamples.intervalMs
val asyncSeq : AsyncSeq.AsyncSeqBuilder

Full name: FSharp.Control.AsyncSeqExtensions.asyncSeq
Multiple items
type DateTime =
  struct
    new : ticks:int64 -> DateTime + 10 overloads
    member Add : value:TimeSpan -> DateTime
    member AddDays : value:float -> DateTime
    member AddHours : value:float -> DateTime
    member AddMilliseconds : value:float -> DateTime
    member AddMinutes : value:float -> DateTime
    member AddMonths : months:int -> DateTime
    member AddSeconds : value:float -> DateTime
    member AddTicks : value:int64 -> DateTime
    member AddYears : value:int -> DateTime
    ...
  end

Full name: System.DateTime

--------------------
DateTime()
   (+0 other overloads)
DateTime(ticks: int64) : unit
   (+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : unit
   (+0 other overloads)
property DateTime.UtcNow: DateTime
val changesOrInterval : AsyncSeq<Value>

Full name: AsyncSeqExamples.changesOrInterval
val combineLatestWith : combine:('T -> 'U -> 'V) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'U> -> AsyncSeq<'V>

Full name: FSharp.Control.AsyncSeq.combineLatestWith
val v : Value
type Status =
  {completed: int;
   finished: bool;
   result: string;}

Full name: AsyncSeqExamples.Status
Status.completed: int
Status.finished: bool
type bool = Boolean

Full name: Microsoft.FSharp.Core.bool
Status.result: string
val status : Async<Status>

Full name: AsyncSeqExamples.status


 Gets the status of a job.
val statuses : AsyncSeq<Status>

Full name: AsyncSeqExamples.statuses
val s : Status
val distinctStatuses : AsyncSeq<Status>

Full name: AsyncSeqExamples.distinctStatuses
val distinctUntilChanged : source:AsyncSeq<'T> -> AsyncSeq<'T> (requires equality)

Full name: FSharp.Control.AsyncSeq.distinctUntilChanged
val result : Async<string>

Full name: AsyncSeqExamples.result
val pick : chooser:('T -> 'TResult option) -> source:AsyncSeq<'T> -> Async<'TResult>

Full name: FSharp.Control.AsyncSeq.pick
val st : Status
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
union case Option.None: Option<'T>
val events : AsyncSeq<Event>

Full name: AsyncSeqExamples.events
val eventsAtLeastOneSec : AsyncSeq<Event>

Full name: AsyncSeqExamples.eventsAtLeastOneSec
val zipWith : mapping:('T1 -> 'T2 -> 'U) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<'U>

Full name: FSharp.Control.AsyncSeq.zipWith
val a : Event
val replicateInfiniteAsync : v:Async<'T> -> AsyncSeq<'T>

Full name: FSharp.Control.AsyncSeq.replicateInfiniteAsync
val individualEvents : AsyncSeq<Event>

Full name: AsyncSeqExamples.individualEvents
val bufferSize : int

Full name: AsyncSeqExamples.bufferSize
val bufferTimeout : int

Full name: AsyncSeqExamples.bufferTimeout
val bufferedEvents : AsyncSeq<Event []>

Full name: AsyncSeqExamples.bufferedEvents
Fork me on GitHub