Advanced AsyncSeq Operations
This document covers advanced AsyncSeq<'T> operations: partitioning a sequence into keyed
sub-streams with groupBy, deduplication with distinctUntilChanged, and accumulating
elements into time-or-count-bounded batches with bufferByCountAndTime.
open System
open FSharp.Control
Group By
AsyncSeq.groupBy partitions a sequence into sub-sequences based on a key, analogous to
Seq.groupBy. Each key appears at most once in the output, paired with an AsyncSeq of the
elements that share that key.
Important: the sub-sequences must be consumed in parallel. Sequential consumption will deadlock because no sub-sequence can complete until all others are also being consumed.
--------------------------------------------------
| source | e1 | e2 | e3 | e4 | |
| key | k1 | k2 | k1 | k2 | |
| result | k1
--------------------------------------------------
A common use case is processing a stream of domain events where events for the same entity must be handled in order, but events for different entities are independent and can be handled in parallel:
type Event = {
entityId : int64
data : string
}
let stream : AsyncSeq<Event> = failwith "TODO: connect to message bus"
let action (e: Event) : Async<unit> = failwith "TODO: process event"
// Process each entity's events sequentially, but different entities in parallel.
stream
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // hash into 4 buckets
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.iterAsync action)
|> AsyncSeq.iter ignore
We can combine this with batching for higher throughput. For example, when writing events to a
full-text search index, batching writes improves performance while the groupBy ensures ordering
within each entity:
let batchStream : AsyncSeq<Event[]> = failwith "TODO: connect to batched source"
let batchAction (es: Event[]) : Async<unit> = failwith "TODO: bulk index"
batchStream
|> AsyncSeq.concatSeq // flatten batches to individual events
|> AsyncSeq.groupBy (fun e -> int e.entityId % 4) // partition into 4 groups
|> AsyncSeq.mapAsyncParallel (snd
>> AsyncSeq.bufferByCountAndTime 500 1000 // re-batch per sub-sequence
>> AsyncSeq.iterAsync batchAction) // bulk index each batch
|> AsyncSeq.iter ignore
The above workflow: (1) reads events in batches, (2) flattens them, (3) partitions by entity into mutually-exclusive sub-sequences, (4) re-batches each sub-sequence by size/time, and (5) processes the batches in parallel while preserving per-entity ordering.
Distinct Until Changed
AsyncSeq.distinctUntilChanged passes through every element of the source sequence but drops
consecutive duplicates, so downstream consumers only see values that are genuinely new.
-----------------------------------
| source | a | a | b | b | b | a |
| result | a | | b | | | a |
-----------------------------------
A natural use case is polling a resource on a fixed schedule and reacting only when its state
actually changes. Consider a background job whose progress is exposed via a getStatus call:
type Status = {
completed : int
finished : bool
result : string
}
let getStatus : Async<Status> = failwith "TODO: call job API"
/// Poll every second and emit each status reading.
let statuses : AsyncSeq<Status> = asyncSeq {
while true do
let! s = getStatus
yield s
do! Async.Sleep 1000
}
/// Only emit when the status has actually changed.
let distinctStatuses : AsyncSeq<Status> =
statuses |> AsyncSeq.distinctUntilChanged
We can now build a workflow that logs every status change and stops as soon as the job finishes:
let jobResult : Async<string> =
distinctStatuses
|> AsyncSeq.pick (fun st ->
printfn "status=%A" st
if st.finished then Some st.result else None)
Buffer by Count and Time
AsyncSeq.bufferByCountAndTime accumulates incoming elements and emits a batch whenever
either the buffer reaches a given size or a timeout elapses — whichever comes first. If the
buffer is empty when the timeout fires, nothing is emitted.
-------------------------------------------------------
| source | a1 | a2 | a3 | a4 | |
| result | | | [a1,a2,a3] | | [a4] |
-------------------------------------------------------
This is useful for services that write events to a bulk API (e.g. a search index). Fixed-size
batching with AsyncSeq.bufferByCount can stall when the source slows down and a partial buffer
never fills. bufferByCountAndTime avoids that by guaranteeing forward progress:
let events : AsyncSeq<Event> = failwith "TODO: connect to event source"
let bufferSize = 100
let bufferTimeout = 1000 // milliseconds
let bufferedEvents : AsyncSeq<Event[]> =
events |> AsyncSeq.bufferByCountAndTime bufferSize bufferTimeout
namespace FSharp
--------------------
namespace Microsoft.FSharp
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
module Event from Microsoft.FSharp.Control
--------------------
type Event = { entityId: int64 data: string }
--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T> with get
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: objnull * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args> with get
--------------------
new: unit -> Event<'T>
--------------------
new: unit -> Event<'Delegate,'Args>
val int64: value: 'T -> int64 (requires member op_Explicit)
--------------------
type int64 = Int64
--------------------
type int64<'Measure> = int64
val string: value: 'T -> string
--------------------
type string = String
module AsyncSeq from FSharp.Control
--------------------
type AsyncSeq<'T> = Collections.Generic.IAsyncEnumerable<'T>
<summary> An asynchronous sequence; equivalent to System.Collections.Generic.IAsyncEnumerable<'T>. Use the asyncSeq { ... } computation expression to create values, and the AsyncSeq module for combinators. </summary>
type Async = static member AsBeginEnd: computation: ('Arg -> Async<'T>) -> ('Arg * AsyncCallback * objnull -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent: event: IEvent<'Del,'T> * ?cancelAction: (unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult: iar: IAsyncResult * ?millisecondsTimeout: int -> Async<bool> static member AwaitTask: task: Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle: waitHandle: WaitHandle * ?millisecondsTimeout: int -> Async<bool> static member CancelDefaultToken: unit -> unit static member Catch: computation: Async<'T> -> Async<Choice<'T,exn>> static member Choice: computations: Async<'T option> seq -> Async<'T option> static member FromBeginEnd: beginAction: (AsyncCallback * objnull -> IAsyncResult) * endAction: (IAsyncResult -> 'T) * ?cancelAction: (unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations: callback: (('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...
--------------------
type Async<'T>
<summary> Applies a key-generating function to each element and returns an async sequence containing unique keys and async sequences containing elements corresponding to the key. Note that the resulting async sequence has to be processed in parallel (e.g AsyncSeq.mapAsyncParallel) becaused completion of sub-sequences depends on completion of other sub-sequences. </summary>
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
<summary> Builds a new asynchronous sequence whose elements are generated by applying the specified function to all elements of the input sequence. The function is applied to elements in order and results are emitted in order, but in parallel, without waiting for a prior mapping operation to complete. Parallelism is bound by the ThreadPool. </summary>
<summary> Iterates over the input sequence and calls the specified asynchronous function for every value. The input sequence will be asked for the next element after the processing of an element completes. </summary>
<summary> Iterates over the input sequence and calls the specified function for every value. </summary>
<summary> Flattens an AsyncSeq of synchronous sequences. </summary>
<summary> Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed. </summary>
Poll every second and emit each status reading.
<summary> Builds an asynchronous sequence using the computation builder syntax </summary>
static member Async.Sleep: millisecondsDueTime: int -> Async<unit>
Only emit when the status has actually changed.
<summary> Returns an async sequence which contains no contiguous duplicate elements. </summary>
<summary> Asynchronously pick a value from a sequence based on the specified chooser function. Raises KeyNotFoundException if the chooser function can't find a matching key. </summary>
FSharp.Control.AsyncSeq