FSharp.Control.Reactive


Getting Started

The easiest way to get started using FSharp.Control.Reactive is to take a look at the tests. In general, you can consider the Observable module as providing a set of extensions to the built-in Observable module.

Observable Module

As a first, simple example, the built-in module does not include a zip operator, but FSharp.Control.Reactive fills in this gap:

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
open FSharp.Control.Reactive

let obs1 = Observable.single 1
let obs2 = Observable.single "A"

Observable.zip obs1 obs2
|> Observable.subscribe (printfn "%A")
|> ignore

Computation Expressions

The provided computation expressions open new approaches for constructing and combining Observable computations. The observe computation expression provides a simple workflow using the essential LINQ operators:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
open FSharp.Control.Reactive.Builders

let rec generate x =
    observe {
        yield x
        if x < 100000 then
            yield! generate (x + 1) }
generate 5
|> Observable.subscribeWithCallbacks ignore ignore ignore
|> ignore

You can find a few additional examples of both the observe and rxquery computation expressions on the source blog post.

Reactive Extensions

For more information on Rx, check out the Rx Workshop on Channel 9. The examples provided in the workshop should be easy to port to FSharp.Control.Reactive. The Beginner's Guide to the Reactive Extensions includes many other links to excellent content.

Reactive Testing

The Rx.NET has several OOP-minded objects to test Reactive Applications. This package contains some wrappers to make your Reactive Tests more functional.

Following test shows how the Broadcast Subject can be tested:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
open NUnit.Framework
open FsCheck
open FSharp.Control.Reactive.Testing

[<Test>]
let ``Broadcast Subject broadcast to all observers`` () =
    Check.QuickThrowOnFailure <| fun (xs : int list) ->
        TestSchedule.usage <| fun sch ->
            use s = Subject.broadcast
            let observer = TestSchedule.subscribeTestObserver sch s

            Subject.onNexts xs s 
            |> Subject.onCompleted 
            |> ignore
            
            TestObserver.nexts observer = xs
  • The TestSchedule.usage call wil make sure we have a virtual time scheduler that we can use to sync our different observables and observers.
  • The subscribeTestObserver call will subscribe the given observable to a TestObserver.
  • This TestObserver can be used to assert on the emits that the SUT sends to it by using the TesteObserver module. (TestObserver.nexts, TestObserver.all, TestObserver.errors)

The testing package also contains generated TestNotifications which we can use to simulate a stream of emits. This values has to be registered first before using.

Following example shows how the Functor Law can be testsd by creating a Hot Observable of simulated emits.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
[<SetUp>]
member __.Setup () =
    Arb.register<GenTestNotification> () |> ignore

[<Test>]
member __.``Functor Law of Observables`` () =
    Check.QuickThrowOnFailure <| 
        fun (TestNotifications ms : TestNotifications<int>) (f : int -> int) (g : int -> int) ->
            TestSchedule.usage <| fun sch ->
                TestSchedule.hotObservable sch ms
                |> Observable.retry
                |> Observable.map f
                |> Observable.map g
                |> TestSchedule.subscribeTestObserverStart sch
                |> TestObserver.nexts = TestNotification.mapNexts (f >> g) ms

NOTE: the previous test contains a Observable.retry call because the simulated emits will possible contain 'OnError' emits as well. See: https://github.com/fsprojects/FSharp.Control.Reactive/blob/master/src/FSharp.Control.Reactive.Testing/TestNotifications.fs for more info on how the simulated emits are generated.

Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
namespace FSharp.Control.Reactive
val obs1 : System.IObservable<int>
Multiple items
module Observable

from FSharp.Control.Reactive

--------------------
module Observable

