Skip to content

fix: retry upload order #259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
731bfcd
fix warnings on file storage and add KDocs
polbins Mar 20, 2025
f312e00
remove previous retry logic
polbins Mar 20, 2025
4676d06
fix ktlint
polbins Mar 20, 2025
bc741f4
fix failing test
polbins Mar 20, 2025
f647d5b
add return value for handled responses
polbins Mar 20, 2025
addb2a3
add tests to file response handler
polbins Mar 20, 2025
f82dd6c
add retry w/ exponential backoff for cases where an upload request fa…
polbins Mar 21, 2025
c035d24
simplify setup of pipeline test
polbins Mar 21, 2025
cd89a80
add retry upload logic on EventPipeline
polbins Mar 21, 2025
d5319eb
ktlint fixes
polbins Mar 21, 2025
5d3b8df
integration test fixes, use a test dispatcher and fix timing/order
polbins Mar 24, 2025
7460a90
clean-up EventsFileManagerTest
polbins Mar 26, 2025
31e8581
fix sorting for file names with varying length
polbins Mar 26, 2025
f93b2d9
Revert "add return value for handled responses"
polbins Mar 27, 2025
9bd00a5
move HttpStatus to AnalyticsResponse where all of its usages are in
polbins Mar 27, 2025
3776394
make AnalyticsResponse a sealed class
polbins Mar 27, 2025
ecb102c
update retry handling on event pipeline based on review comments
polbins Mar 27, 2025
c0300d6
fix visibility and update docs
polbins Mar 27, 2025
38437db
use configuration.flushMaxRetries for retry handler
polbins Mar 27, 2025
fc37c31
remove retryDispatcher in favor of storageDispatcher
polbins Mar 28, 2025
befdcb7
re-introduce file response handler right after an upload
polbins Mar 28, 2025
6594203
use storageDispatcher for all storage related operations
polbins Mar 28, 2025
1b5889c
add a ceiling value to maxDelay and compute it on the handler itself
polbins Apr 1, 2025
8561db9
add more comments to test
polbins Apr 1, 2025
445aadb
fix lint
polbins Apr 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ class AndroidStorageV2(
eventPipeline: EventPipeline,
configuration: Configuration,
scope: CoroutineScope,
dispatcher: CoroutineDispatcher,
storageDispatcher: CoroutineDispatcher,
): ResponseHandler {
return FileResponseHandler(
this,
eventPipeline,
configuration,
scope,
dispatcher,
storageDispatcher,
logger,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ class AndroidStorage(
eventPipeline: EventPipeline,
configuration: Configuration,
scope: CoroutineScope,
dispatcher: CoroutineDispatcher,
storageDispatcher: CoroutineDispatcher,
): ResponseHandler {
return storageV2.getResponseHandler(eventPipeline, configuration, scope, dispatcher)
return storageV2.getResponseHandler(eventPipeline, configuration, scope, storageDispatcher)
}

override fun removeFile(filePath: String): Boolean {
Expand Down
159 changes: 116 additions & 43 deletions android/src/test/java/com/amplitude/android/ResponseHandlerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import io.mockk.every
import io.mockk.mockkConstructor
import io.mockk.mockkStatic
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestCoroutineScheduler
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import okhttp3.mockwebserver.MockResponse
import okhttp3.mockwebserver.MockWebServer
import okhttp3.mockwebserver.RecordedRequest
Expand All @@ -26,12 +31,15 @@ import org.junit.runner.RunWith
import org.robolectric.RobolectricTestRunner
import java.util.concurrent.TimeUnit

private const val FLUSH_INTERVAL_IN_MS = 150
private const val FLUSH_MAX_RETRIES = 3

@ExperimentalCoroutinesApi
@RunWith(RobolectricTestRunner::class)
class ResponseHandlerTest {
private lateinit var server: MockWebServer
private lateinit var amplitude: Amplitude

@ExperimentalCoroutinesApi
@Before
fun setup() {
server = MockWebServer()
Expand All @@ -46,9 +54,9 @@ class ResponseHandlerTest {
context = context,
serverUrl = server.url("/").toString(),
autocapture = setOf(),
flushIntervalMillis = 150,
flushIntervalMillis = FLUSH_INTERVAL_IN_MS,
identifyBatchIntervalMillis = 1000,
flushMaxRetries = 3,
flushMaxRetries = FLUSH_MAX_RETRIES,
identityStorageProvider = IMIdentityStorageProvider(),
)
)
Expand Down Expand Up @@ -84,29 +92,37 @@ class ResponseHandlerTest {
}

@Test
fun `test handle on rate limit`() {
fun `test handle on rate limit`() = runTest {
setAmplitudeDispatchers(amplitude, testScheduler)
val expectedEventsSize = FLUSH_MAX_RETRIES
val rateLimitBody = """
{
"code": 429,
"error": "Too many requests for some devices and users",
"eps_threshold": 30
}
""".trimIndent()
for (i in 1..6) {
repeat(FLUSH_MAX_RETRIES) {
server.enqueue(MockResponse().setBody(rateLimitBody).setResponseCode(429))
}
for (k in 1..4) {
server.enqueue(MockResponse().setResponseCode(200))

amplitude.isBuilt.await()
for (k in 1..expectedEventsSize) {
amplitude.track("test event $k")
runRequest()
}
Thread.sleep(100)
advanceUntilIdle()

// verify the total request count when reaching max retries
assertEquals(6, server.requestCount)
assertEquals(1 + FLUSH_MAX_RETRIES, server.requestCount)
}

@Test
fun `test handle payload too large with only one event`() {
server.enqueue(MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}").setResponseCode(413))
fun `test handle payload too large with only one event`() = runTest {
server.enqueue(
MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}")
.setResponseCode(413)
)
val options = EventOptions()
var statusCode = 0
var callFinished = false
Expand All @@ -128,40 +144,67 @@ class ResponseHandlerTest {
}

@Test
fun `test handle payload too large`() {
server.enqueue(MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}").setResponseCode(413))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
fun `test handle payload too large`() = runTest {
setAmplitudeDispatchers(amplitude, testScheduler)
val expectedSuccesEvents = 5
server.enqueue(
MockResponse().setBody("{\"code\": \"413\", \"error\": \"payload too large\"}")
.setResponseCode(413)
)
repeat(expectedSuccesEvents) {
server.enqueue(MockResponse().setBody("{\"code\": \"200\"}").setResponseCode(200))
}
var eventCompleteCount = 0
val statusMap = mutableMapOf<Int, Int>()
val options = EventOptions()
options.callback = { _: BaseEvent, status: Int, _: String ->
eventCompleteCount++
statusMap.put(status, statusMap.getOrDefault(status, 0) + 1)
statusMap[status] = statusMap.getOrDefault(status, 0) + 1
}

// send 2 events so the event size will be greater than 1 and call split event file
amplitude.isBuilt.await()
amplitude.track("test event 1", options = options)
amplitude.track("test event 2", options = options)
advanceTimeBy(1_000)

// verify first request that hit 413
val request = runRequest()
requireNotNull(request)
val failedEvents = getEventsFromRequest(request)
assertEquals(2, failedEvents.size)

// send succeeding events
amplitude.track("test event 3", options = options)
amplitude.track("test event 4", options = options)
val request = runRequest()
// verify the first request hit 413
assertNotNull(request)
val events = getEventsFromRequest(request!!)
assertEquals(4, events.size)
amplitude.track("test event 5", options = options)
runRequest()
runRequest()
runRequest()
Thread.sleep(150)
advanceUntilIdle()

// verify next requests after split event file
val splitRequest1 = runRequest()
requireNotNull(splitRequest1)
val splitEvents1 = getEventsFromRequest(splitRequest1)
assertEquals(1, splitEvents1.size)
val splitRequest2 = runRequest()
requireNotNull(splitRequest2)
val splitEvents2 = getEventsFromRequest(splitRequest2)
assertEquals(1, splitEvents2.size)

// verify we completed processing for the events after file split
val afterSplitRequest = runRequest()
requireNotNull(afterSplitRequest)
val afterSplitEvents = getEventsFromRequest(afterSplitRequest)
assertEquals(3, afterSplitEvents.size)

// verify we completed processing for the events after file split
assertEquals(4, server.requestCount)
assertEquals(5, statusMap.get(200))
assertEquals(5, eventCompleteCount)
assertEquals(expectedSuccesEvents, statusMap[200])
assertEquals(expectedSuccesEvents, eventCompleteCount)
}

@Test
fun `test handle bad request response`() {
fun `test handle bad request response`() = runTest {
setAmplitudeDispatchers(amplitude, testScheduler)
val badRequestResponseBody = """
{
"code": 400,
Expand All @@ -185,27 +228,37 @@ class ResponseHandlerTest {
val options = EventOptions()
options.callback = { _: BaseEvent, status: Int, _: String ->
eventCompleteCount++
statusMap.put(status, statusMap.getOrDefault(status, 0) + 1)
statusMap[status] = statusMap.getOrDefault(status, 0) + 1
}

amplitude.isBuilt.await()
amplitude.track("test event 1", options = options)
advanceUntilIdle()

// verify first request that hit 400
val request = runRequest()
requireNotNull(request)
val failedEvents = getEventsFromRequest(request)
assertEquals(1, failedEvents.size)

// send succeeding events
amplitude.track("test event 2", options = options)
amplitude.track("test event 3", options = options)
amplitude.track("test event 4", options = options)
// verify first request take 4 events hit 400
val request = runRequest()
assertNotNull(request)
val events = getEventsFromRequest(request!!)
assertEquals(4, events.size)
// verify second request take 2 events after removing 2 bad events
val request2 = runRequest()
assertNotNull(request2)
val events2 = getEventsFromRequest(request2!!)
assertEquals(2, events2.size)
advanceUntilIdle()

// verify next request that the 3 events with 200
val successRequest = runRequest()
requireNotNull(successRequest)
val successfulEvents = getEventsFromRequest(successRequest)
assertEquals(3, successfulEvents.size)

assertEquals(2, server.requestCount)
Thread.sleep(10)

// verify the processed status
assertEquals(2, statusMap.get(400))
assertEquals(2, statusMap.get(200))
assertEquals(1, statusMap[400])
assertEquals(3, statusMap[200])
assertEquals(4, eventCompleteCount)
}

Expand Down Expand Up @@ -264,7 +317,7 @@ class ResponseHandlerTest {

private fun runRequest(): RecordedRequest? {
return try {
server.takeRequest(5, TimeUnit.SECONDS)
server.takeRequest(1, TimeUnit.SECONDS)
} catch (e: InterruptedException) {
null
}
Expand All @@ -287,4 +340,24 @@ class ResponseHandlerTest {
every { anyConstructed<AndroidContextProvider>().mostRecentLocation } returns null
every { anyConstructed<AndroidContextProvider>().appSetId } returns ""
}

companion object {
private fun setAmplitudeDispatchers(
amplitude: com.amplitude.core.Amplitude,
testCoroutineScheduler: TestCoroutineScheduler,
) {
// inject these dispatcher fields with reflection, as the field is val (read-only)
listOf(
"amplitudeDispatcher",
"networkIODispatcher",
"storageIODispatcher"
).forEach { dispatcherField ->
com.amplitude.core.Amplitude::class.java.getDeclaredField(dispatcherField)
.apply {
isAccessible = true
set(amplitude, StandardTestDispatcher(testCoroutineScheduler))
}
}
}
}
}
3 changes: 1 addition & 2 deletions core/src/main/java/com/amplitude/core/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ open class Amplitude internal constructor(
val amplitudeScope: CoroutineScope = CoroutineScope(SupervisorJob()),
val amplitudeDispatcher: CoroutineDispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher(),
val networkIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(),
val storageIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(),
val retryDispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(),
val storageIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
) {
val timeline: Timeline
lateinit var storage: Storage
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/amplitude/core/Storage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ interface Storage {
eventPipeline: EventPipeline,
configuration: Configuration,
scope: CoroutineScope,
dispatcher: CoroutineDispatcher,
storageDispatcher: CoroutineDispatcher,
): ResponseHandler
}

Expand Down
Loading