Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Background heartbeating and activity management utility #652

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

mnichols
Copy link

When an activity implementation has some call that might "hang" or take a very long time to complete, we want to be able to cancel that due to a ActivityCompletionException that was received by our activityContext.heartbeat(null) invocation.

This utility manages two threads, one for the heartbeat and the other for the Callable<T> which is the "actual" activity work that might take a while.

If the heartbeat detects the activity should be canceled, the user can provide a Predicate<Callable<T>> to answer whether to ignore the cancellation or not. If the cancellation moves forward then a non-retryable ApplicationFailure is thrown allowing the main Activity body to handle that and either ignore it or rethrow it to fail the activity.

@mnichols mnichols requested review from tsurdilo, antmendoza and a team as code owners July 17, 2024 21:24
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a static inner class if only used in the one place. Every other class in this package is a standalone sample.

// activity Callable is cancelled and a Cancellation failure is thrown.
// Callers should handle `ApplicationFailure` if you are allowing cancellation and determine
// if you want to exit the Activity with or without the failure bubbling up to the Workflow.
public static <T> T withBackgroundHeartbeatAndActivity(
Copy link
Member

@cretz cretz Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be a few issues here, like:

  • Not reusing the executor service
  • Starting a second thread unnecessarily
  • Not cancelling heartbeater when activity done and maybe sending extra RPC unnecessarily
  • Not sure there is value in heartbeating after server sent a cancellation, would have to check
  • Could use the heartbeat timeout by default to derive interval
  • Force wrapping user exception in a runtime exception
  • Maybe a few more, didn't check

Here's an untested (so probably doesn't even compile) alternative approach:

public class AutoHeartbeater implements AutoCloseable {
  private final ScheduledExecutorService scheduler;

  public AutoHeartbeater(ScheduledExecutorService scheduler) {
    this.scheduler = scheduler;
  }

  public void close() {
    scheduler.shutdown();
  }

  public class Run implements AutoCloseable {
    private final ScheduledFuture<?> ticker;
    private final Object cancelLock = new Object();
    private ActivityCompletionException cancel;
    private Consumer<ActivityCompletionException> cancelConsumer;

    public Run() {
      this((Consumer<ActivityCompletionException>)null);
    }

    public Run(Consumer<ActivityCompletionException> onCancel) {
      this(Activity.getExecutionContext().getInfo().getHeartbeatTimeout().dividedBy(2), onCancel);
    }

    public Run(Duration interval) {
      this(interval, null);
    }

    public Run(Duration interval, Consumer<ActivityCompletionException> onCancel) {
      if (onCancel == null) {
        var thread = Thread.currentThread();
        onCancel = ignore -> thread.interrupt();
      }
      this.onCancel = onCancel;
      var context = Activity.getExecutionContext();
      ticker = scheduler.scheduleAtFixedRate(() -> {
        try {
          context.heartbeat(null);
        } catch (ActivityCompletionException e) {
          Consumer<ActivityCompletionException> localCancelConsumer;
          synchronized (cancelLock) {
            localCancelConsumer = cancelConsumer;
            cancel = e;
          }
          // There's technically a race here I'd have to think about where close's setOnCancel(null) ran
          // but this has the old one so it could interrupt a thread too late. This is bad and can be
          // avoided by synchronizing the cancel call itself, just didn't have time to change the sample.
          if (localCancelConsumer != null) {
            localCancelConsumer.accept(e);
          }
          throw e;
        }
      }, 0, interval.toMillis(), TimeUnit.MILLISECONDS);
    }

    public ActivityCompletionException getCancel() {
      synchronized (cancelLock) {
        return cancel;
      }
    }

    // May be invoked immediately if a cancel has occurred
    public void setOnCancel(Consumer<ActivityCompletionException> onCancel) {
      ActivityCompletionException localCancel;
      synchronized (cancelLock) {
        localCancel = cancel;
        this.onCancel = onCancel;
      }
      if (localCancel != null && onCancel != null) {
        onCancel.accept(localCancel);
      }
    }

    public void close() {
      setOnCancel(null);
      ticker.cancel(false);
      scheduler.shutdown();
    }
  }
}

I haven't tested it so it may not work, but here's how you'd use it:

public static class GreetingActivitiesImpl implements GreetingActivities {
  private final AutoHeartbeater heartbeater;

  public GreetingActivitiesImpl(AutoHeartbeater heartbeater) {
    this.heartbeater = heartbeater;
  }

  @Override
  public String composeGreeting(String greeting, String name) {
    try (var run = heartbeater.new Run()) {
      doStuff();
    }
  }
}

Of course it can be adapted, this is just an idea.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooo...I was looking for something like try-with-resources and didn't know Java had that. Your solution looks like it is headed in a much more elegant direction.
If a cancellation is rejected by an activity I'd need to keep heartbeating lest the server actually will fail the activity task due to heartbeat timeout. Not sure this implementation allows that..I need to digest it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this implementation allows that..I need to digest it.

It doesn't, but a simple option to run (or a heartbeater.runWithoutCancel() or something) can be added.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants