Skip to content

Commit 6d1dbc3

Browse files
committed
add retry upload logic on EventPipeline
- uses ExponentialBackoffRetryHandler to handle most of the logic
1 parent b280a86 commit 6d1dbc3

File tree

2 files changed

+135
-27
lines changed

2 files changed

+135
-27
lines changed

core/src/main/java/com/amplitude/core/platform/EventPipeline.kt

+54-27
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package com.amplitude.core.platform
22

33
import com.amplitude.core.Amplitude
4+
import com.amplitude.core.Storage
45
import com.amplitude.core.events.BaseEvent
6+
import com.amplitude.core.utilities.ExponentialBackoffRetryHandler
57
import com.amplitude.core.utilities.http.HttpClient
68
import com.amplitude.core.utilities.http.HttpClientInterface
9+
import com.amplitude.core.utilities.http.ResponseHandler
710
import com.amplitude.core.utilities.logWithStackTrace
11+
import kotlinx.coroutines.CoroutineScope
812
import kotlinx.coroutines.channels.Channel
913
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1014
import kotlinx.coroutines.channels.consumeEach
@@ -17,21 +21,24 @@ import java.util.concurrent.atomic.AtomicInteger
1721

1822
class EventPipeline(
1923
private val amplitude: Amplitude,
20-
) {
21-
private val writeChannel: Channel<WriteQueueMessage>
22-
private val uploadChannel: Channel<String>
23-
private val eventCount: AtomicInteger = AtomicInteger(0)
24+
private val eventCount: AtomicInteger = AtomicInteger(0),
2425
private val httpClient: HttpClientInterface = amplitude.configuration.httpClient
25-
?: HttpClient(amplitude.configuration)
26-
private val storage get() = amplitude.storage
27-
private val scope get() = amplitude.amplitudeScope
26+
?: HttpClient(amplitude.configuration),
27+
private val retryUploadHandler: ExponentialBackoffRetryHandler =
28+
ExponentialBackoffRetryHandler(),
29+
private val storage: Storage = amplitude.storage,
30+
private val scope: CoroutineScope = amplitude.amplitudeScope,
31+
private val writeChannel: Channel<WriteQueueMessage> = Channel(UNLIMITED),
32+
private var uploadChannel: Channel<String> = Channel(UNLIMITED),
33+
overrideResponseHandler: ResponseHandler? = null,
34+
) {
2835

2936
private var running: Boolean
3037
private var scheduled: Boolean
3138
var flushSizeDivider: AtomicInteger = AtomicInteger(1)
3239

3340
private val responseHandler by lazy {
34-
storage.getResponseHandler(
41+
overrideResponseHandler ?: storage.getResponseHandler(
3542
this@EventPipeline,
3643
amplitude.configuration,
3744
scope,
@@ -40,16 +47,13 @@ class EventPipeline(
4047
}
4148

4249
companion object {
43-
internal const val UPLOAD_SIG = "#!upload"
50+
private const val UPLOAD_SIG = "#!upload"
4451
}
4552

4653
init {
4754
running = false
4855
scheduled = false
4956

50-
writeChannel = Channel(UNLIMITED)
51-
uploadChannel = Channel(UNLIMITED)
52-
5357
registerShutdownHook()
5458
}
5559

@@ -107,7 +111,7 @@ class EventPipeline(
107111

108112
private fun upload() =
109113
scope.launch(amplitude.networkIODispatcher) {
110-
uploadChannel.consumeEach {
114+
uploadChannel.consumeEach { signal ->
111115
withContext(amplitude.storageIODispatcher) {
112116
try {
113117
storage.rollover()
@@ -118,25 +122,48 @@ class EventPipeline(
118122
}
119123
}
120124

121-
val eventsData = storage.readEventsContent()
122-
for (events in eventsData) {
123-
try {
124-
val eventsString = storage.getEventsString(events)
125-
if (eventsString.isEmpty()) continue
125+
uploadNextEventFile()
126+
}
127+
}
126128

127-
val diagnostics = amplitude.diagnostics.extractDiagnostics()
128-
val response = httpClient.upload(eventsString, diagnostics)
129-
responseHandler.handle(response, events, eventsString)
130-
} catch (e: FileNotFoundException) {
131-
e.message?.let {
132-
amplitude.logger.warn("Event storage file not found: $it")
133-
}
134-
} catch (e: Exception) {
135-
e.logWithStackTrace(amplitude.logger, "Error when uploading event")
129+
private suspend fun uploadNextEventFile() {
130+
try {
131+
// only get first event file, we want to upload them one by one, in order
132+
val eventFile = storage.readEventsContent().firstOrNull() ?: return
133+
val eventsString = storage.getEventsString(eventFile)
134+
if (eventsString.isEmpty()) return
135+
136+
val diagnostics = amplitude.diagnostics.extractDiagnostics()
137+
val response = httpClient.upload(eventsString, diagnostics)
138+
val handled = responseHandler.handle(response, eventFile, eventsString)
139+
140+
when {
141+
handled -> retryUploadHandler.reset()
142+
!handled && retryUploadHandler.canRetry() -> {
143+
retryUploadHandler.retryWithDelay {
144+
uploadChannel.trySend(UPLOAD_SIG)
136145
}
137146
}
147+
else -> {
148+
amplitude.logger.error(
149+
"Upload failed ${retryUploadHandler.maxRetryAttempt} times. " +
150+
"Cancel all uploads as we might be offline."
151+
)
152+
153+
// cancel all previous signals in the upload channel and reset
154+
uploadChannel.cancel()
155+
uploadChannel = Channel(UNLIMITED)
156+
retryUploadHandler.reset()
157+
}
158+
}
159+
} catch (e: FileNotFoundException) {
160+
e.message?.let {
161+
amplitude.logger.warn("Event storage file not found: $it")
138162
}
163+
} catch (e: Exception) {
164+
e.logWithStackTrace(amplitude.logger, "Error when uploading event")
139165
}
166+
}
140167

141168
private fun getFlushCount(): Int {
142169
val count = amplitude.configuration.flushQueueSize / flushSizeDivider.get()

core/src/test/kotlin/com/amplitude/core/platform/EventPipelineTest.kt

+81
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,30 @@ import com.amplitude.core.Configuration
55
import com.amplitude.core.State
66
import com.amplitude.core.events.BaseEvent
77
import com.amplitude.core.utilities.ConsoleLoggerProvider
8+
import com.amplitude.core.utilities.ExponentialBackoffRetryHandler
89
import com.amplitude.core.utilities.InMemoryStorageProvider
10+
import com.amplitude.core.utilities.http.BadRequestResponse
11+
import com.amplitude.core.utilities.http.FailedResponse
12+
import com.amplitude.core.utilities.http.HttpStatus
13+
import com.amplitude.core.utilities.http.PayloadTooLargeResponse
14+
import com.amplitude.core.utilities.http.ResponseHandler
15+
import com.amplitude.core.utilities.http.SuccessResponse
16+
import com.amplitude.core.utilities.http.TimeoutResponse
17+
import com.amplitude.core.utilities.http.TooManyRequestsResponse
918
import com.amplitude.id.IMIdentityStorageProvider
19+
import io.mockk.coVerify
20+
import io.mockk.every
21+
import io.mockk.mockk
1022
import io.mockk.spyk
1123
import io.mockk.verify
1224
import kotlinx.coroutines.ExperimentalCoroutinesApi
25+
import kotlinx.coroutines.channels.Channel
26+
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1327
import kotlinx.coroutines.test.StandardTestDispatcher
1428
import kotlinx.coroutines.test.TestScope
1529
import kotlinx.coroutines.test.advanceUntilIdle
1630
import kotlinx.coroutines.test.runTest
31+
import org.junit.jupiter.api.Assertions.assertTrue
1732
import org.junit.jupiter.api.Test
1833

1934
@ExperimentalCoroutinesApi
@@ -37,6 +52,7 @@ class EventPipelineTest {
3752
storageIODispatcher = testDispatcher,
3853
retryDispatcher = testDispatcher
3954
)
55+
private val fakeResponseHandler: ResponseHandler = mockk(relaxed= true)
4056

4157
@Test
4258
fun `should not flush when put and offline`() = runTest(testDispatcher) {
@@ -79,4 +95,69 @@ class EventPipelineTest {
7995

8096
verify(exactly = 1) { eventPipeline.flush() }
8197
}
98+
99+
@Test
100+
fun `should reset retry handler on successful upload`() = runTest(testDispatcher) {
101+
amplitude.isBuilt.await()
102+
val retryUploadHandler = spyk(ExponentialBackoffRetryHandler())
103+
val eventPipeline = EventPipeline(
104+
amplitude,
105+
retryUploadHandler = retryUploadHandler,
106+
overrideResponseHandler = fakeResponseHandler
107+
)
108+
every { fakeResponseHandler.handle(any(), any(), any()) } returns true
109+
val event = BaseEvent().apply { eventType = "test_event" }
110+
111+
eventPipeline.start()
112+
eventPipeline.put(event)
113+
advanceUntilIdle()
114+
115+
verify { retryUploadHandler.reset() }
116+
}
117+
118+
@Test
119+
fun `should retry on failure`() = runTest(testDispatcher) {
120+
amplitude.isBuilt.await()
121+
val retryUploadHandler = spyk(ExponentialBackoffRetryHandler())
122+
val eventPipeline = EventPipeline(
123+
amplitude,
124+
retryUploadHandler = retryUploadHandler,
125+
overrideResponseHandler = fakeResponseHandler
126+
)
127+
val event = BaseEvent().apply { eventType = "test_event" }
128+
129+
eventPipeline.start()
130+
eventPipeline.put(event)
131+
advanceUntilIdle()
132+
133+
coVerify { retryUploadHandler.retryWithDelay(any<suspend () -> Unit>()) }
134+
}
135+
136+
@Test
137+
fun `should reset after max retry attempts`() = runTest(testDispatcher) {
138+
amplitude.isBuilt.await()
139+
val retryUploadHandler = spyk(
140+
ExponentialBackoffRetryHandler(
141+
maxRetryAttempt = 0
142+
)
143+
)
144+
val uploadChannel = Channel<String>(UNLIMITED)
145+
uploadChannel.trySend("test1")
146+
uploadChannel.trySend("test2")
147+
val eventPipeline = EventPipeline(
148+
amplitude,
149+
retryUploadHandler = retryUploadHandler,
150+
overrideResponseHandler = fakeResponseHandler,
151+
uploadChannel = uploadChannel
152+
)
153+
val event = BaseEvent().apply { eventType = "test_event" }
154+
155+
eventPipeline.start()
156+
eventPipeline.put(event)
157+
advanceUntilIdle()
158+
159+
verify { retryUploadHandler.reset() }
160+
assertTrue(uploadChannel.isClosedForSend)
161+
assertTrue(uploadChannel.isClosedForReceive)
162+
}
82163
}

0 commit comments

Comments
 (0)