FSharp.Control.AsyncSeq


Binder

Comparison with IObservable

Both IObservable<'T> and AsyncSeq<'T> represent collections of items and both provide similar operations for transformation and composition. The central difference between the two is that the former uses a synchronous push to a subscriber and the latter uses an asynchronous pull by a consumer. Consumers of an IObservable<'T> subscribe to receive notifications about new items or the end of the sequence. By contrast, consumers of an AsyncSeq<'T> asynchronously retrieve subsequent items on their own terms. Some domains are more naturally modeled with one or the other, however it is less clear which is a more suitable tool for a specific task. In many cases, a combination of the two provides the optimal solution and restricting yourself to one, while simplifying the programming model, can lead one to view all problems as a nail.

A more specific difference between the two is that IObservable<'T> subscribers have the basic type 'T -> unit and are therefore inherently synchronous and imperative. The observer can certainly make a blocking call, but this can defeat the purpose of the observable sequence all together. Alternatively, the observer can spawn an operation, but this can break composition because one can no longer rely on the observer returning to determine that it has completed. With the observable model however, we can model blocking operations through composition on sequences rather than observers.

To illustrate, let's try to implement the above Tweet retrieval, filtering and storage workflow using observable sequences. Suppose we already have an observable sequence representing tweets IObservable<Tweet> and we simply wish to filter it and store the resulting tweets. The function Observable.filter allows one to filter observable sequences based on a predicate, however in this case it doesn't quite cut it because the predicate passed to it must be synchronous 'T -> bool:

open System

let tweetsObs : IObservable<Tweet> =
  failwith "TODO: create observable"

let filteredTweetsObs =
  tweetsObs
  |> Observable.filter (filterTweet >> Async.RunSynchronously) // blocking IO-call!

To remedy the blocking IO-call we can better adapt the filtering function to the IObservable<'T> model. A value of type Async<'T> can be modeled as an IObservable<'T> with one element. Suppose that we have Tweet -> IObservable<bool>. We can define a few helper operators on observables to allow filtering using an asynchronous predicate as follows:

