Skip to content

Commit 6e43e60

Browse files
HDFS-17762. [ARR] Reset CallerContext information when async handler thread done. (#7539). Contributed by hfutatzhanghb.
Reviewed-by: Jian Zhang <[email protected]>
1 parent e9a7a0b commit 6e43e60

File tree

3 files changed

+45
-0
lines changed

3 files changed

+45
-0
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ThreadLocalContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public void transfer() {
7878
if (call != null) {
7979
Server.getCurCall().set(call);
8080
}
81+
CallerContext.setCurrent(null);
8182
if (context != null) {
8283
CallerContext.setCurrent(context);
8384
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2385,4 +2385,45 @@ public void testGetListingOrder() throws Exception {
23852385
fileSystem1.delete(new Path(testPath2), true);
23862386
}
23872387
}
2388+
2389+
@Test
2390+
public void testCallerContextNotResetByAsyncHandler() throws IOException {
2391+
GenericTestUtils.LogCapturer auditLog =
2392+
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
2393+
String dirPath = "/test";
2394+
2395+
// The reason we start this child thread is that CallContext use InheritableThreadLocal.
2396+
Thread t1 = new Thread(() -> {
2397+
// Set flag async:true.
2398+
CallerContext.setCurrent(
2399+
new CallerContext.Builder("async:true").build());
2400+
// Issue some RPCs via the router to populate the CallerContext of async handler thread.
2401+
for (int i = 0; i < 10; i++) {
2402+
try {
2403+
routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
2404+
assertTrue(verifyFileExists(routerFS, dirPath));
2405+
routerProtocol.delete(dirPath, true);
2406+
assertFalse(verifyFileExists(routerFS, dirPath));
2407+
} catch (Exception e) {
2408+
throw new RuntimeException(e);
2409+
}
2410+
}
2411+
2412+
// The audit log should contains async:true.
2413+
assertTrue(auditLog.getOutput().contains("async:true"));
2414+
auditLog.clearOutput();
2415+
assertFalse(auditLog.getOutput().contains("async:true"));
2416+
});
2417+
2418+
t1.start();
2419+
try {
2420+
t1.join();
2421+
} catch (InterruptedException e) {
2422+
throw new RuntimeException(e);
2423+
}
2424+
2425+
routerProtocol.getFileInfo(dirPath);
2426+
// The audit log should not contain async:true.
2427+
assertFalse(auditLog.getOutput().contains("async:true"));
2428+
}
23882429
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.TimeUnit;
3333

3434
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
35+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
3536
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
3637
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
3738
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -59,6 +60,7 @@ public static void globalSetUp() throws Exception {
5960
routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
6061
RouterAsyncRpcFairnessPolicyController.class,
6162
RouterRpcFairnessPolicyController.class);
63+
routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 2);
6264
setUp(routerConf);
6365
}
6466

@@ -80,4 +82,5 @@ public void testgetGroupsForUser() throws Exception {
8082
String[] result = syncReturn(String[].class);
8183
assertArrayEquals(group, result);
8284
}
85+
8386
}

0 commit comments

Comments
 (0)