Header menu logo FSharp.Control.AsyncSeq

Binder

Combining Asynchronous Sequences

This document covers the operations that combine two or more AsyncSeq<'T> values into one. The operations differ in when they wait for input and how they interleave elements:

Operation

Timing

Result

append

sequential

all of seq1 then all of seq2

zip / zipWith

lock-step

waits for one element from each source

interleave

lock-step

alternates strictly between sources

merge

non-deterministic

emits whenever any source produces

combineLatestWith

non-deterministic

emits the latest pair whenever any source advances

open System
open FSharp.Control

Append

AsyncSeq.append produces all elements of the first sequence followed by all elements of the second. The second sequence does not start until the first is exhausted — it is strictly sequential:

let first  = asyncSeq { yield! [ 1; 2; 3 ] }
let second = asyncSeq { yield! [ 4; 5; 6 ] }

let appended : AsyncSeq<int> = AsyncSeq.append first second
// emits: 1, 2, 3, 4, 5, 6

yield! inside asyncSeq { ... } is equivalent to append and is the idiomatic choice when concatenating inline:

let combined = asyncSeq {
    yield! first
    yield! second
}

Zip

AsyncSeq.zip combines two sequences element-wise into a sequence of pairs, consuming one element from each source before producing output. The result terminates as soon as either source is exhausted.

---------------------------------------------
| source1  |    a1    |    a2    |           |
| source2  |    b1    |    b2    |    b3     |
| result   |  a1 * b1 |  a2 * b2 |           |
---------------------------------------------
let letters = asyncSeq { yield! [ 'a'; 'b'; 'c' ] }
let numbers = asyncSeq { yield! [ 1; 2; 3; 4 ] }

let pairs : AsyncSeq<char * int> = AsyncSeq.zip letters numbers
// emits: ('a',1), ('b',2), ('c',3)  — stops when letters runs out

zipWith and zipWithAsync

AsyncSeq.zipWith applies a combining function instead of producing tuples:

let sums : AsyncSeq<int> =
    AsyncSeq.zipWith (fun a b -> a + b) numbers numbers
// emits: 2, 4, 6, 8

AsyncSeq.zipWithAsync is the same but the combining function returns Async<'U>, useful when the combination itself requires IO:

let combine (a: int) (b: int) : Async<string> =
    async { return sprintf "%d+%d=%d" a b (a+b) }

let asyncSums : AsyncSeq<string> =
    AsyncSeq.zipWithAsync combine numbers numbers

zipParallel

AsyncSeq.zipParallel is identical to zip but fetches the next element from both sources simultaneously rather than sequentially. Use this when the two sources are independent and fetching each element involves latency:

let parallelPairs : AsyncSeq<char * int> = AsyncSeq.zipParallel letters numbers

zip3 and zipWith3

For three sources at once:

let triples =
    AsyncSeq.zip3
        (asyncSeq { yield! [ 1; 2; 3 ] })
        (asyncSeq { yield! [ 'a'; 'b'; 'c' ] })
        (asyncSeq { yield! [ true; false; true ] })
// emits: (1,'a',true), (2,'b',false), (3,'c',true)

Rate-limiting with zipWith

A practical pattern for zipWith is rate-limiting: pair each element of a fast source with a minimum delay so that the consumer never runs faster than one element per interval:

type Event = { entityId: int64; data: string }

let events : AsyncSeq<Event> = failwith "TODO: connect to event source"

/// Emit at most one event per second.
let eventsAtLeastOneSec : AsyncSeq<Event> =
    AsyncSeq.zipWith
        (fun evt _ -> evt)
        events
        (AsyncSeq.replicateInfiniteAsync (Async.Sleep 1000))

The delay sequence is infinite so it never causes zipWith to terminate early; the resulting sequence ends only when events runs out.


Interleave

AsyncSeq.interleave alternates strictly between two sources in lock-step: one element from source1, one from source2, one from source1, and so on. It terminates when either source is exhausted. Unlike merge, the ordering is deterministic.

