Skip to content

Conversation

@norberttech
Copy link
Member

Change Log

Added

  • NativeEntryFactory structures detection
  • Metadata to StructureEntry Definition

Fixed

  • JsonLoader closing not only json streams

Changed

  • Schema Formatter - support for structures
  • Parquet Loader - simplified with support for Structure Entry
  • Avro Loader - simplified with support for Structure Entry
  • Parquet Extractor - default options

Removed

Deprecated

Security


Description

Uhm this turned out to be way bigger than I initially expected, but it's all connected and touching one thing, affected few others...

All of this in order to make those two scenarios possible:

<?php

use Flow\ETL\DSL\CSV;
use Flow\ETL\DSL\From;
use Flow\ETL\DSL\Json;
use Flow\ETL\DSL\Parquet;
use Flow\ETL\Filesystem\SaveMode;
use Flow\ETL\Flow;
use Flow\ETL\DSL\Avro;
use function Flow\ETL\DSL\ref;

include __DIR__ . '/../../vendor/autoload.php';

$faker = Faker\Factory::create();
$orders = array_map(
    static fn (int $i) : array => [
        'order_id' => $faker->uuid,
        'created_at' => $faker->dateTimeThisYear->format(\DateTimeInterface::RFC3339),
        'updated_at' => $faker->dateTimeThisMonth->format(\DateTimeInterface::RFC3339),
        'total_price' => $faker->randomFloat(2, 0, 500),
        'discount' => $faker->randomFloat(2, 0, 50),
        'customer' => [
            'name' => $faker->firstName,
            'last_name' => $faker->lastName,
            'email' => $faker->email,
        ],
        'address' => [
            'street' => $faker->streetAddress,
            'city' => $faker->city,
            'zip' => $faker->postcode,
            'country' => $faker->country,
        ],
        'notes' => \array_map(
            static fn ($i) => $faker->sentence,
            \range(1, $faker->numberBetween(1, 5))
        )
    ],
    range(1, 10)
);

(new Flow())
    ->read(From::array($orders))
    ->mode(SaveMode::Overwrite)
    ->write(Parquet::to(__DIR__ . '/orders_flow.parquet', rows_in_group: 20_000))
    ->write(Json::to(__DIR__ . '/orders_flow.json'))
    ->write(Avro::to(__DIR__ . '/orders_flow.avro'))
    ->withEntry('order_id', ref('order_id')->cast('string'))
    ->withEntry('customer', ref('customer')->cast('string'))
    ->withEntry('address', ref('customer')->cast('string'))
    ->withEntry('notes', ref('customer')->cast('string'))
    ->write(CSV::to(__DIR__ . '/orders_flow.csv'))
    ->run()
;
<?php

use Flow\ETL\DSL\Avro;
use Flow\ETL\DSL\CSV;
use Flow\ETL\DSL\Json;
use Flow\ETL\DSL\Parquet;
use Flow\ETL\DSL\To;
use Flow\ETL\Flow;
use Flow\ETL\Loader\StreamLoader\Output;

include __DIR__ . '/../../vendor/autoload.php';

echo "JSON: \n";
(new Flow())
    ->read(Json::from(__DIR__ . '/orders_flow.json', rows_in_batch: 10))
    ->limit(10)
    ->write(To::output(true, output: Output::rows_and_schema))
->run();

echo "\n------------------\n";

echo "Avro: \n";

(new Flow())
    ->read(Avro::from(__DIR__ . '/orders_flow.avro', rows_in_batch: 10))
    ->limit(10)
    ->write(To::output(true, output: Output::rows_and_schema))
    ->run();

echo "\n------------------\n";

echo "Parquet: \n";

(new Flow())
    ->read(Parquet::from(__DIR__ . '/orders_flow.parquet', rows_in_batch: 10))
    ->limit(10)
    ->write(To::output(true, output: Output::rows_and_schema))
    ->run();

echo "\n------------------\n";

echo "CSV: \n";

(new Flow())
    ->read(CSV::from(__DIR__ . '/orders_flow.csv', rows_in_batch: 10))
    ->limit(10)
    ->write(To::output(true, output: Output::rows_and_schema))
    ->run();

Output:

