From 925f8072daef188cac3ea3b50d11e0fb7ccc5592 Mon Sep 17 00:00:00 2001 From: Gal Leibovici <396845+gal-leib@users.noreply.github.com> Date: Thu, 3 Oct 2024 16:23:00 +0300 Subject: [PATCH] Instrument rejected tasks in ThreadPoolExecutor (#4272) --- .../metrics5/InstrumentedExecutorService.java | 19 ++++++++++++ .../InstrumentedExecutorServiceTest.java | 29 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/metrics-core/src/main/java/io/dropwizard/metrics5/InstrumentedExecutorService.java b/metrics-core/src/main/java/io/dropwizard/metrics5/InstrumentedExecutorService.java index 23be352ede..2d716f9854 100644 --- a/metrics-core/src/main/java/io/dropwizard/metrics5/InstrumentedExecutorService.java +++ b/metrics-core/src/main/java/io/dropwizard/metrics5/InstrumentedExecutorService.java @@ -9,6 +9,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -30,6 +31,7 @@ public class InstrumentedExecutorService implements ExecutorService { private final Meter submitted; private final Counter running; private final Meter completed; + private final Counter rejected; private final Timer idle; private final Timer duration; @@ -57,6 +59,7 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi this.submitted = registry.meter(MetricRegistry.name(name, "submitted")); this.running = registry.counter(MetricRegistry.name(name, "running")); this.completed = registry.meter(MetricRegistry.name(name, "completed")); + this.rejected = registry.counter(MetricRegistry.name(name, "rejected")); this.idle = registry.timer(MetricRegistry.name(name, "idle")); this.duration = registry.timer(MetricRegistry.name(name, "duration")); @@ -81,6 +84,8 @@ private void registerInternalMetrics() { queue::size); registry.registerGauge(MetricRegistry.name(name, "tasks.capacity"), queue::remainingCapacity); + RejectedExecutionHandler delegateHandler = executor.getRejectedExecutionHandler(); + executor.setRejectedExecutionHandler(new InstrumentedRejectedExecutionHandler(delegateHandler)); } else if (delegate instanceof ForkJoinPool) { ForkJoinPool forkJoinPool = (ForkJoinPool) delegate; registry.registerGauge(MetricRegistry.name(name, "tasks.stolen"), @@ -223,6 +228,20 @@ public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedExc return delegate.awaitTermination(l, timeUnit); } + private class InstrumentedRejectedExecutionHandler implements RejectedExecutionHandler { + private final RejectedExecutionHandler delegateHandler; + + public InstrumentedRejectedExecutionHandler(RejectedExecutionHandler delegateHandler) { + this.delegateHandler = delegateHandler; + } + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + rejected.inc(); + this.delegateHandler.rejectedExecution(r, executor); + } + } + private class InstrumentedRunnable implements Runnable { private final Runnable task; private final Timer.Context idleContext; diff --git a/metrics-core/src/test/java/io/dropwizard/metrics5/InstrumentedExecutorServiceTest.java b/metrics-core/src/test/java/io/dropwizard/metrics5/InstrumentedExecutorServiceTest.java index 29a750cf82..1bc18b896d 100644 --- a/metrics-core/src/test/java/io/dropwizard/metrics5/InstrumentedExecutorServiceTest.java +++ b/metrics-core/src/test/java/io/dropwizard/metrics5/InstrumentedExecutorServiceTest.java @@ -8,14 +8,17 @@ import java.time.Duration; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; class InstrumentedExecutorServiceTest { @@ -166,6 +169,32 @@ void reportsTasksInformationForThreadPoolExecutor() throws Exception { assertThat(poolSize.getValue()).isEqualTo(1); } + @Test + public void reportsRejectedTasksForThreadPoolExecutor() throws Exception { + executor = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1)); + instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "tp"); + final Counter rejected = registry.counter("tp.rejected"); + assertThat(rejected.getCount()).isEqualTo(0); + + final CountDownLatch latch = new CountDownLatch(1); + + Runnable runnable = () -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + Future executingFuture = instrumentedExecutorService.submit(runnable); + Future queuedFuture = instrumentedExecutorService.submit(runnable); + assertThatThrownBy(() -> instrumentedExecutorService.submit(runnable)) + .isInstanceOf(RejectedExecutionException.class); + latch.countDown(); + assertThat(rejected.getCount()).isEqualTo(1); + } + @Test public void removesMetricsAfterShutdownForThreadPoolExecutor() { executor = new ThreadPoolExecutor(4, 16,