----------------------------------------------
| source1 | a1 |    | a2 |    | a3 |         |
| source2 |    | b1 |    | b2 |    |         |
| result  | a1 | b1 | a2 | b2 | a3 |         |
----------------------------------------------
let odds  = asyncSeq { yield! [ 1; 3; 5 ] }
let evens = asyncSeq { yield! [ 2; 4; 6 ] }

let interleaved : AsyncSeq<int> = AsyncSeq.interleave odds evens
// emits: 1, 2, 3, 4, 5, 6

interleaveMany

AsyncSeq.interleaveMany generalises this to an arbitrary number of sequences, cycling through them in order:

let a = asyncSeq { yield! [ 1; 4; 7 ] }
let b = asyncSeq { yield! [ 2; 5; 8 ] }
let c = asyncSeq { yield! [ 3; 6; 9 ] }

let roundRobin : AsyncSeq<int> = AsyncSeq.interleaveMany [ a; b; c ]
// emits: 1, 2, 3, 4, 5, 6, 7, 8, 9

interleaveChoice

AsyncSeq.interleaveChoice interleaves two sequences of different types, wrapping each element in Choice<'T1,'T2> so you can tell which source it came from:

let intSeq  = asyncSeq { yield! [ 1; 2; 3 ] }
let strSeq  = asyncSeq { yield! [ "a"; "b"; "c" ] }

let tagged : AsyncSeq<Choice<int,string>> =
    AsyncSeq.interleaveChoice intSeq strSeq
// Choice1Of2 1, Choice2Of2 "a", Choice1Of2 2, Choice2Of2 "b", ...

Merge

AsyncSeq.merge interleaves two sequences non-deterministically, emitting values as soon as either source produces one. There is no lock-step coordination — whichever source is ready first wins. The result continues until both sources are exhausted.

-----------------------------------------
| source1 | t0 |    | t1 |    |    | t2 |
| source2 |    | u0 |    |    | u1 |    |
| result  | t0 | u0 | t1 |    | u1 | t2 |
-----------------------------------------

This is different from append (sequential) and interleave (strict lock-step).

A handy building block is an infinite ticker:

/// Emits `DateTime.UtcNow` immediately, then every `periodMs` milliseconds.
let intervalMs (periodMs: int) = asyncSeq {
    yield DateTime.UtcNow
    while true do
        do! Async.Sleep periodMs
        yield DateTime.UtcNow
}

// React to whichever of the two timers fires first.
let either : AsyncSeq<DateTime> =
    AsyncSeq.merge (intervalMs 20) (intervalMs 30)

mergeAll

AsyncSeq.mergeAll merges an arbitrary number of sequences non-deterministically:

let sources : AsyncSeq<DateTime> =
    AsyncSeq.mergeAll [ intervalMs 100; intervalMs 250; intervalMs 500 ]

mergeChoice

AsyncSeq.mergeChoice merges two sequences of different types non-deterministically, tagging each element with Choice<'T1,'T2> so you can distinguish the source:

type Heartbeat = Heartbeat
type ConfigChange = ConfigChange of string

let heartbeats : AsyncSeq<Heartbeat> = AsyncSeq.replicateInfiniteAsync (async { 
    do! Async.Sleep 5000
    return Heartbeat 
})
let configChanges : AsyncSeq<ConfigChange>  = failwith "TODO: long-poll config store"

let merged : AsyncSeq<Choice<Heartbeat, ConfigChange>> =
    AsyncSeq.mergeChoice heartbeats configChanges

Combine Latest

AsyncSeq.combineLatestWith merges two sequences non-deterministically and applies a combining function to the latest value from each side whenever either source advances. The output only starts once both sources have each produced at least one element.

----------------------------------------
| source1 | a0 |    |    | a1 |   | a2 |
| source2 |    | b0 | b1 |    |   |    |
| result  |    | c0 | c1 | c2 |   | c3 |
----------------------------------------

where  c0 = f a0 b0 | c1 = f a0 b1 | c2 = f a1 b1 | c3 = f a2 b1

Unlike zip, the two sources do not have to produce elements at the same rate. A fast source will simply keep updating the "latest" value used in future combinations.

