Skip to content

Commit f1c2ce3

Browse files
authored
Add Pipeline::buffer() (#21)
1 parent c26e73e commit f1c2ce3

4 files changed

+42
-8
lines changed

src/Internal/ConcurrentFlatMapIterator.php

+8-3
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@ final class ConcurrentFlatMapIterator implements ConcurrentIterator
2424
* @param ConcurrentIterator<T> $iterator
2525
* @param \Closure(T, int):iterable<R> $flatMap
2626
*/
27-
public function __construct(ConcurrentIterator $iterator, int $concurrency, bool $ordered, \Closure $flatMap)
28-
{
29-
$queue = new QueueState;
27+
public function __construct(
28+
ConcurrentIterator $iterator,
29+
int $bufferSize,
30+
int $concurrency,
31+
bool $ordered,
32+
\Closure $flatMap,
33+
) {
34+
$queue = new QueueState($bufferSize);
3035
$this->iterator = new ConcurrentQueueIterator($queue);
3136
$order = $ordered ? new Sequence : null;
3237

src/Internal/ConcurrentIterableIterator.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ final class ConcurrentIterableIterator implements ConcurrentIterator
2020
/**
2121
* @param iterable<T> $iterable
2222
*/
23-
public function __construct(iterable $iterable)
23+
public function __construct(iterable $iterable, int $bufferSize = 0)
2424
{
2525
if (\is_array($iterable)) {
2626
$this->iterator = new ConcurrentArrayIterator($iterable);
@@ -36,7 +36,7 @@ public function __construct(iterable $iterable)
3636
$iterable = $iterable->getIterator();
3737
}
3838

39-
$queue = new QueueState();
39+
$queue = new QueueState($bufferSize);
4040
$this->iterator = new ConcurrentQueueIterator($queue);
4141

4242
async(static function () use ($queue, $iterable): void {

src/Internal/FlatMapOperation.php

+9-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public static function getStopMarker(): object
2323
* @param \Closure(T, int):iterable<R> $flatMap
2424
*/
2525
public function __construct(
26+
private readonly int $bufferSize,
2627
private readonly int $concurrency,
2728
private readonly bool $ordered,
2829
private readonly \Closure $flatMap
@@ -45,9 +46,15 @@ public function __invoke(ConcurrentIterator $source): ConcurrentIterator
4546
yield $item;
4647
}
4748
}
48-
})());
49+
})(), $this->bufferSize);
4950
}
5051

51-
return new ConcurrentFlatMapIterator($source, $this->concurrency, $this->ordered, $this->flatMap);
52+
return new ConcurrentFlatMapIterator(
53+
$source,
54+
$this->bufferSize,
55+
$this->concurrency,
56+
$this->ordered,
57+
$this->flatMap,
58+
);
5259
}
5360
}

src/Pipeline.php

+23-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Amp\Pipeline\Internal\ConcurrentIterableIterator;
1010
use Amp\Pipeline\Internal\ConcurrentMergedIterator;
1111
use Amp\Pipeline\Internal\FlatMapOperation;
12+
use Amp\Pipeline\Internal\IntermediateOperation;
1213
use Amp\Pipeline\Internal\Sequence;
1314
use Amp\Pipeline\Internal\SortOperation;
1415
use function Amp\delay;
@@ -123,10 +124,15 @@ private static function mapToConcurrentIterators(array $iterables): array
123124
return \array_map(static fn (iterable $pipeline) => self::fromIterable($pipeline)->getIterator(), $iterables);
124125
}
125126

127+
/** @var non-negative-int */
128+
private int $bufferSize = 0;
129+
130+
/** @var positive-int */
126131
private int $concurrency = 1;
127132

128133
private bool $ordered = true;
129134

135+
/** @var list<IntermediateOperation> */
130136
private array $intermediateOperations = [];
131137

132138
private bool $used = false;
@@ -146,6 +152,17 @@ public function __destruct()
146152
}
147153
}
148154

155+
public function buffer(int $bufferSize): self
156+
{
157+
if ($bufferSize < 0) {
158+
throw new \ValueError('Argument #1 ($bufferSize) must be non-negative, got ' . $bufferSize);
159+
}
160+
161+
$this->bufferSize = $bufferSize;
162+
163+
return $this;
164+
}
165+
149166
public function concurrent(int $concurrency): self
150167
{
151168
if ($concurrency < 1) {
@@ -345,7 +362,12 @@ public function flatMap(\Closure $flatMap): self
345362
throw new \Error('Pipeline consumption has already been started');
346363
}
347364

348-
$this->intermediateOperations[] = new FlatMapOperation($this->concurrency, $this->ordered, $flatMap);
365+
$this->intermediateOperations[] = new FlatMapOperation(
366+
$this->bufferSize,
367+
$this->concurrency,
368+
$this->ordered,
369+
$flatMap,
370+
);
349371

350372
/** @var self<R> */
351373
return $this;

0 commit comments

Comments
 (0)