Skip to content

Commit a8b999e

Browse files
feat: add support for cos storage type for dlq (#67)
* feat: add support for cos storage type for dlq * fix: fix checkstyle * fix: fix checkstyle * fix: fix checkstyle * fix: fix checkstyle * fix: fix checkstyle * feat: improve error handling and instrumentation in cos dlq * chore: bump firehose version to 0.12.0 * fix: fix checkstyle errors * fix: fix checkstyle errors * test: add unit tests for COS DLQ * fix: fix checkstyle errors * refactor: fix failing unit tests * refactor: fix failing unit tests * refactor: fix failing unit tests * fix: fix checkstyle errors * refactor: remove unused methods * refactor: change cos type placeholder string * refactor: extract retry delay param to cos configs
1 parent d2abe23 commit a8b999e

15 files changed

+1625
-3
lines changed

build.gradle

+4-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ lombok {
3333
}
3434

3535
group 'com.gotocompany'
36-
version '0.11.9'
36+
version '0.12.0'
3737

3838
def projName = "firehose"
3939

@@ -104,10 +104,13 @@ dependencies {
104104
implementation group: 'com.gotocompany', name: 'depot', version: '0.10.6'
105105
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
106106
implementation 'dev.cel:cel:0.5.2'
107+
implementation 'com.qcloud:cos_api:5.6.227'
108+
implementation 'com.qcloud:cos-sts_api:3.1.0'
107109

108110
testImplementation group: 'junit', name: 'junit', version: '4.11'
109111
testImplementation 'org.hamcrest:hamcrest-all:1.3'
110112
testImplementation 'org.mockito:mockito-core:4.5.1'
113+
testImplementation 'org.mockito:mockito-inline:4.5.1'
111114
testImplementation "com.github.tomakehurst:wiremock:2.3.1"
112115
testImplementation group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0'
113116
testImplementation group: 'org.mock-server', name: 'mockserver-netty', version: '3.10.5'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.gotocompany.firehose.config;
2+
3+
import org.aeonbits.owner.Config;
4+
5+
public interface CloudObjectStorageConfig extends Config {
6+
@Key("${COS_TYPE}_COS_REGION")
7+
String getCosRegion();
8+
9+
@Key("${COS_TYPE}_COS_BUCKET_NAME")
10+
String getCosBucketName();
11+
12+
@Key("${COS_TYPE}_COS_DIRECTORY_PREFIX")
13+
String getCosDirectoryPrefix();
14+
15+
@Key("${COS_TYPE}_COS_SECRET_ID")
16+
String getCosSecretId();
17+
18+
@Key("${COS_TYPE}_COS_SECRET_KEY")
19+
String getCosSecretKey();
20+
21+
@Key("${COS_TYPE}_COS_TEMP_CREDENTIAL_VALIDITY_SECONDS")
22+
@DefaultValue("1800")
23+
Integer getCosTempCredentialValiditySeconds();
24+
25+
@Key("${COS_TYPE}_COS_APPID")
26+
String getCosAppId();
27+
28+
@Key("${COS_TYPE}_COS_RETRY_MAX_ATTEMPTS")
29+
@DefaultValue("10")
30+
Integer getCosRetryMaxAttempts();
31+
32+
@Key("${COS_TYPE}_COS_RETRY_INITIAL_DELAY_MS")
33+
@DefaultValue("1000")
34+
Long getCosRetryInitialDelayMS();
35+
36+
@Key("${COS_TYPE}_COS_RETRY_MAX_DELAY_MS")
37+
@DefaultValue("30000")
38+
Long getCosRetryMaxDelayMS();
39+
40+
@Key("${COS_TYPE}_COS_RETRY_TOTAL_TIMEOUT_MS")
41+
@DefaultValue("120000")
42+
Long getCosRetryTotalTimeoutMS();
43+
44+
@Key("${COS_TYPE}_COS_CONNECTION_TIMEOUT_MS")
45+
@DefaultValue("5000")
46+
Long getCosConnectionTimeoutMS();
47+
48+
@Key("${COS_TYPE}_COS_SOCKET_TIMEOUT_MS")
49+
@DefaultValue("50000")
50+
Long getCosSocketTimeoutMS();
51+
52+
@Key("${COS_TYPE}_COS_RETRY_DELAY_MS")
53+
@DefaultValue("1000")
54+
Long getCosRetryDelayMS();
55+
}

src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ public static BlobStorage createSinkObjectStorage(BlobSinkConfig sinkConfig, Map
7373
case OSS:
7474
configuration.put("OSS_TYPE", "SINK_BLOB");
7575
break;
76+
case COS:
77+
configuration.put("COS_TYPE", "SINK_BLOB");
78+
break;
7679
default:
7780
throw new IllegalArgumentException("Sink Blob Storage type " + sinkConfig.getBlobStorageType() + "is not supported");
7881
}

src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageFactory.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.gotocompany.firehose.sink.common.blobstorage;
22

3+
import com.gotocompany.firehose.config.CloudObjectStorageConfig;
34
import com.gotocompany.firehose.config.GCSConfig;
45
import com.gotocompany.firehose.config.ObjectStorageServiceConfig;
56
import com.gotocompany.firehose.config.S3Config;
7+
import com.gotocompany.firehose.sink.common.blobstorage.cos.CloudObjectStorage;
68
import com.gotocompany.firehose.sink.common.blobstorage.gcs.GoogleCloudStorage;
79
import com.gotocompany.firehose.sink.common.blobstorage.oss.ObjectStorageService;
810
import com.gotocompany.firehose.sink.common.blobstorage.s3.S3;
@@ -26,7 +28,7 @@ public static BlobStorage createObjectStorage(BlobStorageType storageType, Map<S
2628
try {
2729
S3Config s3Config = ConfigFactory.create(S3Config.class, config);
2830
return new S3(s3Config);
29-
} catch (Exception e) {
31+
} catch (Exception e) {
3032
throw new IllegalArgumentException("Exception while creating S3 Storage", e);
3133
}
3234
case OSS:
@@ -36,6 +38,13 @@ public static BlobStorage createObjectStorage(BlobStorageType storageType, Map<S
3638
} catch (Exception e) {
3739
throw new IllegalArgumentException("Exception while creating OSS Storage", e);
3840
}
41+
case COS:
42+
try {
43+
CloudObjectStorageConfig cloudObjectStorageConfig = ConfigFactory.create(CloudObjectStorageConfig.class, config);
44+
return new CloudObjectStorage(cloudObjectStorageConfig);
45+
} catch (Exception e) {
46+
throw new IllegalArgumentException("Exception while creating COS Storage", e);
47+
}
3948
default:
4049
throw new IllegalArgumentException("Blob Storage Type " + storageType + " is not supported");
4150
}

src/main/java/com/gotocompany/firehose/sink/common/blobstorage/BlobStorageType.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
public enum BlobStorageType {
44
GCS,
55
S3,
6-
OSS
6+
OSS,
7+
COS
78
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package com.gotocompany.firehose.sink.common.blobstorage.cos;
2+
3+
import com.gotocompany.firehose.config.CloudObjectStorageConfig;
4+
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage;
5+
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException;
6+
import com.gotocompany.firehose.sink.common.blobstorage.cos.auth.TencentCredentialManager;
7+
import com.gotocompany.firehose.sink.common.blobstorage.cos.service.TencentObjectOperations;
8+
import com.qcloud.cos.COSClient;
9+
import com.qcloud.cos.ClientConfig;
10+
import com.qcloud.cos.exception.CosClientException;
11+
import com.qcloud.cos.exception.CosServiceException;
12+
import com.qcloud.cos.model.BucketReplicationConfiguration;
13+
import com.qcloud.cos.model.ReplicationRule;
14+
import com.qcloud.cos.region.Region;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
public class CloudObjectStorage implements BlobStorage {
19+
private static final Logger LOGGER = LoggerFactory.getLogger(CloudObjectStorage.class);
20+
21+
private final TencentObjectOperations tencentObjectOperations;
22+
private final TencentCredentialManager credentialManager;
23+
private final COSClient cosClient;
24+
private final CloudObjectStorageConfig config;
25+
26+
public CloudObjectStorage(CloudObjectStorageConfig config) {
27+
this.config = config;
28+
this.credentialManager = new TencentCredentialManager(config);
29+
ClientConfig clientConfig = createDefaultClientConfig(config);
30+
this.cosClient = new COSClient(credentialManager.getCredentials(), clientConfig);
31+
this.tencentObjectOperations = new TencentObjectOperations(cosClient, config);
32+
checkBucket();
33+
logRetentionPolicy();
34+
}
35+
36+
CloudObjectStorage(CloudObjectStorageConfig config, TencentCredentialManager credentialManager, COSClient cosClient) {
37+
this.config = config;
38+
this.credentialManager = credentialManager;
39+
this.cosClient = cosClient;
40+
this.tencentObjectOperations = new TencentObjectOperations(cosClient, config);
41+
checkBucket();
42+
logRetentionPolicy();
43+
}
44+
45+
private static ClientConfig createDefaultClientConfig(CloudObjectStorageConfig config) {
46+
ClientConfig clientConfig = new ClientConfig(new Region(config.getCosRegion()));
47+
clientConfig.setMaxErrorRetry(config.getCosRetryMaxAttempts());
48+
clientConfig.setConnectionTimeout(config.getCosConnectionTimeoutMS().intValue());
49+
clientConfig.setSocketTimeout(config.getCosSocketTimeoutMS().intValue());
50+
return clientConfig;
51+
}
52+
53+
void checkBucket() {
54+
String bucketName = config.getCosBucketName();
55+
if (bucketName == null || bucketName.trim().isEmpty()) {
56+
throw new IllegalArgumentException("Bucket name cannot be null or empty");
57+
}
58+
String region = config.getCosRegion();
59+
if (region == null || region.trim().isEmpty()) {
60+
throw new IllegalArgumentException("Region cannot be null or empty");
61+
}
62+
try {
63+
if (!cosClient.doesBucketExist(bucketName)) {
64+
LOGGER.error("Bucket does not exist: {}", bucketName);
65+
LOGGER.error("Please create COS bucket before running firehose: {}", bucketName);
66+
throw new IllegalArgumentException("COS Bucket not found: " + bucketName);
67+
}
68+
LOGGER.info("Successfully verified COS bucket exists: {}", bucketName);
69+
} catch (CosServiceException e) {
70+
LOGGER.error("Failed to check bucket existence: {} - {} ({})",
71+
bucketName, e.getErrorMessage(), e.getStatusCode(), e);
72+
throw new IllegalArgumentException("Failed to verify COS bucket: " + e.getMessage(), e);
73+
} catch (CosClientException e) {
74+
LOGGER.error("Client error while checking bucket: {}", bucketName, e);
75+
throw new IllegalArgumentException("Failed to verify COS bucket due to client error", e);
76+
}
77+
}
78+
79+
private void logRetentionPolicy() {
80+
String bucketName = config.getCosBucketName();
81+
try {
82+
BucketReplicationConfiguration replication = cosClient.getBucketReplicationConfiguration(bucketName);
83+
if (replication != null && replication.getRules() != null) {
84+
LOGGER.info("Retention Policy for bucket: {}", bucketName);
85+
for (ReplicationRule rule : replication.getRules()) {
86+
LOGGER.info("Rule ID: {}, Status: {}", rule.getID(), rule.getStatus());
87+
}
88+
} else {
89+
LOGGER.info("No retention policy configured for bucket: {}", bucketName);
90+
}
91+
} catch (CosServiceException e) {
92+
LOGGER.warn("Unable to fetch retention policy for bucket {}: {} ({})",
93+
bucketName, e.getErrorMessage(), e.getStatusCode());
94+
} catch (CosClientException e) {
95+
LOGGER.warn("Client error while fetching retention policy for bucket {}: {}",
96+
bucketName, e.getMessage());
97+
}
98+
}
99+
100+
public void store(String objectName, String filePath) throws BlobStorageException {
101+
LOGGER.info("Storing file to COS: {} -> {}", filePath, objectName);
102+
tencentObjectOperations.uploadObject(objectName, filePath);
103+
}
104+
105+
public void store(String objectName, byte[] content) throws BlobStorageException {
106+
if (content == null) {
107+
throw new IllegalArgumentException("Content cannot be null");
108+
}
109+
LOGGER.info("Storing content to COS: {} ({} bytes)", objectName, content.length);
110+
tencentObjectOperations.uploadObject(objectName, content);
111+
}
112+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.gotocompany.firehose.sink.common.blobstorage.cos.auth;
2+
3+
import com.gotocompany.firehose.config.CloudObjectStorageConfig;
4+
import com.qcloud.cos.auth.BasicCOSCredentials;
5+
import com.qcloud.cos.auth.COSCredentials;
6+
import com.qcloud.cos.auth.COSCredentialsProvider;
7+
import com.tencent.cloud.CosStsClient;
8+
import com.tencent.cloud.Response;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.util.TreeMap;
13+
14+
public class TencentCredentialManager implements COSCredentialsProvider {
15+
private static final Logger LOGGER = LoggerFactory.getLogger(TencentCredentialManager.class);
16+
private static final int CREDENTIAL_REFRESH_THRESHOLD_MS = 1000;
17+
18+
private final CloudObjectStorageConfig config;
19+
private COSCredentials credentials;
20+
private long lastUpdateTime;
21+
22+
public TencentCredentialManager(CloudObjectStorageConfig config) {
23+
this.config = config;
24+
this.credentials = null;
25+
this.lastUpdateTime = 0;
26+
}
27+
28+
@Override
29+
public COSCredentials getCredentials() {
30+
if (shouldRefreshCredentials()) {
31+
refreshCredentials();
32+
}
33+
return credentials;
34+
}
35+
36+
@Override
37+
public void refresh() {
38+
refreshCredentials();
39+
}
40+
41+
private boolean shouldRefreshCredentials() {
42+
return credentials == null || isCredentialsExpired();
43+
}
44+
45+
private boolean isCredentialsExpired() {
46+
return (System.currentTimeMillis() - lastUpdateTime) / CREDENTIAL_REFRESH_THRESHOLD_MS >= config.getCosTempCredentialValiditySeconds();
47+
}
48+
49+
private void refreshCredentials() {
50+
try {
51+
TreeMap<String, Object> configMap = new TreeMap<>();
52+
configMap.put("secretId", this.config.getCosSecretId());
53+
configMap.put("secretKey", this.config.getCosSecretKey());
54+
configMap.put("durationSeconds", this.config.getCosTempCredentialValiditySeconds());
55+
configMap.put("bucket", this.config.getCosBucketName());
56+
configMap.put("region", this.config.getCosRegion());
57+
configMap.put("allowPrefix", "*");
58+
String[] allowActions = new String[] {
59+
"cos:PutObject",
60+
"cos:DeleteObject",
61+
"cos:GetObject",
62+
"cos:HeadObject",
63+
"cos:ListParts",
64+
"cos:ListObjects"
65+
};
66+
configMap.put("allowActions", allowActions);
67+
68+
Response response = CosStsClient.getCredential(configMap);
69+
if (response == null || response.credentials == null) {
70+
throw new RuntimeException("Failed to refresh COS credentials: null response or credentials");
71+
}
72+
credentials = new BasicCOSCredentials(response.credentials.tmpSecretId, response.credentials.tmpSecretKey);
73+
lastUpdateTime = System.currentTimeMillis();
74+
LOGGER.info("Successfully refreshed COS credentials");
75+
} catch (Exception e) {
76+
LOGGER.error("Failed to refresh COS credentials", e);
77+
throw new RuntimeException("Failed to refresh COS credentials", e);
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)