Advanced Task Sequence Operations
This page covers advanced TaskSeq<'T> operations: grouping, stateful transformation with
mapFold and threadState, deduplication, set-difference, partitioning, counting by key, lexicographic
comparison, cancellation, and positional editing.
open System.Threading
open System.Threading.Tasks
open FSharp.Control
groupBy and groupByAsync
TaskSeq.groupBy partitions a sequence into groups by a key-projection function. The result
is an array of (key, elements[]) pairs, one per distinct key, in order of first occurrence.
Note:
groupByconsumes the entire source before returning. Do not use it on potentially infinite sequences.
type Event = { EntityId: int; Payload: string }
let events : TaskSeq<Event> =
TaskSeq.ofList
[ { EntityId = 1; Payload = "A" }
{ EntityId = 2; Payload = "B" }
{ EntityId = 1; Payload = "C" }
{ EntityId = 3; Payload = "D" }
{ EntityId = 2; Payload = "E" } ]
// groups: (1, [A;C]), (2, [B;E]), (3, [D])
let grouped : Task<(int * Event[])[]> =
events |> TaskSeq.groupBy (fun e -> e.EntityId)
TaskSeq.groupByAsync accepts an async key projection:
let groupedAsync : Task<(int * Event[])[]> =
events |> TaskSeq.groupByAsync (fun e -> task { return e.EntityId })
countBy and countByAsync
TaskSeq.countBy counts how many elements map to each key, returning (key, count)[]:
let counts : Task<(int * int)[]> =
events |> TaskSeq.countBy (fun e -> e.EntityId)
// (1,2), (2,2), (3,1)
mapFold and mapFoldAsync
TaskSeq.mapFold threads a state accumulator through a sequence while simultaneously mapping
each element to a result value. The output is a task returning a pair of (result[], finalState):
// Number each word sequentially while building a running concatenation
let words : TaskSeq<string> =
TaskSeq.ofList [ "hello"; "world"; "foo" ]
let numbered : Task<string[] * int> =
words
|> TaskSeq.mapFold (fun count w -> $"{count}: {w}", count + 1) 0
// result: ([| "0: hello"; "1: world"; "2: foo" |], 3)
TaskSeq.mapFoldAsync is the same but the mapping function returns Task<'Result * 'State>.
threadState and threadStateAsync
TaskSeq.threadState is the lazy, streaming counterpart to mapFold. It threads a state
accumulator through the sequence while yielding each mapped result — but unlike mapFold it
never materialises the results into an array, and it discards the final state. This makes it
suitable for infinite sequences and pipelines where intermediate results should be streamed rather
than buffered:
let numbers : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 })
// Produce a running total without collecting the whole sequence first
let runningSum : TaskSeq<int> =
numbers
|> TaskSeq.threadState (fun acc x -> acc + x, acc + x) 0
// yields lazily: 1, 3, 6, 10, 15
Compare with scan, which also emits a running result but prepends the initial state:
let viaScan = numbers |> TaskSeq.scan (fun acc x -> acc + x) 0
// yields: 0, 1, 3, 6, 10, 15 (one extra initial element)
let viaThreadState = numbers |> TaskSeq.threadState (fun acc x -> acc + x, acc + x) 0
// yields: 1, 3, 6, 10, 15 (no initial element; result == new state here)
TaskSeq.threadStateAsync accepts an asynchronous folder:
let asyncRunningSum : TaskSeq<int> =
numbers
|> TaskSeq.threadStateAsync (fun acc x -> Task.fromResult (acc + x, acc + x)) 0
TaskSeq.scan is the streaming sibling of fold: it emits each intermediate state as a new
element, starting with the initial state:
let runningTotals : TaskSeq<int> =
numbers |> TaskSeq.scan (fun acc n -> acc + n) 0
// yields: 0, 1, 3, 6, 10, 15
distinct and distinctBy
TaskSeq.distinct removes duplicates (keeps first occurrence), using generic equality:
let withDups : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 2; 3; 1; 4 ]
let deduped : TaskSeq<int> = withDups |> TaskSeq.distinct // 1, 2, 3, 4
TaskSeq.distinctBy deduplicates by a key projection:
let strings : TaskSeq<string> =
TaskSeq.ofList [ "hello"; "HELLO"; "world"; "WORLD" ]
let caseInsensitiveDistinct : TaskSeq<string> =
strings |> TaskSeq.distinctBy (fun s -> s.ToLowerInvariant())
// "hello", "world"
Note: both
distinctanddistinctBybuffer all unique keys in a hash set. Do not use them on potentially infinite sequences.
TaskSeq.distinctByAsync accepts an async key projection.
distinctUntilChanged
TaskSeq.distinctUntilChanged removes consecutive duplicates only — it does not buffer the
whole sequence, so it is safe on infinite streams:
let run : TaskSeq<int> = TaskSeq.ofList [ 1; 1; 2; 2; 2; 3; 1; 1 ]
let noConsecDups : TaskSeq<int> = run |> TaskSeq.distinctUntilChanged
// 1, 2, 3, 1
except and exceptOfSeq
TaskSeq.except itemsToExclude source returns elements of source that do not appear in
itemsToExclude. The exclusion set is materialised eagerly before iteration:
let exclusions : TaskSeq<int> = TaskSeq.ofList [ 2; 4 ]
let source : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 })
let filtered : TaskSeq<int> = TaskSeq.except exclusions source // 1, 3, 5
TaskSeq.exceptOfSeq accepts a plain seq<'T> as the exclusion set.
partition and partitionAsync
TaskSeq.partition splits the sequence into two arrays in a single pass. Elements for which
the predicate returns true go into the first array; the rest into the second:
let partitioned : Task<int[] * int[]> =
source |> TaskSeq.partition (fun n -> n % 2 = 0)
// trueItems: [|2;4|] falseItems: [|1;3;5|]
TaskSeq.partitionAsync accepts an async predicate.
compareWith and compareWithAsync
TaskSeq.compareWith performs a lexicographic comparison of two sequences using a custom
comparer. It returns the first non-zero comparison result, or 0 if the sequences are
element-wise equal and have the same length:
let a : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 3 ]
let b : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4 ]
let cmp : Task<int> =
TaskSeq.compareWith (fun x y -> compare x y) a b
// negative (a < b)
TaskSeq.compareWithAsync accepts an async comparer.
withCancellation
TaskSeq.withCancellation token source injects a CancellationToken into the underlying
IAsyncEnumerable<'T>. This is equivalent to calling .WithCancellation(token) in C# and
is useful when consuming sequences from libraries (e.g. Entity Framework Core) that require a
token at the enumeration site:
let cts = new CancellationTokenSource()
let cancellable : TaskSeq<int> =
source |> TaskSeq.withCancellation cts.Token
Positional editing
TaskSeq.insertAt, TaskSeq.insertManyAt, TaskSeq.removeAt, TaskSeq.removeManyAt, and
TaskSeq.updateAt produce new sequences with an element inserted, removed, or replaced at a
given zero-based index:
let original : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4; 5 ]
let inserted : TaskSeq<int> = original |> TaskSeq.insertAt 2 3 // 1,2,3,4,5
let removed : TaskSeq<int> = original |> TaskSeq.removeAt 1 // 1,4,5
let updated : TaskSeq<int> = original |> TaskSeq.updateAt 0 99 // 99,2,4,5
let manyInserted : TaskSeq<int> =
original
|> TaskSeq.insertManyAt 2 (TaskSeq.ofList [ 10; 11 ])
// 1, 2, 10, 11, 4, 5
let manyRemoved : TaskSeq<int> = original |> TaskSeq.removeManyAt 1 2 // 1, 5
namespace FSharp
--------------------
namespace Microsoft.FSharp
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
module Event from Microsoft.FSharp.Control
--------------------
type Event = { EntityId: int Payload: string }
--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T> with get
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: objnull * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args> with get
--------------------
new: unit -> Event<'T>
--------------------
new: unit -> Event<'Delegate,'Args>
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
val string: value: 'T -> string
--------------------
type string = System.String
module TaskSeq from FSharp.Control.TaskSeqExtensions
--------------------
type TaskSeq = static member append: source1: TaskSeq<'T> -> source2: TaskSeq<'T> -> TaskSeq<'T> static member appendSeq: source1: TaskSeq<'T> -> source2: 'T seq -> TaskSeq<'T> static member box: source: TaskSeq<'T> -> TaskSeq<obj> static member cast: source: TaskSeq<obj> -> TaskSeq<'U> static member choose: chooser: ('T -> 'U option) -> source: TaskSeq<'T> -> TaskSeq<'U> static member chooseAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> TaskSeq<'U> static member chunkBy: projection: ('T -> 'Key) -> source: TaskSeq<'T> -> TaskSeq<'Key * 'T array> (requires equality) static member chunkByAsync: projection: ('T -> #Task<'Key>) -> source: TaskSeq<'T> -> TaskSeq<'Key * 'T array> (requires equality) static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T array> static member collect: binder: ('T -> #TaskSeq<'U>) -> source: TaskSeq<'T> -> TaskSeq<'U> ...
--------------------
type TaskSeq<'T> = System.Collections.Generic.IAsyncEnumerable<'T>
<summary> Represents a task sequence and is the output of using the <paramref name="taskSeq{...}" /> computation expression from this library. It is an alias for <see cref="T:System.IAsyncEnumerable<_>" />. </summary>
--------------------
type TaskSeq<'Machine,'T (requires 'Machine :> IAsyncStateMachine and 'Machine :> IResumableStateMachine<TaskSeqStateMachineData<'T>>)> = inherit TaskSeqBase<'T> interface IValueTaskSource interface IValueTaskSource<bool> interface IAsyncStateMachine interface IAsyncEnumerable<'T> interface IAsyncEnumerator<'T> new: unit -> TaskSeq<'Machine,'T> member InitMachineData: ct: CancellationToken * machine: byref<'Machine> -> unit override MoveNextAsyncResult: unit -> ValueTask<bool>
<summary> Main implementation of generic <see cref="T:System.IAsyncEnumerable<'T>" /> and related interfaces, which forms the meat of the logic behind <see cref="taskSeq" /> computation expresssions. For use by this library only, should not be used directly in user code. Its operation depends highly on resumable state. </summary>
--------------------
new: unit -> TaskSeq<'Machine,'T>
type Task = interface IAsyncResult interface IDisposable new: action: Action -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable + 1 overload member ContinueWith: continuationAction: Action<Task,obj> * state: obj -> Task + 19 overloads member Dispose: unit -> unit member GetAwaiter: unit -> TaskAwaiter member RunSynchronously: unit -> unit + 1 overload member Start: unit -> unit + 1 overload member Wait: unit -> unit + 5 overloads ...
<summary>Represents an asynchronous operation.</summary>
--------------------
type Task<'TResult> = inherit Task new: ``function`` : Func<obj,'TResult> * state: obj -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable<'TResult> + 1 overload member ContinueWith: continuationAction: Action<Task<'TResult>,obj> * state: obj -> Task + 19 overloads member GetAwaiter: unit -> TaskAwaiter<'TResult> member WaitAsync: cancellationToken: CancellationToken -> Task<'TResult> + 4 overloads member Result: 'TResult static member Factory: TaskFactory<'TResult>
<summary>Represents an asynchronous operation that can return a value.</summary>
<typeparam name="TResult">The type of the result produced by this <see cref="T:System.Threading.Tasks.Task`1" />.</typeparam>
--------------------
Task(action: System.Action) : Task
Task(action: System.Action, cancellationToken: CancellationToken) : Task
Task(action: System.Action, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj) : Task
Task(action: System.Action, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken) : Task
Task(action: System.Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
--------------------
Task(``function`` : System.Func<'TResult>) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
val seq: sequence: 'T seq -> 'T seq
--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
<summary> Creates a Task<'U> that's completed successfully with the specified result. </summary>
type CancellationTokenSource = interface IDisposable new: unit -> unit + 3 overloads member Cancel: unit -> unit + 1 overload member CancelAfter: millisecondsDelay: int -> unit + 1 overload member CancelAsync: unit -> Task member Dispose: unit -> unit member TryReset: unit -> bool static member CreateLinkedTokenSource: token: CancellationToken -> CancellationTokenSource + 3 overloads member IsCancellationRequested: bool member Token: CancellationToken
<summary>Signals to a <see cref="T:System.Threading.CancellationToken" /> that it should be canceled.</summary>
--------------------
CancellationTokenSource() : CancellationTokenSource
CancellationTokenSource(millisecondsDelay: int) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan, timeProvider: System.TimeProvider) : CancellationTokenSource
<summary>Gets the <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</summary>
<exception cref="T:System.ObjectDisposedException">The token source has been disposed.</exception>
<returns>The <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</returns>
FSharp.Control.TaskSeq