Header menu logo FSharp.Control.TaskSeq

Advanced Task Sequence Operations

This page covers advanced TaskSeq<'T> operations: grouping, stateful transformation with mapFold, deduplication, set-difference, partitioning, counting by key, lexicographic comparison, cancellation, and positional editing.

open System.Threading
open System.Threading.Tasks
open FSharp.Control

groupBy and groupByAsync

TaskSeq.groupBy partitions a sequence into groups by a key-projection function. The result is an array of (key, elements[]) pairs, one per distinct key, in order of first occurrence.

Note: groupBy consumes the entire source before returning. Do not use it on potentially infinite sequences.

type Event = { EntityId: int; Payload: string }

let events : TaskSeq<Event> =
    TaskSeq.ofList
        [ { EntityId = 1; Payload = "A" }
          { EntityId = 2; Payload = "B" }
          { EntityId = 1; Payload = "C" }
          { EntityId = 3; Payload = "D" }
          { EntityId = 2; Payload = "E" } ]

// groups: (1, [A;C]), (2, [B;E]), (3, [D])
let grouped : Task<(int * Event[])[]> =
    events |> TaskSeq.groupBy (fun e -> e.EntityId)

TaskSeq.groupByAsync accepts an async key projection:

let groupedAsync : Task<(int * Event[])[]> =
    events |> TaskSeq.groupByAsync (fun e -> task { return e.EntityId })

countBy and countByAsync

TaskSeq.countBy counts how many elements map to each key, returning (key, count)[]:

let counts : Task<(int * int)[]> =
    events |> TaskSeq.countBy (fun e -> e.EntityId)
// (1,2), (2,2), (3,1)

mapFold and mapFoldAsync

TaskSeq.mapFold threads a state accumulator through a sequence while simultaneously mapping each element to a result value. The output is a task returning a pair of (result[], finalState):

// Number each word sequentially while building a running concatenation
let words : TaskSeq<string> =
    TaskSeq.ofList [ "hello"; "world"; "foo" ]

let numbered : Task<string[] * int> =
    words
    |> TaskSeq.mapFold (fun count w -> $"{count}: {w}", count + 1) 0

// result: ([| "0: hello"; "1: world"; "2: foo" |], 3)

TaskSeq.mapFoldAsync is the same but the mapping function returns Task<'Result * 'State>.


scan and scanAsync

TaskSeq.scan is the streaming sibling of fold: it emits each intermediate state as a new element, starting with the initial state:

let numbers : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 })

let runningTotals : TaskSeq<int> =
    numbers |> TaskSeq.scan (fun acc n -> acc + n) 0

// yields: 0, 1, 3, 6, 10, 15

distinct and distinctBy

TaskSeq.distinct removes duplicates (keeps first occurrence), using generic equality:

let withDups : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 2; 3; 1; 4 ]

let deduped : TaskSeq<int> = withDups |> TaskSeq.distinct // 1, 2, 3, 4

TaskSeq.distinctBy deduplicates by a key projection:

let strings : TaskSeq<string> =
    TaskSeq.ofList [ "hello"; "HELLO"; "world"; "WORLD" ]

let caseInsensitiveDistinct : TaskSeq<string> =
    strings |> TaskSeq.distinctBy (fun s -> s.ToLowerInvariant())
// "hello", "world"

Note: both distinct and distinctBy buffer all unique keys in a hash set. Do not use them on potentially infinite sequences.

TaskSeq.distinctByAsync accepts an async key projection.


distinctUntilChanged

TaskSeq.distinctUntilChanged removes consecutive duplicates only — it does not buffer the whole sequence, so it is safe on infinite streams:

let run : TaskSeq<int> = TaskSeq.ofList [ 1; 1; 2; 2; 2; 3; 1; 1 ]

let noConsecDups : TaskSeq<int> = run |> TaskSeq.distinctUntilChanged
// 1, 2, 3, 1

except and exceptOfSeq