JSON: 
+----------+----------------------+----------------------+-------------+----------+----------------------+----------------------+----------------------+
| order_id |           created_at |           updated_at | total_price | discount |             customer |              address |                notes |
+----------+----------------------+----------------------+-------------+----------+----------------------+----------------------+----------------------+
|       [] | 2023-08-18T01:43:56+ | 2023-09-19T00:17:01+ |      357.32 |    17.88 | {"name":"Kylie","las | {"street":"8964 Joan | ["Nihil nihil reicie |
|       [] | 2023-03-01T07:11:43+ | 2023-10-08T19:45:52+ |      366.12 |    16.77 | {"name":"Josefina"," | {"street":"840 Lang  | ["Nobis laudantium n |
|       [] | 2023-09-05T03:52:54+ | 2023-09-17T21:16:37+ |      263.06 |    31.22 | {"name":"Casimir","l | {"street":"6134 Melv | ["Ab eligendi molest |
|       [] | 2023-10-13T17:33:07+ | 2023-09-23T03:08:11+ |       81.18 |    29.39 | {"name":"Jeanette"," | {"street":"984 Cayla | ["Modi iure atque vo |
|       [] | 2023-10-06T02:10:35+ | 2023-09-24T15:16:01+ |      364.05 |    18.78 | {"name":"Grayson","l | {"street":"43591 Lit | ["Qui aut voluptatem |
|       [] | 2023-03-07T16:08:08+ | 2023-09-25T02:27:29+ |      389.21 |    34.77 | {"name":"Garfield"," | {"street":"17268 Wil | ["Iure dolorem quas  |
|       [] | 2023-05-31T11:37:15+ | 2023-09-27T17:00:01+ |       93.31 |    25.44 | {"name":"Marques","l | {"street":"6963 Gay  | ["Laboriosam dolorem |
|       [] | 2023-03-24T16:24:06+ | 2023-09-20T21:10:58+ |       47.78 |     43.6 | {"name":"Twila","las | {"street":"9071 Era  | ["Cum quia dolor rer |
|       [] | 2023-02-21T06:56:54+ | 2023-09-24T23:46:32+ |       256.6 |    34.86 | {"name":"Ollie","las | {"street":"831 Nicol | ["Recusandae consect |
|       [] | 2023-10-13T19:49:59+ | 2023-09-18T14:45:44+ |      171.01 |     4.88 | {"name":"King","last | {"street":"760 Ferry | ["Voluptates eveniet |
+----------+----------------------+----------------------+-------------+----------+----------------------+----------------------+----------------------+
10 rows

schema
|-- order_id: Flow\ETL\Row\Entry\ArrayEntry (nullable = false)
|-- created_at: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- updated_at: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- total_price: Flow\ETL\Row\Entry\FloatEntry (nullable = false)
|-- discount: Flow\ETL\Row\Entry\FloatEntry (nullable = false)
|-- customer: Flow\ETL\Row\Entry\StructureEntry (nullable = false)
|    |-- name: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- last_name: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- email: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- address: Flow\ETL\Row\Entry\StructureEntry (nullable = false)
|    |-- street: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- city: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- zip: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- country: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- notes: Flow\ETL\Row\Entry\ListEntry (nullable = false)

------------------
Avro: 
+----------------------+----------------------+----------------------+-----------------+-----------------+----------------------+----------------------+----------------------+
|             order_id |           created_at |           updated_at |     total_price |        discount |             customer |              address |                notes |
+----------------------+----------------------+----------------------+-----------------+-----------------+----------------------+----------------------+----------------------+
| 127dd8e7-6b54-36c5-9 | 2023-08-18T01:43:56+ | 2023-09-19T00:17:01+ | 357.32000732422 | 17.879999160767 | {"name":"Kylie","las | {"street":"8964 Joan | ["Nihil nihil reicie |
| 0ca30307-9a5b-30fb-a | 2023-03-01T07:11:43+ | 2023-10-08T19:45:52+ | 366.11999511719 | 16.770000457764 | {"name":"Josefina"," | {"street":"840 Lang  | ["Nobis laudantium n |
| fe7dda3b-0026-3955-8 | 2023-09-05T03:52:54+ | 2023-09-17T21:16:37+ | 263.05999755859 | 31.219999313354 | {"name":"Casimir","l | {"street":"6134 Melv | ["Ab eligendi molest |
| 9b6deceb-f28e-3d5f-9 | 2023-10-13T17:33:07+ | 2023-09-23T03:08:11+ | 81.180000305176 | 29.389999389648 | {"name":"Jeanette"," | {"street":"984 Cayla | ["Modi iure atque vo |
| 82da96a2-440a-3075-a | 2023-10-06T02:10:35+ | 2023-09-24T15:16:01+ | 364.04998779297 | 18.780000686646 | {"name":"Grayson","l | {"street":"43591 Lit | ["Qui aut voluptatem |
| e16bf5d1-b403-3b01-8 | 2023-03-07T16:08:08+ | 2023-09-25T02:27:29+ | 389.20999145508 | 34.770000457764 | {"name":"Garfield"," | {"street":"17268 Wil | ["Iure dolorem quas  |
| 737f39a9-5d30-33b5-8 | 2023-05-31T11:37:15+ | 2023-09-27T17:00:01+ | 93.309997558594 | 25.440000534058 | {"name":"Marques","l | {"street":"6963 Gay  | ["Laboriosam dolorem |
| ccb5e191-9e10-3d46-a | 2023-03-24T16:24:06+ | 2023-09-20T21:10:58+ | 47.779998779297 | 43.599998474121 | {"name":"Twila","las | {"street":"9071 Era  | ["Cum quia dolor rer |
| 19a158ad-10e3-39a1-8 | 2023-02-21T06:56:54+ | 2023-09-24T23:46:32+ | 256.60000610352 | 34.860000610352 | {"name":"Ollie","las | {"street":"831 Nicol | ["Recusandae consect |
| e8b5468e-0efc-3d3e-b | 2023-10-13T19:49:59+ | 2023-09-18T14:45:44+ | 171.00999450684 | 4.8800001144409 | {"name":"King","last | {"street":"760 Ferry | ["Voluptates eveniet |
+----------------------+----------------------+----------------------+-----------------+-----------------+----------------------+----------------------+----------------------+
10 rows

schema
|-- order_id: Flow\ETL\Row\Entry\UuidEntry (nullable = false)
|-- created_at: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- updated_at: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- total_price: Flow\ETL\Row\Entry\FloatEntry (nullable = false)
|-- discount: Flow\ETL\Row\Entry\FloatEntry (nullable = false)
|-- customer: Flow\ETL\Row\Entry\StructureEntry (nullable = false)
|    |-- name: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- last_name: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- email: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- address: Flow\ETL\Row\Entry\StructureEntry (nullable = false)
|    |-- street: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- city: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- zip: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- country: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- notes: Flow\ETL\Row\Entry\ListEntry (nullable = false)

------------------
Parquet: 
+----------------------+----------------------+----------------------+-----------------+-----------------+----------------------+----------------------+----------------------+
|             order_id |           created_at |           updated_at |     total_price |        discount |             customer |              address |                notes |
+----------------------+----------------------+----------------------+-----------------+-----------------+----------------------+----------------------+----------------------+
| 127dd8e7-6b54-36c5-9 | 2023-08-18T01:43:56+ | 2023-09-19T00:17:01+ | 357.32000732422 | 17.879999160767 | {"name":"Kylie","las | {"street":"8964 Joan | ["Nihil nihil reicie |
| 0ca30307-9a5b-30fb-a | 2023-03-01T07:11:43+ | 2023-10-08T19:45:52+ | 366.11999511719 | 16.770000457764 | {"name":"Josefina"," | {"street":"840 Lang  | ["Nobis laudantium n |
| fe7dda3b-0026-3955-8 | 2023-09-05T03:52:54+ | 2023-09-17T21:16:37+ | 263.05999755859 | 31.219999313354 | {"name":"Casimir","l | {"street":"6134 Melv | ["Ab eligendi molest |
| 9b6deceb-f28e-3d5f-9 | 2023-10-13T17:33:07+ | 2023-09-23T03:08:11+ | 81.180000305176 | 29.389999389648 | {"name":"Jeanette"," | {"street":"984 Cayla | ["Modi iure atque vo |
| 82da96a2-440a-3075-a | 2023-10-06T02:10:35+ | 2023-09-24T15:16:01+ | 364.04998779297 | 18.780000686646 | {"name":"Grayson","l | {"street":"43591 Lit | ["Qui aut voluptatem |
| e16bf5d1-b403-3b01-8 | 2023-03-07T16:08:08+ | 2023-09-25T02:27:29+ | 389.20999145508 | 34.770000457764 | {"name":"Garfield"," | {"street":"17268 Wil | ["Iure dolorem quas  |
| 737f39a9-5d30-33b5-8 | 2023-05-31T11:37:15+ | 2023-09-27T17:00:01+ | 93.309997558594 | 25.440000534058 | {"name":"Marques","l | {"street":"6963 Gay  | ["Laboriosam dolorem |
| ccb5e191-9e10-3d46-a | 2023-03-24T16:24:06+ | 2023-09-20T21:10:58+ | 47.779998779297 | 43.599998474121 | {"name":"Twila","las | {"street":"9071 Era  | ["Cum quia dolor rer |
| 19a158ad-10e3-39a1-8 | 2023-02-21T06:56:54+ | 2023-09-24T23:46:32+ | 256.60000610352 | 34.860000610352 | {"name":"Ollie","las | {"street":"831 Nicol | ["Recusandae consect |
| e8b5468e-0efc-3d3e-b | 2023-10-13T19:49:59+ | 2023-09-18T14:45:44+ | 171.00999450684 | 4.8800001144409 | {"name":"King","last | {"street":"760 Ferry | ["Voluptates eveniet |
+----------------------+----------------------+----------------------+-----------------+-----------------+----------------------+----------------------+----------------------+
10 rows

schema
|-- order_id: Flow\ETL\Row\Entry\UuidEntry (nullable = false)
|-- created_at: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- updated_at: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- total_price: Flow\ETL\Row\Entry\FloatEntry (nullable = false)
|-- discount: Flow\ETL\Row\Entry\FloatEntry (nullable = false)
|-- customer: Flow\ETL\Row\Entry\StructureEntry (nullable = false)
|    |-- name: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- last_name: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- email: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- address: Flow\ETL\Row\Entry\StructureEntry (nullable = false)
|    |-- street: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- city: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- zip: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|    |-- country: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- notes: Flow\ETL\Row\Entry\ListEntry (nullable = false)

------------------
CSV: 
+----------------------+----------------------+----------------------+-------------+----------+----------------------+----------------------+----------------------+
|             order_id |           created_at |           updated_at | total_price | discount |             customer |              address |                notes |
+----------------------+----------------------+----------------------+-------------+----------+----------------------+----------------------+----------------------+
| 127dd8e7-6b54-36c5-9 | 2023-08-18T01:43:56+ | 2023-09-19T00:17:01+ |      357.32 |    17.88 | {"name":"Kylie","las | {"name":"Kylie","las | {"name":"Kylie","las |
| 0ca30307-9a5b-30fb-a | 2023-03-01T07:11:43+ | 2023-10-08T19:45:52+ |      366.12 |    16.77 | {"name":"Josefina"," | {"name":"Josefina"," | {"name":"Josefina"," |
| fe7dda3b-0026-3955-8 | 2023-09-05T03:52:54+ | 2023-09-17T21:16:37+ |      263.06 |    31.22 | {"name":"Casimir","l | {"name":"Casimir","l | {"name":"Casimir","l |
| 9b6deceb-f28e-3d5f-9 | 2023-10-13T17:33:07+ | 2023-09-23T03:08:11+ |       81.18 |    29.39 | {"name":"Jeanette"," | {"name":"Jeanette"," | {"name":"Jeanette"," |
| 82da96a2-440a-3075-a | 2023-10-06T02:10:35+ | 2023-09-24T15:16:01+ |      364.05 |    18.78 | {"name":"Grayson","l | {"name":"Grayson","l | {"name":"Grayson","l |
| e16bf5d1-b403-3b01-8 | 2023-03-07T16:08:08+ | 2023-09-25T02:27:29+ |      389.21 |    34.77 | {"name":"Garfield"," | {"name":"Garfield"," | {"name":"Garfield"," |
| 737f39a9-5d30-33b5-8 | 2023-05-31T11:37:15+ | 2023-09-27T17:00:01+ |       93.31 |    25.44 | {"name":"Marques","l | {"name":"Marques","l | {"name":"Marques","l |
| ccb5e191-9e10-3d46-a | 2023-03-24T16:24:06+ | 2023-09-20T21:10:58+ |       47.78 |     43.6 | {"name":"Twila","las | {"name":"Twila","las | {"name":"Twila","las |
| 19a158ad-10e3-39a1-8 | 2023-02-21T06:56:54+ | 2023-09-24T23:46:32+ |       256.6 |    34.86 | {"name":"Ollie","las | {"name":"Ollie","las | {"name":"Ollie","las |
| e8b5468e-0efc-3d3e-b | 2023-10-13T19:49:59+ | 2023-09-18T14:45:44+ |      171.01 |     4.88 | {"name":"King","last | {"name":"King","last | {"name":"King","last |
+----------------------+----------------------+----------------------+-------------+----------+----------------------+----------------------+----------------------+
10 rows

schema
|-- order_id: Flow\ETL\Row\Entry\UuidEntry (nullable = false)
|-- created_at: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- updated_at: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- total_price: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- discount: Flow\ETL\Row\Entry\StringEntry (nullable = false)
|-- customer: Flow\ETL\Row\Entry\JsonEntry (nullable = false)
|-- address: Flow\ETL\Row\Entry\JsonEntry (nullable = false)
|-- notes: Flow\ETL\Row\Entry\JsonEntry (nullable = false)

@norberttech norberttech merged commit 724b6aa into flow-php:1.x Oct 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant