Skip to content

Commit a270564

Browse files
committed
HADOOP-19574 Restore Subject propagation semantics for Java 22+
1 parent 56a538f commit a270564

File tree

330 files changed

+1272
-747
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

330 files changed

+1272
-747
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hadoop.util.Preconditions;
2323
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
2424
import org.apache.hadoop.util.Time;
25+
import org.apache.hadoop.util.concurrent.HadoopThread;
2526
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -105,15 +106,16 @@ public Collection<PropertyChange> getChangedProperties(
105106
/**
106107
* A background thread to apply configuration changes.
107108
*/
108-
private static class ReconfigurationThread extends Thread {
109+
private static class ReconfigurationThread extends HadoopThread {
109110
private ReconfigurableBase parent;
110111

111112
ReconfigurationThread(ReconfigurableBase base) {
113+
super();
112114
this.parent = base;
113115
}
114116

115117
// See {@link ReconfigurationServlet#applyChanges}
116-
public void run() {
118+
public void work() {
117119
LOG.info("Starting reconfiguration task.");
118120
final Configuration oldConf = parent.getConf();
119121
final Configuration newConf = parent.getNewConf();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.hadoop.classification.InterfaceAudience;
2121
import org.apache.hadoop.classification.InterfaceStability;
2222
import org.apache.hadoop.classification.VisibleForTesting;
23+
import org.apache.hadoop.util.concurrent.HadoopThread;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

@@ -107,7 +108,7 @@ void init() {
107108
*/
108109
private void initRefreshThread(boolean runImmediately) {
109110
if (refreshInterval > 0) {
110-
refreshUsed = new Thread(new RefreshThread(this, runImmediately),
111+
refreshUsed = new HadoopThread(new RefreshThread(this, runImmediately),
111112
"refreshUsed-" + dirPath);
112113
refreshUsed.setDaemon(true);
113114
refreshUsed.start();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.security.token.Token;
3131
import org.apache.hadoop.security.token.TokenIdentifier;
3232
import org.apache.hadoop.util.Time;
33+
import org.apache.hadoop.util.concurrent.HadoopThread;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

@@ -38,7 +39,7 @@
3839
*/
3940
@InterfaceAudience.Private
4041
public class DelegationTokenRenewer
41-
extends Thread {
42+
extends HadoopThread {
4243
private static final Logger LOG = LoggerFactory
4344
.getLogger(DelegationTokenRenewer.class);
4445

@@ -263,7 +264,7 @@ public <T extends FileSystem & Renewable> void removeRenewAction(
263264
}
264265

265266
@Override
266-
public void run() {
267+
public void work() {
267268
for(;;) {
268269
RenewAction<?> action = null;
269270
try {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.hadoop.util.ReflectionUtils;
8282
import org.apache.hadoop.util.ShutdownHookManager;
8383
import org.apache.hadoop.util.StringUtils;
84+
import org.apache.hadoop.util.concurrent.HadoopThread;
8485
import org.apache.hadoop.tracing.Tracer;
8586
import org.apache.hadoop.tracing.TraceScope;
8687
import org.apache.hadoop.util.Preconditions;
@@ -4087,7 +4088,7 @@ private interface StatisticsAggregator<T> {
40874088
static {
40884089
STATS_DATA_REF_QUEUE = new ReferenceQueue<>();
40894090
// start a single daemon cleaner thread
4090-
STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
4091+
STATS_DATA_CLEANER = new HadoopThread(new StatisticsDataReferenceCleaner());
40914092
STATS_DATA_CLEANER.
40924093
setName(StatisticsDataReferenceCleaner.class.getName());
40934094
STATS_DATA_CLEANER.setDaemon(true);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ public void uncaughtException(Thread t, Throwable e) {
283283
}
284284

285285
@Override
286-
public void run() {
286+
public void work() {
287287
while (shouldRun) {
288288
try {
289289
loopUntilConnected();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/StreamPumper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.ha;
1919

20+
import org.apache.hadoop.util.concurrent.HadoopThread;
2021
import org.slf4j.Logger;
2122

2223
import java.io.BufferedReader;
@@ -50,7 +51,7 @@ enum StreamType {
5051
this.stream = stream;
5152
this.type = type;
5253

53-
thread = new Thread(new Runnable() {
54+
thread = new HadoopThread(new Runnable() {
5455
@Override
5556
public void run() {
5657
try {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ void tryStart() {
158158
if (running.compareAndSet(null, current)) {
159159
final Daemon daemon = new Daemon() {
160160
@Override
161-
public void run() {
161+
public void work() {
162162
for (; isRunning(this);) {
163163
final long waitTime = checkCalls();
164164
tryStop(this);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.hadoop.util.StringUtils;
5555
import org.apache.hadoop.util.Time;
5656
import org.apache.hadoop.util.concurrent.AsyncGet;
57+
import org.apache.hadoop.util.concurrent.HadoopThread;
5758
import org.apache.hadoop.tracing.Span;
5859
import org.apache.hadoop.tracing.Tracer;
5960
import org.slf4j.Logger;
@@ -407,7 +408,7 @@ public synchronized void setRpcResponse(Writable rpcResponse) {
407408
/** Thread that reads responses and notifies callers. Each connection owns a
408409
* socket connected to a remote address. Calls are multiplexed through this
409410
* socket: responses may be delivered out of order. */
410-
private class Connection extends Thread {
411+
private class Connection extends HadoopThread {
411412
private InetSocketAddress server; // server ip:port
412413
private final ConnectionId remoteId; // connection id
413414
private AuthMethod authMethod; // authentication method
@@ -448,7 +449,7 @@ private class Connection extends Thread {
448449
Consumer<Connection> removeMethod) {
449450
this.remoteId = remoteId;
450451
this.server = remoteId.getAddress();
451-
this.rpcRequestThread = new Thread(new RpcRequestSender(),
452+
this.rpcRequestThread = new HadoopThread(new RpcRequestSender(),
452453
"IPC Parameter Sending Thread for " + remoteId);
453454
this.rpcRequestThread.setDaemon(true);
454455

@@ -1126,7 +1127,7 @@ private synchronized void sendPing() throws IOException {
11261127
}
11271128

11281129
@Override
1129-
public void run() {
1130+
public void work() {
11301131
try {
11311132
// Don't start the ipc parameter sending thread until we start this
11321133
// thread, because the shutdown logic only gets triggered if this

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@
124124
import org.apache.hadoop.util.ProtoUtil;
125125
import org.apache.hadoop.util.StringUtils;
126126
import org.apache.hadoop.util.Time;
127+
import org.apache.hadoop.util.concurrent.HadoopThread;
128+
127129
import java.util.concurrent.atomic.AtomicBoolean;
128130
import org.apache.hadoop.tracing.Span;
129131
import org.apache.hadoop.tracing.SpanContext;
@@ -1471,7 +1473,7 @@ public String toString() {
14711473
}
14721474

14731475
/** Listens on the socket. Creates jobs for the handler threads*/
1474-
private class Listener extends Thread {
1476+
private class Listener extends HadoopThread {
14751477

14761478
private ServerSocketChannel acceptChannel = null; //the accept channel
14771479
private Selector selector = null; //the selector that we use for the server
@@ -1520,7 +1522,7 @@ void setIsAuxiliary() {
15201522
this.isOnAuxiliaryPort = true;
15211523
}
15221524

1523-
private class Reader extends Thread {
1525+
private class Reader extends HadoopThread {
15241526
final private BlockingQueue<Connection> pendingConnections;
15251527
private final Selector readSelector;
15261528

@@ -1533,7 +1535,7 @@ private class Reader extends Thread {
15331535
}
15341536

15351537
@Override
1536-
public void run() {
1538+
public void work() {
15371539
LOG.info("Starting " + Thread.currentThread().getName());
15381540
try {
15391541
doRunLoop();
@@ -1612,7 +1614,7 @@ void shutdown() {
16121614
}
16131615

16141616
@Override
1615-
public void run() {
1617+
public void work() {
16161618
LOG.info(Thread.currentThread().getName() + ": starting");
16171619
SERVER.set(Server.this);
16181620
connectionManager.startIdleScan();
@@ -1760,7 +1762,7 @@ Reader getReader() {
17601762
}
17611763

17621764
// Sends responses of RPC back to clients.
1763-
private class Responder extends Thread {
1765+
private class Responder extends HadoopThread {
17641766
private final Selector writeSelector;
17651767
private int pending; // connections waiting to register
17661768

@@ -1772,7 +1774,7 @@ private class Responder extends Thread {
17721774
}
17731775

17741776
@Override
1775-
public void run() {
1777+
public void work() {
17761778
LOG.info(Thread.currentThread().getName() + ": starting");
17771779
SERVER.set(Server.this);
17781780
try {
@@ -3219,15 +3221,15 @@ private void internalQueueCall(Call call, boolean blocking)
32193221
}
32203222

32213223
/** Handles queued calls . */
3222-
private class Handler extends Thread {
3224+
private class Handler extends HadoopThread {
32233225
public Handler(int instanceNumber) {
32243226
this.setDaemon(true);
32253227
this.setName("IPC Server handler "+ instanceNumber +
32263228
" on default port " + port);
32273229
}
32283230

32293231
@Override
3230-
public void run() {
3232+
public void work() {
32313233
LOG.debug("{}: starting", Thread.currentThread().getName());
32323234
SERVER.set(Server.this);
32333235
while (running) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hadoop.metrics2.MetricsFilter;
3535
import org.apache.hadoop.metrics2.MetricsSink;
3636
import org.apache.hadoop.util.Time;
37+
import org.apache.hadoop.util.concurrent.HadoopThread;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

@@ -48,7 +49,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
4849
private final MetricsSink sink;
4950
private final MetricsFilter sourceFilter, recordFilter, metricFilter;
5051
private final SinkQueue<MetricsBuffer> queue;
51-
private final Thread sinkThread;
52+
private final HadoopThread sinkThread;
5253
private volatile boolean stopping = false;
5354
private volatile boolean inError = false;
5455
private final int periodMs, firstRetryDelay, retryCount;
@@ -84,8 +85,8 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
8485
"Dropped updates per sink", 0);
8586
qsize = registry.newGauge("Sink_"+ name + "Qsize", "Queue size", 0);
8687

87-
sinkThread = new Thread() {
88-
@Override public void run() {
88+
sinkThread = new HadoopThread() {
89+
@Override public void work() {
8990
publishMetricsFromQueue();
9091
}
9192
};

0 commit comments

Comments
 (0)