Skip to content

Commit

Permalink
Feat: Enable ReactProcessor to process regular iterables (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek authored Nov 29, 2023
1 parent 30d45fd commit 2924577
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Table of Contents
- [Chaining extractors / transformers / loaders](doc/advanced_usage.md#chaining-extractors--transformers--loaders)
- [Reading from STDIN / Writing to STDOUT](doc/advanced_usage.md#reading-from-stdin--writing-to-stdout)
- [Instantiators](doc/advanced_usage.md#instantiators)
- [Using React Streams (ReactPHP support)](doc/advanced_usage.md#using-react-streams-experimental)
- [Using ReactPHP](doc/advanced_usage.md#using-reactphp-experimental)
- [Recipes](doc/recipes.md)
- [Contributing](#contribute)
- [License](#license)
Expand Down
15 changes: 13 additions & 2 deletions doc/advanced_usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,21 @@ $report = withRecipe(new LoggerRecipe($logger))
->process(['foo', 'bar']);
```

Using React streams (experimental)
Using ReactPHP (experimental)
----------------------------------

You can plug your ETL dataflows to any [React Stream](https://github.com/reactphp/stream).
By using the `ReactStreamProcessor` recipe, you can use ReactPHP as the processor of your data.

> [!IMPORTANT]
> `react/stream` and `react/event-loop` are required for this to work.
With this processor, you can extract data from an `iterable` or a [React Stream](https://github.com/reactphp/stream):
each item will be iterated within a [Loop tick](https://github.com/reactphp/event-loop#futuretick) instead of a blocking `while` loop.

This allows you, for example, to:
- [Periodically](https://github.com/reactphp/event-loop#addperiodictimer) perform some stuff (with `Loop::addPeriodicTimer()`)
- Handle [POSIX signals](https://github.com/reactphp/event-loop#addsignal) (with `Loop::addSignal()`)
- Use [React streams](https://github.com/reactphp/stream), like a TCP / HTTP server, a Redis / MySQL connection, or a file stream, for an event-oriented approach.

Example with a TCP server:

Expand Down
16 changes: 14 additions & 2 deletions src/Extractor/ReactStreamExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,29 @@
namespace BenTools\ETL\Extractor;

use BenTools\ETL\EtlState;
use BenTools\ETL\Iterator\IteratorStream;
use React\Stream\ReadableStreamInterface;

final readonly class ReactStreamExtractor implements ExtractorInterface
{
/**
* @param iterable<mixed>|ReadableStreamInterface|null $stream
*/
public function __construct(
public ?ReadableStreamInterface $stream = null,
public ReadableStreamInterface|iterable|null $stream = null,
) {
}

public function extract(EtlState $state): ReadableStreamInterface
{
return $state->source ?? $this->stream;
return $this->ensureStream($state->source ?? $this->stream);
}

/**
* @param iterable<mixed>|ReadableStreamInterface $items
*/
private function ensureStream(iterable|ReadableStreamInterface $items): ReadableStreamInterface
{
return $items instanceof ReadableStreamInterface ? $items : new IteratorStream($items);
}
}
56 changes: 56 additions & 0 deletions src/Iterator/ConsumableIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Iterator;

use Iterator;
use OutOfRangeException;

use function BenTools\ETL\iterable_to_iterator;

/**
* @internal
*
* @template T
*/
final class ConsumableIterator
{
private readonly Iterator $iterator;
private bool $started = false;
private bool $ended = false;

/**
* @param iterable<T> $items
*/
public function __construct(iterable $items)
{
$this->iterator = iterable_to_iterator($items);
}

public function consume(): mixed
{
if ($this->ended) {
throw new OutOfRangeException('This iterator has no more items.'); // @codeCoverageIgnore
}

if (!$this->started) {
$this->iterator->rewind();
$this->started = true;
}

$value = $this->iterator->current();
$this->iterator->next();

if (!$this->iterator->valid()) {
$this->ended = true;
}

return $value;
}

public function isComplete(): bool
{
return $this->ended;
}
}
82 changes: 82 additions & 0 deletions src/Iterator/IteratorStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Iterator;

use Evenement\EventEmitterTrait;
use React\EventLoop\Loop;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

/**
* @internal
*
* @template T
*/
final class IteratorStream implements ReadableStreamInterface
{
use EventEmitterTrait;

/**
* @var ConsumableIterator<T>
*/
public readonly ConsumableIterator $iterator;
public bool $paused = false;

/**
* @param iterable<T> $items
*/
public function __construct(iterable $items)
{
$this->iterator = new ConsumableIterator($items);
$this->resume();
}

public function isReadable(): bool
{
return !$this->iterator->isComplete();
}

public function pause(): void
{
$this->paused = true;
}

public function resume(): void
{
$this->paused = false;
$this->process();
}

private function process(): void
{
if (!$this->iterator->isComplete()) {
Loop::futureTick(function () {
if (!$this->paused) {
$this->emit('data', [$this->iterator->consume()]);
}
$this->process();
});
} else {
$this->emit('end');
$this->close();
}
}

/**
* @param array<string, mixed> $options
*/
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface
{
Util::pipe($this, $dest, $options);

return $dest;
}

public function close(): void
{
$this->emit('close');
}
}
15 changes: 15 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use BenTools\ETL\Recipe\Recipe;
use BenTools\ETL\Transformer\ChainTransformer;
use BenTools\ETL\Transformer\TransformerInterface;
use Iterator;

use function array_fill_keys;
use function array_intersect_key;
Expand All @@ -40,6 +41,20 @@ function array_fill_from(array $keys, array $values, array ...$extraValues): arr
return array_intersect_key($values, $defaults);
}

/**
* @internal
*
* @template T
*
* @param iterable<T> $items
*
* @return Iterator<T>
*/
function iterable_to_iterator(iterable $items): Iterator
{
return $items instanceof Iterator ? $items : (fn () => yield from $items)();
}

function extractFrom(ExtractorInterface|callable $extractor, ExtractorInterface|callable ...$extractors): EtlExecutor
{
return (new EtlExecutor())->extractFrom(...func_get_args());
Expand Down
19 changes: 19 additions & 0 deletions tests/Behavior/ReactStreamProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@
expect($state->output)->toBe(['banana', 'strawberry', 'raspberry']);
});

it('allows iterables, which will be converted to readable streams', function () {
$fruits = ['banana', 'apple', 'strawberry', 'raspberry', 'peach'];
$executor = useReact()
->onExtract(function (ExtractEvent $event) {
match ($event->item) {
'apple' => $event->state->skip(),
'peach' => $event->state->stop(),
default => null,
};
})
;

// When
$state = $executor->process($fruits);

// Then
expect($state->output)->toBe(['banana', 'strawberry', 'raspberry']);
});

it('throws ExtractExceptions', function () {
// Given
$stream = new ReadableResourceStream(fopen('php://temp', 'rb'));
Expand Down
38 changes: 38 additions & 0 deletions tests/Stubs/WritableStreamStub.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Tests\Stubs;

use Evenement\EventEmitterTrait;
use React\Stream\WritableStreamInterface;

final class WritableStreamStub implements WritableStreamInterface
{
use EventEmitterTrait;

/**
* @var list<mixed>
*/
public array $data = [];

public function isWritable(): bool
{
return true;
}

public function write($data): bool
{
$this->data[] = $data;

return true;
}

public function end($data = null): void
{
}

public function close(): void
{
}
}
58 changes: 58 additions & 0 deletions tests/Unit/Iterator/IteratorStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Tests\Unit\Iterator;

use BenTools\ETL\Iterator\IteratorStream;
use BenTools\ETL\Tests\Stubs\WritableStreamStub;
use React\EventLoop\Factory;
use React\EventLoop\Loop;

use function beforeEach;
use function expect;

beforeEach(fn () => Loop::set(Factory::create()));

it('is readable during iteration', function () {
$items = ['foo', 'bar'];
$stream = new IteratorStream($items);

for ($i = 0; $i < 2; ++$i) {
expect($stream->isReadable())->toBeTrue();
$stream->iterator->consume();
}

expect($stream->isReadable())->toBeFalse();
Loop::stop();
});

it('can be paused and resumed', function () {
$stream = new IteratorStream([]);
expect($stream->paused)->toBeFalse();

// When
$stream->pause();

// Then
expect($stream->paused)->toBeTrue();

// When
$stream->resume();

// Then
expect($stream->paused)->toBeFalse();
});

it('can pipe data', function () {
$items = ['foo', 'bar', 'baz'];
$stream = new IteratorStream($items);
$dest = new WritableStreamStub();
$stream->pipe($dest);

// When
Loop::run();

// Then
expect($dest->data)->toBe($items);
});

0 comments on commit 2924577

Please sign in to comment.