FSharp.Control.AsyncSeq


Binder

F# Async: FSharp.Control.AsyncSeq

NOTE: There is also the option to use FSharp.Control.TaskSeq which has a very similar usage model.

An AsyncSeq is a sequence in which individual elements are retrieved using an Async computation. It is similar to seq<'a> in that subsequent elements are pulled on-demand. AsyncSeq also bears similarity to IObservable<'a> with the former being based on an "asynchronous pull" and the latter based on a "synchronous push". Analogs for most operations defined for Seq, List and IObservable are also defined for AsyncSeq. The power of AsyncSeq lies in that many of these operations also have analogs based on Async allowing composition of complex asynchronous workflows.

The AsyncSeq type is located in the FSharp.Control.AsyncSeq.dll assembly which can be loaded in F# Interactive as follows:

#r "../../../bin/FSharp.Control.AsyncSeq.dll"
open FSharp.Control

Generating asynchronous sequences

An AsyncSeq<'T> can be generated using computation expression syntax much like seq<'T>:

let async12 = asyncSeq {
  yield 1
  yield 2
}

Another way to generate an asynchronous sequence is using the Async.unfoldAsync function. This function accepts as an argument a function which can generate individual elements based on a state and signal completion of the sequence.

For example, suppose that you're writing a program which consumes the Twitter API and stores tweets which satisfy some criteria into a database. There are several asynchronous request-reply interactions at play - one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some criteria and finally an operation to write the desired tweet to a database.

Given the type Tweet to represent an individual tweet, the operation to retrieve a batch of tweets can be modeled with type int -> Async<(Tweet[] * int) option> where the incoming int represents the offset into the tweet stream. The asynchronous result is an Option which when None indicates the end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.

The above function to retrieve a batch of tweets can be used to generate an asynchronous sequence of tweet batches as follows:

type Tweet = {
  user : string
  message : string
}

let getTweetBatch (offset: int) : Async<(Tweet[] * int) option> = 
  failwith "TODO: call Twitter API"

let tweetBatches : AsyncSeq<Tweet[]> = 
  AsyncSeq.unfoldAsync getTweetBatch 0

The asynchronous sequence tweetBatches will when iterated, incrementally consume the entire tweet stream.

Next, suppose that the tweet filtering function makes a call to a web service which determines whether a particular tweet is of interest and should be stored in the database. This function can be modeled with type Tweet -> Async<bool>. We can flatten the tweetBatches sequence and then filter it as follows:

let filterTweet (t: Tweet) : Async<bool> =
  failwith "TODO: call web service"

let filteredTweets : AsyncSeq<Tweet> = 
  tweetBatches
  |> AsyncSeq.concatSeq // flatten
  |> AsyncSeq.filterAsync filterTweet // filter

When the resulting sequence filteredTweets is consumed, it will lazily consume the underlying sequence tweetBatches, select individual tweets and filter them using the function filterTweets.

Finally, the function which stores a tweet in the database can be modeled by type Tweet -> Async<unit>. We can store all filtered tweets as follows:

let storeTweet (t: Tweet) : Async<unit> =
  failwith "TODO: call database"

let storeFilteredTweets : Async<unit> =
  filteredTweets
  |> AsyncSeq.iterAsync storeTweet

Note that the value storeFilteredTweets is an asynchronous computation of type Async<unit>. At this point, it is a representation of the workflow which consists of reading batches of tweets, filtering them and storing them in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be succinctly declared and executed as follows:

AsyncSeq.unfoldAsync getTweetBatch 0
|> AsyncSeq.concatSeq
|> AsyncSeq.filterAsync filterTweet
|> AsyncSeq.iterAsync storeTweet
|> Async.RunSynchronously

The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit composed using familiar operations on sequences. Furthermore, it will be executed efficiently in a non-blocking manner.

Comparison with seq<'T>

The central difference between seq<'T> and AsyncSeq<'T> can be illustrated by introducing the notion of time. Suppose that generating subsequent elements of a sequence requires an IO-bound operation. Invoking long running IO-bound operations from within a seq<'T> will block the thread which calls MoveNext on the corresponding IEnumerator. An AsyncSeq on the other hand can use facilities provided by the F# Async type to make more efficient use of system resources.

let withTime = seq {
  Thread.Sleep(1000) // calling thread will block
  yield 1
  Thread.Sleep(1000) // calling thread will block
  yield 1
}

let withTime' = asyncSeq {
  do! Async.Sleep 1000 // non-blocking sleep
  yield 1
  do! Async.Sleep 1000 // non-blocking sleep
  yield 2
}

When the asynchronous sequence withTime' is iterated, the calls to Async.Sleep won't block threads. Instead, the continuation of the sequence will be scheduled by Async while the calling thread will be free to perform other work. Overall, a seq<'a> can be viewed as a special case of an AsyncSeq<'a> where subsequent elements are retrieved in a blocking manner.

Performance Considerations

While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case that this will improve the overall performance of an application. Note however that an async computation does not require a non-blocking operation, it simply allows for it. Also of note is that unlike calling IEnumerable.MoveNext(), consuming an item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the benefits, it can make a difference in some scenarios.

Related Articles

Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
val async12 : AsyncSeq<int>
val asyncSeq : AsyncSeq.AsyncSeqBuilder
<summary> Builds an asynchronous sequence using the computation builder syntax </summary>
type Tweet = { user: string message: string }
Tweet.user: string
Multiple items
val string : value:'T -> string
<summary>Converts the argument to a string using <c>ToString</c>.</summary>
<remarks>For standard integer and floating point values the and any type that implements <c>IFormattable</c><c>ToString</c> conversion uses <c>CultureInfo.InvariantCulture</c>. </remarks>
<param name="value">The input value.</param>
<returns>The converted string.</returns>


--------------------
type string = System.String
<summary>An abbreviation for the CLI type <see cref="T:System.String" />.</summary>
<category>Basic Types</category>
Tweet.message: string
val getTweetBatch : offset:int -> Async<(Tweet [] * int) option>
val offset : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
<summary>Converts the argument to signed 32-bit integer. This is a direct conversion for all primitive numeric types. For strings, the input is converted using <c>Int32.Parse()</c> with InvariantCulture settings. Otherwise the operation requires an appropriate static conversion method on the input type.</summary>
<param name="value">The input value.</param>
<returns>The converted int</returns>


--------------------
[<Struct>] type int = int32
<summary>An abbreviation for the CLI type <see cref="T:System.Int32" />.</summary>
<category>Basic Types</category>


--------------------
type int<'Measure> = int
<summary>The type of 32-bit signed integer numbers, annotated with a unit of measure. The unit of measure is erased in compiled code and when values of this type are analyzed using reflection. The type is representationally equivalent to <see cref="T:System.Int32" />.</summary>
<category>Basic Types with Units of Measure</category>
Multiple items
type Async = static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool> static member AwaitTask : task:Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool> static member CancelDefaultToken : unit -> unit static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>> static member Choice : computations:seq<Async<'T option>> -> Async<'T option> static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...
<summary>Holds static members for creating and manipulating asynchronous computations.</summary>
<remarks> See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>. </remarks>
<category index="1">Async Programming</category>


--------------------
type Async<'T> =
<summary> An asynchronous computation, which, when run, will eventually produce a value of type T, or else raises an exception. </summary>
<remarks> This type has no members. Asynchronous computations are normally specified either by using an async expression or the static methods in the <see cref="T:Microsoft.FSharp.Control.Async" /> type. See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>. </remarks>
<namespacedoc><summary> Library functionality for asynchronous programming, events and agents. See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">Asynchronous Programming</a>, <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/members/events">Events</a> and <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/lazy-expressions">Lazy Expressions</a> in the F# Language Guide. </summary></namespacedoc>
<category index="1">Async Programming</category>
type 'T option = Option<'T>
<summary>The type of optional values. When used from other CLI languages the empty option is the <c>null</c> value. </summary>
<remarks>Use the constructors <c>Some</c> and <c>None</c> to create values of this type. Use the values in the <c>Option</c> module to manipulate values of this type, or pattern match against the values directly. 'None' values will appear as the value <c>null</c> to other CLI languages. Instance methods on this type will appear as static methods to other CLI languages due to the use of <c>null</c> as a value representation.</remarks>
<category index="3">Options</category>
val failwith : message:string -> 'T
<summary>Throw a <see cref="T:System.Exception" /> exception.</summary>
<param name="message">The exception message.</param>
<returns>The result value.</returns>
val tweetBatches : AsyncSeq<Tweet []>
Multiple items
module AsyncSeq from FSharp.Control

--------------------
type AsyncSeq<'T> = IAsyncEnumerable<'T>
<summary> An asynchronous sequence represents a delayed computation that can be started to give an enumerator for pulling results asynchronously </summary>
val unfoldAsync : generator:('State -> Async<('T * 'State) option>) -> state:'State -> AsyncSeq<'T>
<summary> Generates an async sequence using the specified asynchronous generator function. </summary>
val filterTweet : t:Tweet -> Async<bool>
val t : Tweet
[<Struct>] type bool = System.Boolean
<summary>An abbreviation for the CLI type <see cref="T:System.Boolean" />.</summary>
<category>Basic Types</category>
val filteredTweets : AsyncSeq<Tweet>
val concatSeq : source:AsyncSeq<#seq<'T>> -> AsyncSeq<'T>
<summary> Flattens an AsyncSeq of synchronous sequences. </summary>
val filterAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
<summary> Builds a new asynchronous sequence whose elements are those from the input sequence for which the specified function returned true. The specified function is asynchronous (and the input sequence will be asked for the next element after the processing of an element completes). </summary>
val storeTweet : t:Tweet -> Async<unit>
type unit = Unit
<summary>The type 'unit', which has only one value "()". This value is special and always uses the representation 'null'.</summary>
<category index="1">Basic Types</category>
val storeFilteredTweets : Async<unit>
val iterAsync : action:('T -> Async<unit>) -> source:AsyncSeq<'T> -> Async<unit>
<summary> Iterates over the input sequence and calls the specified asynchronous function for every value. The input sequence will be asked for the next element after the processing of an element completes. </summary>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
val withTime : seq<int>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>
<summary>Builds a sequence using sequence expression syntax</summary>
<param name="sequence">The input sequence.</param>
<returns>The result sequence.</returns>


--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>
<summary>An abbreviation for the CLI type <see cref="T:System.Collections.Generic.IEnumerable`1" /></summary>
<remarks> See the <see cref="T:Microsoft.FSharp.Collections.SeqModule" /> module for further operations related to sequences. See also <a href="https://docs.microsoft.com/dotnet/fsharp/language-reference/sequences">F# Language Guide - Sequences</a>. </remarks>
val withTime' : AsyncSeq<int>
static member Async.Sleep : dueTime:System.TimeSpan -> Async<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>