NOTE: There is also the option to use FSharp.Control.TaskSeq which has a very similar usage model.
An AsyncSeq is a sequence in which individual elements are retrieved using an Async
computation.
It is similar to seq<'a>
in that subsequent elements are pulled on-demand.
AsyncSeq
also bears similarity to IObservable<'a>
with the former being based on an "asynchronous pull" and the
latter based on a "synchronous push". Analogs for most operations defined for Seq
, List
and IObservable
are also defined for
AsyncSeq
. The power of AsyncSeq
lies in that many of these operations also have analogs based on Async
allowing composition of complex asynchronous workflows.
The AsyncSeq
type is located in the FSharp.Control.AsyncSeq.dll
assembly which can be loaded in F# Interactive as follows:
#r "../../../bin/FSharp.Control.AsyncSeq.dll"
open FSharp.Control
An AsyncSeq<'T>
can be generated using computation expression syntax much like seq<'T>
:
let async12 = asyncSeq {
yield 1
yield 2
}
Another way to generate an asynchronous sequence is using the Async.unfoldAsync
function. This
function accepts as an argument a function which can generate individual elements based on a state and
signal completion of the sequence.
For example, suppose that you're writing a program which consumes the Twitter API and stores tweets
which satisfy some criteria into a database. There are several asynchronous request-reply interactions at play -
one to retrieve a batch of tweets from the Twitter API, another to determine whether a tweet satisfies some
criteria and finally an operation to write the desired tweet to a database.
Given the type Tweet
to represent an individual tweet, the operation to retrieve a batch of tweets can
be modeled with type int -> Async<(Tweet[] * int) option>
where the incoming int
represents the
offset into the tweet stream. The asynchronous result is an Option
which when None
indicates the
end of the stream, and otherwise contains the batch of retrieved tweets as well as the next offset.
The above function to retrieve a batch of tweets can be used to generate an asynchronous sequence
of tweet batches as follows:
type Tweet = {
user : string
message : string
}
let getTweetBatch (offset: int) : Async<(Tweet[] * int) option> =
failwith "TODO: call Twitter API"
let tweetBatches : AsyncSeq<Tweet[]> =
AsyncSeq.unfoldAsync getTweetBatch 0
The asynchronous sequence tweetBatches
will when iterated, incrementally consume the entire tweet stream.
Next, suppose that the tweet filtering function makes a call to a web service which determines
whether a particular tweet is of interest and should be stored in the database. This function can be modeled with
type Tweet -> Async<bool>
. We can flatten the tweetBatches
sequence and then filter it as follows:
let filterTweet (t: Tweet) : Async<bool> =
failwith "TODO: call web service"
let filteredTweets : AsyncSeq<Tweet> =
tweetBatches
|> AsyncSeq.concatSeq // flatten
|> AsyncSeq.filterAsync filterTweet // filter
When the resulting sequence filteredTweets
is consumed, it will lazily consume the underlying
sequence tweetBatches
, select individual tweets and filter them using the function filterTweets
.
Finally, the function which stores a tweet in the database can be modeled by type Tweet -> Async<unit>
.
We can store all filtered tweets as follows:
let storeTweet (t: Tweet) : Async<unit> =
failwith "TODO: call database"
let storeFilteredTweets : Async<unit> =
filteredTweets
|> AsyncSeq.iterAsync storeTweet
Note that the value storeFilteredTweets
is an asynchronous computation of type Async<unit>
. At this point,
it is a representation of the workflow which consists of reading batches of tweets, filtering them and storing them
in the database. When executed, the workflow will consume the entire tweet stream. The entire workflow can be
succinctly declared and executed as follows:
AsyncSeq.unfoldAsync getTweetBatch 0
|> AsyncSeq.concatSeq
|> AsyncSeq.filterAsync filterTweet
|> AsyncSeq.iterAsync storeTweet
|> Async.RunSynchronously
The above snippet effectively orchestrates several asynchronous request-reply interactions into a cohesive unit
composed using familiar operations on sequences. Furthermore, it will be executed efficiently in a non-blocking manner.
The central difference between seq<'T>
and AsyncSeq<'T>
can be illustrated by introducing the notion of time.
Suppose that generating subsequent elements of a sequence requires an IO-bound operation. Invoking long
running IO-bound operations from within a seq<'T>
will block the thread which calls MoveNext
on the
corresponding IEnumerator
. An AsyncSeq
on the other hand can use facilities provided by the F# Async
type to make
more efficient use of system resources.
let withTime = seq {
Thread.Sleep(1000) // calling thread will block
yield 1
Thread.Sleep(1000) // calling thread will block
yield 1
}
let withTime' = asyncSeq {
do! Async.Sleep 1000 // non-blocking sleep
yield 1
do! Async.Sleep 1000 // non-blocking sleep
yield 2
}
When the asynchronous sequence withTime'
is iterated, the calls to Async.Sleep
won't block threads. Instead,
the continuation of the sequence will be scheduled by Async
while the calling thread will be free to perform other work.
Overall, a seq<'a>
can be viewed as a special case of an AsyncSeq<'a>
where subsequent elements are retrieved
in a blocking manner.
While an asynchronous computation obviates the need to block an OS thread for the duration of an operation, it isn't always the case
that this will improve the overall performance of an application. Note however that an async computation does not require a
non-blocking operation, it simply allows for it. Also of note is that unlike calling IEnumerable.MoveNext()
, consuming
an item from an asynchronous sequence requires several allocations. Usually this is greatly outweighed by the
benefits, it can make a difference in some scenarios.
Multiple items
namespace FSharp
--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
val async12 : AsyncSeq<int>
val asyncSeq : AsyncSeq.AsyncSeqBuilder
<summary>
Builds an asynchronous sequence using the computation builder syntax
</summary>
type Tweet =
{ user: string
message: string }
Tweet.user: string
Multiple items
val string : value:'T -> string
<summary>Converts the argument to a string using <c>ToString</c>.</summary>
<remarks>For standard integer and floating point values the and any type that implements <c>IFormattable</c><c>ToString</c> conversion uses <c>CultureInfo.InvariantCulture</c>. </remarks>
<param name="value">The input value.</param>
<returns>The converted string.</returns>
--------------------
type string = System.String
<summary>An abbreviation for the CLI type <see cref="T:System.String" />.</summary>
<category>Basic Types</category>
Tweet.message: string
val getTweetBatch : offset:int -> Async<(Tweet [] * int) option>
val offset : int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
<summary>Converts the argument to signed 32-bit integer. This is a direct conversion for all
primitive numeric types. For strings, the input is converted using <c>Int32.Parse()</c>
with InvariantCulture settings. Otherwise the operation requires an appropriate
static conversion method on the input type.</summary>
<param name="value">The input value.</param>
<returns>The converted int</returns>
--------------------
[<Struct>]
type int = int32
<summary>An abbreviation for the CLI type <see cref="T:System.Int32" />.</summary>
<category>Basic Types</category>
--------------------
type int<'Measure> =
int
<summary>The type of 32-bit signed integer numbers, annotated with a unit of measure. The unit
of measure is erased in compiled code and when values of this type
are analyzed using reflection. The type is representationally equivalent to
<see cref="T:System.Int32" />.</summary>
<category>Basic Types with Units of Measure</category>
Multiple items
type Async =
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T> + 1 overload
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> + 3 overloads
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
...
<summary>Holds static members for creating and manipulating asynchronous computations.</summary>
<remarks>
See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>.
</remarks>
<category index="1">Async Programming</category>
--------------------
type Async<'T> =
<summary>
An asynchronous computation, which, when run, will eventually produce a value of type T, or else raises an exception.
</summary>
<remarks>
This type has no members. Asynchronous computations are normally specified either by using an async expression
or the static methods in the <see cref="T:Microsoft.FSharp.Control.Async" /> type.
See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>.
</remarks>
<namespacedoc><summary>
Library functionality for asynchronous programming, events and agents. See also
<a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">Asynchronous Programming</a>,
<a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/members/events">Events</a> and
<a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/lazy-expressions">Lazy Expressions</a> in the
F# Language Guide.
</summary></namespacedoc>
<category index="1">Async Programming</category>
type 'T option = Option<'T>
<summary>The type of optional values. When used from other CLI languages the
empty option is the <c>null</c> value. </summary>
<remarks>Use the constructors <c>Some</c> and <c>None</c> to create values of this type.
Use the values in the <c>Option</c> module to manipulate values of this type,
or pattern match against the values directly.
'None' values will appear as the value <c>null</c> to other CLI languages.
Instance methods on this type will appear as static methods to other CLI languages
due to the use of <c>null</c> as a value representation.</remarks>
<category index="3">Options</category>
val failwith : message:string -> 'T
<summary>Throw a <see cref="T:System.Exception" /> exception.</summary>
<param name="message">The exception message.</param>
<returns>The result value.</returns>
val tweetBatches : AsyncSeq<Tweet []>
Multiple items
module AsyncSeq
from FSharp.Control
--------------------
type AsyncSeq<'T> = IAsyncEnumerable<'T>
<summary>
An asynchronous sequence represents a delayed computation that can be
started to give an enumerator for pulling results asynchronously
</summary>
val unfoldAsync : generator:('State -> Async<('T * 'State) option>) -> state:'State -> AsyncSeq<'T>
<summary>
Generates an async sequence using the specified asynchronous generator function.
</summary>
val filterTweet : t:Tweet -> Async<bool>
val t : Tweet
[<Struct>]
type bool = System.Boolean
<summary>An abbreviation for the CLI type <see cref="T:System.Boolean" />.</summary>
<category>Basic Types</category>
val filteredTweets : AsyncSeq<Tweet>
val concatSeq : source:AsyncSeq<#seq<'T>> -> AsyncSeq<'T>
<summary>
Flattens an AsyncSeq of synchronous sequences.
</summary>
val filterAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
<summary>
Builds a new asynchronous sequence whose elements are those from the
input sequence for which the specified function returned true.
The specified function is asynchronous (and the input sequence will
be asked for the next element after the processing of an element completes).
</summary>
val storeTweet : t:Tweet -> Async<unit>
type unit = Unit
<summary>The type 'unit', which has only one value "()". This value is special and
always uses the representation 'null'.</summary>
<category index="1">Basic Types</category>
val storeFilteredTweets : Async<unit>
val iterAsync : action:('T -> Async<unit>) -> source:AsyncSeq<'T> -> Async<unit>
<summary>
Iterates over the input sequence and calls the specified asynchronous function for
every value. The input sequence will be asked for the next element after
the processing of an element completes.
</summary>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:System.Threading.CancellationToken -> 'T
val withTime : seq<int>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>
<summary>Builds a sequence using sequence expression syntax</summary>
<param name="sequence">The input sequence.</param>
<returns>The result sequence.</returns>
--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>
<summary>An abbreviation for the CLI type <see cref="T:System.Collections.Generic.IEnumerable`1" /></summary>
<remarks>
See the <see cref="T:Microsoft.FSharp.Collections.SeqModule" /> module for further operations related to sequences.
See also <a href="https://docs.microsoft.com/dotnet/fsharp/language-reference/sequences">F# Language Guide - Sequences</a>.
</remarks>
val withTime' : AsyncSeq<int>
static member Async.Sleep : dueTime:System.TimeSpan -> Async<unit>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>