Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BulkIngester hangs #532

Closed
frank-montyne opened this issue Mar 14, 2023 · 5 comments · Fixed by #602
Closed

BulkIngester hangs #532

frank-montyne opened this issue Mar 14, 2023 · 5 comments · Fixed by #602
Labels
Category: Bug Something isn't working

Comments

@frank-montyne
Copy link

frank-montyne commented Mar 14, 2023

Elasticsearch Version

7.17.9

Installed Plugins

No response

Java Version

bundled

OS Version

macOS Monterey version 12.6.2

Problem Description

BulkIngester hangs:

I add 100 documents to the ElasticSearch index with the BulkIngester.

When using maxConcurrentRequests(0) on the bulk ingester builder and invoking bulkIngester.flush() after adding the 100 documents to to the bulk ingester, the ingester hangs in the function public T whenReadyIf(BooleanSupplier canRun, Supplier fn) of the class FnCondition on line 82 condition.awaitUninterruptibly();

When using maxConcurrentRequests(1) the documents are ingested.

Steps to Reproduce

This is to code using the BulkIngester:

private void handleBulkPerIndexSet(Collection<E> indexSetEvents) {
   // Create bulk listener.
   BulkListener<String> bulkListener = new BulkListener<String>() {
      @Override
      public void beforeBulk(long executionId, BulkRequest bulkRequest, List<String> contexts) {
      }

      @Override
      public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, BulkResponse bulkResponse) {
      }

      @Override
      public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, Throwable failure) {
      }
   };

   // Create bulk ingester.
   BulkIngester<String> bulkIngester = BulkIngester.of(ingesterBuilder -> 
      ingesterBuilder
         .client(elasticSearch.getESClient())
         .maxConcurrentRequests(1)
         .maxOperations(-1)
         .maxSize(5 * 1024 * 1024)
         .globalSettings(gsBuilder -> 
            gsBuilder
               .waitForActiveShards(asBuilder -> asBuilder.count(1)))
               .refresh(Refresh.True)
         .listener(bulkListener)
      );
   
   try {
      // Add events to bulk ingester.
      for (E event : indexSetEvents) {
         // Add request to bulk processor.
         switch (event.action()) {
            case create:
               bulkIngester.add(new CreateOperation.Builder<BinaryData>()
                     .index(event.esIndex())
                     .id(event.id())
                     .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
                     .build()
                     ._toBulkOperation());
               break;

            case update:
               bulkIngester.add(new IndexOperation.Builder<BinaryData>()
                     .index(event.esIndex())
                     .id(event.id())
                     .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
                     .build()
                     ._toBulkOperation());					
               break;

            case delete:
               // Soft delete event.
               event.deleted(true);
               bulkIngester.add(new IndexOperation.Builder<BinaryData>()
                     .index(event.esIndex())
                     .id(event.id())
                     .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
                     .build()
                     ._toBulkOperation());					
               break;

            case undelete:
               // Soft undelete event.
               event.deleted(false);
               bulkIngester.add(new IndexOperation.Builder<BinaryData>()
                     .index(event.esIndex())
                     .id(event.id())
                     .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
                     .build()
                     ._toBulkOperation());					
               break;

            case purge:
               // Real physical delete.
               bulkIngester.add(new DeleteOperation.Builder()
                     .index(event.esIndex())
                     .id(event.id())
                     .build()
                     ._toBulkOperation());					
               break;

            default:
               // Should not get here. Log anyway.
               logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
               break;
         }
      }
      bulkIngester.flush();
      bulkIngester.close();
   }
   catch (Exception e) {
      bulkException = new EventBulkProcessorException("Failed to process %d events. %s".formatted(indexSetEvents.size(), e.getMessage()));
      logger.error(bulkException.getMessage());
   }
}

Logs (if relevant)

No response

@elasticsearchmachine
Copy link

Pinging @elastic/es-data-management (Team:Data Management)

@dakrone dakrone transferred this issue from elastic/elasticsearch Mar 14, 2023
@dakrone
Copy link
Member

dakrone commented Mar 14, 2023

The BulkIngester is not part of Elasticsearch, it's part of the elasticsearch-java client. I've transferred this issue to that repository.

@aliariff
Copy link

Maybe this is already solved in #502.

@frank-montyne
Copy link
Author

frank-montyne commented Apr 5, 2023 via email

@swallez
Copy link
Member

swallez commented Jun 21, 2023

Thanks for reporting this issue. The minimum number of concurrent requests is 1 to allow requests to be sent one at a time.

PR #602 adds validations to BulkIngester.Builder setters, and setting concurrent requests to zero will throw an IllegalArgumentException.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Category: Bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants