Skip to content

Commit ad748aa

Browse files
committed
Update multiple rows ar once.
1 parent 6baf115 commit ad748aa

File tree

11 files changed

+372
-30
lines changed

11 files changed

+372
-30
lines changed
Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
namespace Flow\Doctrine\Bulk;
66

77
use Doctrine\DBAL\Connection;
8+
use Doctrine\DBAL\Exception;
9+
use Flow\Doctrine\Bulk\Exception\RuntimeException;
810
use Flow\Doctrine\Bulk\QueryFactory\DbalQueryFactory;
911

10-
final class BulkInsert
12+
final class Bulk
1113
{
1214
private QueryFactory $queryFactory;
1315

@@ -32,7 +34,7 @@ public static function create() : self
3234
* update_columns?: array<string>
3335
* } $insertOptions $insertOptions
3436
*
35-
* @throws \Doctrine\DBAL\Exception
37+
* @throws Exception|RuntimeException
3638
* @psalm-suppress DeprecatedMethod
3739
*/
3840
public function insert(Connection $connection, string $table, BulkData $bulkData, array $insertOptions = []) : void
@@ -46,14 +48,37 @@ public function insert(Connection $connection, string $table, BulkData $bulkData
4648
);
4749
}
4850

51+
/**
52+
* @param Connection $connection
53+
* @param string $table
54+
* @param BulkData $bulkData
55+
* @param array{
56+
* primary_key_columns?: array<string>,
57+
* update_columns?: array<string>
58+
* } $updateOptions $updateOptions
59+
*
60+
* @throws Exception|RuntimeException
61+
* @psalm-suppress DeprecatedMethod
62+
*/
63+
public function update(Connection $connection, string $table, BulkData $bulkData, array $updateOptions = []) : void
64+
{
65+
$tableDefinition = new TableDefinition($table, ...\array_values($connection->getSchemaManager()->listTableColumns($table)));
66+
67+
$connection->executeQuery(
68+
$this->queryFactory->update($connection->getDatabasePlatform(), $tableDefinition, $bulkData, $updateOptions),
69+
$bulkData->toSqlParameters(),
70+
$tableDefinition->dbalTypes($bulkData)
71+
);
72+
}
73+
4974
/**
5075
* @param Connection $connection
5176
* @param string $table
5277
* @param BulkData $bulkData
5378
*
54-
* @throws \Doctrine\DBAL\Exception
79+
* @throws Exception|RuntimeException
5580
*
56-
*@deprecated
81+
* @deprecated
5782
*/
5883
public function insertOrSkipOnConflict(Connection $connection, string $table, BulkData $bulkData) : void
5984
{
@@ -68,9 +93,9 @@ public function insertOrSkipOnConflict(Connection $connection, string $table, Bu
6893
* @param string $constraint
6994
* @param BulkData $bulkData
7095
*
71-
* @throws \Doctrine\DBAL\Exception
96+
* @throws Exception|RuntimeException
7297
*
73-
*@deprecated
98+
* @deprecated
7499
*/
75100
public function insertOrUpdateOnConstraintConflict(Connection $connection, string $table, string $constraint, BulkData $bulkData) : void
76101
{
@@ -86,9 +111,9 @@ public function insertOrUpdateOnConstraintConflict(Connection $connection, strin
86111
* @param BulkData $bulkData
87112
* @param array<string> $updateColumns
88113
*
89-
* @throws \Doctrine\DBAL\Exception
114+
* @throws Exception|RuntimeException
90115
*
91-
*@deprecated
116+
* @deprecated
92117
*/
93118
public function insertOrUpdateOnConflict(Connection $connection, string $table, array $conflictColumns, BulkData $bulkData, array $updateColumns = []) : void
94119
{

‎src/Flow/Doctrine/Bulk/Columns.php‎

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,30 @@ public function map(callable $callable) : array
7373

7474
return $columns;
7575
}
76+
77+
/**
78+
* @param string ...$columnNames
79+
*
80+
* @return bool
81+
*/
82+
public function has(string ...$columnNames) : bool
83+
{
84+
return \count(\array_unique(\array_merge($this->columns, $columnNames))) === \count($this->columns);
85+
}
86+
87+
/**
88+
* @throws RuntimeException
89+
*/
90+
public function without(string ...$columnNames) : self
91+
{
92+
$columns = [];
93+
94+
foreach ($this->columns as $column) {
95+
if (false === \in_array($column, $columnNames, true)) {
96+
$columns[] = $column;
97+
}
98+
}
99+
100+
return new self(...$columns);
101+
}
76102
}

‎src/Flow/Doctrine/Bulk/Dialect/Dialect.php‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,16 @@ interface Dialect
2020
* @return string
2121
*/
2222
public function prepareInsert(TableDefinition $table, BulkData $bulkData, array $insertOptions = []) : string;
23+
24+
/**
25+
* @param TableDefinition $table
26+
* @param BulkData $bulkData
27+
* @param array{
28+
* primary_key_columns?: array<string>,
29+
* update_columns?: array<string>
30+
* } $updateOptions $updateOptions
31+
*
32+
* @return string
33+
*/
34+
public function prepareUpdate(TableDefinition $table, BulkData $bulkData, array $updateOptions = []) : string;
2335
}

‎src/Flow/Doctrine/Bulk/Dialect/PostgreSQLDialect.php‎

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
namespace Flow\Doctrine\Bulk\Dialect;
66

7+
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
78
use Flow\Doctrine\Bulk\BulkData;
9+
use Flow\Doctrine\Bulk\Columns;
10+
use Flow\Doctrine\Bulk\Exception\RuntimeException;
811
use Flow\Doctrine\Bulk\TableDefinition;
912

1013
final class PostgreSQLDialect implements Dialect
@@ -31,8 +34,8 @@ public function prepareInsert(TableDefinition $table, BulkData $bulkData, array
3134
$bulkData->toSqlPlaceholders(),
3235
\implode(',', $insertOptions['conflict_columns']),
3336
(\array_key_exists('update_columns', $insertOptions) && \count($insertOptions['update_columns']))
34-
? $this->updatedSelectedColumns($insertOptions['update_columns'], $bulkData)
35-
: $this->updateAllColumns($bulkData)
37+
? $this->updatedSelectedColumns($insertOptions['update_columns'], $bulkData->columns())
38+
: $this->updateAllColumns($bulkData->columns())
3639
);
3740
}
3841

@@ -44,8 +47,8 @@ public function prepareInsert(TableDefinition $table, BulkData $bulkData, array
4447
$bulkData->toSqlPlaceholders(),
4548
$insertOptions['constraint'],
4649
(\array_key_exists('update_columns', $insertOptions) && \count($insertOptions['update_columns']))
47-
? $this->updatedSelectedColumns($insertOptions['update_columns'], $bulkData)
48-
: $this->updateAllColumns($bulkData)
50+
? $this->updatedSelectedColumns($insertOptions['update_columns'], $bulkData->columns())
51+
: $this->updateAllColumns($bulkData->columns())
4952
);
5053
}
5154

