FSharp.CloudAgent


Tutorial

Getting up and running

Creating a simple distributable Agent is as simple as creating a regular agent, wrapping it in a generator function and then binding that function to a service bus queue.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
open FSharp.CloudAgent
open FSharp.CloudAgent.Messaging
open FSharp.CloudAgent.Connections

// Standard Azure Service Bus connection string
let serviceBusConnection = ServiceBusConnection "servicebusconnectionstringgoeshere"

// A DTO
type Person = { Name : string; Age : int }

// A function which creates an Agent on demand.
let createASimpleAgent agentId =
    MailboxProcessor.Start(fun inbox ->
        async {
            while true do
                let! message = inbox.Receive()
                printfn "%s is %d years old." message.Name message.Age
        })

// Create a worker cloud connection to the Service Bus Queue "myMessageQueue"
let cloudConnection = WorkerCloudConnection(serviceBusConnection, Queue "myMessageQueue")

// Start listening! A local pool of agents will be created that will receive messages.
// Service bus messages will be automatically deserialised into the required message type.
ConnectionFactory.StartListening(cloudConnection, createASimpleAgent >> BasicCloudAgent)

Posting messages to agents

You can elect to natively send messages to the service bus (as long as the serialisation format matches i.e. JSON .NET). However, FSharp.CloudAgent includes some handy helper functions to do this for you.

1: 
2: 
3: 
4: 
5: 
6: 
let sendToMyMessageQueue = ConnectionFactory.SendToWorkerPool cloudConnection

// These messages will be processed in parallel across the worker pool.
sendToMyMessageQueue { Name = "Isaac"; Age = 34 }
sendToMyMessageQueue { Name = "Michael"; Age = 32 }
sendToMyMessageQueue { Name = "Sam"; Age = 27 }

Creating Resilient Agents

We can also create "Resilient" agents. These are standard F# Agents that take advantage of the Reply Channel functionality inherent in F# agents to allow us to signal back to Azure whether or not a message was processed correctly or not, which in turn gives us automatic retry and dead letter functionality that is included with Azure Service Bus.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
// A function which creates a Resilient Agent on demand.
let createAResilientAgent agentId =
    MailboxProcessor.Start(fun inbox ->
        async {
            while true do
                let! message, replyChannel = inbox.Receive()
                printfn "%s is %d years old." message.Name message.Age
                
                match message with
                | { Name = "Isaac" } -> replyChannel Completed // all good, message was processed
                | { Name = "Richard" } -> replyChannel Failed // error occurred, try again
                | _ -> replyChannel Abandoned // give up with this message.
        })

// Start listening! A local pool of agents will be created that will receive messages.
ConnectionFactory.StartListening(cloudConnection, createAResilientAgent >> ResilientCloudAgent)

Creating distributed Actors

In addition to the massively distributable messages seen above, we can also create agents which are threadsafe not only within the context of a process but also in the context of multiple processes on different machines. This is made possible by Azure Service Bus session support. To turn an agent into an actor, we simply change the connection and the way in which we post messages.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
// Create an actor cloud connection to the Service Bus Queue "myActorQueue". This queue must have
// Session support turned on.
let actorConnection = ActorCloudConnection(serviceBusConnection, Queue "myActorQueue")

// Start listening! Agents will be created as required for messages sent to new actors.
ConnectionFactory.StartListening(actorConnection, createASimpleAgent >> BasicCloudAgent)

// Send some messages to the actor pool
let sendToMyActorQueue = ConnectionFactory.SendToActorPool actorConnection

let sendToFred = sendToMyActorQueue (ActorKey "Fred")
let sendToMax = sendToMyActorQueue (ActorKey "Max")

// The first two messages will be sent in sequence to the same agent in the same process.
// The last message will be sent to a different agent, potentially in another process.
sendToFred { Name = "Isaac"; Age = 34 }
sendToFred { Name = "Michael"; Age = 32 }
sendToMax { Name = "Sam"; Age = 27 }
namespace FSharp
namespace FSharp.CloudAgent
namespace FSharp.CloudAgent.Messaging
namespace FSharp.CloudAgent.Connections
val serviceBusConnection : ServiceBusConnection

Full name: Tutorial.serviceBusConnection
Multiple items
union case ServiceBusConnection.ServiceBusConnection: string -> ServiceBusConnection

--------------------
type ServiceBusConnection = | ServiceBusConnection of string

Full name: FSharp.CloudAgent.Connections.ServiceBusConnection
type Person =
  {Name: string;
   Age: int;}

Full name: Tutorial.Person
Person.Name: string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = System.String

Full name: Microsoft.FSharp.Core.string
Person.Age: int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val createASimpleAgent : agentId:'a -> MailboxProcessor<Person>

Full name: Tutorial.createASimpleAgent
val agentId : 'a
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<Person>
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val message : Person
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
val cloudConnection : CloudConnection

Full name: Tutorial.cloudConnection
union case CloudConnection.WorkerCloudConnection: ServiceBusConnection * Queue -> CloudConnection
Multiple items
union case Queue.Queue: string -> Queue

--------------------
type Queue = | Queue of string

Full name: FSharp.CloudAgent.Connections.Queue
module ConnectionFactory

from FSharp.CloudAgent
val StartListening : cloudConnection:CloudConnection * createAgentFunc:(ActorKey -> CloudAgentKind<'a>) -> System.IDisposable

Full name: FSharp.CloudAgent.ConnectionFactory.StartListening
union case CloudAgentKind.BasicCloudAgent: MailboxProcessor<'a> -> CloudAgentKind<'a>
val sendToMyMessageQueue : (Person -> Async<unit>)

Full name: Tutorial.sendToMyMessageQueue
val SendToWorkerPool : connection:CloudConnection -> message:'a -> Async<unit>

Full name: FSharp.CloudAgent.ConnectionFactory.SendToWorkerPool
val createAResilientAgent : agentId:'a -> MailboxProcessor<Person * (MessageProcessedStatus -> unit)>

Full name: Tutorial.createAResilientAgent
val inbox : MailboxProcessor<Person * (MessageProcessedStatus -> unit)>
val replyChannel : (MessageProcessedStatus -> unit)
union case MessageProcessedStatus.Completed: MessageProcessedStatus
union case MessageProcessedStatus.Failed: MessageProcessedStatus
union case MessageProcessedStatus.Abandoned: MessageProcessedStatus
union case CloudAgentKind.ResilientCloudAgent: MailboxProcessor<'a * (MessageProcessedStatus -> unit)> -> CloudAgentKind<'a>
val actorConnection : CloudConnection

Full name: Tutorial.actorConnection
union case CloudConnection.ActorCloudConnection: ServiceBusConnection * Queue -> CloudConnection
val sendToMyActorQueue : (ActorKey -> Person -> Async<unit>)

Full name: Tutorial.sendToMyActorQueue
val SendToActorPool : connection:CloudConnection -> ActorKey -> message:'a -> Async<unit>

Full name: FSharp.CloudAgent.ConnectionFactory.SendToActorPool
val sendToFred : (Person -> Async<unit>)

Full name: Tutorial.sendToFred
Multiple items
union case ActorKey.ActorKey: string -> ActorKey

--------------------
type ActorKey = | ActorKey of string

Full name: FSharp.CloudAgent.ActorKey
val sendToMax : (Person -> Async<unit>)

Full name: Tutorial.sendToMax
Fork me on GitHub