from Microsoft.FSharp.Control
val single : value:'Result -> System.IObservable<'Result>
val obs2 : System.IObservable<string>
val zip : first:System.IObservable<'Source1> -> second:System.IObservable<'Source2> -> System.IObservable<'Source1 * 'Source2>
Multiple items
val subscribe : onNext:('T -> unit) -> observable:System.IObservable<'T> -> System.IDisposable

--------------------
val subscribe : callback:('T -> unit) -> source:System.IObservable<'T> -> System.IDisposable
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
val ignore : value:'T -> unit
module Builders

from FSharp.Control.Reactive
val generate : x:int -> System.IObservable<int>
val x : int
val observe : ObservableBuilder
val subscribeWithCallbacks : onNext:('T -> unit) -> onError:(exn -> unit) -> onCompleted:(unit -> unit) -> observable:System.IObservable<'T> -> System.IDisposable
namespace NUnit
namespace NUnit.Framework
namespace FsCheck
namespace FSharp.Control.Reactive.Testing
Multiple items
type TestAttribute =
  inherit NUnitAttribute
  new : unit -> TestAttribute
  member ApplyToTest : test:Test -> unit
  member Author : string with get, set
  member BuildFrom : method:IMethodInfo * suite:Test -> TestMethod
  member Description : string with get, set
  member ExpectedResult : obj with get, set
  member TestOf : Type with get, set

--------------------
TestAttribute() : TestAttribute
val ( Broadcast Subject broadcast to all observers ) : unit -> unit
type Check =
  static member All : config:Config -> unit
  static member All : config:Config * test:Type -> unit
  static member Method : config:Config * methodInfo:MethodInfo * ?target:obj -> unit
  static member One : config:Config * property:'Testable -> unit
  static member One : name:string * config:Config * property:'Testable -> unit
  static member Quick : property:'Testable -> unit
  static member Quick : name:string * property:'Testable -> unit
  static member QuickAll : unit -> unit
  static member QuickAll : test:Type -> unit
  static member QuickThrowOnFailure : property:'Testable -> unit
  ...
static member Check.QuickThrowOnFailure : property:'Testable -> unit
val xs : int list
Multiple items
val int : value:'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
type 'T list = List<'T>
module TestSchedule

from FSharp.Control.Reactive.Testing
val usage : f:(Reactive.Testing.TestScheduler -> 'a) -> 'a
val sch : Reactive.Testing.TestScheduler
val s : System.Reactive.Subjects.Subject<int>
Multiple items
module Subject

from FSharp.Control.Reactive

--------------------
type Subject<'a> =
  private new : unit -> Subject<'a>
  static member behavior : x:'a -> BehaviorSubject<'a>
  static member async : AsyncSubject<'a>
  static member broadcast : Subject<'a>
  static member replay : ReplaySubject<'a>
property Subject.broadcast: System.Reactive.Subjects.Subject<'a> with get
val observer : Reactive.Testing.ITestableObserver<int>
val subscribeTestObserver : sch:Reactive.Testing.TestScheduler -> source:System.IObservable<'a> -> Reactive.Testing.ITestableObserver<'a>
val onNexts : xs:seq<'a> -> s:'b -> 'b (requires 'b :> System.Reactive.Subjects.SubjectBase<'a>)
val onCompleted : s:System.Reactive.Subjects.SubjectBase<'a> -> System.Reactive.Subjects.SubjectBase<'a>
module TestObserver

from FSharp.Control.Reactive.Testing
val nexts : o:Reactive.Testing.ITestableObserver<'a> -> 'a list
Multiple items
type SetUpAttribute =
  inherit NUnitAttribute
  new : unit -> SetUpAttribute

--------------------
SetUpAttribute() : SetUpAttribute
module Arb

from FsCheck
val register<'t> : unit -> TypeClass.TypeClassComparison
type GenTestNotification =
  static member GenNotifications : unit -> Arbitrary<TestNotifications<'a>>
Multiple items
union case TestNotifications.TestNotifications: TestNotification<'a> list -> TestNotifications<'a>

--------------------
type TestNotifications<'a> = | TestNotifications of TestNotification<'a> list
val hotObservable : s:Reactive.Testing.TestScheduler -> TestNotifications<'a> -> Reactive.Testing.ITestableObservable<'a>
val retry : source:System.IObservable<'Source> -> System.IObservable<'Source>
Multiple items
val map : f:('a -> 'b) -> source:System.IObservable<'a> -> System.IObservable<'b>

--------------------
val map : mapping:('T -> 'U) -> source:System.IObservable<'T> -> System.IObservable<'U>
val subscribeTestObserverStart : sch:Reactive.Testing.TestScheduler -> source:System.IObservable<'a> -> Reactive.Testing.ITestableObserver<'a>
Multiple items
module TestNotification

from FSharp.Control.Reactive.Testing

--------------------
type TestNotification<'a> = Reactive.Testing.Recorded<System.Reactive.Notification<'a>>
val mapNexts : f:('a -> 'b) -> (TestNotifications<'a> -> 'b list)
Fork me on GitHub