1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+
18+ package org .apache .hadoop .fs .cosn .auth ;
19+
20+ import com .qcloud .cos .auth .BasicSessionCredentials ;
21+ import com .qcloud .cos .auth .COSCredentials ;
22+ import com .qcloud .cos .auth .COSCredentialsProvider ;
23+
24+ import org .apache .hadoop .conf .Configuration ;
25+ import org .apache .hadoop .fs .cosn .CosNConfigKeys ;
26+
27+ import com .tencentcloudapi .common .Credential ;
28+ import com .tencentcloudapi .common .profile .ClientProfile ;
29+ import com .tencentcloudapi .common .profile .HttpProfile ;
30+ import com .tencentcloudapi .sts .v20180813 .StsClient ;
31+ import com .tencentcloudapi .sts .v20180813 .models .GetFederationTokenRequest ;
32+ import com .tencentcloudapi .sts .v20180813 .models .GetFederationTokenResponse ;
33+ import org .slf4j .Logger ;
34+ import org .slf4j .LoggerFactory ;
35+
36+ import java .io .IOException ;
37+ import java .util .concurrent .atomic .AtomicReference ;
38+
39+ /**
40+ * A COSCredentialsProvider that generates temporary credentials from Tencent Cloud STS.
41+ * This provider requires a long-term secret ID and key with permission to call
42+ * the STS GetFederationToken action.
43+ */
44+ public class DynamicTemporaryCosnCredentialsProvider implements COSCredentialsProvider {
45+ private static final Logger LOG =
46+ LoggerFactory .getLogger (DynamicTemporaryCosnCredentialsProvider .class );
47+
48+ public static final String STS_SECRET_ID_KEY = "fs.cosn.auth.sts.secret.id" ;
49+ public static final String STS_SECRET_KEY_KEY = "fs.cosn.auth.sts.secret.key" ;
50+ public static final String STS_ENDPOINT_KEY = "fs.cosn.auth.sts.endpoint" ;
51+ public static final String DEFAULT_STS_ENDPOINT = "sts.tencentcloudapi.com" ;
52+ public static final String TOKEN_DURATION_SECONDS_KEY = "fs.cosn.auth.sts.token.duration.seconds" ;
53+ public static final int DEFAULT_TOKEN_DURATION_SECONDS = 900 ; // 15 minutes
54+
55+ private final String longTermSecretId ;
56+ private final String longTermSecretKey ;
57+ private final String stsEndpoint ;
58+ private final String region ;
59+ private final String bucketName ;
60+ private final long durationSeconds ;
61+
62+ private final AtomicReference <ExpiringCredentials > expiringCredentialsRef =
63+ new AtomicReference <>();
64+
65+ public DynamicTemporaryCosnCredentialsProvider (Configuration conf ) throws IOException {
66+ this .longTermSecretId = conf .get (STS_SECRET_ID_KEY );
67+ this .longTermSecretKey = conf .get (STS_SECRET_KEY_KEY );
68+ this .stsEndpoint = conf .get (STS_ENDPOINT_KEY , DEFAULT_STS_ENDPOINT );
69+ this .region = conf .get (CosNConfigKeys .COSN_REGION_KEY );
70+ this .bucketName = conf .get ("fs.defaultFS" ).replace ("cosn://" , "" );
71+ this .durationSeconds = conf .getLong (TOKEN_DURATION_SECONDS_KEY , DEFAULT_TOKEN_DURATION_SECONDS );
72+
73+ if (this .longTermSecretId == null || this .longTermSecretKey == null ) {
74+ throw new IOException (
75+ "Long-term STS credentials not provided in configuration. Please set " + STS_SECRET_ID_KEY
76+ + " and " + STS_SECRET_KEY_KEY );
77+ }
78+ if (this .region == null || this .bucketName == null ) {
79+ throw new IOException ("Bucket region or name not configured." );
80+ }
81+ }
82+
83+ @ Override
84+ public COSCredentials getCredentials () {
85+ ExpiringCredentials current = expiringCredentialsRef .get ();
86+ // Refresh if credentials are not present, or are within 60 seconds of expiry.
87+ if (current == null
88+ || System .currentTimeMillis () >= current .getExpirationTimeMillis () - 60000 ) {
89+ LOG .info ("STS credentials expired or not found, requesting new token." );
90+ refresh ();
91+ }
92+ return expiringCredentialsRef .get ().getCredentials ();
93+ }
94+
95+ @ Override
96+ public void refresh () {
97+ try {
98+ Credential cred = new Credential (this .longTermSecretId , this .longTermSecretKey );
99+ HttpProfile httpProfile = new HttpProfile ();
100+ httpProfile .setEndpoint (this .stsEndpoint );
101+ ClientProfile clientProfile = new ClientProfile ();
102+ clientProfile .setHttpProfile (httpProfile );
103+
104+ StsClient client = new StsClient (cred , this .region , clientProfile );
105+ GetFederationTokenRequest req = new GetFederationTokenRequest ();
106+
107+ String policyTemplate = "{\" version\" :\" 2.0\" ,\" statement\" :[{\" action\" :[\" cos:*\" ],"
108+ + "\" effect\" :\" allow\" ,\" resource\" :[\" qcs::cos:%s:uid/%s:%s/*\" ]}]}" ;
109+ String policy =
110+ String .format (policyTemplate , this .region , getAppIdFromBucket (this .bucketName ),
111+ this .bucketName );
112+ req .setPolicy (policy );
113+
114+ req .setDurationSeconds (this .durationSeconds );
115+ req .setName ("HadoopCosNContractTest" );
116+
117+ GetFederationTokenResponse resp = client .GetFederationToken (req );
118+
119+ long expirationTimeMillis = (resp .getExpiredTime () * 1000 );
120+ BasicSessionCredentials credentials =
121+ new BasicSessionCredentials (resp .getCredentials ().getTmpSecretId (),
122+ resp .getCredentials ().getTmpSecretKey (), resp .getCredentials ().getToken ());
123+
124+ expiringCredentialsRef .set (new ExpiringCredentials (credentials , expirationTimeMillis ));
125+ LOG .info ("Successfully refreshed STS credentials. Expiration: {}" ,
126+ new java .util .Date (expirationTimeMillis ));
127+
128+ } catch (Exception e ) {
129+ LOG .error ("Failed to get token from STS: {}" , e .toString ());
130+ throw new RuntimeException ("Failed to get token from STS" , e );
131+ }
132+ }
133+
134+ private String getAppIdFromBucket (String bucket ) {
135+ int lastDash = bucket .lastIndexOf ('-' );
136+ if (lastDash != -1 && lastDash < bucket .length () - 1 ) {
137+ return bucket .substring (lastDash + 1 );
138+ }
139+ throw new IllegalArgumentException ("Could not determine AppID from bucket name: " + bucket );
140+ }
141+
142+ /**
143+ * Helper class to hold credentials and their expiration time.
144+ */
145+ private static class ExpiringCredentials {
146+ private final BasicSessionCredentials credentials ;
147+ private final long expirationTimeMillis ;
148+
149+ ExpiringCredentials (BasicSessionCredentials credentials , long expirationTimeMillis ) {
150+ this .credentials = credentials ;
151+ this .expirationTimeMillis = expirationTimeMillis ;
152+ }
153+
154+ BasicSessionCredentials getCredentials () {
155+ return credentials ;
156+ }
157+
158+ long getExpirationTimeMillis () {
159+ return expirationTimeMillis ;
160+ }
161+ }
162+ }
0 commit comments