Header menu logo FSharp.Control.AsyncSeq

Binder

Advanced AsyncSeq Operations

This document covers advanced AsyncSeq<'T> operations: partitioning a sequence into keyed sub-streams with groupBy, deduplication with distinctUntilChanged, and accumulating elements into time-or-count-bounded batches with bufferByCountAndTime.

open System
open FSharp.Control

Group By

AsyncSeq.groupBy partitions a sequence into sub-sequences based on a key, analogous to Seq.groupBy. Each key appears at most once in the output, paired with an AsyncSeq of the elements that share that key.

Important: the sub-sequences must be consumed in parallel. Sequential consumption will deadlock because no sub-sequence can complete until all others are also being consumed.

--------------------------------------------------
| source  | e1 | e2 | e3 | e4 |                  |
| key     | k1 | k2 | k1 | k2 |                  |
| result  | k1 
--------------------------------------------------

A common use case is processing a stream of domain events where events for the same entity must be handled in order, but events for different entities are independent and can be handled in parallel:

type Event = {
    entityId : int64
    data     : string
}

let stream : AsyncSeq<Event> = failwith "TODO: connect to message bus"

let action (e: Event) : Async<unit> = failwith "TODO: process event"

// Process each entity's events sequentially, but different entities in parallel.
stream
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4)  // hash into 4 buckets
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
|> AsyncSeq.iter ignore

We can combine this with batching for higher throughput. For example, when writing events to a full-text search index, batching writes improves performance while the groupBy ensures ordering within each entity:

let batchStream : AsyncSeq<Event[]> = failwith "TODO: connect to batched source"

let batchAction (es: Event[]) : Async<unit> = failwith "TODO: bulk index"

batchStream
|> AsyncSeq.concatSeq                              // flatten batches to individual events
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups
|> AsyncSeq.mapAsyncParallel (snd
    >> AsyncSeq.bufferByCountAndTime 500 1000      // re-batch per sub-sequence
    >> AsyncSeq.iterAsync batchAction)             // bulk index each batch
|> AsyncSeq.iter ignore

The above workflow: (1) reads events in batches, (2) flattens them, (3) partitions by entity into mutually-exclusive sub-sequences, (4) re-batches each sub-sequence by size/time, and (5) processes the batches in parallel while preserving per-entity ordering.

Distinct Until Changed

AsyncSeq.distinctUntilChanged passes through every element of the source sequence but drops consecutive duplicates, so downstream consumers only see values that are genuinely new.

-----------------------------------
| source  | a | a | b | b | b | a |
| result  | a |   | b |   |   | a |
-----------------------------------

A natural use case is polling a resource on a fixed schedule and reacting only when its state actually changes. Consider a background job whose progress is exposed via a getStatus call:

type Status = {
    completed : int
    finished  : bool
    result    : string
}

let getStatus : Async<Status> = failwith "TODO: call job API"

/// Poll every second and emit each status reading.
let statuses : AsyncSeq<Status> = asyncSeq {
    while true do
        let! s = getStatus
        yield s
        do! Async.Sleep 1000
}

/// Only emit when the status has actually changed.
let distinctStatuses : AsyncSeq<Status> =
    statuses |> AsyncSeq.distinctUntilChanged

We can now build a workflow that logs every status change and stops as soon as the job finishes:

let jobResult : Async<string> =
    distinctStatuses
    |> AsyncSeq.pick (fun st ->
        printfn "status=%A" st
        if st.finished then Some st.result else None)

Buffer by Count and Time

AsyncSeq.bufferByCountAndTime accumulates incoming elements and emits a batch whenever either the buffer reaches a given size or a timeout elapses — whichever comes first. If the buffer is empty when the timeout fires, nothing is emitted.

-------------------------------------------------------
| source   |  a1 | a2 | a3         | a4      |        |
| result   |     |    | [a1,a2,a3] |         |  [a4]  |
-------------------------------------------------------
           

This is useful for services that write events to a bulk API (e.g. a search index). Fixed-size batching with AsyncSeq.bufferByCount can stall when the source slows down and a partial buffer never fills. bufferByCountAndTime avoids that by guaranteeing forward progress:

let events : AsyncSeq<Event> = failwith "TODO: connect to event source"

let bufferSize    = 100
let bufferTimeout = 1000 // milliseconds

let bufferedEvents : AsyncSeq<Event[]> =
    events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout
namespace System
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
Multiple items
module Event from Microsoft.FSharp.Control

--------------------
type Event = { entityId: int64 data: string }

--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T> with get

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: objnull * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args> with get

--------------------
new: unit -> Event<'T>

--------------------
new: unit -> Event<'Delegate,'Args>
Multiple items
val int64: value: 'T -> int64 (requires member op_Explicit)

--------------------
type int64 = Int64

--------------------
type int64<'Measure> = int64
Multiple items
val string: value: 'T -> string

