From 228f9d131bd7c36c55bea185dbbba360c5454a89 Mon Sep 17 00:00:00 2001 From: joecqupt Date: Fri, 17 Jan 2025 23:00:55 +0800 Subject: [PATCH] Fix ConsumeDriver running status --- .../commons/datacarrier/consumer/ConsumeDriver.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java index a6d808fead..dcf1dee807 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java @@ -27,7 +27,7 @@ * Pool of consumers

Created by wusheng on 2016/10/25. */ public class ConsumeDriver implements IDriver { - private boolean running; + private volatile boolean running; private ConsumerThread[] consumerThreads; private Channels channels; private ReentrantLock lock; @@ -88,6 +88,9 @@ public void begin(Channels channels) { } lock.lock(); try { + if (running){ + return; + } this.allocateBuffer2Thread(); for (ConsumerThread consumerThread : consumerThreads) { consumerThread.start(); @@ -124,8 +127,14 @@ private void allocateBuffer2Thread() { @Override public void close(Channels channels) { + if (!running) { + return; + } lock.lock(); try { + if (!running) { + return; + } this.running = false; for (ConsumerThread consumerThread : consumerThreads) { consumerThread.shutdown();