Skip to content

Commit c7e2b69

Browse files
Adding configuration, updating interfaces.
1 parent d16dd56 commit c7e2b69

File tree

5 files changed

+166
-59
lines changed

5 files changed

+166
-59
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.uber.cadence.largeblob;
2+
3+
import com.uber.cadence.converter.DataConverter;
4+
import com.uber.cadence.converter.JsonDataConverter;
5+
6+
import java.time.Duration;
7+
8+
public class Configuration {
9+
10+
private Storage storage;
11+
private DataConverter dataConverter;
12+
private Long maxBytes;
13+
private Duration ttl;
14+
15+
private Configuration() {
16+
}
17+
18+
public static Builder newBuilder() {
19+
return new Builder();
20+
}
21+
22+
public static class Builder {
23+
24+
private Storage storage;
25+
private DataConverter dataConverter = JsonDataConverter.getInstance();
26+
private Duration ttl;
27+
private Long maxBytes = 4096L;
28+
29+
public Configuration build() {
30+
if (storage == null) {
31+
throw new IllegalArgumentException("storage must be provided");
32+
}
33+
34+
Configuration configuration = new Configuration();
35+
configuration.storage = this.storage;
36+
configuration.dataConverter = this.dataConverter;
37+
configuration.ttl = this.ttl;
38+
configuration.maxBytes = this.maxBytes;
39+
return configuration;
40+
}
41+
42+
public Builder setDataConverter(DataConverter dataConverter) {
43+
this.dataConverter = dataConverter;
44+
return this;
45+
}
46+
47+
public Builder setStorage(Storage storage) {
48+
this.storage = storage;
49+
return this;
50+
}
51+
52+
public Builder setTtl(Duration ttl) {
53+
this.ttl = ttl;
54+
return this;
55+
}
56+
57+
public Builder setMaxBytes(Long maxBytes) {
58+
this.maxBytes = maxBytes;
59+
return this;
60+
}
61+
}
62+
63+
public Storage getStorage() {
64+
return storage;
65+
}
66+
67+
public DataConverter getDataConverter() {
68+
return dataConverter;
69+
}
70+
71+
public Duration getTtl() {
72+
return ttl;
73+
}
74+
75+
public Long getMaxBytes() {
76+
return maxBytes;
77+
}
78+
}

src/main/java/com/uber/cadence/largeblob/Future.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,64 +15,65 @@
1515

1616
package com.uber.cadence.largeblob;
1717

18+
import com.uber.cadence.converter.DataConverterException;
19+
1820
import java.io.IOException;
1921

