Combining Asynchronous Sequences
This document covers the operations that combine two or more AsyncSeq<'T> values into one.
The operations differ in when they wait for input and how they interleave elements:
Operation |
Timing |
Result |
|---|---|---|
|
sequential |
all of seq1 then all of seq2 |
|
lock-step |
waits for one element from each source |
|
lock-step |
alternates strictly between sources |
|
non-deterministic |
emits whenever any source produces |
|
non-deterministic |
emits the latest pair whenever any source advances |
open System
open FSharp.Control
Append
AsyncSeq.append produces all elements of the first sequence followed by all elements of the
second. The second sequence does not start until the first is exhausted — it is strictly
sequential:
let first = asyncSeq { yield! [ 1; 2; 3 ] }
let second = asyncSeq { yield! [ 4; 5; 6 ] }
let appended : AsyncSeq<int> = AsyncSeq.append first second
// emits: 1, 2, 3, 4, 5, 6
yield! inside asyncSeq { ... } is equivalent to append and is the idiomatic choice when
concatenating inline:
let combined = asyncSeq {
yield! first
yield! second
}
Zip
AsyncSeq.zip combines two sequences element-wise into a sequence of pairs, consuming one
element from each source before producing output. The result terminates as soon as either source
is exhausted.
---------------------------------------------
| source1 | a1 | a2 | |
| source2 | b1 | b2 | b3 |
| result | a1 * b1 | a2 * b2 | |
---------------------------------------------
let letters = asyncSeq { yield! [ 'a'; 'b'; 'c' ] }
let numbers = asyncSeq { yield! [ 1; 2; 3; 4 ] }
let pairs : AsyncSeq<char * int> = AsyncSeq.zip letters numbers
// emits: ('a',1), ('b',2), ('c',3) — stops when letters runs out
zipWith and zipWithAsync
AsyncSeq.zipWith applies a combining function instead of producing tuples:
let sums : AsyncSeq<int> =
AsyncSeq.zipWith (fun a b -> a + b) numbers numbers
// emits: 2, 4, 6, 8
AsyncSeq.zipWithAsync is the same but the combining function returns Async<'U>, useful when
the combination itself requires IO:
let combine (a: int) (b: int) : Async<string> =
async { return sprintf "%d+%d=%d" a b (a+b) }
let asyncSums : AsyncSeq<string> =
AsyncSeq.zipWithAsync combine numbers numbers
zipParallel
AsyncSeq.zipParallel is identical to zip but fetches the next element from both sources
simultaneously rather than sequentially. Use this when the two sources are independent and
fetching each element involves latency:
let parallelPairs : AsyncSeq<char * int> = AsyncSeq.zipParallel letters numbers
zip3 and zipWith3
For three sources at once:
let triples =
AsyncSeq.zip3
(asyncSeq { yield! [ 1; 2; 3 ] })
(asyncSeq { yield! [ 'a'; 'b'; 'c' ] })
(asyncSeq { yield! [ true; false; true ] })
// emits: (1,'a',true), (2,'b',false), (3,'c',true)
Rate-limiting with zipWith
A practical pattern for zipWith is rate-limiting: pair each element of a fast source with a
minimum delay so that the consumer never runs faster than one element per interval:
type Event = { entityId: int64; data: string }
let events : AsyncSeq<Event> = failwith "TODO: connect to event source"
/// Emit at most one event per second.
let eventsAtLeastOneSec : AsyncSeq<Event> =
AsyncSeq.zipWith
(fun evt _ -> evt)
events
(AsyncSeq.replicateInfiniteAsync (Async.Sleep 1000))
The delay sequence is infinite so it never causes zipWith to terminate early; the resulting
sequence ends only when events runs out.
Interleave
AsyncSeq.interleave alternates strictly between two sources in lock-step: one element from
source1, one from source2, one from source1, and so on. It terminates when either source is
exhausted. Unlike merge, the ordering is deterministic.
----------------------------------------------
| source1 | a1 | | a2 | | a3 | |
| source2 | | b1 | | b2 | | |
| result | a1 | b1 | a2 | b2 | a3 | |
----------------------------------------------
let odds = asyncSeq { yield! [ 1; 3; 5 ] }
let evens = asyncSeq { yield! [ 2; 4; 6 ] }
let interleaved : AsyncSeq<int> = AsyncSeq.interleave odds evens
// emits: 1, 2, 3, 4, 5, 6
interleaveMany
AsyncSeq.interleaveMany generalises this to an arbitrary number of sequences, cycling through
them in order:
let a = asyncSeq { yield! [ 1; 4; 7 ] }
let b = asyncSeq { yield! [ 2; 5; 8 ] }
let c = asyncSeq { yield! [ 3; 6; 9 ] }
let roundRobin : AsyncSeq<int> = AsyncSeq.interleaveMany [ a; b; c ]
// emits: 1, 2, 3, 4, 5, 6, 7, 8, 9
interleaveChoice
AsyncSeq.interleaveChoice interleaves two sequences of different types, wrapping each
element in Choice<'T1,'T2> so you can tell which source it came from:
let intSeq = asyncSeq { yield! [ 1; 2; 3 ] }
let strSeq = asyncSeq { yield! [ "a"; "b"; "c" ] }
let tagged : AsyncSeq<Choice<int,string>> =
AsyncSeq.interleaveChoice intSeq strSeq
// Choice1Of2 1, Choice2Of2 "a", Choice1Of2 2, Choice2Of2 "b", ...
Merge
AsyncSeq.merge interleaves two sequences non-deterministically, emitting values as soon as
either source produces one. There is no lock-step coordination — whichever source is ready first
wins. The result continues until both sources are exhausted.
-----------------------------------------
| source1 | t0 | | t1 | | | t2 |
| source2 | | u0 | | | u1 | |
| result | t0 | u0 | t1 | | u1 | t2 |
-----------------------------------------
This is different from append (sequential) and interleave (strict lock-step).
A handy building block is an infinite ticker:
/// Emits `DateTime.UtcNow` immediately, then every `periodMs` milliseconds.
let intervalMs (periodMs: int) = asyncSeq {
yield DateTime.UtcNow
while true do
do! Async.Sleep periodMs
yield DateTime.UtcNow
}
// React to whichever of the two timers fires first.
let either : AsyncSeq<DateTime> =
AsyncSeq.merge (intervalMs 20) (intervalMs 30)
mergeAll
AsyncSeq.mergeAll merges an arbitrary number of sequences non-deterministically:
let sources : AsyncSeq<DateTime> =
AsyncSeq.mergeAll [ intervalMs 100; intervalMs 250; intervalMs 500 ]
mergeChoice
AsyncSeq.mergeChoice merges two sequences of different types non-deterministically, tagging
each element with Choice<'T1,'T2> so you can distinguish the source:
type Heartbeat = Heartbeat
type ConfigChange = ConfigChange of string
let heartbeats : AsyncSeq<Heartbeat> = AsyncSeq.replicateInfiniteAsync (async {
do! Async.Sleep 5000
return Heartbeat
})
let configChanges : AsyncSeq<ConfigChange> = failwith "TODO: long-poll config store"
let merged : AsyncSeq<Choice<Heartbeat, ConfigChange>> =
AsyncSeq.mergeChoice heartbeats configChanges
Combine Latest
AsyncSeq.combineLatestWith merges two sequences non-deterministically and applies a combining
function to the latest value from each side whenever either source advances. The output only
starts once both sources have each produced at least one element.
----------------------------------------
| source1 | a0 | | | a1 | | a2 |
| source2 | | b0 | b1 | | | |
| result | | c0 | c1 | c2 | | c3 |
----------------------------------------
where c0 = f a0 b0 | c1 = f a0 b1 | c2 = f a1 b1 | c3 = f a2 b1
Unlike zip, the two sources do not have to produce elements at the same rate. A fast source
will simply keep updating the "latest" value used in future combinations.
Consider a service that watches a configuration key in Consul using
HTTP long-polling. Each response carries the new value and a modify-index used in the next
request. We can model the stream of changes as an AsyncSeq:
type Key = string
type Value = string
type ModifyIndex = int64
let longPollKey (key: Key, mi: ModifyIndex) : Async<Value * ModifyIndex> =
failwith "TODO: call Consul HTTP API"
/// Emits a new value every time the Consul key changes.
let changes (key: Key, mi: ModifyIndex) : AsyncSeq<Value> =
AsyncSeq.unfoldAsync
(fun (mi: ModifyIndex) -> async {
let! value, mi' = longPollKey (key, mi)
return Some (value, mi') })
mi
We combine the change stream with a periodic heartbeat so that downstream logic is re-triggered at least once per minute even when the key is stable:
let changesOrInterval : AsyncSeq<Value> =
AsyncSeq.combineLatestWith
(fun v _ -> v)
(changes ("myKey", 0L))
(intervalMs (1000 * 60))
Consumers of changesOrInterval see the latest config value whenever the key changes or at
least once per minute.
namespace FSharp
--------------------
namespace Microsoft.FSharp
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
<summary> Builds an asynchronous sequence using the computation builder syntax </summary>
module AsyncSeq from FSharp.Control
--------------------
type AsyncSeq<'T> = Collections.Generic.IAsyncEnumerable<'T>
<summary> An asynchronous sequence; equivalent to System.Collections.Generic.IAsyncEnumerable<'T>. Use the asyncSeq { ... } computation expression to create values, and the AsyncSeq module for combinators. </summary>
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
<summary> Yields all elements of the first asynchronous sequence and then all elements of the second asynchronous sequence. </summary>
val char: value: 'T -> char (requires member op_Explicit)
--------------------
type char = Char
<summary> Combines two asynchronous sequences into a sequence of pairs. The resulting sequence stops when either of the argument sequences stop. </summary>
<summary> Combines two asynchronous sequences using the specified function. The resulting sequence stops when either of the argument sequences stop. </summary>
type Async = static member AsBeginEnd: computation: ('Arg -> Async<'T>) -> ('Arg * AsyncCallback * objnull -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent: event: IEvent<'Del,'T> * ?cancelAction: (unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult: iar: IAsyncResult * ?millisecondsTimeout: int -> Async<bool> static member AwaitTask: task: Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle: waitHandle: WaitHandle * ?millisecondsTimeout: int -> Async<bool> static member CancelDefaultToken: unit -> unit static member Catch: computation: Async<'T> -> Async<Choice<'T,exn>> static member Choice: computations: Async<'T option> seq -> Async<'T option> static member FromBeginEnd: beginAction: (AsyncCallback * objnull -> IAsyncResult) * endAction: (IAsyncResult -> 'T) * ?cancelAction: (unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations: callback: (('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...
--------------------
type Async<'T>
val string: value: 'T -> string
--------------------
type string = String
<summary> Combines two asynchronous sequences using the specified function. The resulting sequence stops when either of the argument sequences stop. </summary>
<summary> Combines two asynchronous sequences into a sequence of pairs. The values from sequences are retrieved in parallel. The resulting sequence stops when either of the argument sequences stop. </summary>
<summary> Combines three asynchronous sequences into a sequence of triples. The resulting sequence stops when any of the argument sequences stop. </summary>
module Event from Microsoft.FSharp.Control
--------------------
type Event = { entityId: int64 data: string }
--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T> with get
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: objnull * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args> with get
--------------------
new: unit -> Event<'T>
--------------------
new: unit -> Event<'Delegate,'Args>
val int64: value: 'T -> int64 (requires member op_Explicit)
--------------------
type int64 = Int64
--------------------
type int64<'Measure> = int64
Emit at most one event per second.
<summary> Creates an infinite async sequence which repeatedly evaluates and emits the specified async value. </summary>
static member Async.Sleep: millisecondsDueTime: int -> Async<unit>
<summary> Interleaves two async sequences of the same type into a resulting sequence. The provided sequences are consumed in lock-step. </summary>
<summary> Interleaves a sequence of async sequences into a resulting async sequence. The provided sequences are consumed in lock-step. </summary>
type Choice<'T1,'T2> = | Choice1Of2 of 'T1 | Choice2Of2 of 'T2
--------------------
type Choice<'T1,'T2,'T3> = | Choice1Of3 of 'T1 | Choice2Of3 of 'T2 | Choice3Of3 of 'T3
--------------------
type Choice<'T1,'T2,'T3,'T4> = | Choice1Of4 of 'T1 | Choice2Of4 of 'T2 | Choice3Of4 of 'T3 | Choice4Of4 of 'T4
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5> = | Choice1Of5 of 'T1 | Choice2Of5 of 'T2 | Choice3Of5 of 'T3 | Choice4Of5 of 'T4 | Choice5Of5 of 'T5
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6> = | Choice1Of6 of 'T1 | Choice2Of6 of 'T2 | Choice3Of6 of 'T3 | Choice4Of6 of 'T4 | Choice5Of6 of 'T5 | Choice6Of6 of 'T6
--------------------
type Choice<'T1,'T2,'T3,'T4,'T5,'T6,'T7> = | Choice1Of7 of 'T1 | Choice2Of7 of 'T2 | Choice3Of7 of 'T3 | Choice4Of7 of 'T4 | Choice5Of7 of 'T5 | Choice6Of7 of 'T6 | Choice7Of7 of 'T7
<summary> Interleaves two async sequences into a resulting sequence. The provided sequences are consumed in lock-step. </summary>
Emits `DateTime.UtcNow` immediately, then every `periodMs` milliseconds.
[<Struct>] type DateTime = new: year: int * month: int * day: int -> unit + 16 overloads member Add: value: TimeSpan -> DateTime member AddDays: value: float -> DateTime member AddHours: value: float -> DateTime member AddMicroseconds: value: float -> DateTime member AddMilliseconds: value: float -> DateTime member AddMinutes: value: float -> DateTime member AddMonths: months: int -> DateTime member AddSeconds: value: float -> DateTime member AddTicks: value: int64 -> DateTime ...
<summary>Represents an instant in time, typically expressed as a date and time of day.</summary>
--------------------
DateTime ()
(+0 other overloads)
DateTime(ticks: int64) : DateTime
(+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : DateTime
(+0 other overloads)
DateTime(date: DateOnly, time: TimeOnly) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int) : DateTime
(+0 other overloads)
DateTime(date: DateOnly, time: TimeOnly, kind: DateTimeKind) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : DateTime
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : DateTime
(+0 other overloads)
<summary>Gets a <see cref="T:System.DateTime" /> object that is set to the current date and time on this computer, expressed as the Coordinated Universal Time (UTC).</summary>
<returns>An object whose value is the current UTC date and time.</returns>
<summary> Merges two async sequences of the same type into an async sequence non-deterministically. The resulting async sequence produces elements when any argument sequence produces an element. </summary>
<summary> Merges all specified async sequences into an async sequence non-deterministically. The resulting async sequence produces elements when any argument sequence produces an element. </summary>
union case Heartbeat.Heartbeat: Heartbeat
--------------------
type Heartbeat = | Heartbeat
union case ConfigChange.ConfigChange: string -> ConfigChange
--------------------
type ConfigChange = | ConfigChange of string
<summary> Merges two async sequences into an async sequence non-deterministically. The resulting async sequence produces elements when any argument sequence produces an element. </summary>
Emits a new value every time the Consul key changes.
<summary> Generates an async sequence using the specified asynchronous generator function. </summary>
<summary> Merges two async sequences using the specified combine function. The resulting async sequence produces an element when either input sequence produces an element, passing the new element from the emitting sequence and the previously emitted element from the other sequence. If either of the input sequences is empty, the resulting sequence is empty. </summary>
FSharp.Control.AsyncSeq