Skip to content

Commit 849f780

Browse files
committed
Bulk Insert bulkInsert()
Return ES data
1 parent a4011ba commit 849f780

File tree

4 files changed

+51
-2
lines changed

4 files changed

+51
-2
lines changed

src/Connection.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -535,15 +535,15 @@ public function delete($query, $bindings = [])
535535
*
536536
* @throws BulkInsertQueryException
537537
*/
538-
public function insert($query, $bindings = []): Elasticsearch
538+
public function insert($query, $bindings = [], $continueWithErrors = false): Elasticsearch
539539
{
540540
$result = $this->run(
541541
$this->addClientParams($query),
542542
$bindings,
543543
$this->connection->bulk(...)
544544
);
545545

546-
if (! empty($result['errors'])) {
546+
if (! $continueWithErrors && ! empty($result['errors'])) {
547547
throw new BulkInsertQueryException($result);
548548
}
549549

src/Eloquent/Builder.php

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class Builder extends BaseEloquentBuilder
7070
'bucket',
7171
'bucketaggregation',
7272
'openpit',
73+
'bulkinsert',
7374
];
7475

7576
/**

src/Query/Builder.php

+8
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,14 @@ public function insert(array $values): MetaDTO|bool
686686
return $this->processor->processInsert($this, $this->connection->insert($this->grammar->compileInsert($this, $values)));
687687
}
688688

689+
/**
690+
* Same as insert, but returns the ES data of the operation.
691+
*/
692+
public function bulkInsert(array $values): array
693+
{
694+
return $this->processor->processBulkInsert($this, $this->connection->insert($this->grammar->compileInsert($this, $values), [], true));
695+
}
696+
689697
/**
690698
* {@inheritdoc}
691699
*/

src/Query/Processor.php

+40
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,46 @@ public function processInsert(Builder $query, Elasticsearch $result): bool
427427
return ! $this->getRawResponse()['errors'];
428428
}
429429

430+
public function processBulkInsert(Builder $query, Elasticsearch $result): array
431+
{
432+
$this->rawResponse = $result;
433+
$this->query = $query;
434+
435+
$process = $result->asArray();
436+
437+
$outcome = [
438+
'hasErrors' => $process['errors'],
439+
'total' => count($process['items']),
440+
'took' => $process['took'],
441+
'success' => 0,
442+
'created' => 0,
443+
'modified' => 0,
444+
'failed' => 0,
445+
'errors' => [],
446+
];
447+
if (! empty($process['items'])) {
448+
foreach ($process['items'] as $item) {
449+
if (! empty($item['index']['error'])) {
450+
$outcome['errors'][] = [
451+
'id' => $item['index']['_id'],
452+
'type' => $item['index']['error']['type'],
453+
'reason' => $item['index']['error']['reason'],
454+
];
455+
$outcome['failed']++;
456+
} else {
457+
$outcome['success']++;
458+
if ($item['index']['status'] == 201) {
459+
$outcome['created']++;
460+
} else {
461+
$outcome['modified']++;
462+
}
463+
}
464+
}
465+
}
466+
467+
return $outcome;
468+
}
469+
430470
public function processRaw($query, $response)
431471
{
432472
$this->rawResponse = $response;

0 commit comments

Comments
 (0)