2022
/**
2123
* Future is used for passing around potentially large parameters between activities. Future can never be used in Workflow code because it uses an external storage and that will make
2224
* workflow code non deterministic. Small amounts of data are stored in the future instance itself, while larger amounts of data are stored in an external storage.
2325
*/
24-
public class Future {
26+
public class Future<T> {
2527

2628
private byte[] encoded;
2729
private String url;
28-
private final Storage storage;
29-
private final long maxBytesInMemory;
30+
private final Configuration config;
31+
private final Class<T> clazz;
3032

31-
public Future(Storage storage, long maxBytesInMemory, byte[] encoded) {
32-
this.storage = storage;
33-
this.maxBytesInMemory = maxBytesInMemory;
33+
public Future(Configuration config, Class<T> clazz, byte[] encoded) {
34+
this.config = config;
3435
this.encoded = encoded;
36+
this.clazz = clazz;
3537
}
3638

37-
public Future(Storage storage, long maxBytesInMemory, String url) {
39+
public Future(Configuration config, Class<T> clazz, String url) {
3840
this.url = url;
39-
this.storage = storage;
40-
this.maxBytesInMemory = maxBytesInMemory;
41+
this.config = config;
42+
this.clazz = clazz;
4143
}
4244

43-
public Future(Storage storage, long maxBytesInMemory) {
44-
if (storage == null) {
45-
throw new IllegalArgumentException("storage can't be null");
45+
public Future(T obj, Configuration configuration) throws IOException {
46+
byte[] bytes;
47+
try {
48+
bytes = configuration.getDataConverter().toData(obj);
49+
} catch (DataConverterException e) {
50+
throw new IOException(e);
4651
}
4752

48-
this.storage = storage;
49-
this.maxBytesInMemory = maxBytesInMemory;
53+
this.config = configuration;
54+
this.clazz = (Class<T>) obj.getClass();
55+
if (bytes.length <= configuration.getMaxBytes()) {
56+
this.encoded = bytes;
57+
} else {
58+
this.url = configuration.getStorage().put(bytes);
59+
}
5060
}
5161

52-
public byte[] get() throws IOException {
62+
public T get() throws IOException {
5363
if (encoded != null) {
54-
return encoded;
64+
return config.getDataConverter().fromData(encoded, clazz, clazz);
5565
}
5666

5767
if (url != null) {
58-
return storage.get(url);
68+
return config .getDataConverter().fromData(config.getStorage().get(url), clazz, clazz);
5969
}
6070

6171
return null;
6272
}
6373

64-
public void put(String url, byte[] bytes) throws IOException {
65-
this.url = url;
66-
if (bytes.length < maxBytesInMemory) {
67-
this.encoded = bytes;
68-
} else {
69-
storage.put(url, bytes);
70-
}
71-
}
72-
7374
public void delete() throws IOException {
7475
if (this.encoded == null) {
75-
storage.delete(url);
76+
config.getStorage().delete(url);
7677
} else {
7778
this.encoded = null;
7879
}

src/main/java/com/uber/cadence/largeblob/Storage.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.uber.cadence.largeblob;
1717

1818
import java.io.IOException;
19+
import java.time.Duration;
1920

2021
/**
2122
* Storage is an abstraction for storing large parameters to access inside of activities.0
@@ -32,11 +33,26 @@ public interface Storage {
3233

3334
/**
3435
* Stores data based on uri provided.
35-
* @param uri uri.
3636
* @param bytes bytes.
3737
* @throws IOException should be thrown in any implementation class in case of problems with the datastore
3838
*/
39-
void put(String uri, byte[] bytes) throws IOException;
39+
String put(byte[] bytes) throws IOException;
40+
41+
/**
42+
* Stores data based on uri provided.
43+
* @param bytes bytes.
44+
* @param ttl ttl is used for storages like s3 to define the total time to store the object.
45+
* @throws IOException should be thrown in any implementation class in case of problems with the datastore
46+
*/
47+
String put(byte[] bytes, Duration ttl) throws IOException;
48+
49+
/**
50+
* Stores data based on uri provided.
51+
* @param key of the data.
52+
* @param bytes bytes.
53+
* @throws IOException should be thrown in any implementation class in case of problems with the datastore
54+
*/
55+
String put(String key, byte[] bytes) throws IOException;
4056

4157
/**
4258
* Deletes data based on uri provided.

src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
import com.uber.cadence.largeblob.Storage;
1919
import java.io.IOException;
20+
import java.time.Duration;
2021
import java.util.HashMap;
2122
import java.util.Map;
23+
import java.util.UUID;
2224

2325
public class InMemoryStorage implements Storage {
2426

@@ -30,8 +32,21 @@ public byte[] get(String uri) throws IOException {
3032
}
3133

3234
@Override
33-
public void put(String uri, byte[] bytes) throws IOException {
34-
storage.put(uri, bytes);
35+
public String put(byte[] bytes) throws IOException {
36+
String uuid = UUID.randomUUID().toString();
37+
storage.put(uuid, bytes);
38+
return uuid;
39+
}
40+
41+
@Override
42+
public String put(byte[] bytes, Duration ttl) throws IOException {
43+
return put(bytes);
44+
}
45+
46+
@Override
47+
public String put(String key, byte[] bytes) throws IOException {
48+
storage.put(key, bytes);
49+
return key;
3550
}
3651

3752
@Override

src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package com.uber.cadence.internal.largeblob;
1717

18+
import com.uber.cadence.largeblob.Configuration;
1819
import com.uber.cadence.largeblob.Future;
1920
import com.uber.cadence.largeblob.Storage;
2021
import com.uber.cadence.largeblob.impl.InMemoryStorage;
@@ -25,46 +26,42 @@
2526
import java.nio.charset.StandardCharsets;
2627

2728
import static org.junit.Assert.assertEquals;
29+
import static org.junit.Assert.assertNotNull;
2830
import static org.junit.Assert.assertNull;
2931

3032
@RunWith(MockitoJUnitRunner.class)
3133
public class FutureTest {
3234

33-
private Storage storage = new InMemoryStorage();
35+
private final Storage storage = new InMemoryStorage();
3436

3537
@Test
3638
public void testPutSmallElement() throws Exception {
37-
Future future = new Future(storage, 10);
39+
Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build();
40+
Future<String> future = new Future<String>("testValue", config);
3841

39-
String testValue = "testValue";
40-
future.put("test", testValue.getBytes(StandardCharsets.UTF_8));
41-
42-
assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8));
43-
assertNull(storage.get("test"));
42+
assertEquals("testValue", future.get());
43+
assertNull(future.getUrl());
4444
}
4545

4646
@Test
4747
public void testLargerValuesGetPutInStorage() throws Exception {
48-
Future future = new Future(storage, 10);
49-
48+
Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build();
5049
String testValue = "testValuetestValuetestValue";
51-
future.put("test", testValue.getBytes(StandardCharsets.UTF_8));
50+
Future future = new Future(testValue, config);
5251

53-
assertEquals("testValuetestValuetestValue", new String(future.get(), StandardCharsets.UTF_8));
54-
assertEquals(
55-
"testValuetestValuetestValue", new String(storage.get("test"), StandardCharsets.UTF_8));
52+
assertEquals("testValuetestValuetestValue", future.get());
53+
assertNotNull(storage.get(future.getUrl()));
5654
}
5755

5856
@Test
5957
public void testDeleteValueFromStorage() throws Exception {
60-
Future future = new Future(storage, 10);
61-
58+
Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build();
6259
String testValue = "testValuetestValuetestValue";
63-
future.put("test", testValue.getBytes(StandardCharsets.UTF_8));
60+
Future<String> future = new Future<>(testValue, config);
6461

65-
assertEquals("testValuetestValuetestValue", new String(future.get(), StandardCharsets.UTF_8));
62+
assertEquals("testValuetestValuetestValue", future.get());
6663
assertEquals(
67-
"testValuetestValuetestValue", new String(storage.get("test"), StandardCharsets.UTF_8));
64+
"testValuetestValuetestValue", config.getDataConverter().fromData(storage.get(future.getUrl()), String.class, String.class));
6865

6966
future.delete();
7067

@@ -75,33 +72,33 @@ public void testDeleteValueFromStorage() throws Exception {
7572

7673
@Test
7774
public void testSmallValueIsDelete() throws Exception {
78-
Future future = new Future(storage, 10);
79-
80-
String testValue = "testValue";
81-
future.put("test", testValue.getBytes(StandardCharsets.UTF_8));
75+
Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build();
76+
Future<String> future = new Future<>("testValue", config);
8277

83-
assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8));
84-
assertNull(storage.get("test"));
78+
assertEquals("testValue", future.get());
79+
assertNull(storage.get(future.getUrl()));
8580

8681
future.delete();
8782
assertNull(future.get());
8883
}
8984

9085
@Test
9186
public void getWorksWhenInitialisingWithEncodedData() throws Exception {
92-
Future future = new Future(storage, 10, "testValue".getBytes(StandardCharsets.UTF_8));
87+
Configuration configuration = Configuration.newBuilder().setStorage(storage).build();
88+
Future<String> future = new Future<>(configuration, String.class, "testValue".getBytes(StandardCharsets.UTF_8));
9389

94-
assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8));
90+
assertEquals("testValue", future.get());
9591
}
9692

9793
@Test
9894
public void getWorksWhenInitialisingWithUrl() throws Exception {
99-
Future future = new Future(storage, 10, "test");
95+
Configuration configuration = Configuration.newBuilder().setStorage(storage).build();
96+
Future<String> future = new Future<>(configuration, String.class, "test");
10097

10198
assertNull(future.get());
10299

103100
storage.put("test", "testValue".getBytes(StandardCharsets.UTF_8));
104101

105-
assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8));
102+
assertEquals("testValue", future.get());
106103
}
107104
}

0 commit comments

Comments
 (0)