Header menu logo FSharp.Control.TaskSeq

Advanced Task Sequence Operations

This page covers advanced TaskSeq<'T> operations: grouping, stateful transformation with mapFold and threadState, deduplication, set-difference, partitioning, counting by key, lexicographic comparison, cancellation, and positional editing.

open System.Threading
open System.Threading.Tasks
open FSharp.Control

groupBy and groupByAsync

TaskSeq.groupBy partitions a sequence into groups by a key-projection function. The result is an array of (key, elements[]) pairs, one per distinct key, in order of first occurrence.

Note: groupBy consumes the entire source before returning. Do not use it on potentially infinite sequences.

type Event = { EntityId: int; Payload: string }

let events : TaskSeq<Event> =
    TaskSeq.ofList
        [ { EntityId = 1; Payload = "A" }
          { EntityId = 2; Payload = "B" }
          { EntityId = 1; Payload = "C" }
          { EntityId = 3; Payload = "D" }
          { EntityId = 2; Payload = "E" } ]

// groups: (1, [A;C]), (2, [B;E]), (3, [D])
let grouped : Task<(int * Event[])[]> =
    events |> TaskSeq.groupBy (fun e -> e.EntityId)

TaskSeq.groupByAsync accepts an async key projection:

let groupedAsync : Task<(int * Event[])[]> =
    events |> TaskSeq.groupByAsync (fun e -> task { return e.EntityId })

countBy and countByAsync

TaskSeq.countBy counts how many elements map to each key, returning (key, count)[]:

let counts : Task<(int * int)[]> =
    events |> TaskSeq.countBy (fun e -> e.EntityId)
// (1,2), (2,2), (3,1)

mapFold and mapFoldAsync

TaskSeq.mapFold threads a state accumulator through a sequence while simultaneously mapping each element to a result value. The output is a task returning a pair of (result[], finalState):

// Number each word sequentially while building a running concatenation
let words : TaskSeq<string> =
    TaskSeq.ofList [ "hello"; "world"; "foo" ]

let numbered : Task<string[] * int> =
    words
    |> TaskSeq.mapFold (fun count w -> $"{count}: {w}", count + 1) 0

// result: ([| "0: hello"; "1: world"; "2: foo" |], 3)

TaskSeq.mapFoldAsync is the same but the mapping function returns Task<'Result * 'State>.


threadState and threadStateAsync

TaskSeq.threadState is the lazy, streaming counterpart to mapFold. It threads a state accumulator through the sequence while yielding each mapped result — but unlike mapFold it never materialises the results into an array, and it discards the final state. This makes it suitable for infinite sequences and pipelines where intermediate results should be streamed rather than buffered:

let numbers : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 })

// Produce a running total without collecting the whole sequence first
let runningSum : TaskSeq<int> =
    numbers
    |> TaskSeq.threadState (fun acc x -> acc + x, acc + x) 0

// yields lazily: 1, 3, 6, 10, 15

Compare with scan, which also emits a running result but prepends the initial state:

let viaScan = numbers |> TaskSeq.scan (fun acc x -> acc + x) 0
// yields: 0, 1, 3, 6, 10, 15  (one extra initial element)

let viaThreadState = numbers |> TaskSeq.threadState (fun acc x -> acc + x, acc + x) 0
// yields: 1, 3, 6, 10, 15  (no initial element; result == new state here)

TaskSeq.threadStateAsync accepts an asynchronous folder:

let asyncRunningSum : TaskSeq<int> =
    numbers
    |> TaskSeq.threadStateAsync (fun acc x -> Task.fromResult (acc + x, acc + x)) 0

TaskSeq.scan is the streaming sibling of fold: it emits each intermediate state as a new element, starting with the initial state:

let runningTotals : TaskSeq<int> =
    numbers |> TaskSeq.scan (fun acc n -> acc + n) 0

// yields: 0, 1, 3, 6, 10, 15

distinct and distinctBy

TaskSeq.distinct removes duplicates (keeps first occurrence), using generic equality:

let withDups : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 2; 3; 1; 4 ]

let deduped : TaskSeq<int> = withDups |> TaskSeq.distinct // 1, 2, 3, 4

TaskSeq.distinctBy deduplicates by a key projection:

let strings : TaskSeq<string> =
    TaskSeq.ofList [ "hello"; "HELLO"; "world"; "WORLD" ]

