Header menu logo FSharp.Control.AsyncSeq

Binder

Generating Asynchronous Sequences

This document covers the two main ways to create AsyncSeq<'T> values: the asyncSeq computation expression and the AsyncSeq.unfoldAsync factory function.

open System
open FSharp.Control

Computation Expression Syntax

asyncSeq { ... } is a computation expression that lets you write asynchronous sequences using familiar F# constructs. The following sections show each supported form.

Conditionals

Use if/then/else to emit elements conditionally:

let evenNumbers = asyncSeq {
    for i in 1 .. 10 do
        if i % 2 = 0 then
            yield i
}

You can also yield different values in each branch:

let labelledNumbers = asyncSeq {
    for i in 1 .. 5 do
        if i % 2 = 0 then
            yield sprintf "%d is even" i
        else
            yield sprintf "%d is odd" i
}

Match Expressions

match works exactly as it does in ordinary F# code:

type Shape =
    | Circle of radius: float
    | Rectangle of width: float * height: float

let areas = asyncSeq {
    for shape in [ Circle 3.0; Rectangle (4.0, 5.0); Circle 1.5 ] do
        match shape with
        | Circle r       -> yield Math.PI * r * r
        | Rectangle(w,h) -> yield w * h
}

For Loops

for iterates over any seq<'T> or IEnumerable<'T> synchronously, or over an AsyncSeq<'T> asynchronously:

// Iterating a plain sequence
let squaresOfList = asyncSeq {
    for n in [ 1; 2; 3; 4; 5 ] do
        yield n * n
}

// Iterating another AsyncSeq
let doubled = asyncSeq {
    for n in squaresOfList do
        yield n * 2
}

While Loops

while lets you emit elements until a condition becomes false. Async operations can appear inside the loop body:

let countdown = asyncSeq {
    let mutable i = 5
    while i > 0 do
        yield i
        i <- i - 1
}

// Polling a resource until it is ready
let pollUntilReady (checkReady: unit -> Async<bool>) = asyncSeq {
    let mutable ready = false
    while not ready do
        do! Async.Sleep 500
        let! r = checkReady ()
        ready <- r
        if r then yield "ready"
}

Use Bindings

use disposes the resource when the sequence is no longer consumed, making it easy to work with IDisposable objects such as file streams or database connections:

open System.IO

let readLines (path: string) = asyncSeq {
    use reader = new StreamReader(path)
    let mutable line = reader.ReadLine()
    while line <> null do
        yield line
        line <- reader.ReadLine()
}

Try / With

try/with lets you catch exceptions that occur while producing elements and decide how to proceed:

let safeParseInts (inputs: string list) = asyncSeq {
    for s in inputs do
        try
            yield int s
        with
        | :? FormatException -> () // skip values that can't be parsed
}

try/finally is also supported and guarantees clean-up even if the consumer cancels early:

let withCleanup = asyncSeq {
    try
        for i in 1 .. 5 do
            yield i
    finally
        printfn "sequence finished or cancelled"
}

Generating with unfoldAsync

AsyncSeq.unfoldAsync builds a sequence from a seed state. The supplied function receives the current state and returns an Async of Some (element, nextState) to emit an element and continue, or None to end the sequence.

As a concrete example, suppose you are writing a program that reads batches of tweets from an API and stores those that pass a filter into a database. Each step is asynchronous: fetching a batch, deciding whether a tweet is relevant, and writing to the database.

Given a getTweetBatch function of type int -> Async<(Tweet[] * int) option> — where the int is a stream offset — the sequence of all batches is:

type Tweet = {
    user    : string
    message : string
}

let getTweetBatch (offset: int) : Async<(Tweet[] * int) option> =
    async { return failwith "TODO: call Twitter API" }

let tweetBatches : AsyncSeq<Tweet[]> =
    AsyncSeq.unfoldAsync getTweetBatch 0

When iterated, tweetBatches incrementally pages through the entire tweet stream.