module Observable =
  
  /// a |> Async.StartAsTask |> (fun t -> t.ToObservable())
  let ofAsync (a:Async<'a>) : IObservable<'a> =
    failwith "TODO"

  /// Observable.SelectMany
  let bind (f:'a -> IObservable<'b>) (o:IObservable<'a>) : IObservable<'b> =
    failwith "TODO"

  /// Filter an observable sequence using a predicate producing a observable
  /// which emits a single boolean value.
  let filterObs (f:'a -> IObservable<bool>) : IObservable<'a> -> IObservable<'a> =
    bind <| fun a -> 
      f a
      |> Observable.choose (function
        | true -> Some a
        | false -> None
      )
  
  /// Filter an observable sequence using a predicate which returns an async
  /// computation producing a boolean value.
  let filterAsync (f:'a -> Async<bool>) : IObservable<'a> -> IObservable<'a> =
    filterObs (f >> ofAsync)

  /// Maps over an observable sequence using an async-returning function.
  let mapAsync (f:'a -> Async<'b>) : IObservable<'a> -> IObservable<'b> =
    bind (f >> ofAsync)

let filteredTweetsObs' : IObservable<Tweet> =
  filteredTweetsObs
  |> Observable.filterAsync filterTweet

With a little effort, we were able to adapt IObservable<'a> to our needs. Next let's try implementing the storage of filtered tweets. Again, we can adapt the function storeTweet defined above to the observable model and bind the observable of filtered tweets to it:

let storedTweetsObs : IObservable<unit> =
  filteredTweetsObs'
  |> Observable.mapAsync storeTweet

The observable sequence storedTweetsObs will produces a value each time a filtered tweet is stored. The entire workflow can be expressed as follows:

let storedTeetsObs' : IObservable<unit> =
  tweetsObs
  |> Observable.filterAsync filterTweet
  |> Observable.mapAsync storeTweet

Overall, both solutions are succinct and composable and deciding which one to use can ultimately be a matter of preference. Some things to consider are the "synchronous push" vs. "asynchronous pull" semantics. On the one hand, tweets are pushed based - the consumer has no control over their generation. On the other hand, the program at hand will process the tweets on its own terms regardless of how quickly they are being generated. Moreover, the underlying Twitter API will likely utilize a request-reply protocol to retrieve batches of tweets from persistent storage. As such, the distinction between "synchronous push" vs. "asynchronous pull" becomes less interesting. If the underlying source is truly push-based, then one can buffer its output and consume it using an asynchronous sequence. If the underlying source is pull-based, then one can turn it into an observable sequence by first pulling, then pushing. Note however that in a true real-time reactive system, notifications must be pushed immediately without delay.

Upon closer inspection, the consumption approaches between the two models aren't all too different. While AsyncSeq is based on an asynchronous-pull operation, it is usually consumed using an operator such as AsyncSeq.iterAsync as shown above. This is a function of type ('T -> Async<unit>) -> AsyncSeq<'T> -> Async<unit> where the first argument is a function 'T -> Async<unit> which performs some work on an item of the sequence and is applied repeatedly to subsequent items. In a sense, iterAsync pushes values to this function. The primary difference from observers of observable sequences is the return type Async<unit> rather than simply unit.

Related Articles

namespace System
val tweetsObs : IObservable<obj>
type IObservable<'T> = member Subscribe : observer: IObserver<'T> -> IDisposable
<summary>Defines a provider for push-based notification.</summary>
<typeparam name="T">The object that provides notification information.</typeparam>
val failwith : message:string -> 'T
<summary>Throw a <see cref="T:System.Exception" /> exception.</summary>
<param name="message">The exception message.</param>
<returns>The result value.</returns>
val filteredTweetsObs : IObservable<obj>
module Observable from Microsoft.FSharp.Control
<summary>Contains operations for working with first class event and other observable objects.</summary>
<category index="3">Events and Observables</category>
val filter : predicate:('T -> bool) -> source:IObservable<'T> -> IObservable<'T>
<summary>Returns an observable which filters the observations of the source by the given function. The observable will see only those observations for which the predicate returns true. The predicate is executed once for each subscribed observer. The returned object also propagates error observations arising from the source and completes when the source completes.</summary>
<param name="predicate">The function to apply to observations to determine if it should be kept.</param>
<param name="source">The input Observable.</param>
<returns>An Observable that filters observations based on <c>filter</c>.</returns>
Multiple items
type Async = static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit) static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate) static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool> static member AwaitTask : task:Task<'T> -> Async<'T> + 1 overload static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool> static member CancelDefaultToken : unit -> unit static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>> static member Choice : computations:seq<Async<'T option>> -> Async<'T option> static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T> + 3 overloads static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T> ...
<summary>Holds static members for creating and manipulating asynchronous computations.</summary>
<remarks> See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>. </remarks>
<category index="1">Async Programming</category>


--------------------
type Async<'T> =
<summary> An asynchronous computation, which, when run, will eventually produce a value of type T, or else raises an exception. </summary>
<remarks> This type has no members. Asynchronous computations are normally specified either by using an async expression or the static methods in the <see cref="T:Microsoft.FSharp.Control.Async" /> type. See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">F# Language Guide - Async Workflows</a>. </remarks>
<namespacedoc><summary> Library functionality for asynchronous programming, events and agents. See also <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/asynchronous-workflows">Asynchronous Programming</a>, <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/members/events">Events</a> and <a href="https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/lazy-expressions">Lazy Expressions</a> in the F# Language Guide. </summary></namespacedoc>
<category index="1">Async Programming</category>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
val ofAsync : a:Async<'a> -> IObservable<'a>
 a |> Async.StartAsTask |> (fun t -> t.ToObservable())
val a : Async<'a>
val bind : f:('a -> IObservable<'b>) -> o:IObservable<'a> -> IObservable<'b>
 Observable.SelectMany
val f : ('a -> IObservable<'b>)
val o : IObservable<'a>
val filterObs : f:('a -> IObservable<bool>) -> (IObservable<'a> -> IObservable<'a>)
 Filter an observable sequence using a predicate producing a observable
 which emits a single boolean value.
val f : ('a -> IObservable<bool>)
[<Struct>] type bool = Boolean
<summary>An abbreviation for the CLI type <see cref="T:System.Boolean" />.</summary>
<category>Basic Types</category>
val a : 'a
val choose : chooser:('T -> 'U option) -> source:IObservable<'T> -> IObservable<'U>
<summary>Returns an observable which chooses a projection of observations from the source using the given function. The returned object will trigger observations <c>x</c> for which the splitter returns <c>Some x</c>. The returned object also propagates all errors arising from the source and completes when the source completes.</summary>
<param name="chooser">The function that returns Some for observations to be propagated and None for observations to ignore.</param>
<param name="source">The input Observable.</param>
<returns>An Observable that only propagates some of the observations from the source.</returns>
union case Option.Some: Value: 'T -> Option<'T>
<summary>The representation of "Value of type 'T"</summary>
<param name="Value">The input value.</param>
<returns>An option representing the value.</returns>
union case Option.None: Option<'T>
<summary>The representation of "No value"</summary>
val filterAsync : f:('a -> Async<bool>) -> (IObservable<'a> -> IObservable<'a>)
 Filter an observable sequence using a predicate which returns an async
 computation producing a boolean value.
val f : ('a -> Async<bool>)
val mapAsync : f:('a -> Async<'b>) -> (IObservable<'a> -> IObservable<'b>)
 Maps over an observable sequence using an async-returning function.
val f : ('a -> Async<'b>)
val filteredTweetsObs' : IObservable<obj>
Multiple items
module Observable from ComparisonWithObservable

--------------------
module Observable from Microsoft.FSharp.Control
<summary>Contains operations for working with first class event and other observable objects.</summary>
<category index="3">Events and Observables</category>
val storedTweetsObs : IObservable<unit>
type unit = Unit
<summary>The type 'unit', which has only one value "()". This value is special and always uses the representation 'null'.</summary>
<category index="1">Basic Types</category>
val storedTeetsObs' : IObservable<unit>