let caseInsensitiveDistinct : TaskSeq<string> =
    strings |> TaskSeq.distinctBy (fun s -> s.ToLowerInvariant())
// "hello", "world"

Note: both distinct and distinctBy buffer all unique keys in a hash set. Do not use them on potentially infinite sequences.

TaskSeq.distinctByAsync accepts an async key projection.


distinctUntilChanged

TaskSeq.distinctUntilChanged removes consecutive duplicates only — it does not buffer the whole sequence, so it is safe on infinite streams:

let run : TaskSeq<int> = TaskSeq.ofList [ 1; 1; 2; 2; 2; 3; 1; 1 ]

let noConsecDups : TaskSeq<int> = run |> TaskSeq.distinctUntilChanged
// 1, 2, 3, 1

except and exceptOfSeq

TaskSeq.except itemsToExclude source returns elements of source that do not appear in itemsToExclude. The exclusion set is materialised eagerly before iteration:

let exclusions : TaskSeq<int> = TaskSeq.ofList [ 2; 4 ]
let source : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 })

let filtered : TaskSeq<int> = TaskSeq.except exclusions source // 1, 3, 5

TaskSeq.exceptOfSeq accepts a plain seq<'T> as the exclusion set.


partition and partitionAsync

TaskSeq.partition splits the sequence into two arrays in a single pass. Elements for which the predicate returns true go into the first array; the rest into the second:

let partitioned : Task<int[] * int[]> =
    source |> TaskSeq.partition (fun n -> n % 2 = 0)
// trueItems: [|2;4|]   falseItems: [|1;3;5|]

TaskSeq.partitionAsync accepts an async predicate.


compareWith and compareWithAsync

TaskSeq.compareWith performs a lexicographic comparison of two sequences using a custom comparer. It returns the first non-zero comparison result, or 0 if the sequences are element-wise equal and have the same length:

let a : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 3 ]
let b : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4 ]

let cmp : Task<int> =
    TaskSeq.compareWith (fun x y -> compare x y) a b
// negative (a < b)

TaskSeq.compareWithAsync accepts an async comparer.


withCancellation

TaskSeq.withCancellation token source injects a CancellationToken into the underlying IAsyncEnumerable<'T>. This is equivalent to calling .WithCancellation(token) in C# and is useful when consuming sequences from libraries (e.g. Entity Framework Core) that require a token at the enumeration site:

let cts = new CancellationTokenSource()

let cancellable : TaskSeq<int> =
    source |> TaskSeq.withCancellation cts.Token

Positional editing

TaskSeq.insertAt, TaskSeq.insertManyAt, TaskSeq.removeAt, TaskSeq.removeManyAt, and TaskSeq.updateAt produce new sequences with an element inserted, removed, or replaced at a given zero-based index:

let original : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4; 5 ]

let inserted : TaskSeq<int> = original |> TaskSeq.insertAt 2 3 // 1,2,3,4,5

let removed : TaskSeq<int> = original |> TaskSeq.removeAt 1 // 1,4,5

let updated : TaskSeq<int> = original |> TaskSeq.updateAt 0 99 // 99,2,4,5

let manyInserted : TaskSeq<int> =
    original
    |> TaskSeq.insertManyAt 2 (TaskSeq.ofList [ 10; 11 ])
// 1, 2, 10, 11, 4, 5

let manyRemoved : TaskSeq<int> = original |> TaskSeq.removeManyAt 1 2 // 1, 5
namespace System
namespace System.Threading
namespace System.Threading.Tasks
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
Multiple items
module Event from Microsoft.FSharp.Control

--------------------
type Event = { EntityId: int Payload: string }

--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T> with get

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: objnull * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args> with get

--------------------
new: unit -> Event<'T>

--------------------
new: unit -> Event<'Delegate,'Args>
Multiple items
val int: value: 'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
Multiple items
val string: value: 'T -> string

--------------------
type string = System.String
val events: TaskSeq<Event>
Multiple items
module TaskSeq from FSharp.Control.TaskSeqExtensions