--------------------
type string = String
val stream: AsyncSeq<Event>
Multiple items
module AsyncSeq from FSharp.Control

--------------------
type AsyncSeq<'T> = Collections.Generic.IAsyncEnumerable<'T>
<summary> An asynchronous sequence; equivalent to System.Collections.Generic.IAsyncEnumerable&lt;'T&gt;. Use the asyncSeq { ... } computation expression to create values, and the AsyncSeq module for combinators. </summary>
val failwith: message: string -> 'T
val action: e: Event -> Async<unit>
val e: Event
Multiple items
type Async = static member AsBeginEnd: computation: ('Arg -> Async<'T>) -> ('Arg * AsyncCallback * objnull -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent: event: IEvent<'Del,'T> * ?cancelAction: (unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult: iar: IAsyncResult * ?millisecondsTimeout: int -> Async<bool> static member AwaitTask: task: Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle: waitHandle: WaitHandle * ?millisecondsTimeout: int -> Async<bool> static member CancelDefaultToken: unit -> unit static member Catch: computation: Async<'T> -> Async<Choice<'T,exn>> static member Choice: computations: Async<'T option> seq -> Async<'T option> static member FromBeginEnd: beginAction: (AsyncCallback * objnull -> IAsyncResult) * endAction: (IAsyncResult -> 'T) * ?cancelAction: (unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations: callback: (('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...

--------------------
type Async<'T>
type unit = Unit
val groupBy: projection: ('T -> 'Key) -> source: AsyncSeq<'T> -> AsyncSeq<'Key * AsyncSeq<'T>> (requires equality)
<summary> Applies a key-generating function to each element and returns an async sequence containing unique keys and async sequences containing elements corresponding to the key. Note that the resulting async sequence has to be processed in parallel (e.g AsyncSeq.mapAsyncParallel) becaused completion of sub-sequences depends on completion of other sub-sequences. </summary>
Multiple items
val int: value: 'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
Event.entityId: int64
val mapAsyncParallel: mapping: ('T -> Async<'U>) -> s: AsyncSeq<'T> -> AsyncSeq<'U>
<summary> Builds a new asynchronous sequence whose elements are generated by applying the specified function to all elements of the input sequence. The function is applied to elements in order and results are emitted in order, but in parallel, without waiting for a prior mapping operation to complete. Parallelism is bound by the ThreadPool. </summary>
val snd: tuple: ('T1 * 'T2) -> 'T2
val iterAsync: action: ('T -> Async<unit>) -> source: AsyncSeq<'T> -> Async<unit>
<summary> Iterates over the input sequence and calls the specified asynchronous function for every value. The input sequence will be asked for the next element after the processing of an element completes. </summary>
val iter: action: ('T -> unit) -> source: AsyncSeq<'T> -> Async<unit>
<summary> Iterates over the input sequence and calls the specified function for every value. </summary>
val ignore: value: 'T -> unit
val batchStream: AsyncSeq<Event array>
val batchAction: es: Event array -> Async<unit>
val es: Event array
val concatSeq: source: AsyncSeq<#('T seq)> -> AsyncSeq<'T>
<summary> Flattens an AsyncSeq of synchronous sequences. </summary>
val bufferByCountAndTime: bufferSize: int -> timeoutMs: int -> source: AsyncSeq<'T> -> AsyncSeq<'T array>
<summary> Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed. </summary>
type Status = { completed: int finished: bool result: string }
type bool = Boolean
val getStatus: Async<Status>
val statuses: AsyncSeq<Status>
 Poll every second and emit each status reading.
val asyncSeq: AsyncSeq.AsyncSeqBuilder
<summary> Builds an asynchronous sequence using the computation builder syntax </summary>
val s: Status
static member Async.Sleep: dueTime: TimeSpan -> Async<unit>
static member Async.Sleep: millisecondsDueTime: int -> Async<unit>
val distinctStatuses: AsyncSeq<Status>
 Only emit when the status has actually changed.
val distinctUntilChanged: source: AsyncSeq<'T> -> AsyncSeq<'T> (requires equality)
<summary> Returns an async sequence which contains no contiguous duplicate elements. </summary>
val jobResult: Async<string>
val pick: chooser: ('T -> 'TResult option) -> source: AsyncSeq<'T> -> Async<'TResult>
<summary> Asynchronously pick a value from a sequence based on the specified chooser function. Raises KeyNotFoundException if the chooser function can't find a matching key. </summary>
val st: Status
val printfn: format: Printf.TextWriterFormat<'T> -> 'T
Status.finished: bool
union case Option.Some: Value: 'T -> Option<'T>
Status.result: string
union case Option.None: Option<'T>
val events: AsyncSeq<Event>
val bufferSize: int
val bufferTimeout: int
val bufferedEvents: AsyncSeq<Event array>

Type something to start searching.