FSharpx.Extras


  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
113: 
114: 
115: 
116: 
117: 
118: 
119: 
120: 
121: 
122: 
123: 
124: 
125: 
126: 
127: 
128: 
129: 
130: 
131: 
132: 
133: 
134: 
135: 
136: 
137: 
138: 
139: 
140: 
141: 
142: 
143: 
144: 
145: 
146: 
147: 
148: 
149: 
150: 
151: 
152: 
153: 
154: 
155: 
156: 
157: 
158: 
159: 
160: 
161: 
162: 
163: 
164: 
165: 
166: 
167: 
168: 
169: 
170: 
171: 
172: 
173: 
174: 
175: 
176: 
177: 
178: 
179: 
180: 
181: 
182: 
183: 
184: 
185: 
186: 
187: 
188: 
189: 
190: 
191: 
192: 
193: 
194: 
195: 
196: 
197: 
198: 
199: 
200: 
201: 
202: 
203: 
204: 
205: 
206: 
207: 
208: 
209: 
210: 
211: 
212: 
213: 
214: 
215: 
216: 
217: 
218: 
219: 
220: 
221: 
222: 
223: 
224: 
225: 
226: 
227: 
228: 
229: 
230: 
231: 
232: 
233: 
234: 
235: 
236: 
237: 
238: 
239: 
240: 
241: 
242: 
243: 
244: 
245: 
246: 
247: 
248: 
249: 
250: 
251: 
252: 
253: 
254: 
255: 
256: 
257: 
258: 
259: 
260: 
261: 
262: 
263: 
264: 
265: 
266: 
267: 
// -------------------------------------------------------------------------------
// F# async extensions (CircularBuffer.fsx)
// (c) Tomas Petricek & Ryan Riley, 2011-2012, Available under Apache 2.0 license.
// -------------------------------------------------------------------------------

// This example demonstrates how to use `CircularBuffer`, `CircularQueueAgent`, and `CircularStream`.
// The agent implements producer/consumer concurrent pattern.

#r "System.dll"
#I @"../../bin/v4.0"
#r @"FSharpx.Extras.dll"
#r @"FSharpx.Collections.dll"

open System
open System.Diagnostics
open FSharpx.Control
open FSharpx.IO
open FSharpx.Collections.Mutable

let queue = CircularBuffer(5)

let stopwatch = Stopwatch.StartNew()

// Printing from a queue 1..5
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5)
assert ([|1;2;3;4;5|] = queue.Dequeue(5))

// Printing from a queue 1..8, twice
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5) // <---
queue.Enqueue(6)
queue.Enqueue(7)
queue.Enqueue(8)
queue.Enqueue(1)
queue.Enqueue(2) // <---
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5)
queue.Enqueue(6)
queue.Enqueue(7) // <---
queue.Enqueue(8)
assert ([|4;5;6;7;8|] = queue.Dequeue(5))

// Printing from a queue 1..5
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5)
assert ([|1;2;3|] = queue.Dequeue(3))

// Clear out the rest
queue.Dequeue(2)

// Printing from a queue 1..3
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
assert ([|1;2;3|] = queue.Dequeue(3))

// Printing from a queue 1..8 and dequeue 5, then enqueue 1..3 and dequeue 3
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
queue.Enqueue(4)
queue.Enqueue(5) // <---
queue.Enqueue(6)
queue.Enqueue(7)
queue.Enqueue(8)
assert ([|4;5;6;7;8|] = queue.Dequeue(5))
queue.Enqueue(1)
queue.Enqueue(2)
queue.Enqueue(3)
assert ([|1;2;3|] = queue.Dequeue(3))

printfn "Enqueue(value) tests passed in %d ms" stopwatch.ElapsedMilliseconds

stopwatch.Reset()
stopwatch.Start()

// Printing from a queue 1..5
queue.Enqueue([|1;2;3;4;5|])
assert ([|1;2;3;4;5|] = queue.Dequeue(5))

// Printing from a queue 1..8, twice
let error = ref Unchecked.defaultof<Exception>
try
  try
    queue.Enqueue([|1;2;3;4;5;6;7;8;1;2;3;4;5;6;7;8|])
  with e -> error := e
finally
  assert (!error <> null)

queue.Enqueue([|1;2;3;4;5|])
queue.Enqueue([|6;7;8|])
queue.Enqueue([|1;2;3;4;5|])
queue.Enqueue([|6;7;8|])
assert ([|4;5;6;7;8|] = queue.Dequeue(5))