Next, a filtering function that calls a web service has type Tweet -> Async<bool>. We flatten the batches and filter the individual tweets:

let filterTweet (t: Tweet) : Async<bool> =
    failwith "TODO: call web service"

let filteredTweets : AsyncSeq<Tweet> =
    tweetBatches
    |> AsyncSeq.concatSeq       // flatten batches to individual tweets
    |> AsyncSeq.filterAsync filterTweet

Finally, a storeTweet function of type Tweet -> Async<unit> writes each tweet to the database. We can compose the entire pipeline and run it:

let storeTweet (t: Tweet) : Async<unit> =
    failwith "TODO: call database"

AsyncSeq.unfoldAsync getTweetBatch 0
|> AsyncSeq.concatSeq
|> AsyncSeq.filterAsync filterTweet
|> AsyncSeq.iterAsync storeTweet
|> Async.RunSynchronously

This pipeline is a representation of the workflow — nothing executes until Async.RunSynchronously is called. When it runs, it pages through tweets, filters them asynchronously, and stores matches, all without blocking any threads.

namespace System
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
val evenNumbers: AsyncSeq<int>
val asyncSeq: AsyncSeq.AsyncSeqBuilder
<summary> Builds an asynchronous sequence using the computation builder syntax </summary>
val i: int
val labelledNumbers: AsyncSeq<string>
val sprintf: format: Printf.StringFormat<'T> -> 'T
type Shape = | Circle of radius: float | Rectangle of width: float * height: float
Multiple items
val float: value: 'T -> float (requires member op_Explicit)

--------------------
type float = Double

