Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private function sanitizeConfig(): void
'cert_password' => null,
],
'options' => [
'track_total_hits' => null, // null -> skips - max 10k by default; true -> full values; false -> no hit tracking, returns -1; int -> max hit tracking val, ex 20000
'bypass_map_validation' => false, // This skips the safety checks for Elastic Specific queries.
'logging' => false,
'ssl_verification' => true,
Expand Down Expand Up @@ -162,6 +163,8 @@ public function setOptions(): void
{
$this->allowIdSort = $this->config['options']['allow_id_sort'] ?? false;

$this->options()->add('track_total_hits', $this->config['options']['track_total_hits'] ?? null);

$this->options()->add('bypass_map_validation', $this->config['options']['bypass_map_validation'] ?? null);

if (isset($this->config['options']['ssl_verification'])) {
Expand Down
46 changes: 42 additions & 4 deletions src/Eloquent/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class Builder extends BaseEloquentBuilder
'bucketaggregation',
'openpit',
'bulkinsert',
'createonly',
];

/**
Expand All @@ -95,7 +96,7 @@ public function setModel($model): static
public function newModelInstance($attributes = [])
{
$model = $this->model->newInstance($attributes)->setConnection(
$this->query->getConnection()->getName()
$this->query->connection->getName()
);

// Merge in our options.
Expand Down Expand Up @@ -216,7 +217,7 @@ public function hydrate(array $items)
$instance = $this->newModelInstance();

return $instance->newCollection(array_map(function ($item) use ($instance) {
return $instance->newFromBuilder($item, $this->getConnection()->getName());
return $instance->newFromBuilder($item, $this->query->connection->getName());
}, $items));
}

Expand Down Expand Up @@ -387,9 +388,25 @@ public function findOrNew($id, $columns = ['*']): Model
return $model;
}

public function withoutRefresh()
public function withoutRefresh(): Model
{
$this->model->options()->add('refresh', false);
return $this->withRefresh(false);
}

/**
* Explicitly control the Elasticsearch refresh behavior for write ops.
* Accepts: true, false, or 'wait_for'.
*/
public function withRefresh(bool|string $refresh): Model
{
$this->model->options()->add('refresh', $refresh);

return $this->model;
}

public function withOpType(string $value)
{
$this->model->options()->add('op_type', $value);

return $this->model;
}
Expand Down Expand Up @@ -644,6 +661,27 @@ public function rawDsl($dsl): array
return $this->query->raw($dsl)->asArray();
}

/**
* Force insert operations to use op_type=create for dedupe semantics.
* When set, attempts to create an existing _id will fail with a 409 from Elasticsearch.
*/
public function createOnly(): Model
{
// mark insert op type on the underlying query options
$this->withOpType('create');

return $this->model;
}

/**
* Convenience method to perform a create-only insert and surface 409s as exceptions.
* Accepts single document attributes or an array of documents.
*/
public function createOrFail(array $attributes)
{
return $this->createOnly()->create($attributes);
}

// ----------------------------------------------------------------------
// Protected
// ----------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions src/Eloquent/Docs/ModelDocs.php
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@
* @method static array getModels($columns = ['*'])
* @method static ElasticCollection get($columns = ['*'])
* @method static ElasticCollection insert($values, $returnData = null)
* @method static self createOnly()
* @method static self createOrFail(array $attributes)
*-----------------------------------
* @method static array toDsl($columns = ['*'])
* @method static array toSql($columns = ['*'])
Expand Down
32 changes: 27 additions & 5 deletions src/Exceptions/BulkInsertQueryException.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class BulkInsertQueryException extends LaravelElasticsearchException
*/
public function __construct(Elasticsearch $queryResult)
{
parent::__construct($this->formatMessage($queryResult->asArray()), 400);
$result = $queryResult->asArray();
parent::__construct($this->formatMessage($result), $this->inferStatusCode($result));
}

/**
Expand All @@ -30,12 +31,16 @@ private function formatMessage(array $result): string
// Clean that ish up.
$items = collect($result['items'] ?? [])
->filter(function (array $item) {
return $item['index'] && ! empty($item['index']['error']);
$action = array_key_first($item) ?? 'index';

return isset($item[$action]) && ! empty($item[$action]['error']);
})
->map(function (array $item) {
return $item['index'];
$action = array_key_first($item) ?? 'index';

return $item[$action];
})
// reduce to max limit
// reduce to max limit
->slice(0, $this->errorLimit)
->values();

Expand All @@ -44,11 +49,28 @@ private function formatMessage(array $result): string
$message->push('Bulk Insert Errors ('.'Showing '.$items->count().' of '.$totalErrors->count().'):');

$items = $items->map(function (array $item) {
return "{$item['_id']}: {$item['error']['reason']}";
$id = $item['_id'] ?? 'unknown';
$reason = $item['error']['reason'] ?? 'unknown error';
$type = $item['error']['type'] ?? 'error';

return "$id: [$type] $reason";
})->values()->toArray();

$message->push(...$items);

return $message->implode(PHP_EOL);
}

private function inferStatusCode(array $result): int
{
foreach ($result['items'] ?? [] as $item) {
$action = array_key_first($item) ?? 'index';
$error = $item[$action]['error'] ?? null;
if (is_array($error) && ($error['type'] ?? '') === 'version_conflict_engine_exception') {
return 409;
}
}

return 400;
}
}
35 changes: 34 additions & 1 deletion src/Query/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
use Illuminate\Contracts\Support\Arrayable;
use Illuminate\Database\Query\Builder as BaseBuilder;
use Illuminate\Database\Query\Expression;
use Illuminate\Database\Query\Grammars\Grammar;
use Illuminate\Database\Query\Processors\Processor;
use Illuminate\Support\Arr;
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
Expand Down Expand Up @@ -108,6 +110,12 @@ class Builder extends BaseBuilder

protected ?MetaDTO $metaTransfer = null;

public function __construct(...$args)
{
parent::__construct(...$args);
$this->applyConnectionOptions();
}

public function __call($method, $parameters)
{
if (Str::startsWith($method, 'filterWhere')) {
Expand All @@ -117,7 +125,7 @@ public function __call($method, $parameters)
return parent::__call($method, $parameters);
}

public function toDsl(): array
public function toDsl(): array|string
{
$this->applyBeforeQueryCallbacks();

Expand All @@ -129,6 +137,14 @@ public function toSql(): array|string
return $this->toDsl();
}

private function applyConnectionOptions()
{
$trackTotalHits = $this->connection->options()->get('track_total_hits');
if ($trackTotalHits !== null) {
$this->bodyParameters['track_total_hits'] = $trackTotalHits;
}
}

// ======================================================================
// Inherited Methods
// ======================================================================
Expand Down Expand Up @@ -2414,6 +2430,23 @@ public function withAnalyzer(string $analyzer): self
return $this;
}

public function withTrackTotalHits(bool|int|null $val = true): self
{
if ($val === null) {
return $this->withoutTrackTotalHits();
}
$this->bodyParameters['track_total_hits'] = $val;

return $this;
}

public function withoutTrackTotalHits(): self
{
unset($this->bodyParameters['track_total_hits']);

return $this;
}

// ----------------------------------------------------------------------
// Internal Operations
// ----------------------------------------------------------------------
Expand Down
8 changes: 7 additions & 1 deletion src/Query/DSL/DslBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ public function setFields(array $fields): self

/**
* Set a refresh parameter
* Accepts: true, false, or 'wait_for'
*/
public function setRefresh(bool $refresh = true): self
public function setRefresh(bool|string $refresh = true): self
{
return $this->set(['refresh'], $refresh);
}
Expand Down Expand Up @@ -137,6 +138,11 @@ public function setPostFilter(array $filter): self
return $this->set(['post_filter'], $filter);
}

