diff --git a/android/src/main/java/com/amplitude/android/storage/AndroidStorageV2.kt b/android/src/main/java/com/amplitude/android/storage/AndroidStorageV2.kt index 715bc825..13b3497c 100644 --- a/android/src/main/java/com/amplitude/android/storage/AndroidStorageV2.kt +++ b/android/src/main/java/com/amplitude/android/storage/AndroidStorageV2.kt @@ -98,14 +98,14 @@ class AndroidStorageV2( eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, - dispatcher: CoroutineDispatcher, + storageDispatcher: CoroutineDispatcher, ): ResponseHandler { return FileResponseHandler( this, eventPipeline, configuration, scope, - dispatcher, + storageDispatcher, logger, ) } diff --git a/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt b/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt index 9abf6d06..6b040dc0 100644 --- a/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt +++ b/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt @@ -82,9 +82,9 @@ class AndroidStorage( eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, - dispatcher: CoroutineDispatcher, + storageDispatcher: CoroutineDispatcher, ): ResponseHandler { - return storageV2.getResponseHandler(eventPipeline, configuration, scope, dispatcher) + return storageV2.getResponseHandler(eventPipeline, configuration, scope, storageDispatcher) } override fun removeFile(filePath: String): Boolean { diff --git a/android/src/test/java/com/amplitude/android/ResponseHandlerTest.kt b/android/src/test/java/com/amplitude/android/ResponseHandlerTest.kt index 3f5d4643..5a67a806 100644 --- a/android/src/test/java/com/amplitude/android/ResponseHandlerTest.kt +++ b/android/src/test/java/com/amplitude/android/ResponseHandlerTest.kt @@ -12,6 +12,11 @@ import io.mockk.every import io.mockk.mockkConstructor import io.mockk.mockkStatic import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.TestCoroutineScheduler +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer import okhttp3.mockwebserver.RecordedRequest @@ -26,12 +31,15 @@ import org.junit.runner.RunWith import org.robolectric.RobolectricTestRunner import java.util.concurrent.TimeUnit +private const val FLUSH_INTERVAL_IN_MS = 150 +private const val FLUSH_MAX_RETRIES = 3 + +@ExperimentalCoroutinesApi @RunWith(RobolectricTestRunner::class) class ResponseHandlerTest { private lateinit var server: MockWebServer private lateinit var amplitude: Amplitude - @ExperimentalCoroutinesApi @Before fun setup() { server = MockWebServer() @@ -46,9 +54,9 @@ class ResponseHandlerTest { context = context, serverUrl = server.url("/").toString(), autocapture = setOf(), - flushIntervalMillis = 150, + flushIntervalMillis = FLUSH_INTERVAL_IN_MS, identifyBatchIntervalMillis = 1000, - flushMaxRetries = 3, + flushMaxRetries = FLUSH_MAX_RETRIES, identityStorageProvider = IMIdentityStorageProvider(), ) ) @@ -84,7 +92,9 @@ class ResponseHandlerTest { } @Test - fun `test handle on rate limit`() { + fun `test handle on rate limit`() = runTest { + setAmplitudeDispatchers(amplitude, testScheduler) + val expectedEventsSize = FLUSH_MAX_RETRIES val rateLimitBody = """ { "code": 429, @@ -92,21 +102,27 @@ class ResponseHandlerTest { "eps_threshold": 30 } """.trimIndent() - for (i in 1..6) { + repeat(FLUSH_MAX_RETRIES) { server.enqueue(MockResponse().setBody(rateLimitBody).setResponseCode(429)) } - for (k in 1..4) { + server.enqueue(MockResponse().setResponseCode(200)) + + amplitude.isBuilt.await() + for (k in 1..expectedEventsSize) { amplitude.track("test event $k") - runRequest() } - Thread.sleep(100) + advanceUntilIdle() + // verify the total request count when reaching max retries - assertEquals(6, server.requestCount) + assertEquals(1 + FLUSH_MAX_RETRIES, server.requestCount) } @Test - fun `test handle payload too large with only one event`() { - server.enqueue(MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}").setResponseCode(413)) + fun `test handle payload too large with only one event`() = runTest { + server.enqueue( + MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}") + .setResponseCode(413) + ) val options = EventOptions() var statusCode = 0 var callFinished = false @@ -128,40 +144,67 @@ class ResponseHandlerTest { } @Test - fun `test handle payload too large`() { - server.enqueue(MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}").setResponseCode(413)) - server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200)) - server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200)) - server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200)) + fun `test handle payload too large`() = runTest { + setAmplitudeDispatchers(amplitude, testScheduler) + val expectedSuccesEvents = 5 + server.enqueue( + MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}") + .setResponseCode(413) + ) + repeat(expectedSuccesEvents) { + server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200)) + } var eventCompleteCount = 0 val statusMap = mutableMapOf() val options = EventOptions() options.callback = { _: BaseEvent, status: Int, _: String -> eventCompleteCount++ - statusMap.put(status, statusMap.getOrDefault(status, 0) + 1) + statusMap[status] = statusMap.getOrDefault(status, 0) + 1 } + + // send 2 events so the event size will be greater than 1 and call split event file + amplitude.isBuilt.await() amplitude.track("test event 1", options = options) amplitude.track("test event 2", options = options) + advanceTimeBy(1_000) + + // verify first request that hit 413 + val request = runRequest() + requireNotNull(request) + val failedEvents = getEventsFromRequest(request) + assertEquals(2, failedEvents.size) + + // send succeeding events amplitude.track("test event 3", options = options) amplitude.track("test event 4", options = options) - val request = runRequest() - // verify the first request hit 413 - assertNotNull(request) - val events = getEventsFromRequest(request!!) - assertEquals(4, events.size) amplitude.track("test event 5", options = options) - runRequest() - runRequest() - runRequest() - Thread.sleep(150) + advanceUntilIdle() + + // verify next requests after split event file + val splitRequest1 = runRequest() + requireNotNull(splitRequest1) + val splitEvents1 = getEventsFromRequest(splitRequest1) + assertEquals(1, splitEvents1.size) + val splitRequest2 = runRequest() + requireNotNull(splitRequest2) + val splitEvents2 = getEventsFromRequest(splitRequest2) + assertEquals(1, splitEvents2.size) + + // verify we completed processing for the events after file split + val afterSplitRequest = runRequest() + requireNotNull(afterSplitRequest) + val afterSplitEvents = getEventsFromRequest(afterSplitRequest) + assertEquals(3, afterSplitEvents.size) + // verify we completed processing for the events after file split assertEquals(4, server.requestCount) - assertEquals(5, statusMap.get(200)) - assertEquals(5, eventCompleteCount) + assertEquals(expectedSuccesEvents, statusMap[200]) + assertEquals(expectedSuccesEvents, eventCompleteCount) } @Test - fun `test handle bad request response`() { + fun `test handle bad request response`() = runTest { + setAmplitudeDispatchers(amplitude, testScheduler) val badRequestResponseBody = """ { "code": 400, @@ -185,27 +228,37 @@ class ResponseHandlerTest { val options = EventOptions() options.callback = { _: BaseEvent, status: Int, _: String -> eventCompleteCount++ - statusMap.put(status, statusMap.getOrDefault(status, 0) + 1) + statusMap[status] = statusMap.getOrDefault(status, 0) + 1 } + + amplitude.isBuilt.await() amplitude.track("test event 1", options = options) + advanceUntilIdle() + + // verify first request that hit 400 + val request = runRequest() + requireNotNull(request) + val failedEvents = getEventsFromRequest(request) + assertEquals(1, failedEvents.size) + + // send succeeding events amplitude.track("test event 2", options = options) amplitude.track("test event 3", options = options) amplitude.track("test event 4", options = options) - // verify first request take 4 events hit 400 - val request = runRequest() - assertNotNull(request) - val events = getEventsFromRequest(request!!) - assertEquals(4, events.size) - // verify second request take 2 events after removing 2 bad events - val request2 = runRequest() - assertNotNull(request2) - val events2 = getEventsFromRequest(request2!!) - assertEquals(2, events2.size) + advanceUntilIdle() + + // verify next request that the 3 events with 200 + val successRequest = runRequest() + requireNotNull(successRequest) + val successfulEvents = getEventsFromRequest(successRequest) + assertEquals(3, successfulEvents.size) + assertEquals(2, server.requestCount) Thread.sleep(10) + // verify the processed status - assertEquals(2, statusMap.get(400)) - assertEquals(2, statusMap.get(200)) + assertEquals(1, statusMap[400]) + assertEquals(3, statusMap[200]) assertEquals(4, eventCompleteCount) } @@ -264,7 +317,7 @@ class ResponseHandlerTest { private fun runRequest(): RecordedRequest? { return try { - server.takeRequest(5, TimeUnit.SECONDS) + server.takeRequest(1, TimeUnit.SECONDS) } catch (e: InterruptedException) { null } @@ -287,4 +340,24 @@ class ResponseHandlerTest { every { anyConstructed().mostRecentLocation } returns null every { anyConstructed().appSetId } returns "" } + + companion object { + private fun setAmplitudeDispatchers( + amplitude: com.amplitude.core.Amplitude, + testCoroutineScheduler: TestCoroutineScheduler, + ) { + // inject these dispatcher fields with reflection, as the field is val (read-only) + listOf( + "amplitudeDispatcher", + "networkIODispatcher", + "storageIODispatcher" + ).forEach { dispatcherField -> + com.amplitude.core.Amplitude::class.java.getDeclaredField(dispatcherField) + .apply { + isAccessible = true + set(amplitude, StandardTestDispatcher(testCoroutineScheduler)) + } + } + } + } } diff --git a/core/src/main/java/com/amplitude/core/Amplitude.kt b/core/src/main/java/com/amplitude/core/Amplitude.kt index fc43b4c1..2a3f2bf0 100644 --- a/core/src/main/java/com/amplitude/core/Amplitude.kt +++ b/core/src/main/java/com/amplitude/core/Amplitude.kt @@ -47,8 +47,7 @@ open class Amplitude internal constructor( val amplitudeScope: CoroutineScope = CoroutineScope(SupervisorJob()), val amplitudeDispatcher: CoroutineDispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher(), val networkIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(), - val storageIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(), - val retryDispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(), + val storageIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() ) { val timeline: Timeline lateinit var storage: Storage diff --git a/core/src/main/java/com/amplitude/core/Storage.kt b/core/src/main/java/com/amplitude/core/Storage.kt index 9f9ee843..54fa2ae6 100644 --- a/core/src/main/java/com/amplitude/core/Storage.kt +++ b/core/src/main/java/com/amplitude/core/Storage.kt @@ -38,7 +38,7 @@ interface Storage { eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, - dispatcher: CoroutineDispatcher, + storageDispatcher: CoroutineDispatcher, ): ResponseHandler } diff --git a/core/src/main/java/com/amplitude/core/platform/EventPipeline.kt b/core/src/main/java/com/amplitude/core/platform/EventPipeline.kt index 5f18528e..0e85caff 100644 --- a/core/src/main/java/com/amplitude/core/platform/EventPipeline.kt +++ b/core/src/main/java/com/amplitude/core/platform/EventPipeline.kt @@ -1,10 +1,14 @@ package com.amplitude.core.platform import com.amplitude.core.Amplitude +import com.amplitude.core.Storage import com.amplitude.core.events.BaseEvent +import com.amplitude.core.utilities.ExponentialBackoffRetryHandler import com.amplitude.core.utilities.http.HttpClient import com.amplitude.core.utilities.http.HttpClientInterface +import com.amplitude.core.utilities.http.ResponseHandler import com.amplitude.core.utilities.logWithStackTrace +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.channels.consumeEach @@ -17,39 +21,42 @@ import java.util.concurrent.atomic.AtomicInteger class EventPipeline( private val amplitude: Amplitude, -) { - private val writeChannel: Channel - private val uploadChannel: Channel - private val eventCount: AtomicInteger = AtomicInteger(0) + private val eventCount: AtomicInteger = AtomicInteger(0), private val httpClient: HttpClientInterface = amplitude.configuration.httpClient - ?: HttpClient(amplitude.configuration) - private val storage get() = amplitude.storage - private val scope get() = amplitude.amplitudeScope + ?: HttpClient(amplitude.configuration), + private val retryUploadHandler: ExponentialBackoffRetryHandler = + ExponentialBackoffRetryHandler( + maxRetryAttempt = amplitude.configuration.flushMaxRetries + ), + private val storage: Storage = amplitude.storage, + private val scope: CoroutineScope = amplitude.amplitudeScope, + private val writeChannel: Channel = Channel(UNLIMITED), + private var uploadChannel: Channel = Channel(UNLIMITED), + overrideResponseHandler: ResponseHandler? = null, +) { private var running: Boolean private var scheduled: Boolean var flushSizeDivider: AtomicInteger = AtomicInteger(1) private val responseHandler by lazy { - storage.getResponseHandler( + overrideResponseHandler ?: storage.getResponseHandler( this@EventPipeline, amplitude.configuration, scope, - amplitude.retryDispatcher, + amplitude.storageIODispatcher, ) } companion object { - internal const val UPLOAD_SIG = "#!upload" + private const val UPLOAD_SIG = "#!upload" + private const val MAX_RETRY_ATTEMPT_SIG = "#!maxRetryAttemptReached" } init { running = false scheduled = false - writeChannel = Channel(UNLIMITED) - uploadChannel = Channel(UNLIMITED) - registerShutdownHook() } @@ -107,7 +114,7 @@ class EventPipeline( private fun upload() = scope.launch(amplitude.networkIODispatcher) { - uploadChannel.consumeEach { + uploadChannel.consumeEach { signal -> withContext(amplitude.storageIODispatcher) { try { storage.rollover() @@ -118,15 +125,36 @@ class EventPipeline( } } - val eventsData = storage.readEventsContent() - for (events in eventsData) { + if (signal == MAX_RETRY_ATTEMPT_SIG) { + amplitude.logger.debug( + "Max retries ${retryUploadHandler.maxRetryAttempt} reached, temporarily stop consuming upload signals." + ) + // Use the max delay when retry attempt is reached + delay(retryUploadHandler.maxDelayInMs) + retryUploadHandler.reset() + amplitude.logger.debug("Enable consuming of upload signals again.") + } + + val eventFiles = storage.readEventsContent() + for (eventFile in eventFiles) { try { - val eventsString = storage.getEventsString(events) + val eventsString = storage.getEventsString(eventFile) if (eventsString.isEmpty()) continue val diagnostics = amplitude.diagnostics.extractDiagnostics() val response = httpClient.upload(eventsString, diagnostics) - responseHandler.handle(response, events, eventsString) + responseHandler.handle(response, eventFile, eventsString) + + // if we encounter a retryable error, we retry with delay and + // restart the loop to get the newest event files + if (response.status.shouldRetryUploadOnFailure == true) { + retryUploadHandler.attemptRetry { canRetry -> + val retrySignal = if (canRetry) UPLOAD_SIG else MAX_RETRY_ATTEMPT_SIG + uploadChannel.trySend(retrySignal) + } + break + } + retryUploadHandler.reset() // always reset when we've successfully uploaded } catch (e: FileNotFoundException) { e.message?.let { amplitude.logger.warn("Event storage file not found: $it") diff --git a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt index dcb4479b..a8fb4e01 100644 --- a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt +++ b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt @@ -35,6 +35,8 @@ class EventsFileManager( const val DELIMITER = "\u0000" val writeMutexMap = ConcurrentHashMap() val readMutexMap = ConcurrentHashMap() + + private const val FILE_NAME_DESIRED_PADDED_LENGTH = 10 } private val writeMutex = writeMutexMap.getOrPut(storageKey) { Mutex() } @@ -95,7 +97,7 @@ class EventsFileManager( } /** - * Returns a comma-separated list of file paths that are not yet uploaded + * Returns a sorted list of file paths that are not yet uploaded */ fun read(): List { // we need to filter out .temp file, since it's operating on the writing thread @@ -103,18 +105,23 @@ class EventsFileManager( name.contains(storageKey) && !name.endsWith(".tmp") && !name.endsWith(".properties") } ?: emptyArray() - return fileList.sortedBy { file -> - val name = file.nameWithoutExtension.replace("$storageKey-", "") + return fileList + .sortedBy { file -> + val name = file.nameWithoutExtension.replace("$storageKey-", "") - val dashIndex = name.indexOf('-') - if (dashIndex >= 0) { - name.substring(0, dashIndex).padStart(10, '0') + name.substring(dashIndex) - } else { - name + // we're padding file name with 0s to ensure they are sorted in the correct order, + // even for file names with varying lengths and split files (e.g. "1-1", "2", "21") + val dashIndex = name.indexOf('-') + if (dashIndex >= 0) { + name.substring(0, dashIndex) + .padStart(FILE_NAME_DESIRED_PADDED_LENGTH, '0') + name.substring(dashIndex) + } else { + name.padStart(FILE_NAME_DESIRED_PADDED_LENGTH, '0') + } + } + .map { + it.absolutePath } - }.map { - it.absolutePath - } } fun remove(filePath: String): Boolean { @@ -192,7 +199,9 @@ class EventsFileManager( return@use jsonArray.toString() } catch (e: JSONException) { diagnostics.addMalformedEvent(normalizedContent) - logger.error("Failed to parse events: $normalizedContent, dropping file: $filePath") + logger.error( + "Failed to parse events: $normalizedContent, dropping file: $filePath" + ) this.remove(filePath) return@use normalizedContent } @@ -333,7 +342,9 @@ class EventsFileManager( finish(it) } } catch (e: JSONException) { - logger.error("Failed to parse events: $normalizedContent, dropping file: ${it.path}") + logger.error( + "Failed to parse events: $normalizedContent, dropping file: ${it.path}" + ) this.remove(it.path) } } diff --git a/core/src/main/java/com/amplitude/core/utilities/ExponentialBackoffRetryHandler.kt b/core/src/main/java/com/amplitude/core/utilities/ExponentialBackoffRetryHandler.kt new file mode 100644 index 00000000..decad634 --- /dev/null +++ b/core/src/main/java/com/amplitude/core/utilities/ExponentialBackoffRetryHandler.kt @@ -0,0 +1,66 @@ +package com.amplitude.core.utilities + +import kotlinx.coroutines.delay +import java.util.concurrent.atomic.AtomicInteger +import kotlin.math.pow + +/** + * A utility class to handle exponential backoff retry logic. + * + * Usage: + * - call [attemptRetry] to attempt retry with exponential backoff delay + * - always call [reset] at the end of your session to reset the retry attempt counter. + */ +class ExponentialBackoffRetryHandler( + val maxRetryAttempt: Int = MAX_RETRY_ATTEMPT, + private val baseDelayInMs: Int = 1_000, + private val factor: Double = 2.0, +) { + /** + * The current exponential backoff delay in milliseconds. Formula is [baseDelayInMs] * ([factor]^[attempt]) + * + * e.g. for the default values, it will be: 1, 2, 4, 8, 16 seconds + */ + private val exponentialBackOffDelayInMs + get() = (baseDelayInMs * factor.pow(attempt.get())).toLong() + + /** + * After we've reached [maxRetryAttempt], we will stop retrying for a longer period and use this + * value. We apply a ceiling of 60 seconds [MAX_DELAY_IN_MILLIS] to the delay to avoid waiting too long. + */ + val maxDelayInMs: Long + get() = MAX_DELAY_IN_MILLIS.coerceAtMost( + baseDelayInMs * factor.pow(maxRetryAttempt + 1).toInt() + ).toLong() + + internal var attempt = AtomicInteger(0) + + private fun canRetry() = attempt.get() < maxRetryAttempt + + /** + * Attempt retry with exponential backoff delay. see [exponentialBackOffDelayInMs] + * @param block a lambda to execute the retry logic. The lambda will receive a boolean parameter to indicate if the retry logic should be executed. + * If boolean parameter is false, [maxRetryAttempt] was reached and you should stop retrying and handle the failure. + */ + suspend fun attemptRetry(block: (Boolean) -> Unit) { + if (!canRetry()) { + block(false) + return + } + delay(exponentialBackOffDelayInMs) + block(true) + attempt.incrementAndGet() + } + + /** + * Reset the retry attempt counter. + */ + fun reset() { + attempt.set(0) + } + + companion object { + private const val MAX_RETRY_ATTEMPT = 5 + private const val MAX_DELAY_IN_MILLIS = 60_000 + } +} diff --git a/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt b/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt index 2bf9aee5..a3990e57 100644 --- a/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt +++ b/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt @@ -23,7 +23,7 @@ class FileResponseHandler( private val eventPipeline: EventPipeline, private val configuration: Configuration, private val scope: CoroutineScope, - private val dispatcher: CoroutineDispatcher, + private val storageDispatcher: CoroutineDispatcher, private val logger: Logger?, ) : ResponseHandler { @@ -38,12 +38,14 @@ class FileResponseHandler( try { eventsList = JSONArray(eventsString).toEvents() } catch (e: JSONException) { - storage.removeFile(eventFilePath) + scope.launch(storageDispatcher) { + storage.removeFile(eventFilePath) + } removeCallbackByInsertId(eventsString) throw e } triggerEventsCallback(eventsList, HttpStatus.SUCCESS.code, "Event sent success.") - scope.launch(dispatcher) { + scope.launch(storageDispatcher) { storage.removeFile(eventFilePath) } } @@ -61,13 +63,17 @@ class FileResponseHandler( try { eventsList = JSONArray(eventsString).toEvents() } catch (e: JSONException) { - storage.removeFile(eventFilePath) + scope.launch(storageDispatcher) { + storage.removeFile(eventFilePath) + } removeCallbackByInsertId(eventsString) throw e } if (eventsList.size == 1 || badRequestResponse.isInvalidApiKeyResponse()) { triggerEventsCallback(eventsList, HttpStatus.BAD_REQUEST.code, badRequestResponse.error) - storage.removeFile(eventFilePath) + scope.launch(storageDispatcher) { + storage.removeFile(eventFilePath) + } return } val droppedIndices = badRequestResponse.getEventIndicesToDrop() @@ -84,7 +90,7 @@ class FileResponseHandler( eventsToRetry.forEach { eventPipeline.put(it) } - scope.launch(dispatcher) { + scope.launch(storageDispatcher) { storage.removeFile(eventFilePath) } } @@ -102,7 +108,9 @@ class FileResponseHandler( try { rawEvents = JSONArray(eventsString) } catch (e: JSONException) { - storage.removeFile(eventFilePath) + scope.launch(storageDispatcher) { + storage.removeFile(eventFilePath) + } removeCallbackByInsertId(eventsString) throw e } @@ -111,13 +119,13 @@ class FileResponseHandler( triggerEventsCallback( eventsList, HttpStatus.PAYLOAD_TOO_LARGE.code, payloadTooLargeResponse.error ) - scope.launch(dispatcher) { + scope.launch(storageDispatcher) { storage.removeFile(eventFilePath) } return } // split file into two - scope.launch(dispatcher) { + scope.launch(storageDispatcher) { storage.splitEventFile(eventFilePath, rawEvents) } } @@ -130,8 +138,9 @@ class FileResponseHandler( logger?.debug( "Handle response, status: ${tooManyRequestsResponse.status}, error: ${tooManyRequestsResponse.error}" ) - // trigger exponential backoff - storage.releaseFile(events as String) + scope.launch(storageDispatcher) { + storage.releaseFile(events as String) + } } override fun handleTimeoutResponse( @@ -140,8 +149,9 @@ class FileResponseHandler( eventsString: String, ) { logger?.debug("Handle response, status: ${timeoutResponse.status}") - // trigger exponential backoff - storage.releaseFile(events as String) + scope.launch(storageDispatcher) { + storage.releaseFile(events as String) + } } override fun handleFailedResponse( @@ -153,8 +163,9 @@ class FileResponseHandler( "Handle response, status: ${failedResponse.status}, error: ${failedResponse.error}" ) // wait for next time to try again - // trigger exponential backoff - storage.releaseFile(events as String) + scope.launch(storageDispatcher) { + storage.releaseFile(events as String) + } } private fun triggerEventsCallback( @@ -167,9 +178,11 @@ class FileResponseHandler( it(event, status, message) } event.insertId?.let { insertId -> - storage.getEventCallback(insertId)?.let { - it(event, status, message) - storage.removeEventCallback(insertId) + scope.launch(storageDispatcher) { + storage.getEventCallback(insertId)?.let { + it(event, status, message) + storage.removeEventCallback(insertId) + } } } } @@ -178,7 +191,9 @@ class FileResponseHandler( private fun removeCallbackByInsertId(eventsString: String) { val regex = """"insert_id":"(.{36})",""".toRegex() regex.findAll(eventsString).forEach { - storage.removeEventCallback(it.groupValues[1]) + scope.launch(storageDispatcher) { + storage.removeEventCallback(it.groupValues[1]) + } } } } diff --git a/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt b/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt index 7f945905..90082330 100644 --- a/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt +++ b/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt @@ -93,14 +93,14 @@ class FileStorage( eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, - dispatcher: CoroutineDispatcher, + storageDispatcher: CoroutineDispatcher, ): ResponseHandler { return FileResponseHandler( this, eventPipeline, configuration, scope, - dispatcher, + storageDispatcher, logger, ) } diff --git a/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt b/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt index 04c00829..408d5595 100644 --- a/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt +++ b/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt @@ -57,9 +57,9 @@ class InMemoryStorage : Storage { eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, - dispatcher: CoroutineDispatcher + storageDispatcher: CoroutineDispatcher ): ResponseHandler { - return InMemoryResponseHandler(eventPipeline, configuration, scope, dispatcher) + return InMemoryResponseHandler(eventPipeline, configuration, scope, storageDispatcher) } fun removeEvents() { diff --git a/core/src/main/java/com/amplitude/core/utilities/http/AnalyticsResponse.kt b/core/src/main/java/com/amplitude/core/utilities/http/AnalyticsResponse.kt index 99162b25..78a80127 100644 --- a/core/src/main/java/com/amplitude/core/utilities/http/AnalyticsResponse.kt +++ b/core/src/main/java/com/amplitude/core/utilities/http/AnalyticsResponse.kt @@ -8,7 +8,10 @@ import org.json.JSONObject import java.lang.Exception internal object HttpResponse { - fun createHttpResponse(code: Int, responseBody: String?): AnalyticsResponse { + fun createHttpResponse( + code: Int, + responseBody: String?, + ): AnalyticsResponse { return when (code) { HttpStatus.SUCCESS.code -> SuccessResponse() @@ -39,22 +42,20 @@ internal object HttpResponse { } } -interface AnalyticsResponse { - val status: HttpStatus - +sealed class AnalyticsResponse(val status: HttpStatus) { companion object { - fun create(responseCode: Int, responseBody: String?): AnalyticsResponse { + fun create( + responseCode: Int, + responseBody: String?, + ): AnalyticsResponse { return HttpResponse.createHttpResponse(responseCode, responseBody) } } } -class SuccessResponse : AnalyticsResponse { - override val status: HttpStatus = HttpStatus.SUCCESS -} +class SuccessResponse : AnalyticsResponse(HttpStatus.SUCCESS) -class BadRequestResponse(response: JSONObject) : AnalyticsResponse { - override val status: HttpStatus = HttpStatus.BAD_REQUEST +class BadRequestResponse(response: JSONObject) : AnalyticsResponse(HttpStatus.BAD_REQUEST) { val error: String = response.getStringWithDefault("error", "") private var eventsWithInvalidFields: Set = setOf() private var eventsWithMissingFields: Set = setOf() @@ -101,19 +102,20 @@ class BadRequestResponse(response: JSONObject) : AnalyticsResponse { } } -class PayloadTooLargeResponse(response: JSONObject) : AnalyticsResponse { - override val status: HttpStatus = HttpStatus.PAYLOAD_TOO_LARGE +class PayloadTooLargeResponse(response: JSONObject) : + AnalyticsResponse(HttpStatus.PAYLOAD_TOO_LARGE) { val error: String = response.getStringWithDefault("error", "") } -class TooManyRequestsResponse(response: JSONObject) : AnalyticsResponse { - override val status: HttpStatus = HttpStatus.TOO_MANY_REQUESTS +class TooManyRequestsResponse(response: JSONObject) : + AnalyticsResponse(HttpStatus.TOO_MANY_REQUESTS) { + private var exceededDailyQuotaUsers: Set = setOf() + private var exceededDailyQuotaDevices: Set = setOf() + private var throttledDevices: Set = setOf() + private var throttledUsers: Set = setOf() + val error: String = response.getStringWithDefault("error", "") - var exceededDailyQuotaUsers: Set = setOf() - var exceededDailyQuotaDevices: Set = setOf() - var throttledEvents: Set = setOf() - var throttledDevices: Set = setOf() - var throttledUsers: Set = setOf() + var throttledEvents = setOf() init { if (response.has("exceeded_daily_quota_users")) { @@ -140,17 +142,18 @@ class TooManyRequestsResponse(response: JSONObject) : AnalyticsResponse { } } -class TimeoutResponse : AnalyticsResponse { - override val status: HttpStatus = HttpStatus.TIMEOUT -} +class TimeoutResponse : AnalyticsResponse(HttpStatus.TIMEOUT) -class FailedResponse(response: JSONObject) : AnalyticsResponse { - override val status: HttpStatus = HttpStatus.FAILED +class FailedResponse(response: JSONObject) : AnalyticsResponse(HttpStatus.FAILED) { val error: String = response.getStringWithDefault("error", "") } interface ResponseHandler { - fun handle(response: AnalyticsResponse, events: Any, eventsString: String) { + fun handle( + response: AnalyticsResponse, + events: Any, + eventsString: String, + ) { when (response) { is SuccessResponse -> handleSuccessResponse(response, events, eventsString) @@ -175,36 +178,55 @@ interface ResponseHandler { fun handleSuccessResponse( successResponse: SuccessResponse, events: Any, - eventsString: String + eventsString: String, ) fun handleBadRequestResponse( badRequestResponse: BadRequestResponse, events: Any, - eventsString: String + eventsString: String, ) fun handlePayloadTooLargeResponse( payloadTooLargeResponse: PayloadTooLargeResponse, events: Any, - eventsString: String + eventsString: String, ) fun handleTooManyRequestsResponse( tooManyRequestsResponse: TooManyRequestsResponse, events: Any, - eventsString: String + eventsString: String, ) fun handleTimeoutResponse( timeoutResponse: TimeoutResponse, events: Any, - eventsString: String + eventsString: String, ) fun handleFailedResponse( failedResponse: FailedResponse, events: Any, - eventsString: String + eventsString: String, ) } + +/** + * Enum class to represent the HTTP status codes and whether the upload should be retried on failure. + * A request requires a retry if the event file/s are still present and we want to attempt to upload them again. + */ +enum class HttpStatus( + val code: Int, + val shouldRetryUploadOnFailure: Boolean? = null, +) { + SUCCESS(200), + + /** should NOT retry as bad event files will be removed and there's nothing to retry */ + BAD_REQUEST(400, false), + TIMEOUT(408, true), + /** should retry as large event files will be split and retried individually */ + PAYLOAD_TOO_LARGE(413, true), + TOO_MANY_REQUESTS(429, true), + FAILED(500, true), +} diff --git a/core/src/main/java/com/amplitude/core/utilities/http/HttpClient.kt b/core/src/main/java/com/amplitude/core/utilities/http/HttpClient.kt index b25e8c5c..dfaba720 100644 --- a/core/src/main/java/com/amplitude/core/utilities/http/HttpClient.kt +++ b/core/src/main/java/com/amplitude/core/utilities/http/HttpClient.kt @@ -64,12 +64,3 @@ internal class HttpClient( } } } - -enum class HttpStatus(val code: Int) { - SUCCESS(200), - BAD_REQUEST(400), - TIMEOUT(408), - PAYLOAD_TOO_LARGE(413), - TOO_MANY_REQUESTS(429), - FAILED(500), -} diff --git a/core/src/test/kotlin/com/amplitude/core/IdentifyInterceptTest.kt b/core/src/test/kotlin/com/amplitude/core/IdentifyInterceptTest.kt index f0bfb435..5a966cd7 100644 --- a/core/src/test/kotlin/com/amplitude/core/IdentifyInterceptTest.kt +++ b/core/src/test/kotlin/com/amplitude/core/IdentifyInterceptTest.kt @@ -53,7 +53,6 @@ class IdentifyInterceptTest { testDispatcher, testDispatcher, testDispatcher, - testDispatcher ) } diff --git a/core/src/test/kotlin/com/amplitude/core/platform/EventPipelineTest.kt b/core/src/test/kotlin/com/amplitude/core/platform/EventPipelineTest.kt index 8d60a984..447b1adf 100644 --- a/core/src/test/kotlin/com/amplitude/core/platform/EventPipelineTest.kt +++ b/core/src/test/kotlin/com/amplitude/core/platform/EventPipelineTest.kt @@ -5,82 +5,211 @@ import com.amplitude.core.Configuration import com.amplitude.core.State import com.amplitude.core.events.BaseEvent import com.amplitude.core.utilities.ConsoleLoggerProvider +import com.amplitude.core.utilities.ExponentialBackoffRetryHandler import com.amplitude.core.utilities.InMemoryStorageProvider +import com.amplitude.core.utilities.http.AnalyticsResponse +import com.amplitude.core.utilities.http.BadRequestResponse +import com.amplitude.core.utilities.http.HttpClientInterface +import com.amplitude.core.utilities.http.PayloadTooLargeResponse +import com.amplitude.core.utilities.http.ResponseHandler +import com.amplitude.core.utilities.http.SuccessResponse +import com.amplitude.core.utilities.http.TimeoutResponse import com.amplitude.id.IMIdentityStorageProvider +import io.mockk.coVerify +import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.StandardTestDispatcher -import kotlinx.coroutines.test.TestDispatcher import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest -import org.junit.jupiter.api.BeforeEach +import org.json.JSONObject import org.junit.jupiter.api.Test @ExperimentalCoroutinesApi class EventPipelineTest { - private lateinit var amplitude: Amplitude - private lateinit var testScope: TestScope - private lateinit var testDispatcher: TestDispatcher + private lateinit var fakeResponse: AnalyticsResponse private val config = Configuration( apiKey = "API_KEY", flushIntervalMillis = 1, + /** + * Take note that [com.amplitude.core.utilities.InMemoryStorage] will not persist data and + * clear the buffer after a call to `readEventsContent()` is made. + */ storageProvider = InMemoryStorageProvider(), loggerProvider = ConsoleLoggerProvider(), identifyInterceptStorageProvider = InMemoryStorageProvider(), identityStorageProvider = IMIdentityStorageProvider(), + httpClient = object : HttpClientInterface { + override fun upload( + events: String, + diagnostics: String?, + ): AnalyticsResponse = fakeResponse + } + ) + private val testDispatcher = StandardTestDispatcher() + private val testScope = TestScope(testDispatcher) + private val amplitude: Amplitude = Amplitude( + configuration = config, + store = State(), + amplitudeScope = testScope, + amplitudeDispatcher = testDispatcher, + networkIODispatcher = testDispatcher, + storageIODispatcher = testDispatcher ) + private val fakeResponseHandler: ResponseHandler = mockk(relaxed = true) + + @Test + fun `should not flush when put and offline`() = runTest(testDispatcher) { + amplitude.isBuilt.await() + amplitude.configuration.offline = true + val eventPipeline = spyk(EventPipeline(amplitude)) + val event = BaseEvent().apply { eventType = "test_event" } + + eventPipeline.start() + eventPipeline.put(event) + advanceUntilIdle() - @BeforeEach - fun setup() { - testDispatcher = StandardTestDispatcher() - testScope = TestScope(testDispatcher) - amplitude = Amplitude(config, State(), testScope, testDispatcher, testDispatcher, testDispatcher, testDispatcher) + verify(exactly = 0) { eventPipeline.flush() } } @Test - fun `should not flush when put and offline`() = - runTest(testDispatcher) { - amplitude.isBuilt.await() - amplitude.configuration.offline = true - val eventPipeline = spyk(EventPipeline(amplitude)) - val event = BaseEvent().apply { eventType = "test_event" } - - eventPipeline.start() - eventPipeline.put(event) - advanceUntilIdle() - - verify(exactly = 0) { eventPipeline.flush() } - } + fun `should flush when put and online`() = runTest(testDispatcher) { + amplitude.isBuilt.await() + amplitude.configuration.offline = false + val eventPipeline = spyk(EventPipeline(amplitude)) + val event = BaseEvent().apply { eventType = "test_event" } + + eventPipeline.start() + eventPipeline.put(event) + advanceUntilIdle() + + verify(exactly = 1) { eventPipeline.flush() } + } @Test - fun `should flush when put and online`() = - runTest(testDispatcher) { - amplitude.isBuilt.await() - amplitude.configuration.offline = false - val eventPipeline = spyk(EventPipeline(amplitude)) - val event = BaseEvent().apply { eventType = "test_event" } - - eventPipeline.start() - eventPipeline.put(event) - advanceUntilIdle() - - verify(exactly = 1) { eventPipeline.flush() } - } + fun `should flush when put and offline is disabled`() = runTest(testDispatcher) { + amplitude.isBuilt.await() + amplitude.configuration.offline = null + val eventPipeline = spyk(EventPipeline(amplitude)) + val event = BaseEvent().apply { eventType = "test_event" } + + eventPipeline.start() + eventPipeline.put(event) + advanceUntilIdle() + + verify(exactly = 1) { eventPipeline.flush() } + } @Test - fun `should flush when put and offline is disabled`() = - runTest(testDispatcher) { - amplitude.isBuilt.await() - amplitude.configuration.offline = null - val eventPipeline = spyk(EventPipeline(amplitude)) - val event = BaseEvent().apply { eventType = "test_event" } - - eventPipeline.start() - eventPipeline.put(event) - advanceUntilIdle() - - verify(exactly = 1) { eventPipeline.flush() } - } + fun `should reset retry handler on successful upload`() = runTest(testDispatcher) { + amplitude.isBuilt.await() + val retryUploadHandler = spyk(ExponentialBackoffRetryHandler()) + val eventPipeline = EventPipeline( + amplitude, + retryUploadHandler = retryUploadHandler, + overrideResponseHandler = fakeResponseHandler + ) + fakeResponse = SuccessResponse() + val event = BaseEvent().apply { eventType = "test_event" } + + eventPipeline.start() + eventPipeline.put(event) + advanceUntilIdle() + + coVerify(exactly = 0) { retryUploadHandler.attemptRetry(any<(Boolean) -> Unit>()) } + verify { retryUploadHandler.reset() } + verify { fakeResponseHandler.handle(fakeResponse, any(), any()) } + } + + @Test + fun `should NOT retry on non-retryable error - bad request`() = runTest(testDispatcher) { + amplitude.isBuilt.await() + val retryUploadHandler = spyk(ExponentialBackoffRetryHandler()) + val eventPipeline = EventPipeline( + amplitude, + retryUploadHandler = retryUploadHandler, + overrideResponseHandler = fakeResponseHandler + ) + val event = BaseEvent().apply { eventType = "test_event" } + + fakeResponse = BadRequestResponse(JSONObject()) + eventPipeline.start() + eventPipeline.put(event) + advanceUntilIdle() + + coVerify(exactly = 0) { retryUploadHandler.attemptRetry(any<(Boolean) -> Unit>()) } + verify { retryUploadHandler.reset() } + verify { fakeResponseHandler.handle(fakeResponse, any(), any()) } + } + + @Test + fun `should retry on retryable reason - timeout`() = runTest(testDispatcher) { + amplitude.isBuilt.await() + val retryUploadHandler = spyk(ExponentialBackoffRetryHandler()) + val eventPipeline = EventPipeline( + amplitude, + retryUploadHandler = retryUploadHandler, + overrideResponseHandler = fakeResponseHandler + ) + val event = BaseEvent().apply { eventType = "test_event" } + + fakeResponse = TimeoutResponse() + eventPipeline.start() + eventPipeline.put(event) + advanceUntilIdle() + + coVerify { retryUploadHandler.attemptRetry(any<(Boolean) -> Unit>()) } + verify { fakeResponseHandler.handle(fakeResponse, any(), any()) } + verify(exactly = 0) { retryUploadHandler.reset() } + } + + @Test + fun `should retry on retryable reason - too large`() = runTest(testDispatcher) { + amplitude.isBuilt.await() + val retryUploadHandler = spyk(ExponentialBackoffRetryHandler()) + val eventPipeline = EventPipeline( + amplitude, + retryUploadHandler = retryUploadHandler, + overrideResponseHandler = fakeResponseHandler + ) + val event = BaseEvent().apply { eventType = "test_event" } + + fakeResponse = PayloadTooLargeResponse(JSONObject()) + eventPipeline.start() + eventPipeline.put(event) + advanceUntilIdle() + + coVerify { retryUploadHandler.attemptRetry(any<(Boolean) -> Unit>()) } + verify { fakeResponseHandler.handle(fakeResponse, any(), any()) } + verify(exactly = 0) { retryUploadHandler.reset() } + } + + @Test + fun `should send MAX_RETRY_ATTEMPT_SIG after max retry attempts`() = runTest(testDispatcher) { + amplitude.isBuilt.await() + val retryUploadHandler = spyk( + ExponentialBackoffRetryHandler( + maxRetryAttempt = 0 + ) + ) + val eventPipeline = EventPipeline( + amplitude, + retryUploadHandler = retryUploadHandler, + overrideResponseHandler = fakeResponseHandler + ) + + fakeResponse = PayloadTooLargeResponse(JSONObject()) + val event = BaseEvent().apply { eventType = "test_event" } + eventPipeline.start() + eventPipeline.put(event) + advanceUntilIdle() + + coVerify { retryUploadHandler.attemptRetry(any<(Boolean) -> Unit>()) } + // this will be called on the MAX_RETRY_ATTEMPT_SIG block on upload(), + // this is because the InMemoryStorage will clear the buffer and the second call to + // readEventsContent() will return an empty list and will stop the processing + verify(exactly = 1) { retryUploadHandler.reset() } + } } diff --git a/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt b/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt index 3c11aea6..24ed7e81 100644 --- a/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt +++ b/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt @@ -1,7 +1,9 @@ package com.amplitude.core.utilities import com.amplitude.common.jvm.ConsoleLogger +import com.amplitude.core.Configuration import com.amplitude.id.utilities.PropertiesFile +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.json.JSONArray @@ -12,34 +14,42 @@ import org.junit.jupiter.api.io.TempDir import java.io.File import kotlin.concurrent.thread -class EventsFileManagerTest { - @TempDir - lateinit var tempDir: File +private const val STORAGE_KEY = Configuration.DEFAULT_INSTANCE +class EventsFileManagerTest { + @TempDir lateinit var tempDir: File + private val propertiesFile by lazy { + PropertiesFile( + tempDir, "test-prefix-$STORAGE_KEY", logger + ) + } + private val logger = ConsoleLogger() private val testDiagnostics = Diagnostics() + private val eventsFileManager: EventsFileManager by lazy { + EventsFileManager( + directory = tempDir, + storageKey = STORAGE_KEY, + kvs = propertiesFile, + logger = logger, + diagnostics = testDiagnostics + ) + } @Test - fun `test store event and read`() { - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - eventsFileManager.storeEvent(createEvent("test1")) - eventsFileManager.storeEvent(createEvent("test2")) - eventsFileManager.rollover() - eventsFileManager.storeEvent(createEvent("test3")) - eventsFileManager.storeEvent(createEvent("test4")) - eventsFileManager.rollover() - eventsFileManager.storeEvent(createEvent("test5")) - } + fun `store event and read`() = runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.storeEvent(createEvent("test2")) + eventsFileManager.rollover() + eventsFileManager.storeEvent(createEvent("test3")) + eventsFileManager.storeEvent(createEvent("test4")) + eventsFileManager.rollover() + eventsFileManager.storeEvent(createEvent("test5")) val filePaths = eventsFileManager.read() assertEquals(2, filePaths.size) filePaths.withIndex().forEach { (index, filePath) -> // verify file name and raw content val file = File(filePath) - assertEquals("$storageKey-$index", file.name) + assertEquals("$STORAGE_KEY-$index", file.name) val content = file.readText() val lines = content.split(EventsFileManager.DELIMITER) assertEquals(3, lines.size) @@ -48,36 +58,52 @@ class EventsFileManagerTest { assertEquals("", lines[2]) } - runBlocking { - // verify the content read from the file - val eventsString0 = eventsFileManager.getEventString(filePaths[0]) - val eventsString1 = eventsFileManager.getEventString(filePaths[1]) - val events0 = JSONArray(eventsString0) - val events1 = JSONArray(eventsString1) - assertEquals(2, events0.length()) - assertEquals(2, events1.length()) - assertEquals("test1", events0.getJSONObject(0).getString("eventType")) - assertEquals("test2", events0.getJSONObject(1).getString("eventType")) - assertEquals("test3", events1.getJSONObject(0).getString("eventType")) - assertEquals("test4", events1.getJSONObject(1).getString("eventType")) - } + // verify the content read from the file + val eventsString0 = eventsFileManager.getEventString(filePaths[0]) + val eventsString1 = eventsFileManager.getEventString(filePaths[1]) + val events0 = JSONArray(eventsString0) + val events1 = JSONArray(eventsString1) + assertEquals(2, events0.length()) + assertEquals(2, events1.length()) + assertEquals("test1", events0.getJSONObject(0).getString("eventType")) + assertEquals("test2", events0.getJSONObject(1).getString("eventType")) + assertEquals("test3", events1.getJSONObject(0).getString("eventType")) + assertEquals("test4", events1.getJSONObject(1).getString("eventType")) } @Test - fun `rollover should finish current non-empty temp file`() { - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - eventsFileManager.storeEvent(createEvent("test1")) - } + fun `read should respect file name order`() { + val tempFiles = listOf("3.properties") + val splitFiles1 = listOf("1-1", "1-2") + val unsentFiles = listOf("6", "7", "8", "9") + val splitFiles2 = listOf("32-1", "32-2") + val latestUnsentFiles = (33..41).toList().map { it.toString() } + + // explicitly scramble the order of the files + (splitFiles2 + unsentFiles + latestUnsentFiles + splitFiles1 + tempFiles) + // append storage key prefix + .map { STORAGE_KEY + '-' + it } + .forEach { fileName -> + File(tempDir, fileName).createNewFile() + } + + val eventFiles = eventsFileManager.read() + + val expectedFiles = + // .properties files should be ignored + (splitFiles1 + unsentFiles + splitFiles2 + latestUnsentFiles) + // append full directory path and storage key prefix + .map { tempDir.toString() + "/" + STORAGE_KEY + '-' + it } + + assertEquals(eventFiles, expectedFiles) + } + + @Test + fun `rollover should finish current non-empty temp file`() = runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) val filePaths = eventsFileManager.read() assertEquals(0, filePaths.size) - runBlocking { - eventsFileManager.rollover() - } + eventsFileManager.rollover() val filePathsAfterRollover = eventsFileManager.read() assertEquals(1, filePathsAfterRollover.size) val file = File(filePathsAfterRollover[0]) @@ -86,39 +112,23 @@ class EventsFileManagerTest { assertEquals(2, lines.size) assertEquals(createEvent("test1"), lines[0]) assertEquals("", lines[1]) - runBlocking { - val eventsString = eventsFileManager.getEventString(filePathsAfterRollover[0]) - val events = JSONArray(eventsString) - assertEquals(1, events.length()) - assertEquals("test1", events.getJSONObject(0).getString("eventType")) - } + val eventsString = eventsFileManager.getEventString(filePathsAfterRollover[0]) + val events = JSONArray(eventsString) + assertEquals(1, events.length()) + assertEquals("test1", events.getJSONObject(0).getString("eventType")) } @Test - fun `rollover should ignore current empty temp file`() { - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - eventsFileManager.rollover() - } + fun `rollover should ignore current empty temp file`() = runBlocking { + eventsFileManager.rollover() val filePathsAfterRollover = eventsFileManager.read() assertEquals(0, filePathsAfterRollover.size) } @Test - fun `remove should delete a file`() { - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - eventsFileManager.storeEvent(createEvent("test1")) - eventsFileManager.rollover() - } + fun `remove should delete a file`() = runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.rollover() val filePaths = eventsFileManager.read() assertEquals(1, filePaths.size) eventsFileManager.remove(filePaths[0]) @@ -127,33 +137,28 @@ class EventsFileManagerTest { } @Test - fun `test split`() { - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - eventsFileManager.storeEvent(createEvent("test1")) - eventsFileManager.storeEvent(createEvent("test2")) - eventsFileManager.rollover() - } + fun `split event files`() = runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.storeEvent(createEvent("test2")) + eventsFileManager.rollover() + val filePaths = eventsFileManager.read() assertEquals(1, filePaths.size) - runBlocking { - val eventsString = eventsFileManager.getEventString(filePaths[0]) - val events = JSONArray(eventsString) - assertEquals(2, events.length()) - eventsFileManager.splitFile(filePaths[0], events) - } + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + + assertEquals(2, events.length()) + eventsFileManager.splitFile(filePaths[0], events) val filePathsAfterSplit = eventsFileManager.read() assertEquals(2, filePathsAfterSplit.size) + val file0 = File(filePathsAfterSplit[0]) val content0 = file0.readText() val lines0 = content0.split(EventsFileManager.DELIMITER) assertEquals(2, lines0.size) assertEquals(createEvent("test1"), lines0[0]) assertEquals("", lines0[1]) + val file1 = File(filePathsAfterSplit[1]) val content1 = file1.readText() val lines1 = content1.split(EventsFileManager.DELIMITER) @@ -163,333 +168,267 @@ class EventsFileManagerTest { } @Test - fun `verify delimiter handled gracefully`() { - val file0 = File(tempDir, "storageKey-0") + fun `verify delimiter handled gracefully`() = runBlocking { + val file0 = File(tempDir, "$STORAGE_KEY-0") file0.writeText("{\"eventType\":\"test1\"}\u0000{\"eventType\":\"test2\"}\u0000") - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - val filePaths = eventsFileManager.read() - assertEquals(1, filePaths.size) - val eventsString = eventsFileManager.getEventString(filePaths[0]) - val events = JSONArray(eventsString) - assertEquals(2, events.length()) - assertEquals("test1", events.getJSONObject(0).getString("eventType")) - assertEquals("test2", events.getJSONObject(1).getString("eventType")) - } + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals("test1", events.getJSONObject(0).getString("eventType")) + assertEquals("test2", events.getJSONObject(1).getString("eventType")) } @Test - fun `verify malformed event shows up in diagnostics`() { - val file0 = File(tempDir, "storageKey-0") - file0.writeText("{\"eventType\":\"test1\"}\u0000{\"eventType\":\"test2\"}\u0000{\"eventType\":\"test3\"\u0000") + fun `verify malformed event shows up in diagnostics`() = runBlocking { + val file0 = File(tempDir, "$STORAGE_KEY-0") + file0.writeText( + "{\"eventType\":\"test1\"}\u0000{\"eventType\":\"test2\"}\u0000{\"eventType\":\"test3\"\u0000" + ) val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) + val propertiesFile = PropertiesFile(tempDir, "test-prefix-$STORAGE_KEY", logger) val diagnostics = Diagnostics() val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, diagnostics) - runBlocking { - val filePaths = eventsFileManager.read() - assertEquals(1, filePaths.size) - val eventsString = eventsFileManager.getEventString(filePaths[0]) - val events = JSONArray(eventsString) - assertEquals(2, events.length()) - assertEquals("test1", events.getJSONObject(0).getString("eventType")) - assertEquals("test2", events.getJSONObject(1).getString("eventType")) - assertEquals("{\"malformed_events\":[\"{\\\"eventType\\\":\\\"test3\\\"\"]}", diagnostics.extractDiagnostics()) - } + EventsFileManager(tempDir, STORAGE_KEY, propertiesFile, logger, diagnostics) + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals("test1", events.getJSONObject(0).getString("eventType")) + assertEquals("test2", events.getJSONObject(1).getString("eventType")) + assertEquals( + "{\"malformed_events\":[\"{\\\"eventType\\\":\\\"test3\\\"\"]}", + diagnostics.extractDiagnostics() + ) } @Test - fun `verify delimiter in event names`() { - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - eventsFileManager.storeEvent(createEvent("test1")) - eventsFileManager.storeEvent(createEvent("test2\u0000")) - eventsFileManager.rollover() - val filePaths = eventsFileManager.read() - assertEquals(1, filePaths.size) - val eventsString = eventsFileManager.getEventString(filePaths[0]) - val events = JSONArray(eventsString) - assertEquals(2, events.length()) - assertEquals("test1", events.getJSONObject(0).getString("eventType")) - assertEquals("test2", events.getJSONObject(1).getString("eventType")) - } + fun `verify delimiter in event names`() = runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.storeEvent(createEvent("test2\u0000")) + eventsFileManager.rollover() + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals("test1", events.getJSONObject(0).getString("eventType")) + assertEquals("test2", events.getJSONObject(1).getString("eventType")) } @Test - fun `could handle earlier version of events file`() { + fun `could handle earlier version of events file`() = runBlocking { createEarlierVersionEventFiles() - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) val filePaths = eventsFileManager.read() assertEquals(7, filePaths.size) - runBlocking { - filePaths.withIndex().forEach { (index, filePath) -> - val file = File(filePath) - assertTrue(file.extension.isEmpty(), "file extension should be empty for v1 event files") - // verify file format updated to v2 - val content = file.readText() - val lines = content.split(EventsFileManager.DELIMITER) - if (index == 5) { - assertEquals(2, lines.size) - assertEquals("{\"eventType\":\"test11\"}", lines[0]) - } else { - assertEquals(3, lines.size) - assertEquals("{\"eventType\":\"test${index * 2 + 1}\"}", lines[0]) - assertEquals("{\"eventType\":\"test${index * 2 + 2}\"}", lines[1]) - } + filePaths.withIndex().forEach { (index, filePath) -> + val file = File(filePath) + assertTrue( + file.extension.isEmpty(), "file extension should be empty for v1 event files" + ) + // verify file format updated to v2 + val content = file.readText() + val lines = content.split(EventsFileManager.DELIMITER) + if (index == 5) { + assertEquals(2, lines.size) + assertEquals("{\"eventType\":\"test11\"}", lines[0]) + } else { + assertEquals(3, lines.size) + assertEquals("{\"eventType\":\"test${index * 2 + 1}\"}", lines[0]) + assertEquals("{\"eventType\":\"test${index * 2 + 2}\"}", lines[1]) + } - val eventsString = eventsFileManager.getEventString(filePath) - if (index == 5) { - assertEquals("[{\"eventType\":\"test11\"}]", eventsString) - } else { - val events = JSONArray(eventsString) - assertEquals(2, events.length()) - assertEquals( - "test${index * 2 + 1}", - events.getJSONObject(0).getString("eventType"), - ) - assertEquals( - "test${index * 2 + 2}", - events.getJSONObject(1).getString("eventType"), - ) - } + val eventsString = eventsFileManager.getEventString(filePath) + if (index == 5) { + assertEquals("[{\"eventType\":\"test11\"}]", eventsString) + } else { + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals( + "test${index * 2 + 1}", + events.getJSONObject(0).getString("eventType"), + ) + assertEquals( + "test${index * 2 + 2}", + events.getJSONObject(1).getString("eventType"), + ) } } } @Test - fun `could handle earlier versions with name conflict and new events`() { + fun `could handle earlier versions with name conflict and new events`() = runBlocking { createEarlierVersionEventFiles() - val file = File(tempDir, "storageKey-6") + val file = File(tempDir, "$STORAGE_KEY-6") file.writeText("{\"eventType\":\"test15\"},{\"eventType\":\"test16\"}]") - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - eventsFileManager.storeEvent(createEvent("test17")) - eventsFileManager.storeEvent(createEvent("test18")) - eventsFileManager.rollover() - } + eventsFileManager.storeEvent(createEvent("test17")) + eventsFileManager.storeEvent(createEvent("test18")) + eventsFileManager.rollover() var eventsCount = 0 val filePaths = eventsFileManager.read() - runBlocking { - filePaths.forEach { filePath -> - val eventsString = eventsFileManager.getEventString(filePath) - val events = JSONArray(eventsString) - eventsCount += events.length() - } + filePaths.forEach { filePath -> + val eventsString = eventsFileManager.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() } assertEquals(17, eventsCount) } @Test - fun `could handle earlier versions with line break in event name`() { - val file = File(tempDir, "storageKey-6") + fun `could handle earlier versions with line break in event name`() = runBlocking { + val file = File(tempDir, "$STORAGE_KEY-6") file.writeText("{\"eventType\":\"test15\"},{\"eventType\":\"test16\\nsuffix\"}]") - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - val filePaths = eventsFileManager.read() - assertEquals(1, filePaths.size) - val eventsString = eventsFileManager.getEventString(filePaths[0]) - val events = JSONArray(eventsString) - assertEquals(2, events.length()) - assertEquals("test15", events.getJSONObject(0).getString("eventType")) - assertEquals("test16\nsuffix", events.getJSONObject(1).getString("eventType")) - } + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals("test15", events.getJSONObject(0).getString("eventType")) + assertEquals("test16\nsuffix", events.getJSONObject(1).getString("eventType")) } @Test - fun `concurrent writes to the same event file manager instance`() { - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - runBlocking { - val job1 = - kotlinx.coroutines.GlobalScope.launch { - eventsFileManager.storeEvent(createEvent("test1")) - eventsFileManager.storeEvent(createEvent("test2")) - eventsFileManager.rollover() - } - val job2 = - kotlinx.coroutines.GlobalScope.launch { - eventsFileManager.rollover() - eventsFileManager.storeEvent(createEvent("test3")) - eventsFileManager.storeEvent(createEvent("test4")) - eventsFileManager.rollover() - } - val job3 = - kotlinx.coroutines.GlobalScope.launch { - eventsFileManager.rollover() - eventsFileManager.storeEvent(createEvent("test5")) - eventsFileManager.storeEvent(createEvent("test6")) - eventsFileManager.rollover() - } - kotlinx.coroutines.joinAll(job1, job2, job3) + fun `concurrent writes to the same event file manager instance`() = runBlocking { + val job1 = GlobalScope.launch { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.storeEvent(createEvent("test2")) + eventsFileManager.rollover() + } + val job2 = GlobalScope.launch { + eventsFileManager.rollover() + eventsFileManager.storeEvent(createEvent("test3")) + eventsFileManager.storeEvent(createEvent("test4")) + eventsFileManager.rollover() + } + val job3 = GlobalScope.launch { + eventsFileManager.rollover() + eventsFileManager.storeEvent(createEvent("test5")) + eventsFileManager.storeEvent(createEvent("test6")) + eventsFileManager.rollover() } + kotlinx.coroutines.joinAll(job1, job2, job3) val filePaths = eventsFileManager.read() var eventsCount = 0 - runBlocking { - filePaths.forEach { filePath -> - val eventsString = eventsFileManager.getEventString(filePath) - val events = JSONArray(eventsString) - eventsCount += events.length() - } + filePaths.forEach { filePath -> + val eventsString = eventsFileManager.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() } assertEquals(6, eventsCount) } @Test - fun `concurrent write from multiple threads`() { - val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + fun `concurrent write from multiple threads`() = runBlocking { for (i in 0..100) { - val thread = - thread { - runBlocking { - for (d in 0..10) { - eventsFileManager.storeEvent(createEvent("test$i-$d")) - } - eventsFileManager.rollover() + val thread = thread { + runBlocking { + for (d in 0..10) { + eventsFileManager.storeEvent(createEvent("test$i-$d")) } + eventsFileManager.rollover() } + } thread.join() } val filePaths = eventsFileManager.read() var eventsCount = 0 - runBlocking { - filePaths.forEach { filePath -> - val eventsString = eventsFileManager.getEventString(filePath) - val events = JSONArray(eventsString) - eventsCount += events.length() - } + filePaths.forEach { filePath -> + val eventsString = eventsFileManager.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() } assertEquals(101 * 11, eventsCount) } @Test - fun `concurrent write to two instances with same configuration`() { + fun `concurrent write to two instances with same configuration`() = runBlocking { val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile1 = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) - val propertiesFile2 = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) + val propertiesFile1 = PropertiesFile(tempDir, "test-prefix-$STORAGE_KEY", logger) + val propertiesFile2 = PropertiesFile(tempDir, "test-prefix-$STORAGE_KEY", logger) val eventsFileManager1 = - EventsFileManager(tempDir, storageKey, propertiesFile1, logger, testDiagnostics) + EventsFileManager(tempDir, STORAGE_KEY, propertiesFile1, logger, testDiagnostics) val eventsFileManager2 = - EventsFileManager(tempDir, storageKey, propertiesFile2, logger, testDiagnostics) - runBlocking { - val job1 = - kotlinx.coroutines.GlobalScope.launch { - eventsFileManager1.storeEvent(createEvent("test1")) - eventsFileManager1.storeEvent(createEvent("test2")) - eventsFileManager1.rollover() - } - val job2 = - kotlinx.coroutines.GlobalScope.launch { - eventsFileManager2.rollover() - eventsFileManager2.storeEvent(createEvent("test3")) - eventsFileManager2.storeEvent(createEvent("test4")) - eventsFileManager2.rollover() - } - val job3 = - kotlinx.coroutines.GlobalScope.launch { - eventsFileManager1.rollover() - eventsFileManager1.storeEvent(createEvent("test5")) - eventsFileManager1.storeEvent(createEvent("test6")) - eventsFileManager1.rollover() - } - val job4 = - kotlinx.coroutines.GlobalScope.launch { - eventsFileManager2.rollover() - eventsFileManager2.storeEvent(createEvent("test7")) - eventsFileManager2.storeEvent(createEvent("test8")) - eventsFileManager2.rollover() - } - kotlinx.coroutines.joinAll(job1, job2, job3, job4) + EventsFileManager(tempDir, STORAGE_KEY, propertiesFile2, logger, testDiagnostics) + val job1 = GlobalScope.launch { + eventsFileManager1.storeEvent(createEvent("test1")) + eventsFileManager1.storeEvent(createEvent("test2")) + eventsFileManager1.rollover() + } + val job2 = GlobalScope.launch { + eventsFileManager2.rollover() + eventsFileManager2.storeEvent(createEvent("test3")) + eventsFileManager2.storeEvent(createEvent("test4")) + eventsFileManager2.rollover() + } + val job3 = GlobalScope.launch { + eventsFileManager1.rollover() + eventsFileManager1.storeEvent(createEvent("test5")) + eventsFileManager1.storeEvent(createEvent("test6")) + eventsFileManager1.rollover() + } + val job4 = GlobalScope.launch { + eventsFileManager2.rollover() + eventsFileManager2.storeEvent(createEvent("test7")) + eventsFileManager2.storeEvent(createEvent("test8")) + eventsFileManager2.rollover() } + kotlinx.coroutines.joinAll(job1, job2, job3, job4) val filePaths = eventsFileManager1.read() var eventsCount = 0 - runBlocking { - filePaths.forEach { filePath -> - val eventsString = eventsFileManager1.getEventString(filePath) - val events = JSONArray(eventsString) - eventsCount += events.length() - } + filePaths.forEach { filePath -> + val eventsString = eventsFileManager1.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() } assertEquals(8, eventsCount) } @Test - fun `concurrent write from multiple threads on multiple instances`() { + fun `concurrent write from multiple threads on multiple instances`() = runBlocking { val logger = ConsoleLogger() - val storageKey = "storageKey" - val propertiesFile = PropertiesFile(tempDir, "test-prefix-$storageKey", logger) + val propertiesFile = PropertiesFile(tempDir, "test-prefix-$STORAGE_KEY", logger) for (i in 0..100) { val eventsFileManager = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) - val thread = - thread { - runBlocking { - for (d in 0..10) { - eventsFileManager.storeEvent(createEvent("test$i-$d")) - } - eventsFileManager.rollover() + EventsFileManager(tempDir, STORAGE_KEY, propertiesFile, logger, testDiagnostics) + val thread = thread { + runBlocking { + for (d in 0..10) { + eventsFileManager.storeEvent(createEvent("test$i-$d")) } + eventsFileManager.rollover() } + } thread.join() } val eventsFileManagerForRead = - EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + EventsFileManager(tempDir, STORAGE_KEY, propertiesFile, logger, testDiagnostics) val filePaths = eventsFileManagerForRead.read() var eventsCount = 0 - runBlocking { - filePaths.forEach { filePath -> - val eventsString = eventsFileManagerForRead.getEventString(filePath) - val events = JSONArray(eventsString) - eventsCount += events.length() - } + filePaths.forEach { filePath -> + val eventsString = eventsFileManagerForRead.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() } assertEquals(101 * 11, eventsCount) } private fun createEarlierVersionEventFiles() { - val file0 = File(tempDir, "storageKey-0") + val file0 = File(tempDir, "$STORAGE_KEY-0") file0.writeText("[{\"eventType\":\"test1\"},{\"eventType\":\"test2\"}]") - val file1 = File(tempDir, "storageKey-1") + val file1 = File(tempDir, "$STORAGE_KEY-1") file1.writeText(",{\"eventType\":\"test3\"},{\"eventType\":\"test4\"}]") - val file2 = File(tempDir, "storageKey-2") + val file2 = File(tempDir, "$STORAGE_KEY-2") file2.writeText("[[{\"eventType\":\"test5\"},{\"eventType\":\"test6\"}]]") - val file3 = File(tempDir, "storageKey-3") + val file3 = File(tempDir, "$STORAGE_KEY-3") file3.writeText("[{\"eventType\":\"test7\"},{\"eventType\":\"test8\"}]]") - val file4 = File(tempDir, "storageKey-4") + val file4 = File(tempDir, "$STORAGE_KEY-4") file4.writeText("{\"eventType\":\"test9\"},{\"eventType\":\"test10\"}]") - val file5 = File(tempDir, "storageKey-5") + val file5 = File(tempDir, "$STORAGE_KEY-5") file5.writeText("[{\"eventType\":\"test11\"}],{\"eventType\":\"test12\"}") - val file6 = File(tempDir, "storageKey-6.tmp") + val file6 = File(tempDir, "$STORAGE_KEY-6.tmp") file6.writeText("[{\"eventType\":\"test13\"},{\"eventType\":\"test14\"}") } diff --git a/core/src/test/kotlin/com/amplitude/core/utilities/ExponentialBackoffRetryHandlerTest.kt b/core/src/test/kotlin/com/amplitude/core/utilities/ExponentialBackoffRetryHandlerTest.kt new file mode 100644 index 00000000..5e78ecd8 --- /dev/null +++ b/core/src/test/kotlin/com/amplitude/core/utilities/ExponentialBackoffRetryHandlerTest.kt @@ -0,0 +1,101 @@ +package com.amplitude.core.utilities + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import kotlin.math.pow + +@OptIn(ExperimentalCoroutinesApi::class) +class ExponentialBackoffRetryHandlerTest { + + @Test + fun `attemptRetry returns properly until maxRetryAttempt is reached`() = runBlocking { + val handler = ExponentialBackoffRetryHandler( + maxRetryAttempt = 2, + baseDelayInMs = 10 + ) + repeat(3) { count -> + handler.attemptRetry { canRetry -> + if (count < 2) { + assertTrue(canRetry) + } else { + assertFalse(canRetry) + } + } + } + } + + @Test + fun `retryWithDelay increments attempts within max range`() = runTest { + val handler = ExponentialBackoffRetryHandler(maxRetryAttempt = 2) + + handler.attemptRetry {} + assertEquals(1, handler.attempt.get()) + handler.attemptRetry {} + assertEquals(2, handler.attempt.get()) + handler.attemptRetry {} + assertEquals(2, handler.attempt.get()) // max attempt reached + } + + @Test + fun `reset sets attempts to zero`() = runBlocking { + val handler = ExponentialBackoffRetryHandler(maxRetryAttempt = 3, baseDelayInMs = 10) + repeat(4) { count -> + handler.attemptRetry { canRetry -> + if (count < 3) { + assertTrue(canRetry) + } else { + assertFalse(canRetry) + } + } + } + handler.reset() + assertEquals(0, handler.attempt.get()) + handler.attemptRetry { canRetry -> + assertTrue(canRetry) + } + } + + @Test + fun `attemptRetry respects exponential backoff`() = runBlocking { + val baseDelayInMs = 10 + val attemptNumber = 4 + val handler = ExponentialBackoffRetryHandler( + baseDelayInMs = baseDelayInMs + ) + handler.attempt.set(attemptNumber) + + val startTime = System.currentTimeMillis() + handler.attemptRetry { canRetry -> + assertTrue(canRetry) + val endTime = System.currentTimeMillis() + val elapsedTime = endTime - startTime + + val expected = baseDelayInMs * 2.0.pow(attemptNumber) + assertTrue(elapsedTime >= expected) + } + } + + @Test + fun `maxDelay respects ceiling value and current attempt count`() { + val maxRetryAttempt = 10 + var handler = ExponentialBackoffRetryHandler( + maxRetryAttempt = maxRetryAttempt + ) + + // should be set to ceiling of 60 seconds + handler.attempt.set(10) + assertEquals(handler.maxDelayInMs, 60_000) + + // should be set to 2^5: 2^(maxRetryAttempt + 1) + handler = ExponentialBackoffRetryHandler( + maxRetryAttempt = 4 + ) + handler.attempt.set(4) + assertEquals(handler.maxDelayInMs, 32_000) + } +} diff --git a/core/src/test/kotlin/com/amplitude/core/utilities/FileResponseHandlerTest.kt b/core/src/test/kotlin/com/amplitude/core/utilities/FileResponseHandlerTest.kt index 4533aa18..1cfe4c5c 100644 --- a/core/src/test/kotlin/com/amplitude/core/utilities/FileResponseHandlerTest.kt +++ b/core/src/test/kotlin/com/amplitude/core/utilities/FileResponseHandlerTest.kt @@ -4,43 +4,353 @@ import com.amplitude.core.Configuration import com.amplitude.core.events.BaseEvent import com.amplitude.core.platform.EventPipeline import com.amplitude.core.utilities.http.BadRequestResponse +import com.amplitude.core.utilities.http.FailedResponse +import com.amplitude.core.utilities.http.PayloadTooLargeResponse +import com.amplitude.core.utilities.http.SuccessResponse +import com.amplitude.core.utilities.http.TimeoutResponse +import com.amplitude.core.utilities.http.TooManyRequestsResponse import io.mockk.every import io.mockk.mockk import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.UnconfinedTestDispatcher import org.json.JSONObject +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test +@OptIn(ExperimentalCoroutinesApi::class) class FileResponseHandlerTest { + private val storage = mockk() + private val pipeline = mockk(relaxed = true) + private val handler = FileResponseHandler( + storage = storage, + eventPipeline = pipeline, + configuration = Configuration( + apiKey = "test", + callback = { event: BaseEvent, _: Int, _: String -> + configCallBackEventTypes.add(event.eventType) + } + ), + scope = TestScope(), + storageDispatcher = UnconfinedTestDispatcher(), + logger = null + ) + private var configCallBackEventTypes = mutableListOf() + + init { + every { + storage.removeFile("file_path") + } returns true + every { + storage.splitEventFile("file_path", any()) + } returns Unit + every { + storage.releaseFile("file_path") + } returns Unit + } + + @Test + fun `success single event`() { + val response = SuccessResponse() + + val events = listOf( + generateBaseEvent("test1") + ) + handler.handleSuccessResponse( + successResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + assertTrue(configCallBackEventTypes.contains("test1")) + verify(exactly = 1) { + storage.removeFile("file_path") + } + } + @Test - fun testBadResponseHandlerForInvalidApiKey() { + fun `success multiple events`() { + val response = SuccessResponse() + + val events = listOf( + generateBaseEvent("test1"), + generateBaseEvent("test2"), + generateBaseEvent("test3"), + ) + handler.handleSuccessResponse( + successResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + val expectedEventTypes = events.map { it.eventType } + assertTrue(configCallBackEventTypes.containsAll(expectedEventTypes)) + verify(exactly = 1) { + storage.removeFile("file_path") + } + } + + @Test + fun `bad request single event`() { val response = BadRequestResponse( - JSONObject("{\"error\":\"Invalid API key\"}") + JSONObject("{\"error\":\"Some Error\"}") ) - val storage = mockk() - val pipeline = mockk() - val handler = - FileResponseHandler(storage, pipeline, Configuration("test"), mockk(), mockk(), null) - every { + handler.handleBadRequestResponse( + badRequestResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString( + listOf( + generateBaseEvent("test1") + ) + ) + ) + + assertTrue(configCallBackEventTypes.contains("test1")) + verify(exactly = 1) { storage.removeFile("file_path") - } returns true + } + } + + @Test + fun `bad request for invalid API key`() { + val response = BadRequestResponse( + JSONObject("{\"error\":\"Invalid API key\"}") + ) handler.handleBadRequestResponse( - response, - "file_path", - JSONUtil.eventsToString( - listOf(generateBaseEvent("test1"), generateBaseEvent("test2")) + badRequestResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString( + listOf( + generateBaseEvent("test1"), + generateBaseEvent("test2") + ) ) ) + assertTrue(configCallBackEventTypes.contains("test1")) + verify(exactly = 1) { + storage.removeFile("file_path") + } + } + + @Test + fun `bad request multiple events`() { + val response = BadRequestResponse( + JSONObject("{\"error\":\"Some Error\"}") + ) + + val events = listOf( + generateBaseEvent("test1"), + generateBaseEvent("test2"), + generateBaseEvent("test3"), + ) + handler.handleBadRequestResponse( + badRequestResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + val expectedEventTypes = events.map { it.eventType } + expectedEventTypes.forEach { eventType -> + verify { + pipeline.put(match { it.eventType == eventType }) + } + } + verify(exactly = 1) { + storage.removeFile("file_path") + } + } + + @Test + fun `bad request multiple events with events_with_invalid_fields and retry`() { + val badRequestResponseBody = """ + { + "code": 400, + "error": "Request missing required field", + "events_with_invalid_fields": { + "time": [ + 0 + ] + } + } + """.trimIndent() + val response = BadRequestResponse( + JSONObject(badRequestResponseBody) + ) + + val events = listOf( + generateBaseEvent("test1"), + generateBaseEvent("test2"), + generateBaseEvent("test3"), + ) + handler.handleBadRequestResponse( + badRequestResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + assertTrue(configCallBackEventTypes.contains("test1")) + verify { + pipeline.put(match { it.eventType == "test2" }) + } + verify { + pipeline.put(match { it.eventType == "test3" }) + } + verify { + storage.removeFile("file_path") + } + } + + @Test + fun `bad request multiple events with silenced_events and retry`() { + val badRequestResponseBody = """ + { + "code": 400, + "error": "Request missing required field", + "silenced_events": [ + 0 + ] + } + """.trimIndent() + val response = BadRequestResponse( + JSONObject(badRequestResponseBody) + ) + + val events = listOf( + generateBaseEvent("test1"), + generateBaseEvent("test2"), + generateBaseEvent("test3"), + ) + handler.handleBadRequestResponse( + badRequestResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + assertTrue(configCallBackEventTypes.contains("test1")) + verify { + pipeline.put(match { it.eventType == "test2" }) + } + verify { + pipeline.put(match { it.eventType == "test3" }) + } verify(exactly = 1) { storage.removeFile("file_path") } } - private fun generateBaseEvent(eventType: String): BaseEvent { - val baseEvent = BaseEvent() - baseEvent.eventType = eventType - return baseEvent + @Test + fun `handle payload too large with single event`() { + val response = PayloadTooLargeResponse( + JSONObject("{\"error\":\"Payload too large\"}") + ) + + val events = listOf( + generateBaseEvent("test1") + ) + handler.handlePayloadTooLargeResponse( + payloadTooLargeResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + assertTrue(configCallBackEventTypes.contains("test1")) + verify(exactly = 1) { + storage.removeFile("file_path") + } } + + @Test + fun `handle payload too large with multiple events`() { + val response = PayloadTooLargeResponse( + JSONObject("{\"error\":\"Payload too large\"}") + ) + + val events = listOf( + generateBaseEvent("test1"), + generateBaseEvent("test2"), + generateBaseEvent("test3") + ) + handler.handlePayloadTooLargeResponse( + payloadTooLargeResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + verify(exactly = 1) { + storage.splitEventFile("file_path", any()) + } + } + + @Test + fun `handle too many requests with multiple events`() { + val response = TooManyRequestsResponse( + JSONObject("{\"error\":\"Too many requests\"}") + ) + + val events = listOf( + generateBaseEvent("test1"), + generateBaseEvent("test2"), + generateBaseEvent("test3") + ) + handler.handleTooManyRequestsResponse( + tooManyRequestsResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + verify(exactly = 1) { + storage.releaseFile("file_path") + } + } + + @Test + fun `handle timeout with multiple events`() { + val response = TimeoutResponse() + + val events = listOf( + generateBaseEvent("test1"), + generateBaseEvent("test2"), + generateBaseEvent("test3") + ) + handler.handleTimeoutResponse( + timeoutResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + verify(exactly = 1) { + storage.releaseFile("file_path") + } + } + + @Test + fun `handle failed response with multiple events`() { + val response = FailedResponse( + JSONObject("{\"error\":\"Request failed\"}") + ) + + val events = listOf( + generateBaseEvent("test1"), + generateBaseEvent("test2"), + generateBaseEvent("test3") + ) + handler.handleFailedResponse( + failedResponse = response, + events = "file_path", + eventsString = JSONUtil.eventsToString(events) + ) + + verify(exactly = 1) { + storage.releaseFile("file_path") + } + } + + private fun generateBaseEvent(eventType: String) = BaseEvent() + .apply { + this.eventType = eventType + } } diff --git a/core/src/test/kotlin/com/amplitude/core/utils/Mocks.kt b/core/src/test/kotlin/com/amplitude/core/utils/Mocks.kt index cb695f75..b65d29ef 100644 --- a/core/src/test/kotlin/com/amplitude/core/utils/Mocks.kt +++ b/core/src/test/kotlin/com/amplitude/core/utils/Mocks.kt @@ -11,5 +11,12 @@ import kotlinx.coroutines.test.UnconfinedTestDispatcher fun testAmplitude(configuration: Configuration): Amplitude { val testDispatcher = UnconfinedTestDispatcher() val testScope = TestScope(testDispatcher) - return object : Amplitude(configuration, State(), testScope, testDispatcher, testDispatcher, testDispatcher, testDispatcher) {} + return object : Amplitude( + configuration, + State(), + testScope, + testDispatcher, + testDispatcher, + testDispatcher + ) {} }