Skip to content

Commit cda221e

Browse files
committed
HADOOP-19670 Replace Thread with SubjectPreservingThread to restore pre JDK22 Subject behaviour in Threads
1 parent 3d6110b commit cda221e

File tree

322 files changed

+1057
-748
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

322 files changed

+1057
-748
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hadoop.util.Preconditions;
2323
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
2424
import org.apache.hadoop.util.Time;
25+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
2526
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -105,15 +106,16 @@ public Collection<PropertyChange> getChangedProperties(
105106
/**
106107
* A background thread to apply configuration changes.
107108
*/
108-
private static class ReconfigurationThread extends Thread {
109+
private static class ReconfigurationThread extends SubjectInheritingThread {
109110
private ReconfigurableBase parent;
110111

111112
ReconfigurationThread(ReconfigurableBase base) {
113+
super();
112114
this.parent = base;
113115
}
114116

115117
// See {@link ReconfigurationServlet#applyChanges}
116-
public void run() {
118+
public void work() {
117119
LOG.info("Starting reconfiguration task.");
118120
final Configuration oldConf = parent.getConf();
119121
final Configuration newConf = parent.getNewConf();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.hadoop.classification.InterfaceAudience;
2121
import org.apache.hadoop.classification.InterfaceStability;
2222
import org.apache.hadoop.classification.VisibleForTesting;
23+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

@@ -107,7 +108,7 @@ void init() {
107108
*/
108109
private void initRefreshThread(boolean runImmediately) {
109110
if (refreshInterval > 0) {
110-
refreshUsed = new Thread(new RefreshThread(this, runImmediately),
111+
refreshUsed = new SubjectInheritingThread(new RefreshThread(this, runImmediately),
111112
"refreshUsed-" + dirPath);
112113
refreshUsed.setDaemon(true);
113114
refreshUsed.start();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.security.token.Token;
3131
import org.apache.hadoop.security.token.TokenIdentifier;
3232
import org.apache.hadoop.util.Time;
33+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

@@ -38,7 +39,7 @@
3839
*/
3940
@InterfaceAudience.Private
4041
public class DelegationTokenRenewer
41-
extends Thread {
42+
extends SubjectInheritingThread {
4243
private static final Logger LOG = LoggerFactory
4344
.getLogger(DelegationTokenRenewer.class);
4445

@@ -263,7 +264,7 @@ public <T extends FileSystem & Renewable> void removeRenewAction(
263264
}
264265

265266
@Override
266-
public void run() {
267+
public void work() {
267268
for(;;) {
268269
RenewAction<?> action = null;
269270
try {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.hadoop.util.ReflectionUtils;
8282
import org.apache.hadoop.util.ShutdownHookManager;
8383
import org.apache.hadoop.util.StringUtils;
84+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
8485
import org.apache.hadoop.tracing.Tracer;
8586
import org.apache.hadoop.tracing.TraceScope;
8687
import org.apache.hadoop.util.Preconditions;
@@ -4087,7 +4088,7 @@ private interface StatisticsAggregator<T> {
40874088
static {
40884089
STATS_DATA_REF_QUEUE = new ReferenceQueue<>();
40894090
// start a single daemon cleaner thread
4090-
STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
4091+
STATS_DATA_CLEANER = new SubjectInheritingThread(new StatisticsDataReferenceCleaner());
40914092
STATS_DATA_CLEANER.
40924093
setName(StatisticsDataReferenceCleaner.class.getName());
40934094
STATS_DATA_CLEANER.setDaemon(true);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/StreamPumper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.ha;
1919

20+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
2021
import org.slf4j.Logger;
2122

2223
import java.io.BufferedReader;
@@ -50,7 +51,7 @@ enum StreamType {
5051
this.stream = stream;
5152
this.type = type;
5253

53-
thread = new Thread(new Runnable() {
54+
thread = new SubjectInheritingThread(new Runnable() {
5455
@Override
5556
public void run() {
5657
try {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.hadoop.util.StringUtils;
5555
import org.apache.hadoop.util.Time;
5656
import org.apache.hadoop.util.concurrent.AsyncGet;
57+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
5758
import org.apache.hadoop.tracing.Span;
5859
import org.apache.hadoop.tracing.Tracer;
5960
import org.slf4j.Logger;
@@ -407,7 +408,7 @@ public synchronized void setRpcResponse(Writable rpcResponse) {
407408
/** Thread that reads responses and notifies callers. Each connection owns a
408409
* socket connected to a remote address. Calls are multiplexed through this
409410
* socket: responses may be delivered out of order. */
410-
private class Connection extends Thread {
411+
private class Connection extends SubjectInheritingThread {
411412
private InetSocketAddress server; // server ip:port
412413
private final ConnectionId remoteId; // connection id
413414
private AuthMethod authMethod; // authentication method
@@ -448,7 +449,7 @@ private class Connection extends Thread {
448449
Consumer<Connection> removeMethod) {
449450
this.remoteId = remoteId;
450451
this.server = remoteId.getAddress();
451-
this.rpcRequestThread = new Thread(new RpcRequestSender(),
452+
this.rpcRequestThread = new SubjectInheritingThread(new RpcRequestSender(),
452453
"IPC Parameter Sending Thread for " + remoteId);
453454
this.rpcRequestThread.setDaemon(true);
454455

@@ -1126,7 +1127,7 @@ private synchronized void sendPing() throws IOException {
11261127
}
11271128

11281129
@Override
1129-
public void run() {
1130+
public void work() {
11301131
try {
11311132
// Don't start the ipc parameter sending thread until we start this
11321133
// thread, because the shutdown logic only gets triggered if this

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@
124124
import org.apache.hadoop.util.ProtoUtil;
125125
import org.apache.hadoop.util.StringUtils;
126126
import org.apache.hadoop.util.Time;
127+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
128+
127129
import java.util.concurrent.atomic.AtomicBoolean;
128130
import org.apache.hadoop.tracing.Span;
129131
import org.apache.hadoop.tracing.SpanContext;
@@ -1471,7 +1473,7 @@ public String toString() {
14711473
}
14721474

14731475
/** Listens on the socket. Creates jobs for the handler threads*/
1474-
private class Listener extends Thread {
1476+
private class Listener extends SubjectInheritingThread {
14751477

14761478
private ServerSocketChannel acceptChannel = null; //the accept channel
14771479
private Selector selector = null; //the selector that we use for the server
@@ -1520,7 +1522,7 @@ void setIsAuxiliary() {
15201522
this.isOnAuxiliaryPort = true;
15211523
}
15221524

1523-
private class Reader extends Thread {
1525+
private class Reader extends SubjectInheritingThread {
15241526
final private BlockingQueue<Connection> pendingConnections;
15251527
private final Selector readSelector;
15261528

@@ -1533,7 +1535,7 @@ private class Reader extends Thread {
15331535
}
15341536

15351537
@Override
1536-
public void run() {
1538+
public void work() {
15371539
LOG.info("Starting " + Thread.currentThread().getName());
15381540
try {
15391541
doRunLoop();
@@ -1612,7 +1614,7 @@ void shutdown() {
16121614
}
16131615

16141616
@Override
1615-
public void run() {
1617+
public void work() {
16161618
LOG.info(Thread.currentThread().getName() + ": starting");
16171619
SERVER.set(Server.this);
16181620
connectionManager.startIdleScan();
@@ -1760,7 +1762,7 @@ Reader getReader() {
17601762
}
17611763

17621764
// Sends responses of RPC back to clients.
1763-
private class Responder extends Thread {
1765+
private class Responder extends SubjectInheritingThread {
17641766
private final Selector writeSelector;
17651767
private int pending; // connections waiting to register
17661768

@@ -1772,7 +1774,7 @@ private class Responder extends Thread {
17721774
}
17731775

17741776
@Override
1775-
public void run() {
1777+
public void work() {
17761778
LOG.info(Thread.currentThread().getName() + ": starting");
17771779
SERVER.set(Server.this);
17781780
try {
@@ -3219,15 +3221,15 @@ private void internalQueueCall(Call call, boolean blocking)
32193221
}
32203222

32213223
/** Handles queued calls . */
3222-
private class Handler extends Thread {
3224+
private class Handler extends SubjectInheritingThread {
32233225
public Handler(int instanceNumber) {
32243226
this.setDaemon(true);
32253227
this.setName("IPC Server handler "+ instanceNumber +
32263228
" on default port " + port);
32273229
}
32283230

32293231
@Override
3230-
public void run() {
3232+
public void work() {
32313233
LOG.debug("{}: starting", Thread.currentThread().getName());
32323234
SERVER.set(Server.this);
32333235
while (running) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hadoop.metrics2.MetricsFilter;
3535
import org.apache.hadoop.metrics2.MetricsSink;
3636
import org.apache.hadoop.util.Time;
37+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

@@ -48,7 +49,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
4849
private final MetricsSink sink;
4950
private final MetricsFilter sourceFilter, recordFilter, metricFilter;
5051
private final SinkQueue<MetricsBuffer> queue;
51-
private final Thread sinkThread;
52+
private final SubjectInheritingThread sinkThread;
5253
private volatile boolean stopping = false;
5354
private volatile boolean inError = false;
5455
private final int periodMs, firstRetryDelay, retryCount;
@@ -84,8 +85,8 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
8485
"Dropped updates per sink", 0);
8586
qsize = registry.newGauge("Sink_"+ name + "Qsize", "Queue size", 0);
8687

87-
sinkThread = new Thread() {
88-
@Override public void run() {
88+
sinkThread = new SubjectInheritingThread() {
89+
@Override public void work() {
8990
publishMetricsFromQueue();
9091
}
9192
};

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.hadoop.util.NativeCodeLoader;
3737
import org.apache.hadoop.classification.VisibleForTesting;
3838
import org.apache.hadoop.util.Preconditions;
39-
39+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
4040
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
@@ -440,7 +440,7 @@ private void sendCallbackAndRemove(String caller,
440440
}
441441

442442
@VisibleForTesting
443-
final Thread watcherThread = new Thread(new Runnable() {
443+
final Thread watcherThread = new SubjectInheritingThread(new Runnable() {
444444
@Override
445445
public void run() {
446446
if (LOG.isDebugEnabled()) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
import org.apache.hadoop.util.Shell;
9090
import org.apache.hadoop.util.StringUtils;
9191
import org.apache.hadoop.util.Time;
92-
92+
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
9393
import org.slf4j.Logger;
9494
import org.slf4j.LoggerFactory;
9595

@@ -930,7 +930,7 @@ private void executeAutoRenewalTask(final String userName,
930930
new ThreadFactory() {
931931
@Override
932932
public Thread newThread(Runnable r) {
933-
Thread t = new Thread(r);
933+
Thread t = new SubjectInheritingThread(r);
934934
t.setDaemon(true);
935935
t.setName("TGT Renewer for " + userName);
936936
return t;

0 commit comments

Comments
 (0)