Skip to content

Commit 678aea9

Browse files
authored
HADOOP-19612 Rpc auth header (#7844)
* HADOOP-19612 Add RPC header for access token (#7803) Add a new auth header to the rpc header proto for access token support. This should support different access tokens within the same connection. (backport) Contributed-by: Tom McCormick <[email protected]>
1 parent 71cec71 commit 678aea9

File tree

6 files changed

+266
-39
lines changed

6 files changed

+266
-39
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 66 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@
136136
import org.apache.hadoop.thirdparty.protobuf.Message;
137137
import org.slf4j.Logger;
138138
import org.slf4j.LoggerFactory;
139+
import org.apache.hadoop.security.AuthorizationContext;
139140

140141
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
141142
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -835,6 +836,7 @@ public static class Call implements Schedulable,
835836
final byte[] clientId;
836837
private final Span span; // the trace span on the server side
837838
private final CallerContext callerContext; // the call context
839+
private final byte[] authHeader; // the auth header
838840
private boolean deferredResponse = false;
839841
private int priorityLevel;
840842
// the priority level assigned by scheduler, 0 by default
@@ -863,6 +865,11 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2,
863865

864866
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId,
865867
Span span, CallerContext callerContext) {
868+
this(id, retryCount, kind, clientId, span, callerContext, null);
869+
}
870+
871+
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId,
872+
Span span, CallerContext callerContext, byte[] authHeader) {
866873
this.callId = id;
867874
this.retryCount = retryCount;
868875
this.timestampNanos = Time.monotonicNowNanos();
@@ -871,6 +878,7 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2,
871878
this.clientId = clientId;
872879
this.span = span;
873880
this.callerContext = callerContext;
881+
this.authHeader = authHeader;
874882
this.clientStateId = Long.MIN_VALUE;
875883
this.isCallCoordinated = false;
876884
}
@@ -1051,7 +1059,15 @@ private class RpcCall extends Call {
10511059
RpcCall(Connection connection, int id, int retryCount,
10521060
Writable param, RPC.RpcKind kind, byte[] clientId,
10531061
Span span, CallerContext context) {
1054-
super(id, retryCount, kind, clientId, span, context);
1062+
this(connection, id, retryCount, param, kind, clientId,
1063+
span, context, new byte[0]);
1064+
}
1065+
1066+
@SuppressWarnings("checkstyle:parameterNumber")
1067+
RpcCall(Connection connection, int id, int retryCount,
1068+
Writable param, RPC.RpcKind kind, byte[] clientId,
1069+
Span span, CallerContext context, byte[] authHeader) {
1070+
super(id, retryCount, kind, clientId, span, context, authHeader);
10551071
this.connection = connection;
10561072
this.rpcRequest = param;
10571073
}
@@ -2783,48 +2799,58 @@ private void processRpcRequest(RpcRequestHeaderProto header,
27832799
.build();
27842800
}
27852801

2786-
RpcCall call = new RpcCall(this, header.getCallId(),
2787-
header.getRetryCount(), rpcRequest,
2788-
ProtoUtil.convert(header.getRpcKind()),
2789-
header.getClientId().toByteArray(), span, callerContext);
2790-
2791-
// Save the priority level assignment by the scheduler
2792-
call.setPriorityLevel(callQueue.getPriorityLevel(call));
2793-
call.markCallCoordinated(false);
2794-
if(alignmentContext != null && call.rpcRequest != null &&
2795-
(call.rpcRequest instanceof ProtobufRpcEngine2.RpcProtobufRequest)) {
2796-
// if call.rpcRequest is not RpcProtobufRequest, will skip the following
2797-
// step and treat the call as uncoordinated. As currently only certain
2798-
// ClientProtocol methods request made through RPC protobuf needs to be
2799-
// coordinated.
2800-
String methodName;
2801-
String protoName;
2802-
ProtobufRpcEngine2.RpcProtobufRequest req =
2803-
(ProtobufRpcEngine2.RpcProtobufRequest) call.rpcRequest;
2804-
try {
2805-
methodName = req.getRequestHeader().getMethodName();
2806-
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
2807-
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
2808-
call.markCallCoordinated(true);
2809-
long stateId;
2810-
stateId = alignmentContext.receiveRequestState(
2811-
header, getMaxIdleTime());
2812-
call.setClientStateId(stateId);
2802+
// Set AuthorizationContext for this thread if present
2803+
byte[] authHeader = null;
2804+
try {
2805+
if (header.hasAuthorizationHeader()) {
2806+
authHeader = header.getAuthorizationHeader().toByteArray();
2807+
}
2808+
2809+
RpcCall call = new RpcCall(this, header.getCallId(),
2810+
header.getRetryCount(), rpcRequest,
2811+
ProtoUtil.convert(header.getRpcKind()),
2812+
header.getClientId().toByteArray(), span, callerContext, authHeader);
2813+
2814+
// Save the priority level assignment by the scheduler
2815+
call.setPriorityLevel(callQueue.getPriorityLevel(call));
2816+
call.markCallCoordinated(false);
2817+
if (alignmentContext != null && call.rpcRequest != null &&
2818+
(call.rpcRequest instanceof ProtobufRpcEngine2.RpcProtobufRequest)) {
2819+
// if call.rpcRequest is not RpcProtobufRequest, will skip the following
2820+
// step and treat the call as uncoordinated. As currently only certain
2821+
// ClientProtocol methods request made through RPC protobuf needs to be
2822+
// coordinated.
2823+
String methodName;
2824+
String protoName;
2825+
ProtobufRpcEngine2.RpcProtobufRequest req =
2826+
(ProtobufRpcEngine2.RpcProtobufRequest) call.rpcRequest;
2827+
try {
2828+
methodName = req.getRequestHeader().getMethodName();
2829+
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
2830+
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
2831+
call.markCallCoordinated(true);
2832+
long stateId;
2833+
stateId = alignmentContext.receiveRequestState(
2834+
header, getMaxIdleTime());
2835+
call.setClientStateId(stateId);
2836+
}
2837+
} catch (IOException ioe) {
2838+
throw new RpcServerException("Processing RPC request caught ", ioe);
28132839
}
2814-
} catch (IOException ioe) {
2815-
throw new RpcServerException("Processing RPC request caught ", ioe);
28162840
}
2817-
}
28182841

2819-
try {
2820-
internalQueueCall(call);
2821-
} catch (RpcServerException rse) {
2822-
throw rse;
2823-
} catch (IOException ioe) {
2824-
throw new FatalRpcServerException(
2825-
RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
2842+
try {
2843+
internalQueueCall(call);
2844+
} catch (RpcServerException rse) {
2845+
throw rse;
2846+
} catch (IOException ioe) {
2847+
throw new FatalRpcServerException(
2848+
RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
2849+
}
2850+
incRpcCount(); // Increment the rpc count
2851+
} finally {
2852+
AuthorizationContext.clear();
28262853
}
2827-
incRpcCount(); // Increment the rpc count
28282854
}
28292855

28302856
/**
@@ -3046,6 +3072,7 @@ public void run() {
30463072
}
30473073
// always update the current call context
30483074
CallerContext.setCurrent(call.callerContext);
3075+
AuthorizationContext.setCurrentAuthorizationHeader(call.authHeader);
30493076
UserGroupInformation remoteUser = call.getRemoteUser();
30503077
connDropped = !call.isOpen();
30513078
if (remoteUser != null) {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.security;
19+
20+
/**
21+
* Utility for managing a thread-local authorization header for RPC calls.
22+
*/
23+
public final class AuthorizationContext {
24+
private static final ThreadLocal<byte[]> AUTH_HEADER = new ThreadLocal<>();
25+
26+
private AuthorizationContext() {}
27+
28+
public static void setCurrentAuthorizationHeader(byte[] header) {
29+
AUTH_HEADER.set(header);
30+
}
31+
32+
public static byte[] getCurrentAuthorizationHeader() {
33+
return AUTH_HEADER.get();
34+
}
35+
36+
public static void clear() {
37+
AUTH_HEADER.remove();
38+
}
39+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.tracing.Span;
3333
import org.apache.hadoop.tracing.Tracer;
3434
import org.apache.hadoop.tracing.TraceUtils;
35+
import org.apache.hadoop.security.AuthorizationContext;
3536

3637
import org.apache.hadoop.thirdparty.protobuf.ByteString;
3738

@@ -203,6 +204,12 @@ public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
203204
result.setCallerContext(contextBuilder);
204205
}
205206

207+
// Add authorization header if present
208+
byte[] authzHeader = AuthorizationContext.getCurrentAuthorizationHeader();
209+
if (authzHeader != null) {
210+
result.setAuthorizationHeader(ByteString.copyFrom(authzHeader));
211+
}
212+
206213
// Add alignment context if it is not null
207214
if (alignmentContext != null) {
208215
alignmentContext.updateRequestState(result);

hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
9595
// The client should not interpret these bytes, but only forward bytes
9696
// received from RpcResponseHeaderProto.routerFederatedState.
9797
optional bytes routerFederatedState = 9;
98+
// Authorization header for passing opaque credentials or tokens
99+
optional bytes authorizationHeader = 10;
98100
}
99101

100102

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.security;
19+
20+
import org.junit.jupiter.api.Assertions;
21+
import org.junit.jupiter.api.Test;
22+
23+
public class TestAuthorizationContext {
24+
25+
@Test
26+
public void testSetAndGetAuthorizationHeader() {
27+
byte[] header = "my-auth-header".getBytes();
28+
AuthorizationContext.setCurrentAuthorizationHeader(header);
29+
Assertions.assertArrayEquals(header, AuthorizationContext.getCurrentAuthorizationHeader());
30+
AuthorizationContext.clear();
31+
}
32+
33+
@Test
34+
public void testClearAuthorizationHeader() {
35+
byte[] header = "clear-me".getBytes();
36+
AuthorizationContext.setCurrentAuthorizationHeader(header);
37+
AuthorizationContext.clear();
38+
Assertions.assertNull(AuthorizationContext.getCurrentAuthorizationHeader());
39+
}
40+
41+
@Test
42+
public void testThreadLocalIsolation() throws Exception {
43+
byte[] mainHeader = "main-thread".getBytes();
44+
AuthorizationContext.setCurrentAuthorizationHeader(mainHeader);
45+
Thread t = new Thread(() -> {
46+
Assertions.assertNull(AuthorizationContext.getCurrentAuthorizationHeader());
47+
byte[] threadHeader = "other-thread".getBytes();
48+
AuthorizationContext.setCurrentAuthorizationHeader(threadHeader);
49+
Assertions.assertArrayEquals(
50+
threadHeader,
51+
AuthorizationContext.getCurrentAuthorizationHeader());
52+
AuthorizationContext.clear();
53+
Assertions.assertNull(AuthorizationContext.getCurrentAuthorizationHeader());
54+
});
55+
t.start();
56+
t.join();
57+
// Main thread should still have its header
58+
Assertions.assertArrayEquals(mainHeader, AuthorizationContext.getCurrentAuthorizationHeader());
59+
AuthorizationContext.clear();
60+
}
61+
62+
@Test
63+
public void testNullAndEmptyHeader() {
64+
AuthorizationContext.setCurrentAuthorizationHeader(null);
65+
Assertions.assertNull(AuthorizationContext.getCurrentAuthorizationHeader());
66+
byte[] empty = new byte[0];
67+
AuthorizationContext.setCurrentAuthorizationHeader(empty);
68+
Assertions.assertArrayEquals(empty, AuthorizationContext.getCurrentAuthorizationHeader());
69+
AuthorizationContext.clear();
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.namenode;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.FileSystem;
22+
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.fs.FileStatus;
24+
import org.apache.hadoop.hdfs.HdfsConfiguration;
25+
import org.apache.hadoop.hdfs.MiniDFSCluster;
26+
import org.apache.hadoop.security.AuthorizationContext;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.net.InetAddress;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.List;
33+
34+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
35+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
36+
import static org.junit.jupiter.api.Assertions.assertNull;
37+
38+
public class TestAuthorizationHeaderPropagation {
39+
40+
public static class HeaderCapturingAuditLogger implements AuditLogger {
41+
public static final List<byte[]> CAPTURED_HEADERS = new ArrayList<>();
42+
@Override
43+
public void initialize(Configuration conf) {}
44+
@Override
45+
public void logAuditEvent(boolean succeeded, String userName, InetAddress addr,
46+
String cmd, String src, String dst, FileStatus stat) {
47+
byte[] header = AuthorizationContext.getCurrentAuthorizationHeader();
48+
CAPTURED_HEADERS.add(header == null ? null : Arrays.copyOf(header, header.length));
49+
}
50+
}
51+
52+
@Test
53+
public void testAuthorizationHeaderPerRpc() throws Exception {
54+
Configuration conf = new HdfsConfiguration();
55+
conf.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY, HeaderCapturingAuditLogger.class.getName());
56+
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
57+
try {
58+
cluster.waitClusterUp();
59+
HeaderCapturingAuditLogger.CAPTURED_HEADERS.clear();
60+
FileSystem fs = cluster.getFileSystem();
61+
// First RPC with header1
62+
byte[] header1 = "header-one".getBytes();
63+
AuthorizationContext.setCurrentAuthorizationHeader(header1);
64+
fs.mkdirs(new Path("/authz1"));
65+
AuthorizationContext.clear();
66+
// Second RPC with header2
67+
byte[] header2 = "header-two".getBytes();
68+
AuthorizationContext.setCurrentAuthorizationHeader(header2);
69+
fs.mkdirs(new Path("/authz2"));
70+
AuthorizationContext.clear();
71+
// Third RPC with no header
72+
fs.mkdirs(new Path("/authz3"));
73+
// Now assert
74+
assertArrayEquals(header1, HeaderCapturingAuditLogger.CAPTURED_HEADERS.get(0));
75+
assertArrayEquals(header2, HeaderCapturingAuditLogger.CAPTURED_HEADERS.get(1));
76+
assertNull(HeaderCapturingAuditLogger.CAPTURED_HEADERS.get(2));
77+
} finally {
78+
cluster.shutdown();
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)