diff --git a/android/src/main/java/com/segment/analytics/kotlin/android/Storage.kt b/android/src/main/java/com/segment/analytics/kotlin/android/Storage.kt index 24470443..9eae7a47 100644 --- a/android/src/main/java/com/segment/analytics/kotlin/android/Storage.kt +++ b/android/src/main/java/com/segment/analytics/kotlin/android/Storage.kt @@ -5,17 +5,13 @@ import android.content.SharedPreferences import com.segment.analytics.kotlin.android.utilities.AndroidKVS import com.segment.analytics.kotlin.core.Analytics import com.segment.analytics.kotlin.core.Storage -import com.segment.analytics.kotlin.core.Storage.Companion.MAX_PAYLOAD_SIZE import com.segment.analytics.kotlin.core.StorageProvider -import com.segment.analytics.kotlin.core.System -import com.segment.analytics.kotlin.core.UserInfo -import com.segment.analytics.kotlin.core.utilities.EventsFileManager +import com.segment.analytics.kotlin.core.utilities.FileEventStream +import com.segment.analytics.kotlin.core.utilities.StorageImpl import kotlinx.coroutines.CoroutineDispatcher import sovran.kotlin.Store -import sovran.kotlin.Subscriber -import java.io.File -// Android specific +@Deprecated("Use StorageProvider to create storage for Android instead") class AndroidStorage( context: Context, private val store: Store, @@ -23,107 +19,38 @@ class AndroidStorage( private val ioDispatcher: CoroutineDispatcher, directory: String? = null, subject: String? = null -) : Subscriber, Storage { +) : StorageImpl( + propertiesFile = AndroidKVS(context.getSharedPreferences("analytics-android-$writeKey", Context.MODE_PRIVATE)), + eventStream = FileEventStream(context.getDir(directory ?: "segment-disk-queue", Context.MODE_PRIVATE)), + store = store, + writeKey = writeKey, + fileIndexKey = if(subject == null) "segment.events.file.index.$writeKey" else "segment.events.file.index.$writeKey.$subject", + ioDispatcher = ioDispatcher +) - private val sharedPreferences: SharedPreferences = - context.getSharedPreferences("analytics-android-$writeKey", Context.MODE_PRIVATE) - override val storageDirectory: File = context.getDir(directory ?: "segment-disk-queue", Context.MODE_PRIVATE) - internal val eventsFile = - EventsFileManager(storageDirectory, writeKey, AndroidKVS(sharedPreferences), subject) - - override suspend fun subscribeToStore() { - store.subscribe( - this, - UserInfo::class, - initialState = true, - handler = ::userInfoUpdate, - queue = ioDispatcher - ) - store.subscribe( - this, - System::class, - initialState = true, - handler = ::systemUpdate, - queue = ioDispatcher - ) - } - - override suspend fun write(key: Storage.Constants, value: String) { - when (key) { - Storage.Constants.Events -> { - if (value.length < MAX_PAYLOAD_SIZE) { - // write to disk - eventsFile.storeEvent(value) - } else { - throw Exception("enqueued payload is too large") - } - } - else -> { - sharedPreferences.edit().putString(key.rawVal, value).apply() - } - } - } - - /** - * @returns the String value for the associated key - * for Constants.Events it will return a file url that can be used to read the contents of the events - */ - override fun read(key: Storage.Constants): String? { - return when (key) { - Storage.Constants.Events -> { - eventsFile.read().joinToString() - } - Storage.Constants.LegacyAppBuild -> { - // The legacy app build number was stored as an integer so we have to get it - // as an integer and convert it to a String. - val noBuild = -1 - val build = sharedPreferences.getInt(key.rawVal, noBuild) - if (build != noBuild) { - return build.toString() - } else { - return null - } - } - else -> { - sharedPreferences.getString(key.rawVal, null) - } - } - } - - override fun remove(key: Storage.Constants): Boolean { - return when (key) { - Storage.Constants.Events -> { - true - } - else -> { - sharedPreferences.edit().putString(key.rawVal, null).apply() - true - } +object AndroidStorageProvider : StorageProvider { + override fun createStorage(vararg params: Any): Storage { + + if (params.size < 2 || params[0] !is Analytics || params[1] !is Context) { + throw IllegalArgumentException(""" + Invalid parameters for AndroidStorageProvider. + AndroidStorageProvider requires at least 2 parameters. + The first argument has to be an instance of Analytics, + an the second argument has to be an instance of Context + """.trimIndent()) } - } - override fun removeFile(filePath: String): Boolean { - return eventsFile.remove(filePath) - } + val analytics = params[0] as Analytics + val context = params[1] as Context + val config = analytics.configuration - override suspend fun rollover() { - eventsFile.rollover() - } -} + val eventDirectory = context.getDir("segment-disk-queue", Context.MODE_PRIVATE) + val fileIndexKey = "segment.events.file.index.${config.writeKey}" + val sharedPreferences: SharedPreferences = + context.getSharedPreferences("analytics-android-${config.writeKey}", Context.MODE_PRIVATE) -object AndroidStorageProvider : StorageProvider { - override fun getStorage( - analytics: Analytics, - store: Store, - writeKey: String, - ioDispatcher: CoroutineDispatcher, - application: Any - ): Storage { - return AndroidStorage( - store = store, - writeKey = writeKey, - ioDispatcher = ioDispatcher, - context = application as Context, - ) + val propertiesFile = AndroidKVS(sharedPreferences) + val eventStream = FileEventStream(eventDirectory) + return StorageImpl(propertiesFile, eventStream, analytics.store, config.writeKey, fileIndexKey, analytics.fileIODispatcher) } } \ No newline at end of file diff --git a/android/src/main/java/com/segment/analytics/kotlin/android/utilities/AndroidKVS.kt b/android/src/main/java/com/segment/analytics/kotlin/android/utilities/AndroidKVS.kt index 4fe57c78..dfaf2b7c 100644 --- a/android/src/main/java/com/segment/analytics/kotlin/android/utilities/AndroidKVS.kt +++ b/android/src/main/java/com/segment/analytics/kotlin/android/utilities/AndroidKVS.kt @@ -6,10 +6,23 @@ import com.segment.analytics.kotlin.core.utilities.KVS /** * A key-value store wrapper for sharedPreferences on Android */ -class AndroidKVS(val sharedPreferences: SharedPreferences) : KVS { - override fun getInt(key: String, defaultVal: Int): Int = +class AndroidKVS(val sharedPreferences: SharedPreferences): KVS { + + + override fun get(key: String, defaultVal: Int) = sharedPreferences.getInt(key, defaultVal) - override fun putInt(key: String, value: Int): Boolean = + override fun get(key: String, defaultVal: String?) = + sharedPreferences.getString(key, defaultVal) ?: defaultVal + + override fun put(key: String, value: Int) = sharedPreferences.edit().putInt(key, value).commit() + + override fun put(key: String, value: String) = + sharedPreferences.edit().putString(key, value).commit() + + override fun remove(key: String): Boolean = + sharedPreferences.edit().remove(key).commit() + + override fun contains(key: String) = sharedPreferences.contains(key) } \ No newline at end of file diff --git a/android/src/test/java/com/segment/analytics/kotlin/android/AndroidContextCollectorTests.kt b/android/src/test/java/com/segment/analytics/kotlin/android/AndroidContextCollectorTests.kt index df6c0d45..8b851dca 100644 --- a/android/src/test/java/com/segment/analytics/kotlin/android/AndroidContextCollectorTests.kt +++ b/android/src/test/java/com/segment/analytics/kotlin/android/AndroidContextCollectorTests.kt @@ -149,24 +149,5 @@ class AndroidContextCollectorTests { } } - - - @Test - fun `storage directory can be customized`() { - val dir = "test" - val androidStorage = AndroidStorage( - appContext, - Store(), - "123", - UnconfinedTestDispatcher(), - dir - ) - - Assertions.assertTrue(androidStorage.storageDirectory.name.contains(dir)) - Assertions.assertTrue(androidStorage.eventsFile.directory.name.contains(dir)) - Assertions.assertTrue(androidStorage.storageDirectory.exists()) - Assertions.assertTrue(androidStorage.eventsFile.directory.exists()) - } - private fun JsonElement?.asString(): String? = this?.jsonPrimitive?.content } \ No newline at end of file diff --git a/android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt b/android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt index 1afdb3af..67499c7d 100644 --- a/android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt +++ b/android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt @@ -41,7 +41,7 @@ class StorageTests { @Nested inner class Android { private var store = Store() - private lateinit var androidStorage: AndroidStorage + private lateinit var androidStorage: Storage private var mockContext: Context = mockContext() init { @@ -74,7 +74,7 @@ class StorageTests { "123", UnconfinedTestDispatcher() ) - androidStorage.subscribeToStore() + androidStorage.initialize() } @@ -208,9 +208,12 @@ class StorageTests { } val stringified: String = Json.encodeToString(event) androidStorage.write(Storage.Constants.Events, stringified) - androidStorage.eventsFile.rollover() - val storagePath = androidStorage.eventsFile.read()[0] - val storageContents = File(storagePath).readText() + androidStorage.rollover() + val storagePath = androidStorage.read(Storage.Constants.Events)?.let{ + it.split(',')[0] + } + assertNotNull(storagePath) + val storageContents = File(storagePath!!).readText() val jsonFormat = Json.decodeFromString(JsonObject.serializer(), storageContents) assertEquals(1, jsonFormat["batch"]!!.jsonArray.size) } @@ -229,8 +232,8 @@ class StorageTests { e } assertNotNull(exception) - androidStorage.eventsFile.rollover() - assertTrue(androidStorage.eventsFile.read().isEmpty()) + androidStorage.rollover() + assertTrue(androidStorage.read(Storage.Constants.Events).isNullOrEmpty()) } @Test @@ -248,7 +251,7 @@ class StorageTests { val stringified: String = Json.encodeToString(event) androidStorage.write(Storage.Constants.Events, stringified) - androidStorage.eventsFile.rollover() + androidStorage.rollover() val fileUrl = androidStorage.read(Storage.Constants.Events) assertNotNull(fileUrl) fileUrl!!.let { @@ -270,7 +273,7 @@ class StorageTests { @Test fun `reading events with empty storage return empty list`() = runTest { - androidStorage.eventsFile.rollover() + androidStorage.rollover() val fileUrls = androidStorage.read(Storage.Constants.Events) assertTrue(fileUrls!!.isEmpty()) } diff --git a/android/src/test/java/com/segment/analytics/kotlin/android/utilities/AndroidKVSTest.kt b/android/src/test/java/com/segment/analytics/kotlin/android/utilities/AndroidKVSTest.kt new file mode 100644 index 00000000..97d7fbfc --- /dev/null +++ b/android/src/test/java/com/segment/analytics/kotlin/android/utilities/AndroidKVSTest.kt @@ -0,0 +1,44 @@ +package com.segment.analytics.kotlin.android.utilities + +import com.segment.analytics.kotlin.android.utils.MemorySharedPreferences +import com.segment.analytics.kotlin.core.utilities.KVS +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class AndroidKVSTest { + + private lateinit var prefs: KVS + + @BeforeEach + fun setup(){ + val sharedPreferences = MemorySharedPreferences() + prefs = AndroidKVS(sharedPreferences) + prefs.put("int", 1) + prefs.put("string", "string") + } + + @Test + fun getTest() { + Assertions.assertEquals(1, prefs.get("int", 0)) + Assertions.assertEquals("string", prefs.get("string", null)) + Assertions.assertEquals(0, prefs.get("keyNotExists", 0)) + Assertions.assertEquals(null, prefs.get("keyNotExists", null)) + } + + @Test + fun putTest() { + prefs.put("int", 2) + prefs.put("string", "stringstring") + + Assertions.assertEquals(2, prefs.get("int", 0)) + Assertions.assertEquals("stringstring", prefs.get("string", null)) + } + + @Test + fun containsAndRemoveTest() { + Assertions.assertTrue(prefs.contains("int")) + prefs.remove("int") + Assertions.assertFalse(prefs.contains("int")) + } +} \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt index d9812078..e17c27b3 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt @@ -43,14 +43,8 @@ open class Analytics protected constructor( } // use lazy to avoid the instance being leak before fully initialized - val storage: Storage by lazy { - configuration.storageProvider.getStorage( - analytics = this, - writeKey = configuration.writeKey, - ioDispatcher = fileIODispatcher, - store = store, - application = configuration.application!! - ) + open val storage: Storage by lazy { + configuration.storageProvider.createStorage(this, configuration.application!!) } internal var userInfo: UserInfo = UserInfo.defaultState(storage) @@ -134,7 +128,7 @@ open class Analytics protected constructor( it.provide(System.defaultState(configuration, storage)) // subscribe to store after state is provided - storage.subscribeToStore() + storage.initialize() Telemetry.subscribe(store) } diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Configuration.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Configuration.kt index 8ef1c8d2..f8f0f774 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Configuration.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Configuration.kt @@ -25,7 +25,7 @@ import sovran.kotlin.Store data class Configuration( val writeKey: String, var application: Any? = null, - val storageProvider: StorageProvider = ConcreteStorageProvider, + var storageProvider: StorageProvider = ConcreteStorageProvider, var collectDeviceId: Boolean = false, var trackApplicationLifecycleEvents: Boolean = false, var useLifecycleObserver: Boolean = false, diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Storage.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Storage.kt index a844cc69..9acf2f49 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Storage.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Storage.kt @@ -4,19 +4,16 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject import sovran.kotlin.Store -import java.io.File +import java.io.InputStream /** - * Storage interface that abstracts storage of - * - user data - * - segment settings - * - segment events - * - other configs - * - * Constraints: - * - Segment Events must be stored on a file, following the batch format - * - all storage is in terms of String (to make API simple) - * - storage is restricted to keys declared in `Storage.Constants` + * The protocol of how events are read and stored. + * Implement this interface if you wanna your events + * to be read and stored in the way you want (for + * example: from/to remote server, from/to local database + * from/to encrypted source). + * By default, we have implemented read and store events + * from/to memory and file storage. */ interface Storage { companion object { @@ -28,6 +25,8 @@ interface Storage { * is not present in payloads themselves, but is added later, such as `sentAt`, `integrations` and other json tokens. */ const val MAX_BATCH_SIZE = 475000 // 475KB. + + const val MAX_FILE_SIZE = 475_000 // 475KB } enum class Constants(val rawVal: String) { @@ -42,45 +41,64 @@ interface Storage { DeviceId("segment.device.id") } - val storageDirectory: File + /** + * Initialization of the storage. + * All prerequisite setups should be done in this method. + */ + suspend fun initialize() - suspend fun subscribeToStore() + /** + * Write a value of the Storage.Constants type to storage + * + * @param key The type of the value + * @param value Value + */ suspend fun write(key: Constants, value: String) - fun read(key: Constants): String? - fun remove(key: Constants): Boolean - fun removeFile(filePath: String): Boolean /** - * Direct writes to a new file, and close the current file. - * This function is useful in cases such as `flush`, that - * we want to finish writing the current file, and have it - * flushed to server. + * Write a key/value pair to prefs + * + * @param key Key + * @param value Value */ - suspend fun rollover() + fun writePrefs(key: Constants, value: String) - suspend fun userInfoUpdate(userInfo: UserInfo) { - write(Constants.AnonymousId, userInfo.anonymousId) + /** + * Read the value of a given type + * + * @param key The type of the value + * @return value of the given type + */ + fun read(key: Constants): String? - userInfo.userId?.let { - write(Constants.UserId, it) - } ?: run { - remove(Constants.UserId) - } + /** + * Read the given source stream as an InputStream + * + * @param source stream to read + * @return result as InputStream + */ + fun readAsStream(source: String): InputStream? - userInfo.traits?.let { - write(Constants.Traits, Json.encodeToString(JsonObject.serializer(), it)) - } ?: run { - remove(Constants.Traits) - } - } + /** + * Remove the data of a given type + * + * @param key type of the data to remove + * @return status of the operation + */ + fun remove(key: Constants): Boolean - suspend fun systemUpdate(system: System) { - system.settings?.let { - write(Constants.Settings, Json.encodeToString(Settings.serializer(), it)) - } ?: run { - remove(Constants.Settings) - } - } + /** + * Remove a stream + * + * @param filePath the fullname/identifier of a stream + * @return status of the operation + */ + fun removeFile(filePath: String): Boolean + + /** + * Close and finish the current stream and start a new one + */ + suspend fun rollover() } fun parseFilePaths(filePathStr: String?): List { @@ -98,11 +116,16 @@ fun parseFilePaths(filePathStr: String?): List { * provider via this interface */ interface StorageProvider { + @Deprecated("Deprecated in favor of create which takes vararg params", + ReplaceWith("createStorage(analytics, store, writeKey, ioDispatcher, application)") + ) fun getStorage( analytics: Analytics, store: Store, writeKey: String, ioDispatcher: CoroutineDispatcher, application: Any - ): Storage + ): Storage = createStorage(analytics, store, writeKey, ioDispatcher, application) + + fun createStorage(vararg params: Any): Storage } \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt index ac700c9e..a133ad2e 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt @@ -16,8 +16,6 @@ import kotlinx.serialization.json.Json import kotlinx.serialization.json.encodeToJsonElement import kotlinx.serialization.json.jsonObject import kotlinx.serialization.json.jsonPrimitive -import java.io.File -import java.io.FileInputStream open class EventPipeline( private val analytics: Analytics, @@ -136,31 +134,29 @@ open class EventPipeline( val fileUrlList = parseFilePaths(storage.read(Storage.Constants.Events)) for (url in fileUrlList) { // upload event file - val file = File(url) - if (!file.exists()) continue - - var shouldCleanup = true - try { - val connection = httpClient.upload(apiHost) - connection.outputStream?.let { - // Write the payloads into the OutputStream. - val fileInputStream = FileInputStream(file) - fileInputStream.copyTo(connection.outputStream) - fileInputStream.close() - connection.outputStream.close() - - // Upload the payloads. - connection.close() + storage.readAsStream(url)?.let { data -> + var shouldCleanup = true + try { + val connection = httpClient.upload(apiHost) + connection.outputStream?.let { + // Write the payloads into the OutputStream + data.copyTo(connection.outputStream) + data.close() + connection.outputStream.close() + + // Upload the payloads. + connection.close() + } + // Cleanup uploaded payloads + analytics.log("$logTag uploaded $url") + } catch (e: Exception) { + analytics.reportInternalError(e) + shouldCleanup = handleUploadException(e, url) } - // Cleanup uploaded payloads - analytics.log("$logTag uploaded $url") - } catch (e: Exception) { - analytics.reportInternalError(e) - shouldCleanup = handleUploadException(e, file) - } - if (shouldCleanup) { - storage.removeFile(file.path) + if (shouldCleanup) { + storage.removeFile(url) + } } } } @@ -176,7 +172,7 @@ open class EventPipeline( - private fun handleUploadException(e: Exception, file: File): Boolean { + private fun handleUploadException(e: Exception, file: String): Boolean { var shouldCleanup = false if (e is HTTPException) { analytics.log("$logTag exception while uploading, ${e.message}") @@ -198,7 +194,7 @@ open class EventPipeline( Analytics.segmentLog( """ | Error uploading events from batch file - | fileUrl="${file.path}" + | fileUrl="${file}" | msg=${e.message} """.trimMargin(), kind = LogKind.ERROR ) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventStream.kt b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventStream.kt new file mode 100644 index 00000000..5ee3ed00 --- /dev/null +++ b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventStream.kt @@ -0,0 +1,251 @@ +package com.segment.analytics.kotlin.core.utilities + +import java.io.File +import java.io.FileInputStream +import java.io.FileOutputStream +import java.io.InputStream +import java.lang.StringBuilder +import java.util.concurrent.ConcurrentHashMap + +/** + * The protocol of how events are read and stored. + * Implement this interface if you wanna your events + * to be read and stored in the way you want (for + * example: from/to remote server, from/to local database + * from/to encrypted source). + * By default, we have implemented read and store events + * from/to memory and file storage. + * + * A stream is defined as something that contains a batch of + * events. It can be in the form of any of the following: + * * a file + * * an in-memory entry + * * a table entry in database + */ +interface EventStream { + /** + * Length of current stream + */ + val length: Long + + /** + * Check if a stream is opened + */ + val isOpened: Boolean + + /** + * Open the stream with the given name. Creates a new one if not already exists. + * + * @param file name of the stream + * @return true if a new stream is created + */ + fun openOrCreate(file: String): Boolean + + /** + * Append content to the opening stream + * + * @param content Content to append + */ + fun write(content: String) + + /** + * Read the list of streams in directory + * @return a list of stream names in directory + */ + fun read(): List + + /** + * Remove the stream with the given name + * + * @param file name of stream to be removed + */ + fun remove(file: String) + + /** + * Close the current opening stream without finish it, + * so that the stream can be opened for future appends. + */ + fun close() + + /** + * Close and finish the current opening stream. + * Pass a withRename closure if you want to distinguish completed + * streams from ongoing stream + * + * @param withRename a callback that renames a finished stream + */ + fun finishAndClose(withRename: ((name: String) -> String)? = null) + + /** + * Read the stream with the given name as an InputStream. + * Needed for HTTPClient to upload data + * + * @param source the full name of a stream + */ + fun readAsStream(source: String): InputStream? +} + +open class InMemoryEventStream: EventStream { + protected val directory = ConcurrentHashMap() + + protected open var currFile: InMemoryFile? = null + + override val length: Long + get() = (currFile?.length ?: 0).toLong() + override val isOpened: Boolean + get() = currFile != null + + override fun openOrCreate(file: String): Boolean { + currFile?.let { + if (it.name != file) { + // the given file is different than the current one + // close the current one first + close() + } + } + + var newFile = false + if (currFile == null) { + newFile = !directory.containsKey(file) + currFile = if (newFile) InMemoryFile(file) else directory[file] + } + + currFile?.let { directory[file] = it } + + return newFile + } + + override fun write(content: String) { + currFile?.write(content) + } + + override fun read(): List = directory.keys().toList() + + override fun remove(file: String) { + directory.remove(file) + } + + override fun close() { + currFile = null + } + + override fun finishAndClose(withRename: ((name: String) -> String)?) { + currFile?.let { + withRename?.let { rename -> + directory.remove(it.name) + directory[rename(it.name)] = it + } + currFile = null + } + + } + + override fun readAsStream(source: String): InputStream? = directory[source]?.toStream() + + class InMemoryFile(val name: String) { + val fileStream: StringBuilder = StringBuilder() + + val length: Int + get() = fileStream.length + + fun write(content: String) = fileStream.append(content) + + fun toStream() = fileStream.toString().byteInputStream() + } +} + +open class FileEventStream( + val directory: File +): EventStream { + + init { + createDirectory(directory) + registerShutdownHook() + } + + protected open var fs: FileOutputStream? = null + + protected open var currFile: File? = null + + override val length: Long + get() = currFile?.length() ?: 0 + override val isOpened: Boolean + get() = currFile != null && fs != null + + override fun openOrCreate(file: String): Boolean { + currFile?.let { + if (!it.name.endsWith(file)) { + close() + } + } + + if (currFile == null) { + currFile = File(directory, file) + } + + var newFile = false + currFile?.let { + if (!it.exists()) { + it.createNewFile() + newFile = true + } + + fs = fs ?: FileOutputStream(it, true) + } + + return newFile + } + + override fun write(content: String) { + fs?.run { + write(content.toByteArray()) + flush() + } + } + + override fun read(): List = (directory.listFiles() ?: emptyArray()).map { it.absolutePath } + + /** + * Remove the given file from disk + * + * NOTE: file string has to be the full path of the file + * + * @param file full path of the file to be deleted + */ + override fun remove(file: String) { + File(file).delete() + } + + override fun close() { + fs?.close() + fs = null + currFile = null + } + + override fun finishAndClose(withRename: ((name: String) -> String)?) { + fs?.close() + fs = null + + currFile?.let { + withRename?.let { rename -> + it.renameTo(File(directory, rename(it.name))) + } + } + + currFile = null + } + + override fun readAsStream(source: String): InputStream? { + val file = File(source) + return if (file.exists()) FileInputStream(file) else null + } + + private fun registerShutdownHook() { + // close the stream if the app shuts down + Runtime.getRuntime().addShutdownHook(object : Thread() { + override fun run() { + fs?.close() + } + }) + } +} \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventsFileManager.kt b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventsFileManager.kt index 5d6b4b7a..cb778aa8 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventsFileManager.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventsFileManager.kt @@ -31,6 +31,7 @@ import java.io.FileOutputStream * * remove() will delete the file path specified */ +@Deprecated("Deprecated in favor of EventStream") class EventsFileManager( val directory: File, private val writeKey: String, @@ -183,12 +184,4 @@ class EventsFileManager( block() semaphore.release() } -} - -/** - * Key-value store interface used by eventsFile - */ -interface KVS { - fun getInt(key: String, defaultVal: Int): Int - fun putInt(key: String, value: Int): Boolean -} +} \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/FileUtils.kt b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/FileUtils.kt index cae0dbfb..b02370bc 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/FileUtils.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/FileUtils.kt @@ -12,4 +12,13 @@ fun createDirectory(location: File) { if (!(location.exists() || location.mkdirs() || location.isDirectory)) { throw IOException("Could not create directory at $location") } +} + +fun removeFileExtension(fileName: String): String { + val lastDotIndex = fileName.lastIndexOf('.') + return if (lastDotIndex != -1 && lastDotIndex > 0) { + fileName.substring(0, lastDotIndex) + } else { + fileName + } } \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/KVS.kt b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/KVS.kt new file mode 100644 index 00000000..456421ae --- /dev/null +++ b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/KVS.kt @@ -0,0 +1,92 @@ +package com.segment.analytics.kotlin.core.utilities + +import java.util.concurrent.ConcurrentHashMap + + +/** + * Key-value store interface used by eventsFile + */ +interface KVS { + @Deprecated("Deprecated in favor of `get`", ReplaceWith("get(key, defaultVal)")) + fun getInt(key: String, defaultVal: Int): Int = get(key, defaultVal) + @Deprecated("Deprecated in favor of `put`", ReplaceWith("put(key, value)")) + fun putInt(key: String, value: Int): Boolean = put(key, value) + + /** + * Read the value of a given key as integer + * @param key Key + * @param defaultVal Fallback value to use + * @return Value + */ + fun get(key: String, defaultVal: Int): Int + + /** + * Store the key value pair + * @param key Key + * @param value Fallback value to use + * @return Status of the operation + */ + fun put(key: String, value: Int): Boolean + + /** + * Read the value of a given key as integer + * @param key Key + * @param defaultVal Fallback value to use + * @return Value + */ + fun get(key: String, defaultVal: String?): String? + + /** + * Store the key value pair + * @param key Key + * @param value Fallback value to use + * @return Status of the operation + */ + fun put(key: String, value: String): Boolean + + /** + * Remove a key/value pair by key + * + * @param key Key + * @return Status of the operation + */ + fun remove(key: String): Boolean + + /** + * checks if a given key exists + * + * @param Key + * @return Status of the operation + */ + fun contains(key: String): Boolean +} + +class InMemoryPrefs: KVS { + + private val cache = ConcurrentHashMap() + override fun get(key: String, defaultVal: Int): Int { + return if (cache[key] is Int) cache[key] as Int else defaultVal + } + + override fun get(key: String, defaultVal: String?): String? { + return if (cache[key] is String) cache[key] as String else defaultVal + } + + override fun put(key: String, value: Int): Boolean { + cache[key] = value + return true + } + + override fun put(key: String, value: String): Boolean { + cache[key] = value + return true + } + + override fun remove(key: String): Boolean { + cache.remove(key) + return true + } + + override fun contains(key: String) = cache.containsKey(key) + +} \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/PropertiesFile.kt b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/PropertiesFile.kt index 88638a5c..2b4634dc 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/PropertiesFile.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/PropertiesFile.kt @@ -10,53 +10,68 @@ import java.util.Properties * conforming to {@link com.segment.analytics.kotlin.core.utilities.KVS} interface. * Ideal for use on JVM systems to store k-v pairs on a file. */ -class PropertiesFile(val directory: File, writeKey: String) : KVS { - private val underlyingProperties: Properties = Properties() - private val propertiesFileName = "analytics-kotlin-$writeKey.properties" - private val propertiesFile = File(directory, propertiesFileName) +class PropertiesFile(val file: File) : KVS { + private val properties: Properties = Properties() + + init { + load() + } /** * Check if underlying file exists, and load properties if true */ fun load() { - if (propertiesFile.exists()) { - FileInputStream(propertiesFile).use { - underlyingProperties.load(it) + if (file.exists()) { + FileInputStream(file).use { + properties.load(it) } } else { - propertiesFile.parentFile.mkdirs() - propertiesFile.createNewFile() + file.parentFile.mkdirs() + file.createNewFile() } } fun save() { - FileOutputStream(propertiesFile).use { - underlyingProperties.store(it, null) + FileOutputStream(file).use { + properties.store(it, null) } } - override fun getInt(key: String, defaultVal: Int): Int = - underlyingProperties.getProperty(key, "").toIntOrNull() ?: defaultVal + override fun get(key: String, defaultVal: Int): Int { + return properties.getProperty(key, "").toIntOrNull() ?: defaultVal + } + + override fun get(key: String, defaultVal: String?): String? { + return properties.getProperty(key, defaultVal) + } + + override fun put(key: String, value: Int): Boolean { + properties.setProperty(key, value.toString()) + save() + return true + } - override fun putInt(key: String, value: Int): Boolean { - underlyingProperties.setProperty(key, value.toString()) + override fun put(key: String, value: String): Boolean { + properties.setProperty(key, value) save() return true } fun putString(key: String, value: String): Boolean { - underlyingProperties.setProperty(key, value) + properties.setProperty(key, value) save() return true } fun getString(key: String, defaultVal: String?): String? = - underlyingProperties.getProperty(key, defaultVal) + properties.getProperty(key, defaultVal) - fun remove(key: String): Boolean { - underlyingProperties.remove(key) + override fun remove(key: String): Boolean { + properties.remove(key) save() return true } + + override fun contains(key: String) = properties.containsKey(key) } diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/StorageImpl.kt b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/StorageImpl.kt index a1be4287..f1b152d3 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/StorageImpl.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/StorageImpl.kt @@ -1,40 +1,50 @@ package com.segment.analytics.kotlin.core.utilities import com.segment.analytics.kotlin.core.Analytics +import com.segment.analytics.kotlin.core.Settings import com.segment.analytics.kotlin.core.Storage +import com.segment.analytics.kotlin.core.Storage.Companion.MAX_FILE_SIZE import com.segment.analytics.kotlin.core.Storage.Companion.MAX_PAYLOAD_SIZE import com.segment.analytics.kotlin.core.StorageProvider import com.segment.analytics.kotlin.core.System import com.segment.analytics.kotlin.core.UserInfo +import com.segment.analytics.kotlin.core.reportInternalError import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.sync.Semaphore +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject import sovran.kotlin.Store import sovran.kotlin.Subscriber import java.io.File +import java.io.InputStream /** * Storage implementation for JVM platform, uses {@link com.segment.analytics.kotlin.core.utilities.PropertiesFile} * for key-value storage and {@link com.segment.analytics.kotlin.core.utilities.EventsFileManager} * for events storage */ -class StorageImpl( +open class StorageImpl( + internal val propertiesFile: KVS, + internal val eventStream: EventStream, private val store: Store, - writeKey: String, - private val ioDispatcher: CoroutineDispatcher, - directory: String? = null, - subject: String? = null + private val writeKey: String, + internal val fileIndexKey: String, + private val ioDispatcher: CoroutineDispatcher ) : Subscriber, Storage { - override val storageDirectory = File(directory ?: "/tmp/analytics-kotlin/$writeKey") - private val storageDirectoryEvents = File(storageDirectory, "events") + private val semaphore = Semaphore(1) - internal val propertiesFile = PropertiesFile(storageDirectory, writeKey) - internal val eventsFile = EventsFileManager(storageDirectoryEvents, writeKey, propertiesFile, subject) + internal val begin = """{"batch":[""" - init { - propertiesFile.load() - } + internal val end + get() = """],"sentAt":"${SegmentInstant.now()}","writeKey":"$writeKey"}""" + + private val ext = "tmp" + + private val currentFile + get() = "$writeKey-${propertiesFile.get(fileIndexKey, 0)}.$ext" - override suspend fun subscribeToStore() { + override suspend fun initialize() { store.subscribe( this, UserInfo::class, @@ -51,34 +61,65 @@ class StorageImpl( ) } + suspend fun userInfoUpdate(userInfo: UserInfo) { + write(Storage.Constants.AnonymousId, userInfo.anonymousId) + + userInfo.userId?.let { + write(Storage.Constants.UserId, it) + } ?: run { + remove(Storage.Constants.UserId) + } + + userInfo.traits?.let { + write(Storage.Constants.Traits, Json.encodeToString(JsonObject.serializer(), it)) + } ?: run { + remove(Storage.Constants.Traits) + } + } + + suspend fun systemUpdate(system: System) { + system.settings?.let { + write(Storage.Constants.Settings, Json.encodeToString(Settings.serializer(), it)) + } ?: run { + remove(Storage.Constants.Settings) + } + } + override suspend fun write(key: Storage.Constants, value: String) { when (key) { Storage.Constants.Events -> { if (value.length < MAX_PAYLOAD_SIZE) { // write to disk - eventsFile.storeEvent(value) + storeEvent(value) } else { throw Exception("enqueued payload is too large") } } else -> { - propertiesFile.putString(key.rawVal, value) + writePrefs(key, value) } } } + override fun writePrefs(key: Storage.Constants, value: String) { + propertiesFile.put(key.rawVal, value) + } + override fun read(key: Storage.Constants): String? { return when (key) { Storage.Constants.Events -> { - val read = eventsFile.read() - read.joinToString() + eventStream.read().filter { !it.endsWith(".$ext") }.joinToString() } else -> { - propertiesFile.getString(key.rawVal, null) + propertiesFile.get(key.rawVal, null) } } } + override fun readAsStream(source: String): InputStream? { + return eventStream.readAsStream(source) + } + override fun remove(key: Storage.Constants): Boolean { return when (key) { Storage.Constants.Events -> { @@ -92,27 +133,105 @@ class StorageImpl( } override fun removeFile(filePath: String): Boolean { - return eventsFile.remove(filePath) + try { + eventStream.remove(filePath) + return true + } + catch (e: Exception) { + Analytics.reportInternalError(e) + return false + } } - override suspend fun rollover() { - eventsFile.rollover() + override suspend fun rollover() = withLock { + performRollover() } + /** + * closes existing file, if at capacity + * opens a new file, if current file is full or uncreated + * stores the event + */ + private suspend fun storeEvent(event: String) = withLock { + var newFile = eventStream.openOrCreate(currentFile) + if (newFile) { + eventStream.write(begin) + } + + // check if file is at capacity + if (eventStream.length > MAX_FILE_SIZE) { + performRollover() + + // open the next file + newFile = eventStream.openOrCreate(currentFile) + eventStream.write(begin) + } + + val contents = StringBuilder() + if (!newFile) { + contents.append(',') + } + contents.append(event) + eventStream.write(contents.toString()) + } + + private fun performRollover() { + if (!eventStream.isOpened) return + + eventStream.write(end) + eventStream.finishAndClose { + removeFileExtension(it) + } + incrementFileIndex() + } + + private fun incrementFileIndex() { + val index = propertiesFile.get(fileIndexKey, 0) + 1 + propertiesFile.put(fileIndexKey, index) + } + + private suspend fun withLock(block: () -> Unit) { + semaphore.acquire() + block() + semaphore.release() + } } object ConcreteStorageProvider : StorageProvider { - override fun getStorage( - analytics: Analytics, - store: Store, - writeKey: String, - ioDispatcher: CoroutineDispatcher, - application: Any - ): Storage { - return StorageImpl( - ioDispatcher = ioDispatcher, - writeKey = writeKey, - store = store - ) + + override fun createStorage(vararg params: Any): Storage { + if (params.isEmpty() || params[0] !is Analytics) { + throw IllegalArgumentException("Invalid parameters for ConcreteStorageProvider. ConcreteStorageProvider requires at least 1 parameter and the first argument has to be an instance of Analytics") + } + + val analytics = params[0] as Analytics + val config = analytics.configuration + + val directory = File("/tmp/analytics-kotlin/${config.writeKey}") + val eventDirectory = File(directory, "events") + val fileIndexKey = "segment.events.file.index.${config.writeKey}" + val userPrefs = File(directory, "analytics-kotlin-${config.writeKey}.properties") + + val propertiesFile = PropertiesFile(userPrefs) + val eventStream = FileEventStream(eventDirectory) + return StorageImpl(propertiesFile, eventStream, analytics.store, config.writeKey, fileIndexKey, analytics.fileIODispatcher) } +} + +class InMemoryStorageProvider: StorageProvider { + override fun createStorage(vararg params: Any): Storage { + if (params.isEmpty() || params[0] !is Analytics) { + throw IllegalArgumentException("Invalid parameters for InMemoryStorageProvider. InMemoryStorageProvider requires at least 1 parameter and the first argument has to be an instance of Analytics") + } + + val analytics = params[0] as Analytics + val config = analytics.configuration + + val userPrefs = InMemoryPrefs() + val eventStream = InMemoryEventStream() + val fileIndexKey = "segment.events.file.index.${config.writeKey}" + + return StorageImpl(userPrefs, eventStream, analytics.store, config.writeKey, fileIndexKey, analytics.fileIODispatcher) + } + } \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/StorageTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/StorageTest.kt new file mode 100644 index 00000000..7c255b4a --- /dev/null +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/StorageTest.kt @@ -0,0 +1,216 @@ +package com.segment.analytics.kotlin.core + +import com.segment.analytics.kotlin.core.utilities.ConcreteStorageProvider +import com.segment.analytics.kotlin.core.utilities.EncodeDefaultsJson +import com.segment.analytics.kotlin.core.utilities.EventStream +import com.segment.analytics.kotlin.core.utilities.FileEventStream +import com.segment.analytics.kotlin.core.utilities.InMemoryEventStream +import com.segment.analytics.kotlin.core.utilities.InMemoryPrefs +import com.segment.analytics.kotlin.core.utilities.InMemoryStorageProvider +import com.segment.analytics.kotlin.core.utilities.KVS +import com.segment.analytics.kotlin.core.utilities.PropertiesFile +import com.segment.analytics.kotlin.core.utilities.StorageImpl +import com.segment.analytics.kotlin.core.utils.clearPersistentStorage +import com.segment.analytics.kotlin.core.utils.testAnalytics +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.put +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import sovran.kotlin.Store +import java.lang.StringBuilder +import java.util.Date + +class StorageTest { + @Nested + inner class StorageProviderTest { + private lateinit var analytics: Analytics + private val testDispatcher = UnconfinedTestDispatcher() + private val testScope = TestScope(testDispatcher) + + @BeforeEach + fun setup() { + clearPersistentStorage() + val config = Configuration( + writeKey = "123", + application = "Test" + ) + + analytics = testAnalytics(config, testScope, testDispatcher) + } + + @Test + fun concreteStorageProviderTest() { + val storage = ConcreteStorageProvider.createStorage(analytics) as StorageImpl + assertTrue(storage.eventStream is FileEventStream) + assertTrue(storage.propertiesFile is PropertiesFile) + + val eventStream = storage.eventStream as FileEventStream + val propertiesFile = (storage.propertiesFile as PropertiesFile).file + + val dir = "/tmp/analytics-kotlin/${analytics.configuration.writeKey}" + // we don't cache storage directory, but we can use the parent of the event storage to verify + assertEquals(dir, eventStream.directory.parent) + assertTrue(eventStream.directory.path.contains(dir)) + assertTrue(propertiesFile.path.contains(dir)) + assertTrue(eventStream.directory.exists()) + assertTrue(propertiesFile.exists()) + } + + @Test + fun inMemoryStorageProviderTest() { + val storage = InMemoryStorageProvider().createStorage(analytics) as StorageImpl + assertTrue(storage.eventStream is InMemoryEventStream) + assertTrue(storage.propertiesFile is InMemoryPrefs) + } + } + + @Nested + inner class StorageTest { + private lateinit var storage: StorageImpl + + private lateinit var prefs: KVS + + private lateinit var stream: EventStream + + private lateinit var payload: String + + @BeforeEach + fun setup() { + val trackEvent = TrackEvent( + event = "clicked", + properties = buildJsonObject { put("foo", "bar") }) + .apply { + messageId = "qwerty-1234" + anonymousId = "anonId" + integrations = emptyJsonObject + context = emptyJsonObject + timestamp = Date(0).toInstant().toString() + } + payload = EncodeDefaultsJson.encodeToString(trackEvent) + prefs = InMemoryPrefs() + stream = mockk(relaxed = true) + storage = StorageImpl(prefs, stream, mockk(), "test", "key", UnconfinedTestDispatcher()) + } + + @Test + fun writeNewFileTest() = runTest { + every { stream.openOrCreate(any()) } returns true + storage.write(Storage.Constants.Events, payload) + verify(exactly = 1) { + stream.write(storage.begin) + stream.write(payload) + } + } + + @Test + fun rolloverToNewFileTest() = runTest { + every { stream.openOrCreate(any()) } returns false andThen true + every { stream.length } returns Storage.MAX_FILE_SIZE + 1L + every { stream.isOpened } returns true + + storage.write(Storage.Constants.Events, payload) + assertEquals(1, prefs.get(storage.fileIndexKey, 0)) + verify (exactly = 1) { + stream.finishAndClose(any()) + stream.write(storage.begin) + stream.write(payload) + } + + verify (exactly = 3){ + stream.write(any()) + } + } + + @Test + fun largePayloadCauseExceptionTest() = runTest { + val letters = "abcdefghijklmnopqrstuvwxyz1234567890" + val largePayload = StringBuilder() + for (i in 0..1000) { + largePayload.append(letters) + } + + assertThrows { + storage.write(Storage.Constants.Events, largePayload.toString()) + } + } + + @Test + fun writePrefsAsyncTest() = runTest { + val expected = "userid" + assertNull(storage.read(Storage.Constants.UserId)) + storage.write(Storage.Constants.UserId, expected) + assertEquals(expected, storage.read(Storage.Constants.UserId)) + } + + @Test + fun writePrefsTest() { + val expected = "userId" + assertNull(storage.read(Storage.Constants.UserId)) + storage.writePrefs(Storage.Constants.UserId, expected) + assertEquals(expected, storage.read(Storage.Constants.UserId)) + } + + @Test + fun rolloverTest() = runTest { + every { stream.isOpened } returns true + + storage.rollover() + + verify (exactly = 1) { + stream.write(any()) + stream.finishAndClose(any()) + } + + assertEquals(1, prefs.get(storage.fileIndexKey, 0)) + } + + @Test + fun readTest() { + val files = listOf("test1.tmp", "test2", "test3.tmp", "test4") + every { stream.read() } returns files + prefs.put(Storage.Constants.UserId.rawVal, "userId") + + val actual = storage.read(Storage.Constants.Events) + assertEquals(listOf(files[1], files[3]).joinToString(), actual) + assertEquals("userId", storage.read(Storage.Constants.UserId)) + } + + @Test + fun removeTest() { + prefs.put(Storage.Constants.UserId.rawVal, "userId") + storage.remove(Storage.Constants.UserId) + + assertTrue(storage.remove(Storage.Constants.Events)) + assertNull(storage.read(Storage.Constants.UserId)) + } + + @Test + fun removeFileTest() { + storage.removeFile("file") + verify (exactly = 1) { + stream.remove("file") + } + + every { stream.remove(any()) } throws java.lang.Exception() + assertFalse(storage.removeFile("file")) + } + + @Test + fun readAsStream() { + storage.readAsStream("file") + verify (exactly = 1) { + stream.readAsStream(any()) + } + } + } +} \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt index 43050efa..b8a28578 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt @@ -58,12 +58,7 @@ internal class EventPipelineTest { analytics = mockAnalytics(testScope, testDispatcher) clearPersistentStorage(analytics.configuration.writeKey) - storage = spyk(ConcreteStorageProvider.getStorage( - analytics, - analytics.store, - analytics.configuration.writeKey, - analytics.fileIODispatcher, - this)) + storage = spyk(ConcreteStorageProvider.createStorage(analytics)) every { analytics.storage } returns storage pipeline = EventPipeline(analytics, diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/plugins/SegmentDestinationTests.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/plugins/SegmentDestinationTests.kt index ca80e1f1..19e832a5 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/plugins/SegmentDestinationTests.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/plugins/SegmentDestinationTests.kt @@ -100,8 +100,8 @@ class SegmentDestinationTests { val expectedStringPayload = Json.encodeToString(expectedEvent) (analytics.storage as StorageImpl).run { - eventsFile.rollover() - val storagePath = eventsFile.read()[0] + rollover() + val storagePath = eventStream.read()[0] val storageContents = File(storagePath).readText() assertTrue( storageContents.contains( @@ -260,8 +260,8 @@ class SegmentDestinationTests { verify { errorUploading.set(true) } (analytics.storage as StorageImpl).run { // batch file doesn't get deleted - eventsFile.rollover() - assertEquals(1, eventsFile.read().size) + rollover() + assertEquals(1, eventStream.read().size) } } @@ -303,8 +303,8 @@ class SegmentDestinationTests { verify { errorUploading.set(true) } (analytics.storage as StorageImpl).run { // batch file doesn't get deleted - eventsFile.rollover() - assertEquals(1, eventsFile.read().size) + rollover() + assertEquals(1, eventStream.read().size) } } diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/EventStreamTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/EventStreamTest.kt new file mode 100644 index 00000000..5339e9e8 --- /dev/null +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/EventStreamTest.kt @@ -0,0 +1,270 @@ +package com.segment.analytics.kotlin.core.utilities + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import java.io.File +import java.util.UUID + +class EventStreamTest { + + @Nested + inner class InMemoryEventStreamTest { + + private lateinit var eventStream: EventStream; + + @BeforeEach + fun setup() { + eventStream = InMemoryEventStream() + } + + @Test + fun lengthTest() { + val str1 = "abc" + val str2 = "defgh" + + assertEquals(0, eventStream.length) + + eventStream.openOrCreate("test.tmp") + eventStream.write(str1) + assertEquals(str1.length * 1L, eventStream.length) + + eventStream.write(str2) + assertEquals(str1.length + str2.length * 1L, eventStream.length) + } + + @Test + fun isOpenTest() { + assertFalse(eventStream.isOpened) + + eventStream.openOrCreate("test.tmp") + assertTrue(eventStream.isOpened) + + eventStream.close() + assertFalse(eventStream.isOpened) + } + + @Test + fun openOrCreateTest() { + var actual = eventStream.openOrCreate("test.tmp") + assertTrue(actual) + + actual = eventStream.openOrCreate("test.tmp") + assertFalse(actual) + } + + @Test + fun writeAndReadStreamTest() { + val file = "test.tmp" + eventStream.openOrCreate(file) + val str1 = "abc" + val str2 = "defgh" + + assertEquals(0, eventStream.length) + + eventStream.write(str1) + assertEquals(str1, eventStream.readAsStream(file)!!.bufferedReader().use { it.readText() }) + eventStream.write(str2) + assertEquals((str1 + str2), eventStream.readAsStream(file)!!.bufferedReader().use { it.readText() }) + } + + @Test + fun readTest() { + val files = arrayOf("test1.tmp", "test2", "test3.tmp") + + eventStream.openOrCreate("test1.tmp") + + // open test2 without finish test1 + eventStream.openOrCreate("test2.tmp") + eventStream.finishAndClose { + removeFileExtension(it) + } + + // open test3 after finish test2 + eventStream.openOrCreate("test3.tmp") + // open test3 again + eventStream.openOrCreate("test3.tmp") + + val actual = HashSet(eventStream.read()) + assertEquals(files.size, actual.size) + assertTrue(actual.contains(files[0])) + assertTrue(actual.contains(files[1])) + assertTrue(actual.contains(files[2])) + } + + @Test + fun removeTest() { + eventStream.openOrCreate("test.tmp") + eventStream.finishAndClose { + removeFileExtension(it) + } + eventStream.remove("test") + val newFile = eventStream.openOrCreate("test.tmp") + + assertTrue(newFile) + } + + @Test + fun closeTest() { + eventStream.openOrCreate("test.tmp") + assertTrue(eventStream.isOpened) + + eventStream.close() + assertFalse(eventStream.isOpened) + } + + @Test + fun finishAndCloseTest() { + eventStream.openOrCreate("test.tmp") + eventStream.finishAndClose { + removeFileExtension(it) + } + + val files = eventStream.read() + assertEquals(1, files.size) + assertEquals("test", files[0]) + assertFalse(eventStream.isOpened) + } + } + + @Nested + inner class FileEventStreamTest { + private lateinit var eventStream: EventStream + + private lateinit var dir: File + + @BeforeEach + fun setup() { + dir = File(UUID.randomUUID().toString()) + eventStream = FileEventStream(dir) + } + + @AfterEach + fun tearDown() { + dir.deleteRecursively() + } + + + @Test + fun lengthTest() { + val str1 = "abc" + val str2 = "defgh" + + assertEquals(0, eventStream.length) + + eventStream.openOrCreate("test.tmp") + eventStream.write(str1) + assertEquals(str1.length * 1L, eventStream.length) + + eventStream.write(str2) + assertEquals(str1.length + str2.length * 1L, eventStream.length) + } + + @Test + fun isOpenTest() { + assertFalse(eventStream.isOpened) + + eventStream.openOrCreate("test.tmp") + assertTrue(eventStream.isOpened) + + eventStream.close() + assertFalse(eventStream.isOpened) + } + + @Test + fun openOrCreateTest() { + var actual = eventStream.openOrCreate("test.tmp") + assertTrue(actual) + assertTrue(File(dir, "test.tmp").exists()) + + actual = eventStream.openOrCreate("test.tmp") + assertFalse(actual) + } + + @Test + fun writeAndReadStreamTest() { + val str1 = "abc" + val str2 = "defgh" + + eventStream.openOrCreate("test.tmp") + assertEquals(0, eventStream.length) + var files = eventStream.read() + assertEquals(1, files.size) + eventStream.write(str1) + eventStream.close() + assertEquals(str1, eventStream.readAsStream(files[0])!!.bufferedReader().use { it.readText() }) + + eventStream.openOrCreate("test.tmp") + assertEquals(str1.length * 1L, eventStream.length) + files = eventStream.read() + assertEquals(1, files.size) + eventStream.write(str2) + eventStream.close() + assertEquals((str1 + str2), eventStream.readAsStream(files[0])!!.bufferedReader().use { it.readText() }) + } + + @Test + fun readTest() { + val files = arrayOf("test1.tmp", "test2", "test3.tmp") + + eventStream.openOrCreate("test1.tmp") + + // open test2 without finish test1 + eventStream.openOrCreate("test2.tmp") + eventStream.finishAndClose { + removeFileExtension(it) + } + + // open test3 after finish test2 + eventStream.openOrCreate("test3.tmp") + // open test3 again + eventStream.openOrCreate("test3.tmp") + + val actual = HashSet(eventStream.read().map { it.substring(it.lastIndexOf('/') + 1) }) + assertEquals(files.size, actual.size) + assertTrue(actual.contains(files[0])) + assertTrue(actual.contains(files[1])) + assertTrue(actual.contains(files[2])) + } + + @Test + fun removeTest() { + eventStream.openOrCreate("test.tmp") + eventStream.finishAndClose { + removeFileExtension(it) + } + assertTrue(File(dir, "test").exists()) + + eventStream.remove(File(dir, "test").absolutePath) + assertFalse(File(dir, "test").exists()) + + val newFile = eventStream.openOrCreate("test.tmp") + + assertTrue(newFile) + } + + @Test + fun closeTest() { + eventStream.openOrCreate("test.tmp") + assertTrue(eventStream.isOpened) + + eventStream.close() + assertFalse(eventStream.isOpened) + } + + @Test + fun finishAndCloseTest() { + eventStream.openOrCreate("test.tmp") + eventStream.finishAndClose { + removeFileExtension(it) + } + + val files = eventStream.read().map { it.substring(it.lastIndexOf('/') + 1) } + assertEquals(1, files.size) + assertEquals("test", files[0]) + assertFalse(eventStream.isOpened) + } + } +} \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/EventsFileManagerTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/EventsFileManagerTest.kt deleted file mode 100644 index 5d3b7c50..00000000 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/EventsFileManagerTest.kt +++ /dev/null @@ -1,250 +0,0 @@ -package com.segment.analytics.kotlin.core.utilities - -import com.segment.analytics.kotlin.core.TrackEvent -import com.segment.analytics.kotlin.core.emptyJsonObject -import io.mockk.every -import io.mockk.mockkObject -import io.mockk.mockkStatic -import kotlinx.coroutines.test.runTest -import kotlinx.serialization.encodeToString -import kotlinx.serialization.json.buildJsonObject -import kotlinx.serialization.json.put -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertFalse -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import java.io.File -import java.io.FileOutputStream -import java.time.Instant -import java.util.Date - -internal class EventsFileManagerTest{ - private val epochTimestamp = Date(0).toInstant().toString() - private val directory = File("/tmp/analytics-android-test/events") - private val kvStore = PropertiesFile(directory.parentFile, "123") - - init { - mockkObject(SegmentInstant) - every { SegmentInstant.now() } returns Date(0).toInstant().toString() - } - - @BeforeEach - fun setup() { - directory.deleteRecursively() - } - - @Test - fun `check if event is stored correctly and creates new file`() = runTest { - val file = EventsFileManager(directory, "123", kvStore) - val trackEvent = TrackEvent( - event = "clicked", - properties = buildJsonObject { put("behaviour", "good") }) - .apply { - messageId = "qwerty-1234" - anonymousId = "anonId" - integrations = emptyJsonObject - context = emptyJsonObject - timestamp = epochTimestamp - } - val eventString = EncodeDefaultsJson.encodeToString(trackEvent) - file.storeEvent(eventString) - - val expectedContents = """{"batch":[${eventString}""" - val newFile = File(directory, "123-0.tmp") - assertTrue(newFile.exists()) - val actualContents = newFile.readText() - assertEquals(expectedContents, actualContents) - } - - @Test - fun `check if filename includes subject`() = runTest { - val file = EventsFileManager(directory, "123", kvStore, "test") - val trackEvent = TrackEvent( - event = "clicked", - properties = buildJsonObject { put("behaviour", "good") }) - .apply { - messageId = "qwerty-1234" - anonymousId = "anonId" - integrations = emptyJsonObject - context = emptyJsonObject - timestamp = epochTimestamp - } - val eventString = EncodeDefaultsJson.encodeToString(trackEvent) - file.storeEvent(eventString) - file.rollover() - - assertEquals(1, kvStore.getInt("segment.events.file.index.123.test", -1)) - } - - @Test - fun `storeEvent stores in existing file if available`() = runTest { - val file = EventsFileManager(directory, "123", kvStore) - val trackEvent = TrackEvent( - event = "clicked", - properties = buildJsonObject { put("behaviour", "good") }) - .apply { - messageId = "qwerty-1234" - anonymousId = "anonId" - integrations = emptyJsonObject - context = emptyJsonObject - timestamp = epochTimestamp - } - val eventString = EncodeDefaultsJson.encodeToString(trackEvent) - file.storeEvent(eventString) - file.storeEvent(eventString) - - val expectedContents = """{"batch":[${eventString},${eventString}""" - val newFile = File(directory, "123-0.tmp") - assertTrue(newFile.exists()) - val actualContents = newFile.readText() - assertEquals(expectedContents, actualContents) - } - - @Test - fun `storeEvent creates new file when at capacity and closes other file`() = runTest { - val file = EventsFileManager(directory, "123", kvStore) - val trackEvent = TrackEvent( - event = "clicked", - properties = buildJsonObject { put("behaviour", "good") }) - .apply { - messageId = "qwerty-1234" - anonymousId = "anonId" - integrations = emptyJsonObject - context = emptyJsonObject - timestamp = epochTimestamp - } - val eventString = EncodeDefaultsJson.encodeToString(trackEvent) - file.storeEvent(eventString) - // artificially add 500kb of data to file - FileOutputStream(File(directory, "123-0.tmp"), true).write( - "A".repeat(475_000).toByteArray() - ) - - file.storeEvent(eventString) - assertFalse(File(directory, "123-0.tmp").exists()) - assertTrue(File(directory, "123-0").exists()) - val expectedContents = """{"batch":[${eventString}""" - val newFile = File(directory, "123-1.tmp") - assertTrue(newFile.exists()) - val actualContents = newFile.readText() - assertEquals(expectedContents, actualContents) - - } - - @Test - fun `read returns empty list when no events stored`() { - val file = EventsFileManager(directory, "123", kvStore) - assertTrue(file.read().isEmpty()) - } - - @Test - fun `read finishes open file and lists it`() = runTest { - val file = EventsFileManager(directory, "123", kvStore) - val trackEvent = TrackEvent( - event = "clicked", - properties = buildJsonObject { put("behaviour", "good") }) - .apply { - messageId = "qwerty-1234" - anonymousId = "anonId" - integrations = emptyJsonObject - context = emptyJsonObject - timestamp = epochTimestamp - } - val eventString = EncodeDefaultsJson.encodeToString(trackEvent) - file.storeEvent(eventString) - - file.rollover() - val fileUrls = file.read() - assertEquals(1, fileUrls.size) - val expectedContents = """ {"batch":[${eventString}],"sentAt":"$epochTimestamp","writeKey":"123"}""".trim() - val newFile = File(directory, "123-0") - assertTrue(newFile.exists()) - val actualContents = newFile.readText() - assertEquals(expectedContents, actualContents) - } - - @Test - fun `multiple reads doesnt create extra files`() = runTest { - val file = EventsFileManager(directory, "123", kvStore) - val trackEvent = TrackEvent( - event = "clicked", - properties = buildJsonObject { put("behaviour", "good") }) - .apply { - messageId = "qwerty-1234" - anonymousId = "anonId" - integrations = emptyJsonObject - context = emptyJsonObject - timestamp = epochTimestamp - } - val eventString = EncodeDefaultsJson.encodeToString(trackEvent) - file.storeEvent(eventString) - - file.rollover() - file.read().let { - assertEquals(1, it.size) - val expectedContents = - """ {"batch":[${eventString}],"sentAt":"$epochTimestamp","writeKey":"123"}""".trim() - val newFile = File(directory, "123-0") - assertTrue(newFile.exists()) - val actualContents = newFile.readText() - assertEquals(expectedContents, actualContents) - } - // second read is a no-op - file.read().let { - assertEquals(1, it.size) - assertEquals(1, directory.list()!!.size) - } - } - - @Test - fun `read lists all available files for writekey`() = runTest { - val trackEvent = TrackEvent( - event = "clicked", - properties = buildJsonObject { put("behaviour", "good") }) - .apply { - messageId = "qwerty-1234" - anonymousId = "anonId" - integrations = emptyJsonObject - context = emptyJsonObject - timestamp = epochTimestamp - } - val eventString = EncodeDefaultsJson.encodeToString(trackEvent) - - val file1 = EventsFileManager(directory, "123", kvStore) - val file2 = EventsFileManager(directory, "qwerty", kvStore) - - file1.storeEvent(eventString) - file2.storeEvent(eventString) - - file1.rollover() - file2.rollover() - - assertEquals(listOf("/tmp/analytics-android-test/events/123-0"), file1.read()) - assertEquals(listOf("/tmp/analytics-android-test/events/qwerty-0"), file2.read()) - } - - @Test - fun `remove deletes file`() = runTest { - val file = EventsFileManager(directory, "123", kvStore) - val trackEvent = TrackEvent( - event = "clicked", - properties = buildJsonObject { put("behaviour", "good") }) - .apply { - messageId = "qwerty-1234" - anonymousId = "anonId" - integrations = emptyJsonObject - context = emptyJsonObject - timestamp = epochTimestamp - } - val eventString = EncodeDefaultsJson.encodeToString(trackEvent) - file.storeEvent(eventString) - - file.rollover() - val list = file.read() - file.remove(list[0]) - - assertFalse(File(list[0]).exists()) - } - -} \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/InMemoryStorageTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/InMemoryStorageTest.kt new file mode 100644 index 00000000..aafe2c8d --- /dev/null +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/InMemoryStorageTest.kt @@ -0,0 +1,312 @@ +package com.segment.analytics.kotlin.core.utilities + +import com.segment.analytics.kotlin.core.Configuration +import com.segment.analytics.kotlin.core.Settings +import com.segment.analytics.kotlin.core.Storage +import com.segment.analytics.kotlin.core.System +import com.segment.analytics.kotlin.core.TrackEvent +import com.segment.analytics.kotlin.core.UserInfo +import com.segment.analytics.kotlin.core.emptyJsonObject +import com.segment.analytics.kotlin.core.utils.testAnalytics +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.jsonArray +import kotlinx.serialization.json.put +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import sovran.kotlin.Action +import sovran.kotlin.Store +import java.util.Date + +internal class InMemoryStorageTest { + + private val epochTimestamp = Date(0).toInstant().toString() + + private val testDispatcher = UnconfinedTestDispatcher() + + private val testScope = TestScope(testDispatcher) + + private lateinit var store: Store + + private lateinit var storage: StorageImpl + + @BeforeEach + fun setup() = runTest { + val config = Configuration( + writeKey = "123", + application = "Test", + apiHost = "local", + ) + val analytics = testAnalytics(config, testScope, testDispatcher) + store = analytics.store + storage = InMemoryStorageProvider().createStorage(analytics) as StorageImpl + storage.initialize() + } + + + @Test + fun `userInfo update calls write`() = runTest { + val action = object : Action { + override fun reduce(state: UserInfo): UserInfo { + return UserInfo( + anonymousId = "newAnonId", + userId = "newUserId", + traits = emptyJsonObject + ) + } + } + store.dispatch(action, UserInfo::class) + val userId = storage.read(Storage.Constants.UserId) + val anonId = storage.read(Storage.Constants.AnonymousId) + val traits = storage.read(Storage.Constants.Traits) + + assertEquals("newAnonId", anonId) + assertEquals("newUserId", userId) + assertEquals("{}", traits) + } + + @Test + fun `userInfo reset action removes userInfo`() = runTest { + store.dispatch(UserInfo.ResetAction(), UserInfo::class) + + val userId = storage.read(Storage.Constants.UserId) + val anonId = storage.read(Storage.Constants.AnonymousId) + val traits = storage.read(Storage.Constants.Traits) + + assertNotNull(anonId) + assertEquals(null, userId) + assertEquals(null, traits) + } + + @Test + fun `system update calls write for settings`() = runTest { + val action = object : Action { + override fun reduce(state: System): System { + return System( + configuration = state.configuration, + settings = Settings( + integrations = buildJsonObject { + put( + "Segment.io", + buildJsonObject { + put( + "apiKey", + "1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ" + ) + }) + }, + plan = emptyJsonObject, + edgeFunction = emptyJsonObject, + middlewareSettings = emptyJsonObject + ), + running = false, + initializedPlugins = setOf(), + enabled = true + ) + } + } + store.dispatch(action, System::class) + val settings = storage.read(Storage.Constants.Settings) ?: "" + + assertEquals( + Settings( + integrations = buildJsonObject { + put( + "Segment.io", + buildJsonObject { put("apiKey", "1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ") }) + }, + plan = emptyJsonObject, + edgeFunction = emptyJsonObject, + middlewareSettings = emptyJsonObject + ), Json.decodeFromString(Settings.serializer(), settings) + ) + } + + @Test + fun `system reset action removes system`() = runTest { + val action = object : Action { + override fun reduce(state: System): System { + return System(state.configuration, null, state.running, state.initializedPlugins, state.enabled) + } + } + store.dispatch(action, System::class) + + val settings = storage.read(Storage.Constants.Settings) + + assertEquals(null, settings) + } + + @Nested + inner class EventsStorage() { + + @Test + fun `writing events writes to eventsFile`() = runTest { + val event = TrackEvent( + event = "clicked", + properties = buildJsonObject { put("behaviour", "good") }) + .apply { + messageId = "qwerty-1234" + anonymousId = "anonId" + integrations = emptyJsonObject + context = emptyJsonObject + timestamp = epochTimestamp + } + val stringified: String = Json.encodeToString(event) + storage.write(Storage.Constants.Events, stringified) + storage.rollover() + val storagePath = storage.eventStream.read()[0] + val storageContents = (storage.eventStream as InMemoryEventStream).readAsStream(storagePath) + assertNotNull(storageContents) + val jsonFormat = Json.decodeFromString(JsonObject.serializer(), storageContents!!.bufferedReader().use { it.readText() }) + assertEquals(1, jsonFormat["batch"]!!.jsonArray.size) + } + + @Test + fun `cannot write more than 32kb as event`() = runTest { + val stringified: String = "A".repeat(32002) + val exception = try { + storage.write( + Storage.Constants.Events, + stringified + ) + null + } + catch (e : Exception) { + e + } + assertNotNull(exception) + assertTrue(storage.eventStream.read().isEmpty()) + } + + @Test + fun `reading events returns a non-null file handle with correct events`() = runTest { + val event = TrackEvent( + event = "clicked", + properties = buildJsonObject { put("behaviour", "good") }) + .apply { + messageId = "qwerty-1234" + anonymousId = "anonId" + integrations = emptyJsonObject + context = emptyJsonObject + timestamp = epochTimestamp + } + val stringified: String = Json.encodeToString(event) + storage.write(Storage.Constants.Events, stringified) + + storage.rollover() + val fileUrl = storage.read(Storage.Constants.Events) + assertNotNull(fileUrl) + fileUrl!!.let { + val storageContents = (storage.eventStream as InMemoryEventStream).readAsStream(it) + assertNotNull(storageContents) + val contentsStr = storageContents!!.bufferedReader().use { it.readText() } + val contentsJson: JsonObject = Json.decodeFromString(contentsStr) + assertEquals(3, contentsJson.size) // batch, sentAt, writeKey + assertTrue(contentsJson.containsKey("batch")) + assertTrue(contentsJson.containsKey("sentAt")) + assertTrue(contentsJson.containsKey("writeKey")) + assertEquals(1, contentsJson["batch"]?.jsonArray?.size) + val eventInFile = contentsJson["batch"]?.jsonArray?.get(0) + val eventInFile2 = Json.decodeFromString( + TrackEvent.serializer(), + Json.encodeToString(eventInFile) + ) + assertEquals(event, eventInFile2) + } + } + + @Test + fun `reading events with empty storage return empty list`() { + val fileUrls = storage.read(Storage.Constants.Events) + assertTrue(fileUrls!!.isEmpty()) + } + + @Test + fun `can write and read multiple events`() = runTest { + val event1 = TrackEvent( + event = "clicked", + properties = buildJsonObject { put("behaviour", "good") }) + .apply { + messageId = "qwerty-1234" + anonymousId = "anonId" + integrations = emptyJsonObject + context = emptyJsonObject + timestamp = epochTimestamp + } + val event2 = TrackEvent( + event = "clicked2", + properties = buildJsonObject { put("behaviour", "bad") }) + .apply { + messageId = "qwerty-12345" + anonymousId = "anonId" + integrations = emptyJsonObject + context = emptyJsonObject + timestamp = epochTimestamp + } + val stringified1: String = Json.encodeToString(event1) + val stringified2: String = Json.encodeToString(event2) + storage.write(Storage.Constants.Events, stringified1) + storage.write(Storage.Constants.Events, stringified2) + + storage.rollover() + val fileUrl = storage.read(Storage.Constants.Events) + assertNotNull(fileUrl) + fileUrl!!.let { + val storageContents = (storage.eventStream as InMemoryEventStream).readAsStream(it) + assertNotNull(storageContents) + val contentsStr = storageContents!!.bufferedReader().use { it.readText() } + val contentsJson: JsonObject = Json.decodeFromString(contentsStr) + assertEquals(3, contentsJson.size) // batch, sentAt, writeKey + assertTrue(contentsJson.containsKey("batch")) + assertTrue(contentsJson.containsKey("sentAt")) + assertTrue(contentsJson.containsKey("writeKey")) + assertEquals(2, contentsJson["batch"]?.jsonArray?.size) + val eventInFile = contentsJson["batch"]?.jsonArray?.get(0) + val eventInFile2 = Json.decodeFromString( + TrackEvent.serializer(), + Json.encodeToString(eventInFile) + ) + assertEquals(event1, eventInFile2) + + val event2InFile = contentsJson["batch"]?.jsonArray?.get(1) + val event2InFile2 = Json.decodeFromString( + TrackEvent.serializer(), + Json.encodeToString(event2InFile) + ) + assertEquals(event2, event2InFile2) + } + } + + @Test + fun remove() = runTest { + val action = object : Action { + override fun reduce(state: UserInfo): UserInfo { + return UserInfo( + anonymousId = "newAnonId", + userId = "newUserId", + traits = emptyJsonObject + ) + } + } + store.dispatch(action, UserInfo::class) + + val userId = storage.read(Storage.Constants.UserId) + assertEquals("newUserId", userId) + + storage.remove(Storage.Constants.UserId) + assertNull(storage.read(Storage.Constants.UserId)) + assertTrue(storage.remove(Storage.Constants.Events)) + } + } + +} \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/KVSTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/KVSTest.kt new file mode 100644 index 00000000..33d00a2f --- /dev/null +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/KVSTest.kt @@ -0,0 +1,44 @@ +package com.segment.analytics.kotlin.core.utilities + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test + +class KVSTest { + @Nested + inner class InMemoryPrefsTest { + private lateinit var prefs: KVS + + @BeforeEach + fun setup(){ + prefs = InMemoryPrefs() + prefs.put("int", 1) + prefs.put("string", "string") + } + + @Test + fun getTest() { + assertEquals(1, prefs.get("int", 0)) + assertEquals("string", prefs.get("string", null)) + assertEquals(0, prefs.get("keyNotExists", 0)) + assertEquals(null, prefs.get("keyNotExists", null)) + } + + @Test + fun putTest() { + prefs.put("int", 2) + prefs.put("string", "stringstring") + + assertEquals(2, prefs.get("int", 0)) + assertEquals("stringstring", prefs.get("string", null)) + } + + @Test + fun containsAndRemoveTest() { + assertTrue(prefs.contains("int")) + prefs.remove("int") + assertFalse(prefs.contains("int")) + } + } +} \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/PropertiesFileTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/PropertiesFileTest.kt index 8bfc5b2c..5f7c0fb2 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/PropertiesFileTest.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/PropertiesFileTest.kt @@ -9,7 +9,7 @@ import java.io.File internal class PropertiesFileTest { private val directory = File("/tmp/analytics-test/123") - private val kvStore = PropertiesFile(directory, "123") + private val kvStore = PropertiesFile(File(directory, "123")) @BeforeEach internal fun setUp() { @@ -22,10 +22,10 @@ internal class PropertiesFileTest { kvStore.putString("string", "test") kvStore.putInt("int", 1) - assertEquals(kvStore.getString("string", ""), "test") - assertEquals(kvStore.getInt("int", 0), 1) + assertEquals(kvStore.get("string", ""), "test") + assertEquals(kvStore.get("int", 0), 1) kvStore.remove("int") - assertEquals(kvStore.getInt("int", 0), 0) + assertEquals(kvStore.get("int", 0), 0) } } \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/StorageImplTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/StorageImplTest.kt index 9bd1d275..f8f8efa1 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/StorageImplTest.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/StorageImplTest.kt @@ -3,6 +3,7 @@ package com.segment.analytics.kotlin.core.utilities import com.segment.analytics.kotlin.core.Configuration import com.segment.analytics.kotlin.core.Settings import com.segment.analytics.kotlin.core.Storage +import com.segment.analytics.kotlin.core.StorageProvider import com.segment.analytics.kotlin.core.System import com.segment.analytics.kotlin.core.TrackEvent import com.segment.analytics.kotlin.core.UserInfo @@ -60,12 +61,21 @@ internal class StorageImplTest { ) ) - storage = StorageImpl( - store, - "123", - UnconfinedTestDispatcher() - ) - storage.subscribeToStore() + val storageProvider = object : StorageProvider { + override fun createStorage(vararg params: Any): Storage { + val writeKey = "123" + val directory = File("/tmp/analytics-kotlin/${writeKey}") + val eventDirectory = File(directory, "events") + val fileIndexKey = "segment.events.file.index.${writeKey}" + val userPrefs = File(directory, "analytics-kotlin-${writeKey}.properties") + + val propertiesFile = PropertiesFile(userPrefs) + val eventStream = FileEventStream(eventDirectory) + return StorageImpl(propertiesFile, eventStream, store, writeKey, fileIndexKey, UnconfinedTestDispatcher()) + } + } + storage = storageProvider.createStorage() as StorageImpl + storage.initialize() } @@ -163,19 +173,29 @@ internal class StorageImplTest { @Test fun `storage directory can be customized`() { - storage = StorageImpl( - store, - "123", - UnconfinedTestDispatcher(), - "/tmp/test" - ) - - assertEquals("/tmp/test", storage.storageDirectory.path) - assertTrue(storage.eventsFile.directory.path.contains("/tmp/test")) - assertTrue(storage.propertiesFile.directory.path.contains("/tmp/test")) - assertTrue(storage.storageDirectory.exists()) - assertTrue(storage.eventsFile.directory.exists()) - assertTrue(storage.propertiesFile.directory.exists()) + val storageProvider = object : StorageProvider { + override fun createStorage(vararg params: Any): Storage { + val writeKey = "123" + val directory = File("/tmp/test") + val eventDirectory = File(directory, "events") + val fileIndexKey = "segment.events.file.index.${writeKey}" + val userPrefs = File(directory, "analytics-kotlin-${writeKey}.properties") + + val propertiesFile = PropertiesFile(userPrefs) + val eventStream = FileEventStream(eventDirectory) + return StorageImpl(propertiesFile, eventStream, store, writeKey, fileIndexKey, UnconfinedTestDispatcher()) + } + } + storage = storageProvider.createStorage() as StorageImpl + val eventStream = storage.eventStream as FileEventStream + val propertiesFile = (storage.propertiesFile as PropertiesFile).file + + // we don't cache storage directory, but we can use the parent of the event storage to verify + assertEquals("/tmp/test", eventStream.directory.parent) + assertTrue(eventStream.directory.path.contains("/tmp/test")) + assertTrue(propertiesFile.path.contains("/tmp/test")) + assertTrue(eventStream.directory.exists()) + assertTrue(propertiesFile.exists()) } @Nested @@ -196,7 +216,7 @@ internal class StorageImplTest { val stringified: String = Json.encodeToString(event) storage.write(Storage.Constants.Events, stringified) storage.rollover() - val storagePath = storage.eventsFile.read()[0] + val storagePath = storage.eventStream.read()[0] val storageContents = File(storagePath).readText() val jsonFormat = Json.decodeFromString(JsonObject.serializer(), storageContents) assertEquals(1, jsonFormat["batch"]!!.jsonArray.size) @@ -216,7 +236,7 @@ internal class StorageImplTest { e } assertNotNull(exception) - assertTrue(storage.eventsFile.read().isEmpty()) + assertTrue(storage.eventStream.read().isEmpty()) } @Test diff --git a/samples/kotlin-android-app/src/main/java/com/segment/analytics/next/EncryptedEventStream.kt b/samples/kotlin-android-app/src/main/java/com/segment/analytics/next/EncryptedEventStream.kt new file mode 100644 index 00000000..09487d1f --- /dev/null +++ b/samples/kotlin-android-app/src/main/java/com/segment/analytics/next/EncryptedEventStream.kt @@ -0,0 +1,192 @@ +package com.segment.analytics.next + +import android.content.Context +import android.content.SharedPreferences +import com.segment.analytics.kotlin.android.utilities.AndroidKVS +import com.segment.analytics.kotlin.core.Analytics +import com.segment.analytics.kotlin.core.Storage +import com.segment.analytics.kotlin.core.StorageProvider +import com.segment.analytics.kotlin.core.utilities.FileEventStream +import com.segment.analytics.kotlin.core.utilities.StorageImpl +import java.io.File +import java.io.InputStream +import java.security.SecureRandom +import javax.crypto.Cipher +import javax.crypto.CipherInputStream +import javax.crypto.spec.IvParameterSpec +import javax.crypto.spec.SecretKeySpec + + +class EncryptedEventStream( + directory: File, + val key: ByteArray +) : FileEventStream(directory) { + + private val ivSize = 16 + + override fun write(content: String) { + fs?.run { + // generate a different iv for every content + val iv = ByteArray(ivSize).apply { + SecureRandom().nextBytes(this) + } + val cipher = getCipher(Cipher.ENCRYPT_MODE, iv, key) + val encryptedContent = cipher.doFinal(content.toByteArray()) + + write(iv) + // write the size of the content, so decipher knows + // the length of the content + write(writeInt(encryptedContent.size)) + write(encryptedContent) + flush() + } + } + + override fun readAsStream(source: String): InputStream? { + val stream = super.readAsStream(source) + return if (stream == null) { + null + } else { + // the DecryptingInputStream decrypts the steam + // and uses a LimitedInputStream to read the exact + // bytes of a chunk of content + DecryptingInputStream(stream) + } + } + + + private fun getCipher(mode: Int, iv: ByteArray, key: ByteArray): Cipher { + val cipher = Cipher.getInstance("AES/CBC/PKCS5Padding") + val keySpec = SecretKeySpec(key, "AES") + val ivSpec = IvParameterSpec(iv) + cipher.init(mode, keySpec, ivSpec) + return cipher + } + + private fun writeInt(value: Int): ByteArray { + return byteArrayOf( + (value ushr 24).toByte(), + (value ushr 16).toByte(), + (value ushr 8).toByte(), + value.toByte() + ) + } + + private fun readInt(input: InputStream): Int { + val bytes = input.readNBytes(4) + return (bytes[0].toInt() and 0xFF shl 24) or + (bytes[1].toInt() and 0xFF shl 16) or + (bytes[2].toInt() and 0xFF shl 8) or + (bytes[3].toInt() and 0xFF) + } + + private inner class DecryptingInputStream(private val input: InputStream) : InputStream() { + private var currentCipherInputStream: CipherInputStream? = null + private var remainingBytes = 0 + private var endOfStream = false + + private fun setupNextBlock(): Boolean { + if (endOfStream) return false + + try { + // Read IV + val iv = input.readNBytes(ivSize) + if (iv.size < ivSize) { + endOfStream = true + return false + } + + // Read content size + remainingBytes = readInt(input) + if (remainingBytes <= 0) { + endOfStream = true + return false + } + + // Setup cipher + val cipher = getCipher(Cipher.DECRYPT_MODE, iv, key) + + // Create new cipher stream + currentCipherInputStream = CipherInputStream( + LimitedInputStream(input, remainingBytes.toLong()), + cipher + ) + return true + } catch (e: Exception) { + endOfStream = true + return false + } + } + + override fun read(): Int { + if (currentCipherInputStream == null && !setupNextBlock()) { + return -1 + } + + val byte = currentCipherInputStream?.read() ?: -1 + if (byte == -1) { + currentCipherInputStream = null + return read() // Try next block + } + return byte + } + + override fun close() { + currentCipherInputStream?.close() + input.close() + } + } + + // Helper class to limit reading to current encrypted block + private class LimitedInputStream( + private val input: InputStream, + private var remaining: Long + ) : InputStream() { + override fun read(): Int { + if (remaining <= 0) return -1 + val result = input.read() + if (result >= 0) remaining-- + return result + } + + override fun read(b: ByteArray, off: Int, len: Int): Int { + if (remaining <= 0) return -1 + val result = input.read(b, off, minOf(len, remaining.toInt())) + if (result >= 0) remaining -= result + return result + } + + override fun close() { + // Don't close the underlying stream + } + } +} + +class EncryptedStorageProvider(val key: ByteArray) : StorageProvider { + + override fun createStorage(vararg params: Any): Storage { + + if (params.size < 2 || params[0] !is Analytics || params[1] !is Context) { + throw IllegalArgumentException(""" + Invalid parameters for EncryptedStorageProvider. + EncryptedStorageProvider requires at least 2 parameters. + The first argument has to be an instance of Analytics, + an the second argument has to be an instance of Context + """.trimIndent()) + } + + val analytics = params[0] as Analytics + val context = params[1] as Context + val config = analytics.configuration + + val eventDirectory = context.getDir("segment-disk-queue", Context.MODE_PRIVATE) + val fileIndexKey = "segment.events.file.index.${config.writeKey}" + val sharedPreferences: SharedPreferences = + context.getSharedPreferences("analytics-android-${config.writeKey}", Context.MODE_PRIVATE) + + val propertiesFile = AndroidKVS(sharedPreferences) + // use the key from constructor or get it from share preferences + val eventStream = EncryptedEventStream(eventDirectory, key) + return StorageImpl(propertiesFile, eventStream, analytics.store, config.writeKey, fileIndexKey, analytics.fileIODispatcher) + } +} \ No newline at end of file diff --git a/samples/kotlin-android-app/src/main/java/com/segment/analytics/next/MainApplication.kt b/samples/kotlin-android-app/src/main/java/com/segment/analytics/next/MainApplication.kt index 91b6f1ec..8d21c584 100644 --- a/samples/kotlin-android-app/src/main/java/com/segment/analytics/next/MainApplication.kt +++ b/samples/kotlin-android-app/src/main/java/com/segment/analytics/next/MainApplication.kt @@ -22,16 +22,18 @@ class MainApplication : Application() { override fun onCreate() { super.onCreate() + // Replace it with your key to the encrypted storage + val secretKey = ByteArray(32) { 1 } + analytics = Analytics("tteOFND0bb5ugJfALOJWpF0wu1tcxYgr", applicationContext) { this.collectDeviceId = true this.trackApplicationLifecycleEvents = true this.trackDeepLinks = true this.flushPolicies = listOf( - CountBasedFlushPolicy(3), // Flush after 3 events - FrequencyFlushPolicy(5000), // Flush after 5 secs - UnmeteredFlushPolicy(applicationContext) // Flush if network is not metered + CountBasedFlushPolicy(100), // Flush after 3 events +// FrequencyFlushPolicy(60000), // Flush after 5 secs +// UnmeteredFlushPolicy(applicationContext) // Flush if network is not metered ) - this.flushPolicies = listOf(UnmeteredFlushPolicy(applicationContext)) this.requestFactory = object : RequestFactory() { override fun upload(apiHost: String): HttpURLConnection { val connection: HttpURLConnection = openConnection("https://$apiHost/b") @@ -41,6 +43,7 @@ class MainApplication : Application() { return connection } } + this.storageProvider = EncryptedStorageProvider(secretKey) } analytics.add(AndroidRecordScreenPlugin()) analytics.add(object : Plugin {