Consider a service that watches a configuration key in Consul using HTTP long-polling. Each response carries the new value and a modify-index used in the next request. We can model the stream of changes as an AsyncSeq:

type Key         = string
type Value       = string
type ModifyIndex = int64

let longPollKey (key: Key, mi: ModifyIndex) : Async<Value * ModifyIndex> =
    failwith "TODO: call Consul HTTP API"

/// Emits a new value every time the Consul key changes.
let changes (key: Key, mi: ModifyIndex) : AsyncSeq<Value> =
    AsyncSeq.unfoldAsync
        (fun (mi: ModifyIndex) -> async {
            let! value, mi' = longPollKey (key, mi)
            return Some (value, mi') })
        mi

We combine the change stream with a periodic heartbeat so that downstream logic is re-triggered at least once per minute even when the key is stable:

let changesOrInterval : AsyncSeq<Value> =
    AsyncSeq.combineLatestWith
        (fun v _ -> v)
        (changes ("myKey", 0L))
        (intervalMs (1000 * 60))

Consumers of changesOrInterval see the latest config value whenever the key changes or at least once per minute.

namespace System
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
val first: AsyncSeq<int>
val asyncSeq: AsyncSeq.AsyncSeqBuilder
<summary> Builds an asynchronous sequence using the computation builder syntax </summary>
val second: AsyncSeq<int>
val appended: AsyncSeq<int>
Multiple items
module AsyncSeq from FSharp.Control

--------------------
type AsyncSeq<'T> = Collections.Generic.IAsyncEnumerable<'T>
<summary> An asynchronous sequence; equivalent to System.Collections.Generic.IAsyncEnumerable&lt;'T&gt;. Use the asyncSeq { ... } computation expression to create values, and the AsyncSeq module for combinators. </summary>
Multiple items
val int: value: 'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
val append: seq1: AsyncSeq<'T> -> seq2: AsyncSeq<'T> -> AsyncSeq<'T>
<summary> Yields all elements of the first asynchronous sequence and then all elements of the second asynchronous sequence. </summary>
val combined: AsyncSeq<int>
val letters: AsyncSeq<char>
val numbers: AsyncSeq<int>
val pairs: AsyncSeq<char * int>
Multiple items
val char: value: 'T -> char (requires member op_Explicit)

--------------------
type char = Char
val zip: input1: AsyncSeq<'T1> -> input2: AsyncSeq<'T2> -> AsyncSeq<'T1 * 'T2>
<summary> Combines two asynchronous sequences into a sequence of pairs. The resulting sequence stops when either of the argument sequences stop. </summary>
val sums: AsyncSeq<int>
val zipWith: mapping: ('T1 -> 'T2 -> 'U) -> source1: AsyncSeq<'T1> -> source2: AsyncSeq<'T2> -> AsyncSeq<'U>
<summary> Combines two asynchronous sequences using the specified function. The resulting sequence stops when either of the argument sequences stop. </summary>
val a: int
val b: int
val combine: a: int -> b: int -> Async<string>
Multiple items
type Async = static member AsBeginEnd: computation: ('Arg -> Async<'T>) -> ('Arg * AsyncCallback * objnull -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent: event: IEvent<'Del,'T> * ?cancelAction: (unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult: iar: IAsyncResult * ?millisecondsTimeout: int -> Async<bool> static member AwaitTask: task: Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle: waitHandle: WaitHandle * ?millisecondsTimeout: int -> Async<bool> static member CancelDefaultToken: unit -> unit static member Catch: computation: Async<'T> -> Async<Choice<'T,exn>> static member Choice: computations: Async<'T option> seq -> Async<'T option> static member FromBeginEnd: beginAction: (AsyncCallback * objnull -> IAsyncResult) * endAction: (IAsyncResult -> 'T) * ?cancelAction: (unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations: callback: (('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...

--------------------
type Async<'T>
Multiple items
val string: value: 'T -> string

--------------------
type string = String
val async: AsyncBuilder
val sprintf: format: Printf.StringFormat<'T> -> 'T
val asyncSums: AsyncSeq<string>
val zipWithAsync: mapping: ('T1 -> 'T2 -> Async<'U>) -> source1: AsyncSeq<'T1> -> source2: AsyncSeq<'T2> -> AsyncSeq<'U>
<summary> Combines two asynchronous sequences using the specified function. The resulting sequence stops when either of the argument sequences stop. </summary>
val parallelPairs: AsyncSeq<char * int>
val zipParallel: input1: AsyncSeq<'T1> -> input2: AsyncSeq<'T2> -> AsyncSeq<'T1 * 'T2>
<summary> Combines two asynchronous sequences into a sequence of pairs. The values from sequences are retrieved in parallel. The resulting sequence stops when either of the argument sequences stop. </summary>
val triples: AsyncSeq<int * char * bool>
val zip3: source1: AsyncSeq<'T1> -> source2: AsyncSeq<'T2> -> source3: AsyncSeq<'T3> -> AsyncSeq<'T1 * 'T2 * 'T3>
<summary> Combines three asynchronous sequences into a sequence of triples. The resulting sequence stops when any of the argument sequences stop. </summary>
Multiple items
module Event from Microsoft.FSharp.Control

--------------------
type Event = { entityId: int64 data: string }

--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T> with get

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: objnull * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args> with get

--------------------
new: unit -> Event<'T>

--------------------
new: unit -> Event<'Delegate,'Args>
Multiple items
val int64: value: 'T -> int64 (requires member op_Explicit)

--------------------
type int64 = Int64

--------------------
type int64<'Measure> = int64
val events: AsyncSeq<Event>
val failwith: message: string -> 'T
val eventsAtLeastOneSec: AsyncSeq<Event>
 Emit at most one event per second.
val evt: Event
val replicateInfiniteAsync: v: Async<'T> -> AsyncSeq<'T>
<summary> Creates an infinite async sequence which repeatedly evaluates and emits the specified async value. </summary>
static member Async.Sleep: dueTime: TimeSpan -> Async<unit>
static member Async.Sleep: millisecondsDueTime: int -> Async<unit>
val odds: AsyncSeq<int>
val evens: AsyncSeq<int>
val interleaved: AsyncSeq<int>
val interleave: source1: AsyncSeq<'T> -> source2: AsyncSeq<'T> -> AsyncSeq<'T>
<summary> Interleaves two async sequences of the same type into a resulting sequence. The provided sequences are consumed in lock-step. </summary>
val a: AsyncSeq<int>
val b: AsyncSeq<int>
val c: AsyncSeq<int>
val roundRobin: AsyncSeq<int>
val interleaveMany: source: #(AsyncSeq<'T> seq) -> AsyncSeq<'T>
<summary> Interleaves a sequence of async sequences into a resulting async sequence. The provided sequences are consumed in lock-step. </summary>
val intSeq: AsyncSeq<int>
val strSeq: AsyncSeq<string>
val tagged: AsyncSeq<Choice<int,string>>
Multiple items
type Choice<'T1,'T2> = | Choice1Of2 of 'T1 | Choice2Of2 of 'T2

--------------------
type Choice<'T1,'T2,'T3> = | Choice1Of3 of 'T1 | Choice2Of3 of 'T2 | Choice3Of3 of 'T3

--------------------
type Choice<'T1,'T2,'T3,'T4> = | Choice1Of4 of 'T1 | Choice2Of4 of 'T2 | Choice3Of4 of 'T3 | Choice4Of4 of 'T4

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5> = | Choice1Of5 of 'T1 | Choice2Of5 of 'T2 | Choice3Of5 of 'T3 | Choice4Of5 of 'T4 | Choice5Of5 of 'T5

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6> = | Choice1Of6 of 'T1 | Choice2Of6 of 'T2 | Choice3Of6 of 'T3 | Choice4Of6 of 'T4 | Choice5Of6 of 'T5 | Choice6Of6 of 'T6

--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6,'T7> = | Choice1Of7 of 'T1 | Choice2Of7 of 'T2 | Choice3Of7 of 'T3 | Choice4Of7 of 'T4 | Choice5Of7 of 'T5 | Choice6Of7 of 'T6 | Choice7Of7 of 'T7
val interleaveChoice: source1: AsyncSeq<'T1> -> source2: AsyncSeq<'T2> -> AsyncSeq<Choice<'T1,'T2>>
<summary> Interleaves two async sequences into a resulting sequence. The provided sequences are consumed in lock-step. </summary>
val intervalMs: periodMs: int -> AsyncSeq<DateTime>
 Emits `DateTime.UtcNow` immediately, then every `periodMs` milliseconds.
val periodMs: int
Multiple items
[<Struct>] type DateTime = new: year: int * month: int * day: int -> unit + 16 overloads member Add: value: TimeSpan -> DateTime member AddDays: value: float -> DateTime member AddHours: value: float -> DateTime member AddMicroseconds: value: float -> DateTime member AddMilliseconds: value: float -> DateTime member AddMinutes: value: float -> DateTime member AddMonths: months: int -> DateTime member AddSeconds: value: float -> DateTime member AddTicks: value: int64 -> DateTime ...
<summary>Represents an instant in time, typically expressed as a date and time of day.</summary>

--------------------
DateTime ()
   (+0 other overloads)
DateTime(ticks: int64) : DateTime
   (+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : DateTime
   (+0 other overloads)
DateTime(date: DateOnly, time: TimeOnly) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int) : DateTime
   (+0 other overloads)
DateTime(date: DateOnly, time: TimeOnly, kind: DateTimeKind) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : DateTime
   (+0 other overloads)
property DateTime.UtcNow: DateTime with get
<summary>Gets a <see cref="T:System.DateTime" /> object that is set to the current date and time on this computer, expressed as the Coordinated Universal Time (UTC).</summary>
<returns>An object whose value is the current UTC date and time.</returns>
val either: AsyncSeq<DateTime>
val merge: source1: AsyncSeq<'T> -> source2: AsyncSeq<'T> -> AsyncSeq<'T>
<summary> Merges two async sequences of the same type into an async sequence non-deterministically. The resulting async sequence produces elements when any argument sequence produces an element. </summary>
val sources: AsyncSeq<DateTime>
val mergeAll: sources: AsyncSeq<'T> seq -> AsyncSeq<'T>
<summary> Merges all specified async sequences into an async sequence non-deterministically. The resulting async sequence produces elements when any argument sequence produces an element. </summary>
Multiple items
union case Heartbeat.Heartbeat: Heartbeat

--------------------
type Heartbeat = | Heartbeat
Multiple items
union case ConfigChange.ConfigChange: string -> ConfigChange

--------------------
type ConfigChange = | ConfigChange of string
type ConfigChange = | ConfigChange of string
val heartbeats: AsyncSeq<Heartbeat>
val configChanges: AsyncSeq<ConfigChange>
val merged: AsyncSeq<Choice<Heartbeat,ConfigChange>>
val mergeChoice: source1: AsyncSeq<'T1> -> source2: AsyncSeq<'T2> -> AsyncSeq<Choice<'T1,'T2>>
<summary> Merges two async sequences into an async sequence non-deterministically. The resulting async sequence produces elements when any argument sequence produces an element. </summary>
type Key = string
type Value = string
type ModifyIndex = int64
val longPollKey: key: Key * mi: ModifyIndex -> Async<Value * ModifyIndex>
val key: Key
val mi: ModifyIndex
val changes: key: Key * mi: ModifyIndex -> AsyncSeq<Value>
 Emits a new value every time the Consul key changes.
val unfoldAsync: generator: ('State -> Async<('T * 'State) option>) -> state: 'State -> AsyncSeq<'T>
<summary> Generates an async sequence using the specified asynchronous generator function. </summary>
val value: Value
val mi': ModifyIndex
union case Option.Some: Value: 'T -> Option<'T>
val changesOrInterval: AsyncSeq<Value>
val combineLatestWith: combine: ('T -> 'U -> 'V) -> source1: AsyncSeq<'T> -> source2: AsyncSeq<'U> -> AsyncSeq<'V>
<summary> Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence. If either of the input sequences is empty, the resulting sequence is empty. </summary>
val v: Value

Type something to start searching.