// Printing from a queue 1..5
queue.Enqueue([|1;2;3;4;5|])
assert ([|1;2;3|] = queue.Dequeue(3))

// Clear out the rest
queue.Dequeue(2)

// Printing from a queue 1..3
queue.Enqueue([|1;2;3|])
assert ([|1;2;3|] = queue.Dequeue(3))

// Printing from a queue 1..8 and dequeue 5, then enqueue 1..3 and dequeue 3
queue.Enqueue([|1;2;3;4;5|])
queue.Enqueue([|6;7;8|])
assert ([|4;5;6;7;8|] = queue.Dequeue(5))
queue.Enqueue([|1;2;3|])
assert ([|1;2;3|] = queue.Dequeue(3))

printfn "Enqueue(array) tests passed in %d ms" stopwatch.ElapsedMilliseconds

stopwatch.Reset()
stopwatch.Start()

// Consider a large array with various, incoming array segments.
let source =
    [| 1;2;3;4;5
       1;2;3;4;5;6;7;8;1;2;3;4;5;6;7;8
       1;2;3;4;5
       1;2;3
       1;2;3;4;5;6;7;8
       1;2;3 |]

let incoming =
    let generator =
        seq { yield ArraySegment<_>(source,0,5)
              yield ArraySegment<_>(source,5,5)
              yield ArraySegment<_>(source,10,3)
              yield ArraySegment<_>(source,13,5)
              yield ArraySegment<_>(source,18,3)
              yield ArraySegment<_>(source,21,5)
              yield ArraySegment<_>(source,26,3)
              yield ArraySegment<_>(source,29,5)
              yield ArraySegment<_>(source,34,3)
              yield ArraySegment<_>(source,37,3) } 
    in generator.GetEnumerator()

let enqueueNext() =
    incoming.MoveNext() |> ignore
    queue.Enqueue(incoming.Current)

// Printing from a queue 1..5
enqueueNext()
assert ([|1;2;3;4;5|] = queue.Dequeue(5))

// Printing from a queue 1..8, twice
enqueueNext()
enqueueNext()
enqueueNext()
enqueueNext()
assert ([|4;5;6;7;8|] = queue.Dequeue(5))

// Printing from a queue 1..5
enqueueNext()
assert ([|1;2;3|] = queue.Dequeue(3))

// Clear out the rest
queue.Dequeue(2)

// Printing from a queue 1..3
enqueueNext()
assert ([|1;2;3|] = queue.Dequeue(3))

// Printing from a queue 1..8 and dequeue 5, then enqueue 1..3 and dequeue 3
enqueueNext()
enqueueNext()
assert ([|4;5;6;7;8|] = queue.Dequeue(5))
enqueueNext()
assert ([|1;2;3|] = queue.Dequeue(3))

printfn "Enqueue(array) tests passed in %d ms" stopwatch.ElapsedMilliseconds

let data = [|1;2;3;4;5|]
queue.Enqueue(data)
assert ((data |> Array.toList) = (queue |> Seq.toList))    

printfn "Seq.toList matches enqueued data."


stopwatch.Reset()
stopwatch.Start()

let buffer = new CircularQueueAgent<int>(3)

// The sample uses two workflows that add/take elements
// from the buffer with the following timeouts. When the producer
// timout is larger, consumer will be blocked. Otherwise, producer
// will be blocked.
let producerTimeout = 500
let consumerTimeout = 1000

async { 
  for i in 0 .. 10 do 
    // Sleep for some time and then add value
    do! Async.Sleep(producerTimeout)
    buffer.Enqueue([|i|])
    printfn "Added %d" i }
|> Async.Start

async { 
  while true do
    // Sleep for some time and then get value
    do! Async.Sleep(consumerTimeout)
    let! v = buffer.AsyncDequeue(1)
    printfn "Got %d" v.[0] }
|> Async.Start

printfn "CircularQueueAgent.Enqueue(array) tests passed in %d ms" stopwatch.ElapsedMilliseconds

stopwatch.Reset()
stopwatch.Start()

async { 
  for i in 0 .. 10 do 
    // Sleep for some time and then add value
    do! Async.Sleep(producerTimeout)
    do! buffer.AsyncEnqueue([|i|])
    printfn "Added %d" i }
|> Async.Start

async { 
  while true do
    // Sleep for some time and then get value
    do! Async.Sleep(consumerTimeout)
    let! v = buffer.AsyncDequeue(1)
    printfn "Got %d" v.[0] }
|> Async.Start

