From 98f39687be344f9486dd784688dcc415ffc840af Mon Sep 17 00:00:00 2001 From: Mindaugas Barcauskas Date: Thu, 17 Nov 2022 14:02:29 +0200 Subject: [PATCH 1/4] Storage wrapper to enable large amount of data to be passed around between activities. --- .../cadence/internal/largeblob/Future.java | 63 ++++++++++++++ .../internal/largeblob/FutureTest.java | 86 +++++++++++++++++++ .../uber/cadence/internal/largeblob/README.md | 19 ++++ .../cadence/internal/largeblob/Storage.java | 27 ++++++ .../largeblob/impl/InMemoryStorage.java | 41 +++++++++ .../autoscaler/PollerAutoScalerTest.java | 17 ++++ 6 files changed, 253 insertions(+) create mode 100644 src/test/java/com/uber/cadence/internal/largeblob/Future.java create mode 100644 src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java create mode 100644 src/test/java/com/uber/cadence/internal/largeblob/README.md create mode 100644 src/test/java/com/uber/cadence/internal/largeblob/Storage.java create mode 100644 src/test/java/com/uber/cadence/internal/largeblob/impl/InMemoryStorage.java diff --git a/src/test/java/com/uber/cadence/internal/largeblob/Future.java b/src/test/java/com/uber/cadence/internal/largeblob/Future.java new file mode 100644 index 000000000..4b164b4c3 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/largeblob/Future.java @@ -0,0 +1,63 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.largeblob; + +import java.io.IOException; + +public class Future { + + private byte[] encoded; + private String url; + private final Storage storage; + private final long maxBytesInMemory; + + public Future(Storage storage, long maxBytesInMemory) { + if (storage == null) { + throw new IllegalArgumentException("storage can't be null"); + } + this.storage = storage; + this.maxBytesInMemory = maxBytesInMemory; + } + + public byte[] get() throws IOException { + if (encoded != null) { + return encoded; + } + + if (url != null) { + return storage.get(url); + } + + return null; + } + + public void put(String url, byte[] bytes) throws IOException { + this.url = url; + if (bytes.length < maxBytesInMemory) { + this.encoded = bytes; + } else { + storage.put(url, bytes); + } + } + + public void delete() throws IOException { + if (this.encoded == null) { + storage.delete(url); + } else { + this.encoded = null; + } + } +} diff --git a/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java b/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java new file mode 100644 index 000000000..4f2e3913a --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java @@ -0,0 +1,86 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.largeblob; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import com.uber.cadence.internal.largeblob.impl.InMemoryStorage; +import java.nio.charset.StandardCharsets; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class FutureTest { + + private Storage storage = new InMemoryStorage(); + + @Test + public void testPutSmallElement() throws Exception { + Future future = new Future(storage, 10); + + String testValue = "testValue"; + future.put("test", testValue.getBytes(StandardCharsets.UTF_8)); + + assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8)); + assertNull(storage.get("test")); + } + + @Test + public void testLargerValuesGetPutInStorage() throws Exception { + Future future = new Future(storage, 10); + + String testValue = "testValuetestValuetestValue"; + future.put("test", testValue.getBytes(StandardCharsets.UTF_8)); + + assertEquals("testValuetestValuetestValue", new String(future.get(), StandardCharsets.UTF_8)); + assertEquals( + "testValuetestValuetestValue", new String(storage.get("test"), StandardCharsets.UTF_8)); + } + + @Test + public void testDeleteValueFromStorage() throws Exception { + Future future = new Future(storage, 10); + + String testValue = "testValuetestValuetestValue"; + future.put("test", testValue.getBytes(StandardCharsets.UTF_8)); + + assertEquals("testValuetestValuetestValue", new String(future.get(), StandardCharsets.UTF_8)); + assertEquals( + "testValuetestValuetestValue", new String(storage.get("test"), StandardCharsets.UTF_8)); + + future.delete(); + + assertNull(storage.get("test")); + + assertNull(future.get()); + } + + @Test + public void testSmallValueIsDelete() throws Exception { + Future future = new Future(storage, 10); + + String testValue = "testValue"; + future.put("test", testValue.getBytes(StandardCharsets.UTF_8)); + + assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8)); + assertNull(storage.get("test")); + + future.delete(); + assertNull(future.get()); + } +} diff --git a/src/test/java/com/uber/cadence/internal/largeblob/README.md b/src/test/java/com/uber/cadence/internal/largeblob/README.md new file mode 100644 index 000000000..94a40c44f --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/largeblob/README.md @@ -0,0 +1,19 @@ +This package contains an Activity-oriented Future, which can semi-transparently upload large amounts of data to +external stores, effectively avoiding Cadence's per-event / per-workflow size limits. + +Multiple Futures can be used in a single Activity's arguments or response values, e.g. in separate struct fields or +slices, but each Future will be unaware of the others. Any max-size limit you choose will not be shared between all +Futures (they each have their own limit), so the cumulative size of your response may be much larger. +Take care to keep WithMaxBytes limits low enough for all values, and be aware that the URLs that replace the data +also take space - dozens are fine, thousands are probably not. + +# Caveats + +This tool comes with some semi-severe caveats, but if they are acceptable for your use, it may allow you to easily +reduce your Workflow's history's data use: + +# Workflows will not have access to data + +By design, this tool does not allow you to access wrapped data in your workflows. You can use the type to forward data +between activities, but not inspect the contents - they may not exist, and you cannot safely perform the download in +your workflow. diff --git a/src/test/java/com/uber/cadence/internal/largeblob/Storage.java b/src/test/java/com/uber/cadence/internal/largeblob/Storage.java new file mode 100644 index 000000000..35ccb92b3 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/largeblob/Storage.java @@ -0,0 +1,27 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.largeblob; + +import java.io.IOException; + +public interface Storage { + + byte[] get(String uri) throws IOException; + + void put(String uri, byte[] bytes) throws IOException; + + void delete(String uri) throws IOException; +} diff --git a/src/test/java/com/uber/cadence/internal/largeblob/impl/InMemoryStorage.java b/src/test/java/com/uber/cadence/internal/largeblob/impl/InMemoryStorage.java new file mode 100644 index 000000000..c9fa61047 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/largeblob/impl/InMemoryStorage.java @@ -0,0 +1,41 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.largeblob.impl; + +import com.uber.cadence.internal.largeblob.Storage; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class InMemoryStorage implements Storage { + + private final Map storage = new HashMap<>(); + + @Override + public byte[] get(String uri) throws IOException { + return storage.get(uri); + } + + @Override + public void put(String uri, byte[] bytes) throws IOException { + storage.put(uri, bytes); + } + + @Override + public void delete(String uri) throws IOException { + storage.remove(uri); + } +} diff --git a/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java b/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java index dc9479e2c..c40137ed1 100644 --- a/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java +++ b/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java @@ -44,5 +44,22 @@ public void testAutoScalerScalesPollers() { pollerAutoScaler.resizePollers(); assertEquals(10, pollerAutoScaler.getSemaphoreSize()); + + pollerUsageEstimator.increaseActionableTaskCount(); + pollerUsageEstimator.increaseActionableTaskCount(); + + pollerAutoScaler.resizePollers(); + + assertEquals(100, pollerAutoScaler.getSemaphoreSize()); + + pollerUsageEstimator.increaseActionableTaskCount(); + pollerUsageEstimator.increaseActionableTaskCount(); + pollerUsageEstimator.increaseNoopTaskCount(); + pollerUsageEstimator.increaseNoopTaskCount(); + pollerUsageEstimator.increaseNoopTaskCount(); + + pollerAutoScaler.resizePollers(); + + assertEquals(80, pollerAutoScaler.getSemaphoreSize()); } } From 033757706ce8a176daa960adf17ffe37d2664245 Mon Sep 17 00:00:00 2001 From: Mindaugas Barcauskas Date: Fri, 18 Nov 2022 14:08:19 +0200 Subject: [PATCH 2/4] Refactoring, adding documentation. --- .../com/uber/cadence}/largeblob/Future.java | 27 +++++++++++++++- .../com/uber/cadence}/largeblob/README.md | 0 .../com/uber/cadence}/largeblob/Storage.java | 22 ++++++++++++- .../largeblob/impl/InMemoryStorage.java | 4 +-- .../internal/largeblob/FutureTest.java | 31 ++++++++++++++++--- 5 files changed, 75 insertions(+), 9 deletions(-) rename src/{test/java/com/uber/cadence/internal => main/java/com/uber/cadence}/largeblob/Future.java (64%) rename src/{test/java/com/uber/cadence/internal => main/java/com/uber/cadence}/largeblob/README.md (100%) rename src/{test/java/com/uber/cadence/internal => main/java/com/uber/cadence}/largeblob/Storage.java (51%) rename src/{test/java/com/uber/cadence/internal => main/java/com/uber/cadence}/largeblob/impl/InMemoryStorage.java (91%) diff --git a/src/test/java/com/uber/cadence/internal/largeblob/Future.java b/src/main/java/com/uber/cadence/largeblob/Future.java similarity index 64% rename from src/test/java/com/uber/cadence/internal/largeblob/Future.java rename to src/main/java/com/uber/cadence/largeblob/Future.java index 4b164b4c3..7cd11d795 100644 --- a/src/test/java/com/uber/cadence/internal/largeblob/Future.java +++ b/src/main/java/com/uber/cadence/largeblob/Future.java @@ -13,10 +13,14 @@ * permissions and limitations under the License. */ -package com.uber.cadence.internal.largeblob; +package com.uber.cadence.largeblob; import java.io.IOException; +/** + * Future is used for passing around potentially large parameters between activities. Future can never be used in Workflow code because it uses an external storage and that will make + * workflow code non deterministic. Small amounts of data are stored in the future instance itself, while larger amounts of data are stored in an external storage. + */ public class Future { private byte[] encoded; @@ -24,10 +28,23 @@ public class Future { private final Storage storage; private final long maxBytesInMemory; + public Future(Storage storage, long maxBytesInMemory, byte[] encoded) { + this.storage = storage; + this.maxBytesInMemory = maxBytesInMemory; + this.encoded = encoded; + } + + public Future(Storage storage, long maxBytesInMemory, String url) { + this.url = url; + this.storage = storage; + this.maxBytesInMemory = maxBytesInMemory; + } + public Future(Storage storage, long maxBytesInMemory) { if (storage == null) { throw new IllegalArgumentException("storage can't be null"); } + this.storage = storage; this.maxBytesInMemory = maxBytesInMemory; } @@ -60,4 +77,12 @@ public void delete() throws IOException { this.encoded = null; } } + + public byte[] getEncoded() { + return encoded; + } + + public String getUrl() { + return url; + } } diff --git a/src/test/java/com/uber/cadence/internal/largeblob/README.md b/src/main/java/com/uber/cadence/largeblob/README.md similarity index 100% rename from src/test/java/com/uber/cadence/internal/largeblob/README.md rename to src/main/java/com/uber/cadence/largeblob/README.md diff --git a/src/test/java/com/uber/cadence/internal/largeblob/Storage.java b/src/main/java/com/uber/cadence/largeblob/Storage.java similarity index 51% rename from src/test/java/com/uber/cadence/internal/largeblob/Storage.java rename to src/main/java/com/uber/cadence/largeblob/Storage.java index 35ccb92b3..2a96c89f1 100644 --- a/src/test/java/com/uber/cadence/internal/largeblob/Storage.java +++ b/src/main/java/com/uber/cadence/largeblob/Storage.java @@ -13,15 +13,35 @@ * permissions and limitations under the License. */ -package com.uber.cadence.internal.largeblob; +package com.uber.cadence.largeblob; import java.io.IOException; +/** + * Storage is an abstraction for storing large parameters to access inside of activities.0 + */ public interface Storage { + /** + * Gets the data based on uri provided + * @param uri uri. + * @return the data as a byte array. + * @throws IOException should be thrown in any implementation class in case of problems accessing the datastore. + */ byte[] get(String uri) throws IOException; + /** + * Stores data based on uri provided. + * @param uri uri. + * @param bytes bytes. + * @throws IOException should be thrown in any implementation class in case of problems with the datastore + */ void put(String uri, byte[] bytes) throws IOException; + /** + * Deletes data based on uri provided. + * @param uri uri. + * @throws IOException should be thrown in any implementation class in case of problems with the datastore + */ void delete(String uri) throws IOException; } diff --git a/src/test/java/com/uber/cadence/internal/largeblob/impl/InMemoryStorage.java b/src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java similarity index 91% rename from src/test/java/com/uber/cadence/internal/largeblob/impl/InMemoryStorage.java rename to src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java index c9fa61047..96516556d 100644 --- a/src/test/java/com/uber/cadence/internal/largeblob/impl/InMemoryStorage.java +++ b/src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java @@ -13,9 +13,9 @@ * permissions and limitations under the License. */ -package com.uber.cadence.internal.largeblob.impl; +package com.uber.cadence.largeblob.impl; -import com.uber.cadence.internal.largeblob.Storage; +import com.uber.cadence.largeblob.Storage; import java.io.IOException; import java.util.HashMap; import java.util.Map; diff --git a/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java b/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java index 4f2e3913a..c5629458f 100644 --- a/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java +++ b/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java @@ -15,15 +15,18 @@ package com.uber.cadence.internal.largeblob; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import com.uber.cadence.internal.largeblob.impl.InMemoryStorage; -import java.nio.charset.StandardCharsets; +import com.uber.cadence.largeblob.Future; +import com.uber.cadence.largeblob.Storage; +import com.uber.cadence.largeblob.impl.InMemoryStorage; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.runners.MockitoJUnitRunner; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + @RunWith(MockitoJUnitRunner.class) public class FutureTest { @@ -83,4 +86,22 @@ public void testSmallValueIsDelete() throws Exception { future.delete(); assertNull(future.get()); } + + @Test + public void getWorksWhenInitialisingWithEncodedData() throws Exception { + Future future = new Future(storage, 10, "testValue".getBytes(StandardCharsets.UTF_8)); + + assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8)); + } + + @Test + public void getWorksWhenInitialisingWithUrl() throws Exception { + Future future = new Future(storage, 10, "test"); + + assertNull(future.get()); + + storage.put("test", "testValue".getBytes(StandardCharsets.UTF_8)); + + assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8)); + } } From d16dd564f759afb53cc80727d8a1462d0d0f83cf Mon Sep 17 00:00:00 2001 From: Mindaugas Barcauskas Date: Fri, 18 Nov 2022 14:10:28 +0200 Subject: [PATCH 3/4] removing accidental change. --- .../worker/autoscaler/PollerAutoScalerTest.java | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java b/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java index c40137ed1..dc9479e2c 100644 --- a/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java +++ b/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java @@ -44,22 +44,5 @@ public void testAutoScalerScalesPollers() { pollerAutoScaler.resizePollers(); assertEquals(10, pollerAutoScaler.getSemaphoreSize()); - - pollerUsageEstimator.increaseActionableTaskCount(); - pollerUsageEstimator.increaseActionableTaskCount(); - - pollerAutoScaler.resizePollers(); - - assertEquals(100, pollerAutoScaler.getSemaphoreSize()); - - pollerUsageEstimator.increaseActionableTaskCount(); - pollerUsageEstimator.increaseActionableTaskCount(); - pollerUsageEstimator.increaseNoopTaskCount(); - pollerUsageEstimator.increaseNoopTaskCount(); - pollerUsageEstimator.increaseNoopTaskCount(); - - pollerAutoScaler.resizePollers(); - - assertEquals(80, pollerAutoScaler.getSemaphoreSize()); } } From c7e2b6930c451d99e681d555b4d68317488d6256 Mon Sep 17 00:00:00 2001 From: Mindaugas Barcauskas Date: Mon, 21 Nov 2022 14:59:13 +0200 Subject: [PATCH 4/4] Adding configuration, updating interfaces. --- .../uber/cadence/largeblob/Configuration.java | 78 +++++++++++++++++++ .../com/uber/cadence/largeblob/Future.java | 55 ++++++------- .../com/uber/cadence/largeblob/Storage.java | 20 ++++- .../largeblob/impl/InMemoryStorage.java | 19 ++++- .../internal/largeblob/FutureTest.java | 53 ++++++------- 5 files changed, 166 insertions(+), 59 deletions(-) create mode 100644 src/main/java/com/uber/cadence/largeblob/Configuration.java diff --git a/src/main/java/com/uber/cadence/largeblob/Configuration.java b/src/main/java/com/uber/cadence/largeblob/Configuration.java new file mode 100644 index 000000000..69d3e1fbe --- /dev/null +++ b/src/main/java/com/uber/cadence/largeblob/Configuration.java @@ -0,0 +1,78 @@ +package com.uber.cadence.largeblob; + +import com.uber.cadence.converter.DataConverter; +import com.uber.cadence.converter.JsonDataConverter; + +import java.time.Duration; + +public class Configuration { + + private Storage storage; + private DataConverter dataConverter; + private Long maxBytes; + private Duration ttl; + + private Configuration() { + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private Storage storage; + private DataConverter dataConverter = JsonDataConverter.getInstance(); + private Duration ttl; + private Long maxBytes = 4096L; + + public Configuration build() { + if (storage == null) { + throw new IllegalArgumentException("storage must be provided"); + } + + Configuration configuration = new Configuration(); + configuration.storage = this.storage; + configuration.dataConverter = this.dataConverter; + configuration.ttl = this.ttl; + configuration.maxBytes = this.maxBytes; + return configuration; + } + + public Builder setDataConverter(DataConverter dataConverter) { + this.dataConverter = dataConverter; + return this; + } + + public Builder setStorage(Storage storage) { + this.storage = storage; + return this; + } + + public Builder setTtl(Duration ttl) { + this.ttl = ttl; + return this; + } + + public Builder setMaxBytes(Long maxBytes) { + this.maxBytes = maxBytes; + return this; + } + } + + public Storage getStorage() { + return storage; + } + + public DataConverter getDataConverter() { + return dataConverter; + } + + public Duration getTtl() { + return ttl; + } + + public Long getMaxBytes() { + return maxBytes; + } +} diff --git a/src/main/java/com/uber/cadence/largeblob/Future.java b/src/main/java/com/uber/cadence/largeblob/Future.java index 7cd11d795..c32391196 100644 --- a/src/main/java/com/uber/cadence/largeblob/Future.java +++ b/src/main/java/com/uber/cadence/largeblob/Future.java @@ -15,64 +15,65 @@ package com.uber.cadence.largeblob; +import com.uber.cadence.converter.DataConverterException; + import java.io.IOException; /** * Future is used for passing around potentially large parameters between activities. Future can never be used in Workflow code because it uses an external storage and that will make * workflow code non deterministic. Small amounts of data are stored in the future instance itself, while larger amounts of data are stored in an external storage. */ -public class Future { +public class Future { private byte[] encoded; private String url; - private final Storage storage; - private final long maxBytesInMemory; + private final Configuration config; + private final Class clazz; - public Future(Storage storage, long maxBytesInMemory, byte[] encoded) { - this.storage = storage; - this.maxBytesInMemory = maxBytesInMemory; + public Future(Configuration config, Class clazz, byte[] encoded) { + this.config = config; this.encoded = encoded; + this.clazz = clazz; } - public Future(Storage storage, long maxBytesInMemory, String url) { + public Future(Configuration config, Class clazz, String url) { this.url = url; - this.storage = storage; - this.maxBytesInMemory = maxBytesInMemory; + this.config = config; + this.clazz = clazz; } - public Future(Storage storage, long maxBytesInMemory) { - if (storage == null) { - throw new IllegalArgumentException("storage can't be null"); + public Future(T obj, Configuration configuration) throws IOException { + byte[] bytes; + try { + bytes = configuration.getDataConverter().toData(obj); + } catch (DataConverterException e) { + throw new IOException(e); } - this.storage = storage; - this.maxBytesInMemory = maxBytesInMemory; + this.config = configuration; + this.clazz = (Class) obj.getClass(); + if (bytes.length <= configuration.getMaxBytes()) { + this.encoded = bytes; + } else { + this.url = configuration.getStorage().put(bytes); + } } - public byte[] get() throws IOException { + public T get() throws IOException { if (encoded != null) { - return encoded; + return config.getDataConverter().fromData(encoded, clazz, clazz); } if (url != null) { - return storage.get(url); + return config .getDataConverter().fromData(config.getStorage().get(url), clazz, clazz); } return null; } - public void put(String url, byte[] bytes) throws IOException { - this.url = url; - if (bytes.length < maxBytesInMemory) { - this.encoded = bytes; - } else { - storage.put(url, bytes); - } - } - public void delete() throws IOException { if (this.encoded == null) { - storage.delete(url); + config.getStorage().delete(url); } else { this.encoded = null; } diff --git a/src/main/java/com/uber/cadence/largeblob/Storage.java b/src/main/java/com/uber/cadence/largeblob/Storage.java index 2a96c89f1..13f2f89f4 100644 --- a/src/main/java/com/uber/cadence/largeblob/Storage.java +++ b/src/main/java/com/uber/cadence/largeblob/Storage.java @@ -16,6 +16,7 @@ package com.uber.cadence.largeblob; import java.io.IOException; +import java.time.Duration; /** * Storage is an abstraction for storing large parameters to access inside of activities.0 @@ -32,11 +33,26 @@ public interface Storage { /** * Stores data based on uri provided. - * @param uri uri. * @param bytes bytes. * @throws IOException should be thrown in any implementation class in case of problems with the datastore */ - void put(String uri, byte[] bytes) throws IOException; + String put(byte[] bytes) throws IOException; + + /** + * Stores data based on uri provided. + * @param bytes bytes. + * @param ttl ttl is used for storages like s3 to define the total time to store the object. + * @throws IOException should be thrown in any implementation class in case of problems with the datastore + */ + String put(byte[] bytes, Duration ttl) throws IOException; + + /** + * Stores data based on uri provided. + * @param key of the data. + * @param bytes bytes. + * @throws IOException should be thrown in any implementation class in case of problems with the datastore + */ + String put(String key, byte[] bytes) throws IOException; /** * Deletes data based on uri provided. diff --git a/src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java b/src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java index 96516556d..9894b8e7c 100644 --- a/src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java +++ b/src/main/java/com/uber/cadence/largeblob/impl/InMemoryStorage.java @@ -17,8 +17,10 @@ import com.uber.cadence.largeblob.Storage; import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.UUID; public class InMemoryStorage implements Storage { @@ -30,8 +32,21 @@ public byte[] get(String uri) throws IOException { } @Override - public void put(String uri, byte[] bytes) throws IOException { - storage.put(uri, bytes); + public String put(byte[] bytes) throws IOException { + String uuid = UUID.randomUUID().toString(); + storage.put(uuid, bytes); + return uuid; + } + + @Override + public String put(byte[] bytes, Duration ttl) throws IOException { + return put(bytes); + } + + @Override + public String put(String key, byte[] bytes) throws IOException { + storage.put(key, bytes); + return key; } @Override diff --git a/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java b/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java index c5629458f..91d51ef48 100644 --- a/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java +++ b/src/test/java/com/uber/cadence/internal/largeblob/FutureTest.java @@ -15,6 +15,7 @@ package com.uber.cadence.internal.largeblob; +import com.uber.cadence.largeblob.Configuration; import com.uber.cadence.largeblob.Future; import com.uber.cadence.largeblob.Storage; import com.uber.cadence.largeblob.impl.InMemoryStorage; @@ -25,46 +26,42 @@ import java.nio.charset.StandardCharsets; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @RunWith(MockitoJUnitRunner.class) public class FutureTest { - private Storage storage = new InMemoryStorage(); + private final Storage storage = new InMemoryStorage(); @Test public void testPutSmallElement() throws Exception { - Future future = new Future(storage, 10); + Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build(); + Future future = new Future("testValue", config); - String testValue = "testValue"; - future.put("test", testValue.getBytes(StandardCharsets.UTF_8)); - - assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8)); - assertNull(storage.get("test")); + assertEquals("testValue", future.get()); + assertNull(future.getUrl()); } @Test public void testLargerValuesGetPutInStorage() throws Exception { - Future future = new Future(storage, 10); - + Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build(); String testValue = "testValuetestValuetestValue"; - future.put("test", testValue.getBytes(StandardCharsets.UTF_8)); + Future future = new Future(testValue, config); - assertEquals("testValuetestValuetestValue", new String(future.get(), StandardCharsets.UTF_8)); - assertEquals( - "testValuetestValuetestValue", new String(storage.get("test"), StandardCharsets.UTF_8)); + assertEquals("testValuetestValuetestValue", future.get()); + assertNotNull(storage.get(future.getUrl())); } @Test public void testDeleteValueFromStorage() throws Exception { - Future future = new Future(storage, 10); - + Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build(); String testValue = "testValuetestValuetestValue"; - future.put("test", testValue.getBytes(StandardCharsets.UTF_8)); + Future future = new Future<>(testValue, config); - assertEquals("testValuetestValuetestValue", new String(future.get(), StandardCharsets.UTF_8)); + assertEquals("testValuetestValuetestValue", future.get()); assertEquals( - "testValuetestValuetestValue", new String(storage.get("test"), StandardCharsets.UTF_8)); + "testValuetestValuetestValue", config.getDataConverter().fromData(storage.get(future.getUrl()), String.class, String.class)); future.delete(); @@ -75,13 +72,11 @@ public void testDeleteValueFromStorage() throws Exception { @Test public void testSmallValueIsDelete() throws Exception { - Future future = new Future(storage, 10); - - String testValue = "testValue"; - future.put("test", testValue.getBytes(StandardCharsets.UTF_8)); + Configuration config = Configuration.newBuilder().setMaxBytes(20L).setStorage(storage).build(); + Future future = new Future<>("testValue", config); - assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8)); - assertNull(storage.get("test")); + assertEquals("testValue", future.get()); + assertNull(storage.get(future.getUrl())); future.delete(); assertNull(future.get()); @@ -89,19 +84,21 @@ public void testSmallValueIsDelete() throws Exception { @Test public void getWorksWhenInitialisingWithEncodedData() throws Exception { - Future future = new Future(storage, 10, "testValue".getBytes(StandardCharsets.UTF_8)); + Configuration configuration = Configuration.newBuilder().setStorage(storage).build(); + Future future = new Future<>(configuration, String.class, "testValue".getBytes(StandardCharsets.UTF_8)); - assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8)); + assertEquals("testValue", future.get()); } @Test public void getWorksWhenInitialisingWithUrl() throws Exception { - Future future = new Future(storage, 10, "test"); + Configuration configuration = Configuration.newBuilder().setStorage(storage).build(); + Future future = new Future<>(configuration, String.class, "test"); assertNull(future.get()); storage.put("test", "testValue".getBytes(StandardCharsets.UTF_8)); - assertEquals("testValue", new String(future.get(), StandardCharsets.UTF_8)); + assertEquals("testValue", future.get()); } }