@@ -67,12 +70,46 @@ public function prepareInsert(TableDefinition $table, BulkData $bulkData, array
6770
}
6871

6972
/**
70-
* @param array<string> $updateColumns
73+
* @param TableDefinition $table
7174
* @param BulkData $bulkData
75+
* @param array{
76+
* primary_key_columns?: array<string>,
77+
* update_columns?: array<string>
78+
* } $updateOptions $updateOptions
79+
*
80+
* @throws RuntimeException
81+
*
82+
* @return string
83+
*/
84+
public function prepareUpdate(TableDefinition $table, BulkData $bulkData, array $updateOptions = []) : string
85+
{
86+
if (false === \array_key_exists('primary_key_columns', $updateOptions)) {
87+
throw new RuntimeException('primary_key_columns option is required for update.');
88+
}
89+
90+
if (false === $bulkData->columns()->has(...$updateOptions['primary_key_columns'])) {
91+
throw new RuntimeException('All columns from primary_key_columns must be in bulk data columns.');
92+
}
93+
94+
return \sprintf(
95+
'UPDATE %s as existing_table SET %s FROM (VALUES %s) as excluded (%s) WHERE %s',
96+
$table->name(),
97+
(\array_key_exists('update_columns', $updateOptions) && \count($updateOptions['update_columns']))
98+
? $this->updatedSelectedColumns($updateOptions['update_columns'], $bulkData->columns()->without(...$updateOptions['primary_key_columns']))
99+
: $this->updateAllColumns($bulkData->columns()->without(...$updateOptions['primary_key_columns'])),
100+
$table->toSqlCastedPlaceholders($bulkData, new PostgreSQLPlatform()),
101+
$bulkData->columns()->concat(','),
102+
$this->updatedIndexColumns($updateOptions['primary_key_columns'])
103+
);
104+
}
105+
106+
/**
107+
* @param array<string> $updateColumns
108+
* @param Columns $columns
72109
*
73110
* @return string
74111
*/
75-
private function updatedSelectedColumns(array $updateColumns, BulkData $bulkData) : string
112+
private function updatedSelectedColumns(array $updateColumns, Columns $columns) : string
76113
{
77114
/**
78115
* https://www.postgresql.org/docs/9.5/sql-insert.html#SQL-ON-CONFLICT
@@ -81,15 +118,25 @@ private function updatedSelectedColumns(array $updateColumns, BulkData $bulkData
81118
*/
82119
return \count($updateColumns)
83120
? \implode(',', \array_map(fn (string $column) : string => "{$column} = excluded.{$column}", $updateColumns))
84-
: $this->updateAllColumns($bulkData);
121+
: $this->updateAllColumns($columns);
85122
}
86123

87124
/**
88-
* @param BulkData $bulkData
125+
* @param array<string> $updateColumns
126+
*
127+
* @return string
128+
*/
129+
private function updatedIndexColumns(array $updateColumns) : string
130+
{
131+
return \implode(' AND ', \array_map(fn (string $column) : string => "existing_table.{$column} = excluded.{$column}", $updateColumns));
132+
}
133+
134+
/**
135+
* @param Columns $columns
89136
*
90137
* @return string
91138
*/
92-
private function updateAllColumns(BulkData $bulkData) : string
139+
private function updateAllColumns(Columns $columns) : string
93140
{
94141
/**
95142
* https://www.postgresql.org/docs/9.5/sql-insert.html#SQL-ON-CONFLICT
@@ -98,7 +145,7 @@ private function updateAllColumns(BulkData $bulkData) : string
98145
*/
99146
return \implode(
100147
',',
101-
$bulkData->columns()->map(
148+
$columns->map(
102149
fn (string $column) : string => "{$column} = excluded.{$column}"
103150
)
104151
);

‎src/Flow/Doctrine/Bulk/QueryFactory.php‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,17 @@ interface QueryFactory
2222
* @return string
2323
*/
2424
public function insert(AbstractPlatform $platform, TableDefinition $table, BulkData $bulkData, array $insertOptions = []) : string;
25+
26+
/**
27+
* @param AbstractPlatform $platform
28+
* @param TableDefinition $table
29+
* @param BulkData $bulkData
30+
* @param array{
31+
* primary_key_columns?: array<string>,
32+
* update_columns?: array<string>
33+
* } $updateOptions $updateOptions
34+
*
35+
* @return string
36+
*/
37+
public function update(AbstractPlatform $platform, TableDefinition $table, BulkData $bulkData, array $updateOptions = []) : string;
2538
}

‎src/Flow/Doctrine/Bulk/QueryFactory/DbalQueryFactory.php‎

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Doctrine\DBAL\Platforms\AbstractPlatform;
88
use Flow\Doctrine\Bulk\BulkData;
99
use Flow\Doctrine\Bulk\DbalPlatform;
10+
use Flow\Doctrine\Bulk\Exception\RuntimeException;
1011
use Flow\Doctrine\Bulk\QueryFactory;
1112
use Flow\Doctrine\Bulk\TableDefinition;
1213

@@ -23,12 +24,30 @@ class DbalQueryFactory implements QueryFactory
2324
* update_columns?: array<string>
2425
* } $insertOptions $insertOptions
2526
*
26-
*@throws \Flow\Doctrine\Bulk\Exception\RuntimeException
27+
*@throws RuntimeException
2728
*
2829
* @return string
2930
*/
3031
public function insert(AbstractPlatform $platform, TableDefinition $table, BulkData $bulkData, array $insertOptions = []) : string
3132
{
3233
return (new DbalPlatform($platform))->dialect()->prepareInsert($table, $bulkData, $insertOptions);
3334
}
35+
36+
/**
37+
* @param AbstractPlatform $platform
38+
* @param TableDefinition $table
39+
* @param BulkData $bulkData
40+
* @param array{
41+
* primary_key_columns?: array<string>,
42+
* update_columns?: array<string>
43+
* } $updateOptions $updateOptions
44+
*
45+
* @throws RuntimeException
46+
*
47+
* @return string
48+
*/
49+
public function update(AbstractPlatform $platform, TableDefinition $table, BulkData $bulkData, array $updateOptions = []) : string
50+
{
51+
return (new DbalPlatform($platform))->dialect()->prepareUpdate($table, $bulkData, $updateOptions);
52+
}
3453
}

‎src/Flow/Doctrine/Bulk/TableDefinition.php‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public function name() : string
3434
/**
3535
* @param BulkData $bulkData
3636
*
37-
*@throws RuntimeException
37+
* @throws RuntimeException
3838
*
3939
* @return array<string, string>
4040
*/
@@ -88,6 +88,9 @@ function (int $index, array $row) use ($abstractPlatform) : string {
8888
);
8989
}
9090

91+
/**
92+
* @throws RuntimeException
93+
*/
9194
private function getDbalColumn(string $columnName) : Column
9295
{
9396
$dbColumnNames = \array_filter($this->columns, fn (Column $dbColumn) : bool => $dbColumn->getName() === $columnName);

0 commit comments

Comments
 (0)