Advanced Task Sequence Operations
This page covers advanced TaskSeq<'T> operations: grouping, stateful transformation with
mapFold, deduplication, set-difference, partitioning, counting by key, lexicographic
comparison, cancellation, and positional editing.
open System.Threading
open System.Threading.Tasks
open FSharp.Control
groupBy and groupByAsync
TaskSeq.groupBy partitions a sequence into groups by a key-projection function. The result
is an array of (key, elements[]) pairs, one per distinct key, in order of first occurrence.
Note:
groupByconsumes the entire source before returning. Do not use it on potentially infinite sequences.
type Event = { EntityId: int; Payload: string }
let events : TaskSeq<Event> =
TaskSeq.ofList
[ { EntityId = 1; Payload = "A" }
{ EntityId = 2; Payload = "B" }
{ EntityId = 1; Payload = "C" }
{ EntityId = 3; Payload = "D" }
{ EntityId = 2; Payload = "E" } ]
// groups: (1, [A;C]), (2, [B;E]), (3, [D])
let grouped : Task<(int * Event[])[]> =
events |> TaskSeq.groupBy (fun e -> e.EntityId)
TaskSeq.groupByAsync accepts an async key projection:
let groupedAsync : Task<(int * Event[])[]> =
events |> TaskSeq.groupByAsync (fun e -> task { return e.EntityId })
countBy and countByAsync
TaskSeq.countBy counts how many elements map to each key, returning (key, count)[]:
let counts : Task<(int * int)[]> =
events |> TaskSeq.countBy (fun e -> e.EntityId)
// (1,2), (2,2), (3,1)
mapFold and mapFoldAsync
TaskSeq.mapFold threads a state accumulator through a sequence while simultaneously mapping
each element to a result value. The output is a task returning a pair of (result[], finalState):
// Number each word sequentially while building a running concatenation
let words : TaskSeq<string> =
TaskSeq.ofList [ "hello"; "world"; "foo" ]
let numbered : Task<string[] * int> =
words
|> TaskSeq.mapFold (fun count w -> $"{count}: {w}", count + 1) 0
// result: ([| "0: hello"; "1: world"; "2: foo" |], 3)
TaskSeq.mapFoldAsync is the same but the mapping function returns Task<'Result * 'State>.
scan and scanAsync
TaskSeq.scan is the streaming sibling of fold: it emits each intermediate state as a new
element, starting with the initial state:
let numbers : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 })
let runningTotals : TaskSeq<int> =
numbers |> TaskSeq.scan (fun acc n -> acc + n) 0
// yields: 0, 1, 3, 6, 10, 15
distinct and distinctBy
TaskSeq.distinct removes duplicates (keeps first occurrence), using generic equality:
let withDups : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 2; 3; 1; 4 ]
let deduped : TaskSeq<int> = withDups |> TaskSeq.distinct // 1, 2, 3, 4
TaskSeq.distinctBy deduplicates by a key projection:
let strings : TaskSeq<string> =
TaskSeq.ofList [ "hello"; "HELLO"; "world"; "WORLD" ]
let caseInsensitiveDistinct : TaskSeq<string> =
strings |> TaskSeq.distinctBy (fun s -> s.ToLowerInvariant())
// "hello", "world"
Note: both
distinctanddistinctBybuffer all unique keys in a hash set. Do not use them on potentially infinite sequences.
TaskSeq.distinctByAsync accepts an async key projection.
distinctUntilChanged
TaskSeq.distinctUntilChanged removes consecutive duplicates only — it does not buffer the
whole sequence, so it is safe on infinite streams:
let run : TaskSeq<int> = TaskSeq.ofList [ 1; 1; 2; 2; 2; 3; 1; 1 ]
let noConsecDups : TaskSeq<int> = run |> TaskSeq.distinctUntilChanged
// 1, 2, 3, 1
except and exceptOfSeq
TaskSeq.except itemsToExclude source returns elements of source that do not appear in
itemsToExclude. The exclusion set is materialised eagerly before iteration:
let exclusions : TaskSeq<int> = TaskSeq.ofList [ 2; 4 ]
let source : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 })
let filtered : TaskSeq<int> = TaskSeq.except exclusions source // 1, 3, 5
TaskSeq.exceptOfSeq accepts a plain seq<'T> as the exclusion set.
partition and partitionAsync
TaskSeq.partition splits the sequence into two arrays in a single pass. Elements for which
the predicate returns true go into the first array; the rest into the second:
let partitioned : Task<int[] * int[]> =
source |> TaskSeq.partition (fun n -> n % 2 = 0)
// trueItems: [|2;4|] falseItems: [|1;3;5|]
TaskSeq.partitionAsync accepts an async predicate.
compareWith and compareWithAsync
TaskSeq.compareWith performs a lexicographic comparison of two sequences using a custom
comparer. It returns the first non-zero comparison result, or 0 if the sequences are
element-wise equal and have the same length:
let a : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 3 ]
let b : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4 ]
let cmp : Task<int> =
TaskSeq.compareWith (fun x y -> compare x y) a b
// negative (a < b)
TaskSeq.compareWithAsync accepts an async comparer.
withCancellation
TaskSeq.withCancellation token source injects a CancellationToken into the underlying
IAsyncEnumerable<'T>. This is equivalent to calling .WithCancellation(token) in C# and
is useful when consuming sequences from libraries (e.g. Entity Framework Core) that require a
token at the enumeration site:
let cts = new CancellationTokenSource()
let cancellable : TaskSeq<int> =
source |> TaskSeq.withCancellation cts.Token
Positional editing
TaskSeq.insertAt, TaskSeq.insertManyAt, TaskSeq.removeAt, TaskSeq.removeManyAt, and
TaskSeq.updateAt produce new sequences with an element inserted, removed, or replaced at a
given zero-based index:
let original : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4; 5 ]
let inserted : TaskSeq<int> = original |> TaskSeq.insertAt 2 3 // 1,2,3,4,5
let removed : TaskSeq<int> = original |> TaskSeq.removeAt 1 // 1,4,5
let updated : TaskSeq<int> = original |> TaskSeq.updateAt 0 99 // 99,2,4,5
let manyInserted : TaskSeq<int> =
original
|> TaskSeq.insertManyAt 2 (TaskSeq.ofList [ 10; 11 ])
// 1, 2, 10, 11, 4, 5
let manyRemoved : TaskSeq<int> = original |> TaskSeq.removeManyAt 1 2 // 1, 5
namespace FSharp
--------------------
namespace Microsoft.FSharp
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
module Event from Microsoft.FSharp.Control
--------------------
type Event = { EntityId: int Payload: string }
--------------------
type Event<'T> = new: unit -> Event<'T> member Trigger: arg: 'T -> unit member Publish: IEvent<'T> with get
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate and reference type)> = new: unit -> Event<'Delegate,'Args> member Trigger: sender: objnull * args: 'Args -> unit member Publish: IEvent<'Delegate,'Args> with get
--------------------
new: unit -> Event<'T>
--------------------
new: unit -> Event<'Delegate,'Args>
val int: value: 'T -> int (requires member op_Explicit)
--------------------
type int = int32
--------------------
type int<'Measure> = int
val string: value: 'T -> string
--------------------
type string = System.String
module TaskSeq from FSharp.Control.TaskSeqExtensions
--------------------
type TaskSeq = static member append: source1: TaskSeq<'T> -> source2: TaskSeq<'T> -> TaskSeq<'T> static member appendSeq: source1: TaskSeq<'T> -> source2: 'T seq -> TaskSeq<'T> static member box: source: TaskSeq<'T> -> TaskSeq<obj> static member cast: source: TaskSeq<obj> -> TaskSeq<'U> static member choose: chooser: ('T -> 'U option) -> source: TaskSeq<'T> -> TaskSeq<'U> static member chooseAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> TaskSeq<'U> static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T array> static member collect: binder: ('T -> #TaskSeq<'U>) -> source: TaskSeq<'T> -> TaskSeq<'U> static member collectAsync: binder: ('T -> #Task<'TSeqU>) -> source: TaskSeq<'T> -> TaskSeq<'U> (requires 'TSeqU :> TaskSeq<'U>) static member collectSeq: binder: ('T -> #('U seq)) -> source: TaskSeq<'T> -> TaskSeq<'U> ...
--------------------
type TaskSeq<'T> = System.Collections.Generic.IAsyncEnumerable<'T>
<summary> Represents a task sequence and is the output of using the <paramref name="taskSeq{...}" /> computation expression from this library. It is an alias for <see cref="T:System.IAsyncEnumerable<_>" />. </summary>
--------------------
type TaskSeq<'Machine,'T (requires 'Machine :> IAsyncStateMachine and 'Machine :> IResumableStateMachine<TaskSeqStateMachineData<'T>>)> = inherit TaskSeqBase<'T> interface IValueTaskSource interface IValueTaskSource<bool> interface IAsyncStateMachine interface IAsyncEnumerable<'T> interface IAsyncEnumerator<'T> new: unit -> TaskSeq<'Machine,'T> member InitMachineData: ct: CancellationToken * machine: byref<'Machine> -> unit override MoveNextAsyncResult: unit -> ValueTask<bool>
<summary> Main implementation of generic <see cref="T:System.IAsyncEnumerable<'T>" /> and related interfaces, which forms the meat of the logic behind <see cref="taskSeq" /> computation expresssions. For use by this library only, should not be used directly in user code. Its operation depends highly on resumable state. </summary>
--------------------
new: unit -> TaskSeq<'Machine,'T>
type Task = interface IAsyncResult interface IDisposable new: action: Action -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable + 1 overload member ContinueWith: continuationAction: Action<Task,obj> * state: obj -> Task + 19 overloads member Dispose: unit -> unit member GetAwaiter: unit -> TaskAwaiter member RunSynchronously: unit -> unit + 1 overload member Start: unit -> unit + 1 overload member Wait: unit -> unit + 5 overloads ...
<summary>Represents an asynchronous operation.</summary>
--------------------
type Task<'TResult> = inherit Task new: ``function`` : Func<obj,'TResult> * state: obj -> unit + 7 overloads member ConfigureAwait: continueOnCapturedContext: bool -> ConfiguredTaskAwaitable<'TResult> + 1 overload member ContinueWith: continuationAction: Action<Task<'TResult>,obj> * state: obj -> Task + 19 overloads member GetAwaiter: unit -> TaskAwaiter<'TResult> member WaitAsync: cancellationToken: CancellationToken -> Task<'TResult> + 4 overloads member Result: 'TResult static member Factory: TaskFactory<'TResult>
<summary>Represents an asynchronous operation that can return a value.</summary>
<typeparam name="TResult">The type of the result produced by this <see cref="T:System.Threading.Tasks.Task`1" />.</typeparam>
--------------------
Task(action: System.Action) : Task
Task(action: System.Action, cancellationToken: CancellationToken) : Task
Task(action: System.Action, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj) : Task
Task(action: System.Action, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken) : Task
Task(action: System.Action<obj>, state: obj, creationOptions: TaskCreationOptions) : Task
Task(action: System.Action<obj>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task
--------------------
Task(``function`` : System.Func<'TResult>) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<'TResult>, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
Task(``function`` : System.Func<obj,'TResult>, state: obj, cancellationToken: CancellationToken, creationOptions: TaskCreationOptions) : Task<'TResult>
val seq: sequence: 'T seq -> 'T seq
--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
type CancellationTokenSource = interface IDisposable new: unit -> unit + 3 overloads member Cancel: unit -> unit + 1 overload member CancelAfter: millisecondsDelay: int -> unit + 1 overload member CancelAsync: unit -> Task member Dispose: unit -> unit member TryReset: unit -> bool static member CreateLinkedTokenSource: token: CancellationToken -> CancellationTokenSource + 3 overloads member IsCancellationRequested: bool member Token: CancellationToken
<summary>Signals to a <see cref="T:System.Threading.CancellationToken" /> that it should be canceled.</summary>
--------------------
CancellationTokenSource() : CancellationTokenSource
CancellationTokenSource(millisecondsDelay: int) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan) : CancellationTokenSource
CancellationTokenSource(delay: System.TimeSpan, timeProvider: System.TimeProvider) : CancellationTokenSource
<summary>Gets the <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</summary>
<exception cref="T:System.ObjectDisposedException">The token source has been disposed.</exception>
<returns>The <see cref="T:System.Threading.CancellationToken" /> associated with this <see cref="T:System.Threading.CancellationTokenSource" />.</returns>
FSharp.Control.TaskSeq