Skip to content

Adding TaskSeq.chunkBySize #265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ The `TaskSeq` project already has a wide array of functions and functionalities,
- [ ] `average` / `averageBy`, `sum` and related
- [x] `forall` / `forallAsync` (see [#240])
- [x] `skip` / `drop` / `truncate` / `take` (see [#209])
- [ ] `chunkBySize` / `windowed`
- [x] `chunkBySize` (see [#265])
- [ ] `windowed`
- [ ] `compareWith`
- [ ] `distinct`
- [ ] `exists2` / `map2` / `fold2` / `iter2` and related '2'-functions
Expand Down Expand Up @@ -263,7 +264,7 @@ This is what has been implemented so far, is planned or skipped:
| ✅ [#67][] | | | `box` | |
| ✅ [#67][] | | | `unbox` | |
| ✅ [#23][] | `choose` | `choose` | `chooseAsync` | |
| | `chunkBySize` | `chunkBySize` | | |
| ✅ [#265][]| `chunkBySize` | `chunkBySize` | | |
| ✅ [#11][] | `collect` | `collect` | `collectAsync` | |
| ✅ [#11][] | | `collectSeq` | `collectSeqAsync` | |
| | `compareWith` | `compareWith` | `compareWithAsync` | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<Compile Include="TaskSeq.Do.Tests.fs" />
<Compile Include="TaskSeq.Let.Tests.fs" />
<Compile Include="TaskSeq.Using.Tests.fs" />
<Compile Include="TaskSeq.ChunkBySize.Tests.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
219 changes: 219 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
module TaskSeq.Tests.ChunkBySize

open System

open FsUnitTyped
open Xunit
open FsUnit.Xunit

open FSharp.Control

//
// TaskSeq.chunkBySize
//

exception SideEffectPastEnd of string

module EmptySeq =
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-chunkBySize(0) on empty input should throw InvalidOperation`` variant =
fun () ->
Gen.getEmptyVariant variant
|> TaskSeq.chunkBySize 0
|> consumeTaskSeq

|> should throwAsyncExact typeof<ArgumentException>

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-chunkBySize(1) has no effect on empty input`` variant =
// no `task` block needed
Gen.getEmptyVariant variant
|> TaskSeq.chunkBySize 1
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-chunkBySize(99) has no effect on empty input`` variant =
// no `task` block needed
Gen.getEmptyVariant variant
|> TaskSeq.chunkBySize 99
|> verifyEmpty

[<Fact>]
let ``TaskSeq-chunkBySize(-1) should throw ArgumentException on any input`` () =
fun () ->
TaskSeq.empty<int>
|> TaskSeq.chunkBySize -1
|> consumeTaskSeq
|> should throwAsyncExact typeof<ArgumentException>

fun () ->
TaskSeq.init 10 id
|> TaskSeq.chunkBySize -1
|> consumeTaskSeq
|> should throwAsyncExact typeof<ArgumentException>

[<Fact>]
let ``TaskSeq-chunkBySize(-1) should throw ArgumentException before awaiting`` () =
fun () ->
taskSeq {
do! longDelay ()

if false then
yield 0 // type inference
}
|> TaskSeq.chunkBySize -1
|> ignore // throws even without running the async. Bad coding, don't ignore a task!

|> should throw typeof<ArgumentException>

module Immutable =
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-chunkBySize returns all items from source in order`` variant = task {
do!
Gen.getSeqImmutable variant
|> TaskSeq.chunkBySize 3
|> TaskSeq.collect TaskSeq.ofArray
|> verify1To10
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-chunkBySize returns chunks with items in order`` variant = task {
do!
Gen.getSeqImmutable variant
|> TaskSeq.chunkBySize 2
|> TaskSeq.toArrayAsync
|> Task.map (shouldEqual [| [| 1; 2 |]; [| 3; 4 |]; [| 5; 6 |]; [| 7; 8 |]; [| 9; 10 |] |])
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-chunkBySize returns exactly 'chunkSize' items per chunk`` variant = task {
do!
Gen.getSeqImmutable variant
|> TaskSeq.chunkBySize 1
|> TaskSeq.iter (shouldHaveLength 1)

do!
Gen.getSeqImmutable variant
|> TaskSeq.chunkBySize 2
|> TaskSeq.iter (shouldHaveLength 2)

do!
Gen.getSeqImmutable variant
|> TaskSeq.chunkBySize 5
|> TaskSeq.iter (shouldHaveLength 5)
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-chunkBySize returns remaining items in last chunk`` variant = task {
let verifyChunk chunkSize lastChunkSize =
Gen.getSeqImmutable variant
|> TaskSeq.chunkBySize chunkSize
|> TaskSeq.toArrayAsync
|> Task.map (Array.last >> shouldHaveLength lastChunkSize)

do! verifyChunk 1 1
do! verifyChunk 3 1
do! verifyChunk 4 2
do! verifyChunk 6 4
do! verifyChunk 7 3
do! verifyChunk 8 2
do! verifyChunk 9 1
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-chunkBySize returns all elements when 'chunkSize' > number of items`` variant =
Gen.getSeqImmutable variant
|> TaskSeq.chunkBySize 11
|> TaskSeq.toArrayAsync
|> Task.map (Array.exactlyOne >> shouldHaveLength 10)

module SideEffects =
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-chunkBySize gets all items`` variant =
Gen.getSeqWithSideEffect variant
|> TaskSeq.chunkBySize 5
|> TaskSeq.toArrayAsync
|> Task.map (shouldEqual [| [| 1..5 |]; [| 6..10 |] |])

[<Fact>]
let ``TaskSeq-chunkBySize prove we execute empty-seq side-effects`` () = task {
let mutable i = 0

let ts = taskSeq {
i <- i + 1
i <- i + 1
i <- i + 1 // we should get here
}

do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq
do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq
i |> should equal 9
}

[<Fact>]
let ``TaskSeq-chunkBySize prove we execute after-effects`` () = task {
let mutable i = 0

let ts = taskSeq {
i <- i + 1
i <- i + 1
yield 42
i <- i + 1 // we should get here
}

do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq
do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq
i |> should equal 9
}

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-chunkBySize should go over all items`` variant = task {
let ts = Gen.getSeqWithSideEffect variant
do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq
do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq
// incl. the iteration of 'last', we reach 40
do! ts |> TaskSeq.last |> Task.map (should equal 40)
}

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-chunkBySize multiple iterations over same sequence`` variant = task {
let ts = Gen.getSeqWithSideEffect variant
let mutable sum = 0

do!
TaskSeq.chunkBySize 1 ts
|> TaskSeq.collect TaskSeq.ofArray
|> TaskSeq.iter (fun item -> sum <- sum + item)

do!
TaskSeq.chunkBySize 2 ts
|> TaskSeq.collect TaskSeq.ofArray
|> TaskSeq.iter (fun item -> sum <- sum + item)

do!
TaskSeq.chunkBySize 3 ts
|> TaskSeq.collect TaskSeq.ofArray
|> TaskSeq.iter (fun item -> sum <- sum + item)

do!
TaskSeq.chunkBySize 4 ts
|> TaskSeq.collect TaskSeq.ofArray
|> TaskSeq.iter (fun item -> sum <- sum + item)

sum |> should equal 820 // side-effected tasks, so 'item' DOES CHANGE, each next iteration starts 10 higher
}

[<Fact>]
let ``TaskSeq-chunkBySize prove that an exception from the taskSeq is thrown`` () =
let items = taskSeq {
yield 42
yield! [ 1; 2 ]
do SideEffectPastEnd "at the end" |> raise
yield 43
}

fun () -> items |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
|> should throwAsyncExact typeof<SideEffectPastEnd>
2 changes: 2 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ type TaskSeq private () =
yield! source2
}

static member chunkBySize (chunkSize: int) (source: TaskSeq<'T>) = Internal.chunkBySize chunkSize source

//
// iter/map/collect functions
//
Expand Down
9 changes: 9 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,15 @@ type TaskSeq =
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
static member chooseAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> TaskSeq<'U>

/// <summary>Divides the input sequence into chunks of size at most <c>chunkSize</c>.</summary>
///
/// <param name="chunkSize">The maximum size of each chunk.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>The task sequence divided into chunks.</returns>
/// <exception cref="T:System.ArgumentNullException">Thrown when the input task sequence is null.</exception>
/// <exception cref="T:System.ArgumentException">Thrown when <c>chunkSize</c> is not positive.</exception>
static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T[]>

/// <summary>
/// Returns a new task sequence containing only the elements of the collection
/// for which the given function <paramref name="predicate" /> returns <see cref="true" />.
Expand Down
29 changes: 29 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,3 +1097,32 @@ module internal TaskSeqInternal =
go <- step

}

let chunkBySize chunkSize (source: TaskSeq<'T>) : TaskSeq<'T[]> =
if chunkSize < 1 then
invalidArg (nameof chunkSize) $"The value must be positive, but was %i{chunkSize}."

checkNonNull (nameof source) source

taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None
let mutable go = true
let! step = e.MoveNextAsync()
go <- step

if step then
let buffer = ResizeArray<_>()

while go do
buffer.Add e.Current

if buffer.Count = chunkSize then
yield buffer.ToArray()
buffer.Clear()

let! step = e.MoveNextAsync()
go <- step

if buffer.Count > 0 then
yield buffer.ToArray()
}
Loading