printfn "CircularQueueAgent.AsyncEnqueue(array) tests passed in %d ms" stopwatch.ElapsedMilliseconds

stopwatch.Reset()
stopwatch.Start()

let stream = new CircularStream(3)

async { 
  for i in 0uy .. 10uy do 
    // Sleep for some time and then add value
    do! Async.Sleep(producerTimeout)
    do! stream.AsyncWrite([|i|], 0, 1)
    printfn "Wrote %d" i }
|> Async.Start

async { 
  let buffer = Array.zeroCreate<byte> 1
  while true do
    // Sleep for some time and then get value
    do! Async.Sleep(consumerTimeout)
    let! v = stream.AsyncRead(buffer, 0, 1)
    printfn "Read %d bytes with value %A" v buffer.[0] }
|> Async.Start
printfn "CircularStream.AsyncWrite(array) tests passed in %d ms" stopwatch.ElapsedMilliseconds
namespace System
namespace System.Diagnostics
namespace Microsoft.FSharp.Control
namespace System.IO
Multiple items
namespace System.Collections

--------------------
namespace Microsoft.FSharp.Collections
val queue : seq<int>

Full name: CircularBuffer.queue
val stopwatch : Stopwatch

Full name: CircularBuffer.stopwatch
Multiple items
type Stopwatch =
  new : unit -> Stopwatch
  member Elapsed : TimeSpan
  member ElapsedMilliseconds : int64
  member ElapsedTicks : int64
  member IsRunning : bool
  member Reset : unit -> unit
  member Restart : unit -> unit
  member Start : unit -> unit
  member Stop : unit -> unit
  static val Frequency : int64
  ...

Full name: System.Diagnostics.Stopwatch

--------------------
Stopwatch() : unit
Stopwatch.StartNew() : Stopwatch
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
property Stopwatch.ElapsedMilliseconds: int64
Stopwatch.Reset() : unit
Stopwatch.Start() : unit
val error : Exception ref

Full name: CircularBuffer.error
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
module Unchecked

from Microsoft.FSharp.Core.Operators
val defaultof<'T> : 'T

Full name: Microsoft.FSharp.Core.Operators.Unchecked.defaultof
Multiple items
type Exception =
  new : unit -> Exception + 2 overloads
  member Data : IDictionary
  member GetBaseException : unit -> Exception
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  member GetType : unit -> Type
  member HelpLink : string with get, set
  member InnerException : Exception
  member Message : string
  member Source : string with get, set
  member StackTrace : string
  ...

Full name: System.Exception

--------------------
Exception() : unit
Exception(message: string) : unit
Exception(message: string, innerException: exn) : unit
val e : exn
val source : int []

Full name: CircularBuffer.source
val incoming : Collections.Generic.IEnumerator<ArraySegment<int>>

Full name: CircularBuffer.incoming
val generator : seq<ArraySegment<int>>
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
Multiple items
type ArraySegment<'T> =
  struct
    new : array:'T[] -> ArraySegment<'T> + 1 overload
    member Array : 'T[]
    member Count : int
    member Equals : obj:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member Offset : int
  end

Full name: System.ArraySegment<_>

--------------------
ArraySegment()
ArraySegment(array: 'T []) : unit
ArraySegment(array: 'T [], offset: int, count: int) : unit
Collections.Generic.IEnumerable.GetEnumerator() : Collections.Generic.IEnumerator<ArraySegment<int>>
val enqueueNext : unit -> 'a

Full name: CircularBuffer.enqueueNext
Collections.IEnumerator.MoveNext() : bool
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
property Collections.Generic.IEnumerator.Current: ArraySegment<int>
val data : int []

Full name: CircularBuffer.data
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val toList : array:'T [] -> 'T list

Full name: Microsoft.FSharp.Collections.Array.toList
module Seq

from Microsoft.FSharp.Collections
val toList : source:seq<'T> -> 'T list

Full name: Microsoft.FSharp.Collections.Seq.toList
val buffer : obj

Full name: CircularBuffer.buffer
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
val producerTimeout : int

Full name: CircularBuffer.producerTimeout
val consumerTimeout : int

Full name: CircularBuffer.consumerTimeout
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val i : int
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
static member Async.Start : computation:Async<unit> * ?cancellationToken:Threading.CancellationToken -> unit
val v : obj
val stream : obj

Full name: CircularBuffer.stream
val i : byte
val buffer : byte []
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = Byte

Full name: Microsoft.FSharp.Core.byte
val v : int
Fork me on GitHub