TaskSeq.except itemsToExclude source returns elements of source that do not appear in itemsToExclude. The exclusion set is materialised eagerly before iteration:

let exclusions : TaskSeq<int> = TaskSeq.ofList [ 2; 4 ]
let source : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 })

let filtered : TaskSeq<int> = TaskSeq.except exclusions source // 1, 3, 5

TaskSeq.exceptOfSeq accepts a plain seq<'T> as the exclusion set.


partition and partitionAsync

TaskSeq.partition splits the sequence into two arrays in a single pass. Elements for which the predicate returns true go into the first array; the rest into the second:

let partitioned : Task<int[] * int[]> =
    source |> TaskSeq.partition (fun n -> n % 2 = 0)
// trueItems: [|2;4|]   falseItems: [|1;3;5|]

TaskSeq.partitionAsync accepts an async predicate.


compareWith and compareWithAsync

TaskSeq.compareWith performs a lexicographic comparison of two sequences using a custom comparer. It returns the first non-zero comparison result, or 0 if the sequences are element-wise equal and have the same length:

let a : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 3 ]
let b : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4 ]

let cmp : Task<int> =
    TaskSeq.compareWith (fun x y -> compare x y) a b
// negative (a < b)

TaskSeq.compareWithAsync accepts an async comparer.


withCancellation

TaskSeq.withCancellation token source injects a CancellationToken into the underlying IAsyncEnumerable<'T>. This is equivalent to calling .WithCancellation(token) in C# and is useful when consuming sequences from libraries (e.g. Entity Framework Core) that require a token at the enumeration site:

let cts = new CancellationTokenSource()

let cancellable : TaskSeq<int> =
    source |> TaskSeq.withCancellation cts.Token

Positional editing

TaskSeq.insertAt, TaskSeq.insertManyAt, TaskSeq.removeAt, TaskSeq.removeManyAt, and TaskSeq.updateAt produce new sequences with an element inserted, removed, or replaced at a given zero-based index:

let original : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4; 5 ]

let inserted : TaskSeq<int> = original |> TaskSeq.insertAt 2 3 // 1,2,3,4,5

let removed : TaskSeq<int> = original |> TaskSeq.removeAt 1 // 1,4,5

let updated : TaskSeq<int> = original |> TaskSeq.updateAt 0 99 // 99,2,4,5

let manyInserted : TaskSeq<int> =
    original
    |> TaskSeq.insertManyAt 2 (TaskSeq.ofList [ 10; 11 ])
// 1, 2, 10, 11, 4, 5

let manyRemoved : TaskSeq<int> = original |> TaskSeq.removeManyAt 1 2 // 1, 5
namespace System
namespace System.Threading
namespace System.Threading.Tasks
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
Multiple items
module Event from Microsoft.FSharp.Control

--------------------
type Event = { EntityId: int Payload: string }

--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T> with get

--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: objnull * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args> with get

--------------------
new: unit -> Event<'T>

--------------------
new: unit -> Event<'Delegate,'Args>
Multiple items
val int: value: 'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
Multiple items
val string: value: 'T -> string

--------------------
type string = System.String
val events: TaskSeq<Event>
Multiple items
module TaskSeq from FSharp.Control.TaskSeqExtensions

--------------------
type TaskSeq = static member append: source1: TaskSeq<'T> -> source2: TaskSeq<'T> -> TaskSeq<'T> static member appendSeq: source1: TaskSeq<'T> -> source2: 'T seq -> TaskSeq<'T> static member box: source: TaskSeq<'T> -> TaskSeq<obj> static member cast: source: TaskSeq<obj> -> TaskSeq<'U> static member choose: chooser: ('T -> 'U option) -> source: TaskSeq<'T> -> TaskSeq<'U> static member chooseAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> TaskSeq<'U> static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T array> static member collect: binder: ('T -> #TaskSeq<'U>) -> source: TaskSeq<'T> -> TaskSeq<'U> static member collectAsync: binder: ('T -> #Task<'TSeqU>) -> source: TaskSeq<'T> -> TaskSeq<'U> (requires 'TSeqU :> TaskSeq<'U>) static member collectSeq: binder: ('T -> #('U seq)) -> source: TaskSeq<'T> -> TaskSeq<'U> ...

--------------------
type TaskSeq<'T> = System.Collections.Generic.IAsyncEnumerable<'T>
<summary> Represents a task sequence and is the output of using the <paramref name="taskSeq{...}" /> computation expression from this library. It is an alias for <see cref="T:System.IAsyncEnumerable&lt;_&gt;" />. </summary>

--------------------
type TaskSeq<'Machine,'T (requires 'Machine :> IAsyncStateMachine and 'Machine :> IResumableStateMachine<TaskSeqStateMachineData<'T>>)> = inherit TaskSeqBase<'T> interface IValueTaskSource interface IValueTaskSource<bool> interface IAsyncStateMachine interface IAsyncEnumerable<'T> interface IAsyncEnumerator<'T> new: unit -> TaskSeq<'Machine,'T> member InitMachineData: ct: CancellationToken * machine: byref<'Machine> -> unit override MoveNextAsyncResult: unit -> ValueTask<bool>
<summary> Main implementation of generic <see cref="T:System.IAsyncEnumerable&lt;'T&gt;" /> and related interfaces, which forms the meat of the logic behind <see cref="taskSeq" /> computation expresssions. For use by this library only, should not be used directly in user code. Its operation depends highly on resumable state. </summary>

--------------------
new: unit -> TaskSeq<'Machine,'T>
static member TaskSeq.ofList: source: 'T list -> TaskSeq<'T>
val grouped: Task<(int * Event array) array>
Multiple items
type Task = interface IAsyncResult interface IDisposable new: action: Action -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable + 1 overload member ContinueWith: continuationAction: Action<Task,obj> * state: obj -> Task + 19 overloads member Dispose: unit -> unit member GetAwaiter: unit -> TaskAwaiter member RunSynchronously: unit -> unit + 1 overload member Start: unit -> unit + 1 overload member Wait: unit -> unit + 5 overloads ...
<summary>Represents an asynchronous operation.</summary>

--------------------
type Task<'TResult> = inherit Task new: ``function`` : Func<obj,'TResult> * state: obj -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable<'TResult> + 1 overload member ContinueWith: continuationAction: Action<Task<'TResult>,obj> * state: obj -> Task + 19 overloads member GetAwaiter: unit -> TaskAwaiter<'TResult> member WaitAsync: cancellationToken: CancellationToken -> Task<'TResult> + 4 overloads member Result: 'TResult static member Factory: TaskFactory<'TResult>
<summary>Represents an asynchronous operation that can return a value.</summary>
<typeparam name="TResult">The type of the result produced by this <see cref="T:System.Threading.Tasks.Task`1" />.</typeparam>


--------------------
Task(action: System.Action) : Task
Task(action: System.Action, cancellationToken: CancellationToken) : Task
Task(action: System.Action, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj) : Task
Task(action: System.Action, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken) : Task
Task(action: System.Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task

--------------------
Task(``function`` : System.Func<'TResult>) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
static member TaskSeq.groupBy: projection: ('T -> 'Key) -> source: TaskSeq<'T> -> Task<('Key * 'T array) array> (requires equality)
val e: Event
Event.EntityId: int
val groupedAsync: Task<(int * Event array) array>
static member TaskSeq.groupByAsync: projection: ('T -> #Task<'Key>) -> source: TaskSeq<'T> -> Task<('Key * 'T array) array> (requires equality)
val task: TaskBuilder
val counts: Task<(int * int) array>
static member TaskSeq.countBy: projection: ('T -> 'Key) -> source: TaskSeq<'T> -> Task<('Key * int) array> (requires equality)
val words: TaskSeq<string>
val numbered: Task<string array * int>
static member TaskSeq.mapFold: mapping: ('State -> 'T -> 'Result * 'State) -> state: 'State -> source: TaskSeq<'T> -> Task<'Result array * 'State>
val count: int
val w: string
val numbers: TaskSeq<int>
static member TaskSeq.ofSeq: source: 'T seq -> TaskSeq<'T>
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val runningTotals: TaskSeq<int>
static member TaskSeq.scan: folder: ('State -> 'T -> 'State) -> state: 'State -> source: TaskSeq<'T> -> TaskSeq<'State>
val acc: int
val n: int
val withDups: TaskSeq<int>
val deduped: TaskSeq<int>
static member TaskSeq.distinct: source: TaskSeq<'T> -> TaskSeq<'T> (requires equality)
val strings: TaskSeq<string>
val caseInsensitiveDistinct: TaskSeq<string>
static member TaskSeq.distinctBy: projection: ('T -> 'Key) -> source: TaskSeq<'T> -> TaskSeq<'T> (requires equality)
val s: string
System.String.ToLowerInvariant() : string
val run: TaskSeq<int>
val noConsecDups: TaskSeq<int>
static member TaskSeq.distinctUntilChanged: source: TaskSeq<'T> -> TaskSeq<'T> (requires equality)
val exclusions: TaskSeq<int>
val source: TaskSeq<int>
val filtered: TaskSeq<int>
static member TaskSeq.except: itemsToExclude: TaskSeq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T> (requires equality)
val partitioned: Task<int array * int array>
static member TaskSeq.partition: predicate: ('T -> bool) -> source: TaskSeq<'T> -> Task<'T array * 'T array>
val a: TaskSeq<int>
val b: TaskSeq<int>
val cmp: Task<int>
static member TaskSeq.compareWith: comparer: ('T -> 'T -> int) -> source1: TaskSeq<'T> -> source2: TaskSeq<'T> -> Task<int>
val x: int
val y: int
val compare: e1: 'T -> e2: 'T -> int (requires comparison)
val cts: CancellationTokenSource
Multiple items
type CancellationTokenSource = interface IDisposable new: unit -> unit + 3 overloads member Cancel: unit -> unit + 1 overload member CancelAfter: millisecondsDelay: int -> unit + 1 overload member CancelAsync: unit -> Task member Dispose: unit -> unit member TryReset: unit -> bool static member CreateLinkedTokenSource: token: CancellationToken -> CancellationTokenSource + 3 overloads member IsCancellationRequested: bool member Token: CancellationToken
<summary>Signals to a <see cref="T:System.Threading.CancellationToken" /> that it should be canceled.</summary>

--------------------
CancellationTokenSource() : CancellationTokenSource
CancellationTokenSource(millisecondsDelay: int) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan, timeProvider: System.TimeProvider) : CancellationTokenSource
val cancellable: TaskSeq<int>
static member TaskSeq.withCancellation: cancellationToken: CancellationToken -> source: TaskSeq<'T> -> TaskSeq<'T>
property CancellationTokenSource.Token: CancellationToken with get
<summary>Gets the <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</summary>
<exception cref="T:System.ObjectDisposedException">The token source has been disposed.</exception>
<returns>The <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</returns>
val original: TaskSeq<int>
val inserted: TaskSeq<int>
static member TaskSeq.insertAt: index: int -> value: 'T -> source: TaskSeq<'T> -> TaskSeq<'T>
val removed: TaskSeq<int>
static member TaskSeq.removeAt: index: int -> source: TaskSeq<'T> -> TaskSeq<'T>
val updated: TaskSeq<int>
static member TaskSeq.updateAt: index: int -> value: 'T -> source: TaskSeq<'T> -> TaskSeq<'T>
val manyInserted: TaskSeq<int>
static member TaskSeq.insertManyAt: index: int -> values: TaskSeq<'T> -> source: TaskSeq<'T> -> TaskSeq<'T>
val manyRemoved: TaskSeq<int>
static member TaskSeq.removeManyAt: index: int -> count: int -> source: TaskSeq<'T> -> TaskSeq<'T>

Type something to start searching.