public function setOpType(string $type): self
{
return $this->set(['op_type'], $type);
}

public function setOption(array $keys, $value): self
{
return $this->set($keys, $value);
Expand Down
8 changes: 7 additions & 1 deletion src/Query/Grammar.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,14 @@ public function compileInsert($query, array $values): array
} else {
unset($doc['id'], $doc['_id']);
}
if (! empty($doc['_op_type'])) {
$options['op_type'] = $doc['_op_type'];
unset($doc['_op_type']);
} elseif ($optType = $query->getOption('op_type')) {
$options['op_type'] = $optType;
}

// Add the document index operation
// Add the document operation
$index = DslFactory::indexOperation(
index: $query->getFrom(),
id: $docId,
Expand Down
6 changes: 3 additions & 3 deletions src/Query/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,9 @@ public function processBulkInsert(Builder $query, Elasticsearch $result): array
foreach ($process['items'] as $item) {
if (! empty($item['index']['error'])) {
$outcome['errors'][] = [
'id' => $item['index']['_id'],
'type' => $item['index']['error']['type'],
'reason' => $item['index']['error']['reason'],
'id' => $item['index']['_id'] ?? null,
'type' => $item['index']['error']['type'] ?? null,
'reason' => $item['index']['error']['reason'] ?? null,
];
$outcome['failed']++;
} else {
Expand Down
96 changes: 96 additions & 0 deletions tests/CreateOpTypeTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?php

declare(strict_types=1);

use PDPhilip\Elasticsearch\Exceptions\BulkInsertQueryException;
use PDPhilip\Elasticsearch\Tests\Models\User;

beforeEach(function () {
User::executeSchema();
});

it('creates a document with createOnly and rejects duplicates', function () {
$id = 'dataset:check-1:2025-01-01T00:00:00Z';

// First create should succeed
User::query()
->createOnly()
->withRefresh('wait_for')
->create([
'id' => $id,
'name' => 'First Insert',
'title' => 'admin',
'age' => 30,
]);

$found = User::find($id);
expect($found)->not()->toBeNull();
expect($found->id)->toBe($id);

// Second create with same _id must fail with 409 (bulk error)
expect(function () use ($id) {
User::query()
->createOnly()
->create([
'id' => $id,
'name' => 'Second Insert',
'title' => 'user',
'age' => 31,
]);
})->toThrow(BulkInsertQueryException::class);
});

it('creates a document with createOrFail and rejects duplicates', function () {
$id = 'dataset:check-1:2025-01-01T00:00:00Z';

// First create should succeed
User::query()
->withRefresh('wait_for')
->createOrFail([
'id' => $id,
'name' => 'First Insert',
'title' => 'admin',
'age' => 30,
]);

$found = User::find($id);
expect($found)->not()->toBeNull();
expect($found->id)->toBe($id);

// Second create with same _id must fail with 409 (bulk error)
expect(function () use ($id) {
User::query()
->createOrFail([
'id' => $id,
'name' => 'Second Insert',
'title' => 'user',
'age' => 31,
]);
})->toThrow(BulkInsertQueryException::class);
});

it('supports per-document op_type via attribute', function () {
$id = 'dataset:check-2:2025-01-01T00:00:00Z';

// Create with per-document op_type
User::create([
'id' => $id,
'_op_type' => 'create',
'name' => 'Doc Create',
'title' => 'admin',
'age' => 42,
]);

$found = User::find($id);
expect($found)->not()->toBeNull();
expect($found->id)->toBe($id);

// Duplicate should raise conflict
expect(function () use ($id) {
User::create([
'id' => $id,
'_op_type' => 'create',
'name' => 'Doc Create Duplicate',
]);
})->toThrow(BulkInsertQueryException::class);
});
Loading