--------------------
type TaskSeq = static member append: source1: TaskSeq<'T> -> source2: TaskSeq<'T> -> TaskSeq<'T> static member appendSeq: source1: TaskSeq<'T> -> source2: 'T seq -> TaskSeq<'T> static member box: source: TaskSeq<'T> -> TaskSeq<obj> static member cast: source: TaskSeq<obj> -> TaskSeq<'U> static member choose: chooser: ('T -> 'U option) -> source: TaskSeq<'T> -> TaskSeq<'U> static member chooseAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> TaskSeq<'U> static member chunkBy: projection: ('T -> 'Key) -> source: TaskSeq<'T> -> TaskSeq<'Key * 'T array> (requires equality) static member chunkByAsync: projection: ('T -> #Task<'Key>) -> source: TaskSeq<'T> -> TaskSeq<'Key * 'T array> (requires equality) static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T array> static member collect: binder: ('T -> #TaskSeq<'U>) -> source: TaskSeq<'T> -> TaskSeq<'U> ...

--------------------
type TaskSeq<'T> = System.Collections.Generic.IAsyncEnumerable<'T>
<summary> Represents a task sequence and is the output of using the <paramref name="taskSeq{...}" /> computation expression from this library. It is an alias for <see cref="T:System.IAsyncEnumerable&lt;_&gt;" />. </summary>

--------------------
type TaskSeq<'Machine,'T (requires 'Machine :> IAsyncStateMachine and 'Machine :> IResumableStateMachine<TaskSeqStateMachineData<'T>>)> = inherit TaskSeqBase<'T> interface IValueTaskSource interface IValueTaskSource<bool> interface IAsyncStateMachine interface IAsyncEnumerable<'T> interface IAsyncEnumerator<'T> new: unit -> TaskSeq<'Machine,'T> member InitMachineData: ct: CancellationToken * machine: byref<'Machine> -> unit override MoveNextAsyncResult: unit -> ValueTask<bool>
<summary> Main implementation of generic <see cref="T:System.IAsyncEnumerable&lt;'T&gt;" /> and related interfaces, which forms the meat of the logic behind <see cref="taskSeq" /> computation expresssions. For use by this library only, should not be used directly in user code. Its operation depends highly on resumable state. </summary>

--------------------
new: unit -> TaskSeq<'Machine,'T>
static member TaskSeq.ofList: source: 'T list -> TaskSeq<'T>
val grouped: Task<(int * Event array) array>
Multiple items
type Task = interface IAsyncResult interface IDisposable new: action: Action -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable + 1 overload member ContinueWith: continuationAction: Action<Task,obj> * state: obj -> Task + 19 overloads member Dispose: unit -> unit member GetAwaiter: unit -> TaskAwaiter member RunSynchronously: unit -> unit + 1 overload member Start: unit -> unit + 1 overload member Wait: unit -> unit + 5 overloads ...
<summary>Represents an asynchronous operation.</summary>

--------------------
type Task<'TResult> = inherit Task new: ``function`` : Func<obj,'TResult> * state: obj -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable<'TResult> + 1 overload member ContinueWith: continuationAction: Action<Task<'TResult>,obj> * state: obj -> Task + 19 overloads member GetAwaiter: unit -> TaskAwaiter<'TResult> member WaitAsync: cancellationToken: CancellationToken -> Task<'TResult> + 4 overloads member Result: 'TResult static member Factory: TaskFactory<'TResult>
<summary>Represents an asynchronous operation that can return a value.</summary>
<typeparam name="TResult">The type of the result produced by this <see cref="T:System.Threading.Tasks.Task`1" />.</typeparam>


--------------------
Task(action: System.Action) : Task
Task(action: System.Action, cancellationToken: CancellationToken) : Task
Task(action: System.Action, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj) : Task
Task(action: System.Action, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken) : Task
Task(action: System.Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task

--------------------
Task(``function`` : System.Func<'TResult>) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
static member TaskSeq.groupBy: projection: ('T -> 'Key) -> source: TaskSeq<'T> -> Task<('Key * 'T array) array> (requires equality)
val e: Event
Event.EntityId: int
val groupedAsync: Task<(int * Event array) array>
static member TaskSeq.groupByAsync: projection: ('T -> #Task<'Key>) -> source: TaskSeq<'T> -> Task<('Key * 'T array) array> (requires equality)
val task: TaskBuilder
val counts: Task<(int * int) array>
static member TaskSeq.countBy: projection: ('T -> 'Key) -> source: TaskSeq<'T> -> Task<('Key * int) array> (requires equality)
val words: TaskSeq<string>
val numbered: Task<string array * int>
static member TaskSeq.mapFold: mapping: ('State -> 'T -> 'Result * 'State) -> state: 'State -> source: TaskSeq<'T> -> Task<'Result array * 'State>
val count: int
val w: string
val numbers: TaskSeq<int>
static member TaskSeq.ofSeq: source: 'T seq -> TaskSeq<'T>
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val runningSum: TaskSeq<int>
static member TaskSeq.threadState: folder: ('State -> 'T -> 'Result * 'State) -> state: 'State -> source: TaskSeq<'T> -> TaskSeq<'Result>
val acc: int
val x: int
val viaScan: obj
val viaThreadState: obj
val asyncRunningSum: TaskSeq<int>
static member TaskSeq.threadStateAsync: folder: ('State -> 'T -> #Task<'Result * 'State>) -> state: 'State -> source: TaskSeq<'T> -> TaskSeq<'Result>
val fromResult: value: 'U -> Task<'U>
<summary> Creates a Task&lt;'U&gt; that's completed successfully with the specified result. </summary>
val runningTotals: TaskSeq<int>
static member TaskSeq.scan: folder: ('State -> 'T -> 'State) -> state: 'State -> source: TaskSeq<'T> -> TaskSeq<'State>
val n: int
val withDups: TaskSeq<int>
val deduped: TaskSeq<int>
static member TaskSeq.distinct: source: TaskSeq<'T> -> TaskSeq<'T> (requires equality)
val strings: TaskSeq<string>
val caseInsensitiveDistinct: TaskSeq<string>
static member TaskSeq.distinctBy: projection: ('T -> 'Key) -> source: TaskSeq<'T> -> TaskSeq<'T> (requires equality)
val s: string
System.String.ToLowerInvariant() : string
val run: TaskSeq<int>
val noConsecDups: TaskSeq<int>
static member TaskSeq.distinctUntilChanged: source: TaskSeq<'T> -> TaskSeq<'T> (requires equality)
val exclusions: TaskSeq<int>
val source: TaskSeq<int>
val filtered: TaskSeq<int>
static member TaskSeq.except: itemsToExclude: TaskSeq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T> (requires equality)
val partitioned: Task<int array * int array>
static member TaskSeq.partition: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<'T array * 'T array>
val a: TaskSeq<int>
val b: TaskSeq<int>
val cmp: Task<int>
static member TaskSeq.compareWith: comparer: ('T -> 'T -> int) -> source1: TaskSeq<'T> -> source2: TaskSeq<'T> -> Task<int>
val y: int
val compare: e1: 'T -> e2: 'T -> int (requires comparison)
val cts: CancellationTokenSource
Multiple items
type CancellationTokenSource = interface IDisposable new: unit -> unit + 3 overloads member Cancel: unit -> unit + 1 overload member CancelAfter: millisecondsDelay: int -> unit + 1 overload member CancelAsync: unit -> Task member Dispose: unit -> unit member TryReset: unit -> bool static member CreateLinkedTokenSource: token: CancellationToken -> CancellationTokenSource + 3 overloads member IsCancellationRequested: bool member Token: CancellationToken
<summary>Signals to a <see cref="T:System.Threading.CancellationToken" /> that it should be canceled.</summary>

--------------------
CancellationTokenSource() : CancellationTokenSource
CancellationTokenSource(millisecondsDelay: int) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan, timeProvider: System.TimeProvider) : CancellationTokenSource
val cancellable: TaskSeq<int>
static member TaskSeq.withCancellation: cancellationToken: CancellationToken -> source: TaskSeq<'T> -> TaskSeq<'T>
property CancellationTokenSource.Token: CancellationToken with get
<summary>Gets the <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</summary>
<exception cref="T:System.ObjectDisposedException">The token source has been disposed.</exception>
<returns>The <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</returns>
val original: TaskSeq<int>
val inserted: TaskSeq<int>
static member TaskSeq.insertAt: index: int -> value: 'T -> source: TaskSeq<'T> -> TaskSeq<'T>
val removed: TaskSeq<int>
static member TaskSeq.removeAt: index: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val updated: TaskSeq<int>
static member TaskSeq.updateAt: index: int -> value: 'T -> source: TaskSeq<'T> -> TaskSeq<'T>
val manyInserted: TaskSeq<int>
static member TaskSeq.insertManyAt: index: int -> values: TaskSeq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T>
val manyRemoved: TaskSeq<int>
static member TaskSeq.removeManyAt: index: int -> count: int -> source: TaskSeq<'T> -> TaskSeq<'T>

Type something to start searching.