Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument rejected tasks in ThreadPoolExecutor #4481

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -30,6 +31,7 @@ public class InstrumentedExecutorService implements ExecutorService {
private final Meter submitted;
private final Counter running;
private final Meter completed;
private final Counter rejected;
private final Timer idle;
private final Timer duration;

Expand Down Expand Up @@ -57,6 +59,7 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi
this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));
this.running = registry.counter(MetricRegistry.name(name, "running"));
this.completed = registry.meter(MetricRegistry.name(name, "completed"));
this.rejected = registry.counter(MetricRegistry.name(name, "rejected"));
this.idle = registry.timer(MetricRegistry.name(name, "idle"));
this.duration = registry.timer(MetricRegistry.name(name, "duration"));

Expand All @@ -81,6 +84,8 @@ private void registerInternalMetrics() {
queue::size);
registry.registerGauge(MetricRegistry.name(name, "tasks.capacity"),
queue::remainingCapacity);
RejectedExecutionHandler delegateHandler = executor.getRejectedExecutionHandler();
executor.setRejectedExecutionHandler(new InstrumentedRejectedExecutionHandler(delegateHandler));
} else if (delegate instanceof ForkJoinPool) {
ForkJoinPool forkJoinPool = (ForkJoinPool) delegate;
registry.registerGauge(MetricRegistry.name(name, "tasks.stolen"),
Expand Down Expand Up @@ -223,6 +228,20 @@ public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedExc
return delegate.awaitTermination(l, timeUnit);
}

private class InstrumentedRejectedExecutionHandler implements RejectedExecutionHandler {
private final RejectedExecutionHandler delegateHandler;

public InstrumentedRejectedExecutionHandler(RejectedExecutionHandler delegateHandler) {
this.delegateHandler = delegateHandler;
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejected.inc();
this.delegateHandler.rejectedExecution(r, executor);
}
}

private class InstrumentedRunnable implements Runnable {
private final Runnable task;
private final Timer.Context idleContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class InstrumentedExecutorServiceTest {

Expand Down Expand Up @@ -166,6 +169,32 @@ void reportsTasksInformationForThreadPoolExecutor() throws Exception {
assertThat(poolSize.getValue()).isEqualTo(1);
}

@Test
public void reportsRejectedTasksForThreadPoolExecutor() throws Exception {
executor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1));
instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "tp");
final Counter rejected = registry.counter("tp.rejected");
assertThat(rejected.getCount()).isEqualTo(0);

final CountDownLatch latch = new CountDownLatch(1);

Runnable runnable = () -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};

Future<?> executingFuture = instrumentedExecutorService.submit(runnable);
Future<?> queuedFuture = instrumentedExecutorService.submit(runnable);
assertThatThrownBy(() -> instrumentedExecutorService.submit(runnable))
.isInstanceOf(RejectedExecutionException.class);
latch.countDown();
assertThat(rejected.getCount()).isEqualTo(1);
}

@Test
public void removesMetricsAfterShutdownForThreadPoolExecutor() {
executor = new ThreadPoolExecutor(4, 16,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ void testCreate() throws Exception {

assertThat(registry.getMetrics()).containsOnlyKeys("test-instrumented.completed",
"test-instrumented.submitted", "test-instrumented.duration", "test-instrumented.idle",
"test-instrumented.running");
"test-instrumented.running", "test-instrumented.rejected");
}
}