--------------------
type float<'Measure> = float
val areas: AsyncSeq<float>
val shape: Shape
union case Shape.Circle: radius: float -> Shape
union case Shape.Rectangle: width: float * height: float -> Shape
val r: float
type Math = static member Abs: value: decimal -> decimal + 7 overloads static member Acos: d: float -> float static member Acosh: d: float -> float static member Asin: d: float -> float static member Asinh: d: float -> float static member Atan: d: float -> float static member Atan2: y: float * x: float -> float static member Atanh: d: float -> float static member BigMul: a: int * b: int -> int64 + 2 overloads static member BitDecrement: x: float -> float ...
<summary>Provides constants and static methods for trigonometric, logarithmic, and other common mathematical functions.</summary>
field Math.PI: float = 3.14159265359
val w: float
val h: float
val squaresOfList: AsyncSeq<int>
val n: int
val doubled: AsyncSeq<int>
val countdown: AsyncSeq<int>
val mutable i: int
val pollUntilReady: checkReady: (unit -> Async<bool>) -> AsyncSeq<string>
val checkReady: (unit -> Async<bool>)
type unit = Unit
Multiple items
type Async = static member AsBeginEnd: computation: ('Arg -> Async<'T>) -> ('Arg * AsyncCallback * objnull -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent: event: IEvent<'Del,'T> * ?cancelAction: (unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult: iar: IAsyncResult * ?millisecondsTimeout: int -> Async<bool> static member AwaitTask: task: Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle: waitHandle: WaitHandle * ?millisecondsTimeout: int -> Async<bool> static member CancelDefaultToken: unit -> unit static member Catch: computation: Async<'T> -> Async<Choice<'T,exn>> static member Choice: computations: Async<'T option> seq -> Async<'T option> static member FromBeginEnd: beginAction: (AsyncCallback * objnull -> IAsyncResult) * endAction: (IAsyncResult -> 'T) * ?cancelAction: (unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations: callback: (('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...

--------------------
type Async<'T>
type bool = Boolean
val mutable ready: bool
static member Async.Sleep: dueTime: TimeSpan -> Async<unit>
static member Async.Sleep: millisecondsDueTime: int -> Async<unit>
val r: bool
namespace System.IO
val readLines: path: string -> AsyncSeq<string>
val path: string
Multiple items
val string: value: 'T -> string

--------------------
type string = String
val reader: StreamReader
Multiple items
type StreamReader = inherit TextReader new: stream: Stream -> unit + 12 overloads member Close: unit -> unit member DiscardBufferedData: unit -> unit member Peek: unit -> int member Read: unit -> int + 2 overloads member ReadAsync: buffer: char array * index: int * count: int -> Task<int> + 1 overload member ReadBlock: buffer: char array * index: int * count: int -> int + 1 overload member ReadBlockAsync: buffer: char array * index: int * count: int -> Task<int> + 1 overload member ReadLine: unit -> string ...
<summary>Implements a <see cref="T:System.IO.TextReader" /> that reads characters from a byte stream in a particular encoding.</summary>

--------------------
StreamReader(stream: Stream) : StreamReader
   (+0 other overloads)
StreamReader(path: string) : StreamReader
   (+0 other overloads)
StreamReader(stream: Stream, detectEncodingFromByteOrderMarks: bool) : StreamReader
   (+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding) : StreamReader
   (+0 other overloads)
StreamReader(path: string, detectEncodingFromByteOrderMarks: bool) : StreamReader
   (+0 other overloads)
StreamReader(path: string, options: FileStreamOptions) : StreamReader
   (+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding) : StreamReader
   (+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : StreamReader
   (+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : StreamReader
   (+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : StreamReader
   (+0 other overloads)
val mutable line: string
StreamReader.ReadLine() : string
val safeParseInts: inputs: string list -> AsyncSeq<int>
val inputs: string list
type 'T list = List<'T>
val s: string
Multiple items
val int: value: 'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
Multiple items
type FormatException = inherit SystemException new: unit -> unit + 2 overloads
<summary>The exception that is thrown when the format of an argument is invalid, or when a composite format string is not well formed.</summary>

--------------------
FormatException() : FormatException
FormatException(message: string) : FormatException
FormatException(message: string, innerException: exn) : FormatException
val withCleanup: AsyncSeq<int>
val printfn: format: Printf.TextWriterFormat<'T> -> 'T
type Tweet = { user: string message: string }
val getTweetBatch: offset: int -> Async<(Tweet array * int) option>
val offset: int
type 'T option = Option<'T>
val async: AsyncBuilder
val failwith: message: string -> 'T
val tweetBatches: AsyncSeq<Tweet array>
Multiple items
module AsyncSeq from FSharp.Control

--------------------
type AsyncSeq<'T> = Collections.Generic.IAsyncEnumerable<'T>
<summary> An asynchronous sequence; equivalent to System.Collections.Generic.IAsyncEnumerable&lt;'T&gt;. Use the asyncSeq { ... } computation expression to create values, and the AsyncSeq module for combinators. </summary>
val unfoldAsync: generator: ('State -> Async<('T * 'State) option>) -> state: 'State -> AsyncSeq<'T>
<summary> Generates an async sequence using the specified asynchronous generator function. </summary>
val filterTweet: t: Tweet -> Async<bool>
val t: Tweet
val filteredTweets: AsyncSeq<Tweet>
val concatSeq: source: AsyncSeq<#('T seq)> -> AsyncSeq<'T>
<summary> Flattens an AsyncSeq of synchronous sequences. </summary>
val filterAsync: predicate: ('T -> Async<bool>) -> source: AsyncSeq<'T> -> AsyncSeq<'T>
<summary> Builds a new asynchronous sequence whose elements are those from the input sequence for which the specified function returned true. The specified function is asynchronous (and the input sequence will be asked for the next element after the processing of an element completes). </summary>
val storeTweet: t: Tweet -> Async<unit>
val iterAsync: action: ('T -> Async<unit>) -> source: AsyncSeq<'T> -> Async<unit>
<summary> Iterates over the input sequence and calls the specified asynchronous function for every value. The input sequence will be asked for the next element after the processing of an element completes. </summary>
static member Async.RunSynchronously: computation: Async<'T> * ?timeout: int * ?cancellationToken: Threading.CancellationToken -> 'T

Type something to start searching.