Skip to content

Commit

Permalink
Fix ConsumeDriver running status
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeCqupt committed Jan 17, 2025
1 parent 3549537 commit 228f9d1
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* Pool of consumers <p> Created by wusheng on 2016/10/25.
*/
public class ConsumeDriver<T> implements IDriver {
private boolean running;
private volatile boolean running;
private ConsumerThread[] consumerThreads;
private Channels<T> channels;
private ReentrantLock lock;
Expand Down Expand Up @@ -88,6 +88,9 @@ public void begin(Channels channels) {
}
lock.lock();
try {
if (running){
return;
}
this.allocateBuffer2Thread();
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
Expand Down Expand Up @@ -124,8 +127,14 @@ private void allocateBuffer2Thread() {

@Override
public void close(Channels channels) {
if (!running) {
return;
}
lock.lock();
try {
if (!running) {
return;
}
this.running = false;
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.shutdown();
Expand Down

0 comments on commit 228f9d1

Please sign in to comment.