diff --git a/src/Connection.php b/src/Connection.php index 9a581fe..4c5fabf 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -123,6 +123,7 @@ private function sanitizeConfig(): void 'cert_password' => null, ], 'options' => [ + 'track_total_hits' => null, // null -> skips - max 10k by default; true -> full values; false -> no hit tracking, returns -1; int -> max hit tracking val, ex 20000 'bypass_map_validation' => false, // This skips the safety checks for Elastic Specific queries. 'logging' => false, 'ssl_verification' => true, @@ -162,6 +163,8 @@ public function setOptions(): void { $this->allowIdSort = $this->config['options']['allow_id_sort'] ?? false; + $this->options()->add('track_total_hits', $this->config['options']['track_total_hits'] ?? null); + $this->options()->add('bypass_map_validation', $this->config['options']['bypass_map_validation'] ?? null); if (isset($this->config['options']['ssl_verification'])) { diff --git a/src/Eloquent/Builder.php b/src/Eloquent/Builder.php index 29b230d..aa3865b 100644 --- a/src/Eloquent/Builder.php +++ b/src/Eloquent/Builder.php @@ -71,6 +71,7 @@ class Builder extends BaseEloquentBuilder 'bucketaggregation', 'openpit', 'bulkinsert', + 'createonly', ]; /** @@ -95,7 +96,7 @@ public function setModel($model): static public function newModelInstance($attributes = []) { $model = $this->model->newInstance($attributes)->setConnection( - $this->query->getConnection()->getName() + $this->query->connection->getName() ); // Merge in our options. @@ -216,7 +217,7 @@ public function hydrate(array $items) $instance = $this->newModelInstance(); return $instance->newCollection(array_map(function ($item) use ($instance) { - return $instance->newFromBuilder($item, $this->getConnection()->getName()); + return $instance->newFromBuilder($item, $this->query->connection->getName()); }, $items)); } @@ -387,9 +388,25 @@ public function findOrNew($id, $columns = ['*']): Model return $model; } - public function withoutRefresh() + public function withoutRefresh(): Model { - $this->model->options()->add('refresh', false); + return $this->withRefresh(false); + } + + /** + * Explicitly control the Elasticsearch refresh behavior for write ops. + * Accepts: true, false, or 'wait_for'. + */ + public function withRefresh(bool|string $refresh): Model + { + $this->model->options()->add('refresh', $refresh); + + return $this->model; + } + + public function withOpType(string $value) + { + $this->model->options()->add('op_type', $value); return $this->model; } @@ -644,6 +661,27 @@ public function rawDsl($dsl): array return $this->query->raw($dsl)->asArray(); } + /** + * Force insert operations to use op_type=create for dedupe semantics. + * When set, attempts to create an existing _id will fail with a 409 from Elasticsearch. + */ + public function createOnly(): Model + { + // mark insert op type on the underlying query options + $this->withOpType('create'); + + return $this->model; + } + + /** + * Convenience method to perform a create-only insert and surface 409s as exceptions. + * Accepts single document attributes or an array of documents. + */ + public function createOrFail(array $attributes) + { + return $this->createOnly()->create($attributes); + } + // ---------------------------------------------------------------------- // Protected // ---------------------------------------------------------------------- diff --git a/src/Eloquent/Docs/ModelDocs.php b/src/Eloquent/Docs/ModelDocs.php index 48c45db..36be5ad 100644 --- a/src/Eloquent/Docs/ModelDocs.php +++ b/src/Eloquent/Docs/ModelDocs.php @@ -181,6 +181,8 @@ * @method static array getModels($columns = ['*']) * @method static ElasticCollection get($columns = ['*']) * @method static ElasticCollection insert($values, $returnData = null) + * @method static self createOnly() + * @method static self createOrFail(array $attributes) *----------------------------------- * @method static array toDsl($columns = ['*']) * @method static array toSql($columns = ['*']) diff --git a/src/Exceptions/BulkInsertQueryException.php b/src/Exceptions/BulkInsertQueryException.php index a03190f..b2bfba2 100644 --- a/src/Exceptions/BulkInsertQueryException.php +++ b/src/Exceptions/BulkInsertQueryException.php @@ -15,7 +15,8 @@ class BulkInsertQueryException extends LaravelElasticsearchException */ public function __construct(Elasticsearch $queryResult) { - parent::__construct($this->formatMessage($queryResult->asArray()), 400); + $result = $queryResult->asArray(); + parent::__construct($this->formatMessage($result), $this->inferStatusCode($result)); } /** @@ -30,12 +31,16 @@ private function formatMessage(array $result): string // Clean that ish up. $items = collect($result['items'] ?? []) ->filter(function (array $item) { - return $item['index'] && ! empty($item['index']['error']); + $action = array_key_first($item) ?? 'index'; + + return isset($item[$action]) && ! empty($item[$action]['error']); }) ->map(function (array $item) { - return $item['index']; + $action = array_key_first($item) ?? 'index'; + + return $item[$action]; }) - // reduce to max limit + // reduce to max limit ->slice(0, $this->errorLimit) ->values(); @@ -44,11 +49,28 @@ private function formatMessage(array $result): string $message->push('Bulk Insert Errors ('.'Showing '.$items->count().' of '.$totalErrors->count().'):'); $items = $items->map(function (array $item) { - return "{$item['_id']}: {$item['error']['reason']}"; + $id = $item['_id'] ?? 'unknown'; + $reason = $item['error']['reason'] ?? 'unknown error'; + $type = $item['error']['type'] ?? 'error'; + + return "$id: [$type] $reason"; })->values()->toArray(); $message->push(...$items); return $message->implode(PHP_EOL); } + + private function inferStatusCode(array $result): int + { + foreach ($result['items'] ?? [] as $item) { + $action = array_key_first($item) ?? 'index'; + $error = $item[$action]['error'] ?? null; + if (is_array($error) && ($error['type'] ?? '') === 'version_conflict_engine_exception') { + return 409; + } + } + + return 400; + } } diff --git a/src/Query/Builder.php b/src/Query/Builder.php index 7b07981..01ac2ec 100644 --- a/src/Query/Builder.php +++ b/src/Query/Builder.php @@ -13,6 +13,8 @@ use Illuminate\Contracts\Support\Arrayable; use Illuminate\Database\Query\Builder as BaseBuilder; use Illuminate\Database\Query\Expression; +use Illuminate\Database\Query\Grammars\Grammar; +use Illuminate\Database\Query\Processors\Processor; use Illuminate\Support\Arr; use Illuminate\Support\Collection; use Illuminate\Support\Str; @@ -108,6 +110,12 @@ class Builder extends BaseBuilder protected ?MetaDTO $metaTransfer = null; + public function __construct(...$args) + { + parent::__construct(...$args); + $this->applyConnectionOptions(); + } + public function __call($method, $parameters) { if (Str::startsWith($method, 'filterWhere')) { @@ -117,7 +125,7 @@ public function __call($method, $parameters) return parent::__call($method, $parameters); } - public function toDsl(): array + public function toDsl(): array|string { $this->applyBeforeQueryCallbacks(); @@ -129,6 +137,14 @@ public function toSql(): array|string return $this->toDsl(); } + private function applyConnectionOptions() + { + $trackTotalHits = $this->connection->options()->get('track_total_hits'); + if ($trackTotalHits !== null) { + $this->bodyParameters['track_total_hits'] = $trackTotalHits; + } + } + // ====================================================================== // Inherited Methods // ====================================================================== @@ -2414,6 +2430,23 @@ public function withAnalyzer(string $analyzer): self return $this; } + public function withTrackTotalHits(bool|int|null $val = true): self + { + if ($val === null) { + return $this->withoutTrackTotalHits(); + } + $this->bodyParameters['track_total_hits'] = $val; + + return $this; + } + + public function withoutTrackTotalHits(): self + { + unset($this->bodyParameters['track_total_hits']); + + return $this; + } + // ---------------------------------------------------------------------- // Internal Operations // ---------------------------------------------------------------------- diff --git a/src/Query/DSL/DslBuilder.php b/src/Query/DSL/DslBuilder.php index 2d2d8c4..d87a552 100644 --- a/src/Query/DSL/DslBuilder.php +++ b/src/Query/DSL/DslBuilder.php @@ -107,8 +107,9 @@ public function setFields(array $fields): self /** * Set a refresh parameter + * Accepts: true, false, or 'wait_for' */ - public function setRefresh(bool $refresh = true): self + public function setRefresh(bool|string $refresh = true): self { return $this->set(['refresh'], $refresh); } @@ -137,6 +138,11 @@ public function setPostFilter(array $filter): self return $this->set(['post_filter'], $filter); } + public function setOpType(string $type): self + { + return $this->set(['op_type'], $type); + } + public function setOption(array $keys, $value): self { return $this->set($keys, $value); diff --git a/src/Query/Grammar.php b/src/Query/Grammar.php index 796ed7e..9512a5c 100644 --- a/src/Query/Grammar.php +++ b/src/Query/Grammar.php @@ -91,8 +91,14 @@ public function compileInsert($query, array $values): array } else { unset($doc['id'], $doc['_id']); } + if (! empty($doc['_op_type'])) { + $options['op_type'] = $doc['_op_type']; + unset($doc['_op_type']); + } elseif ($optType = $query->getOption('op_type')) { + $options['op_type'] = $optType; + } - // Add the document index operation + // Add the document operation $index = DslFactory::indexOperation( index: $query->getFrom(), id: $docId, diff --git a/src/Query/Processor.php b/src/Query/Processor.php index 6abebbd..a3bae02 100644 --- a/src/Query/Processor.php +++ b/src/Query/Processor.php @@ -448,9 +448,9 @@ public function processBulkInsert(Builder $query, Elasticsearch $result): array foreach ($process['items'] as $item) { if (! empty($item['index']['error'])) { $outcome['errors'][] = [ - 'id' => $item['index']['_id'], - 'type' => $item['index']['error']['type'], - 'reason' => $item['index']['error']['reason'], + 'id' => $item['index']['_id'] ?? null, + 'type' => $item['index']['error']['type'] ?? null, + 'reason' => $item['index']['error']['reason'] ?? null, ]; $outcome['failed']++; } else { diff --git a/tests/CreateOpTypeTest.php b/tests/CreateOpTypeTest.php new file mode 100644 index 0000000..b4e19fe --- /dev/null +++ b/tests/CreateOpTypeTest.php @@ -0,0 +1,96 @@ +createOnly() + ->withRefresh('wait_for') + ->create([ + 'id' => $id, + 'name' => 'First Insert', + 'title' => 'admin', + 'age' => 30, + ]); + + $found = User::find($id); + expect($found)->not()->toBeNull(); + expect($found->id)->toBe($id); + + // Second create with same _id must fail with 409 (bulk error) + expect(function () use ($id) { + User::query() + ->createOnly() + ->create([ + 'id' => $id, + 'name' => 'Second Insert', + 'title' => 'user', + 'age' => 31, + ]); + })->toThrow(BulkInsertQueryException::class); +}); + +it('creates a document with createOrFail and rejects duplicates', function () { + $id = 'dataset:check-1:2025-01-01T00:00:00Z'; + + // First create should succeed + User::query() + ->withRefresh('wait_for') + ->createOrFail([ + 'id' => $id, + 'name' => 'First Insert', + 'title' => 'admin', + 'age' => 30, + ]); + + $found = User::find($id); + expect($found)->not()->toBeNull(); + expect($found->id)->toBe($id); + + // Second create with same _id must fail with 409 (bulk error) + expect(function () use ($id) { + User::query() + ->createOrFail([ + 'id' => $id, + 'name' => 'Second Insert', + 'title' => 'user', + 'age' => 31, + ]); + })->toThrow(BulkInsertQueryException::class); +}); + +it('supports per-document op_type via attribute', function () { + $id = 'dataset:check-2:2025-01-01T00:00:00Z'; + + // Create with per-document op_type + User::create([ + 'id' => $id, + '_op_type' => 'create', + 'name' => 'Doc Create', + 'title' => 'admin', + 'age' => 42, + ]); + + $found = User::find($id); + expect($found)->not()->toBeNull(); + expect($found->id)->toBe($id); + + // Duplicate should raise conflict + expect(function () use ($id) { + User::create([ + 'id' => $id, + '_op_type' => 'create', + 'name' => 'Doc Create Duplicate', + ]); + })->toThrow(BulkInsertQueryException::class); +}); diff --git a/tests/LargeRecordsTest.php b/tests/LargeRecordsTest.php new file mode 100644 index 0000000..a3c39d6 --- /dev/null +++ b/tests/LargeRecordsTest.php @@ -0,0 +1,42 @@ +get(); + expect($products->getQueryMeta()->getTotalHits())->toBe(10000); + + $products = Product::limit(1)->withTrackTotalHits()->get(); + expect($products->getQueryMeta()->getTotalHits())->toBe(11000); + + $products = Product::limit(1)->withTrackTotalHits(false)->get(); + expect($products->getQueryMeta()->getTotalHits())->toBe(-1); + + $products = Product::limit(1)->withTrackTotalHits(300)->get(); + expect($products->getQueryMeta()->getTotalHits())->toBe(300); + + $products = ProductWithDefaultTrackTotalHits::limit(1)->get(); + expect($products->getQueryMeta()->getTotalHits())->toBe(11000); + + $products = ProductWithDefaultTrackTotalHits::limit(1)->withoutTrackTotalHits()->get(); + expect($products->getQueryMeta()->getTotalHits())->toBe(10000); + + $products = ProductWithDefaultTrackTotalHits::limit(1)->withTrackTotalHits(300)->get(); + expect($products->getQueryMeta()->getTotalHits())->toBe(300); + +}); diff --git a/tests/Models/Product.php b/tests/Models/Product.php index 4eb79c9..3579558 100644 --- a/tests/Models/Product.php +++ b/tests/Models/Product.php @@ -29,4 +29,17 @@ public static function executeSchema() $table->date('updated_at'); }); } + + public static function buildRecords($limit = 100) + { + $records = []; + while ($limit) { + $records[] = [ + 'state' => rand(1, 100), + ]; + $limit--; + } + Product::insert($records); + // Product::withoutRefresh()->insert($records); + } } diff --git a/tests/TestCase.php b/tests/TestCase.php index 6727b0f..974a911 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -56,5 +56,15 @@ protected function getEnvironmentSetUp($app): void 'logging' => true, ], ]); + + $app['config']->set('database.connections.elasticsearch_with_default_track_total_hits', [ + 'driver' => 'elasticsearch', + 'auth_type' => 'http', + 'hosts' => ['http://localhost:9200'], + 'options' => [ + 'track_total_hits' => true, + 'logging' => true, + ], + ]); } }