Skip to content

fix: 4xx-handling-when-on-proxy #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private const val FLUSH_MAX_RETRIES = 3

@ExperimentalCoroutinesApi
@RunWith(RobolectricTestRunner::class)
class ResponseHandlerTest {
class ResponseHandlerIntegrationTest {
private lateinit var server: MockWebServer
private lateinit var amplitude: Amplitude

Expand Down Expand Up @@ -203,7 +203,7 @@ class ResponseHandlerTest {
}

@Test
fun `test handle bad request response`() = runTest {
fun `test handle bad request response - missing field`() = runTest {
setAmplitudeDispatchers(amplitude, testScheduler)
val badRequestResponseBody = """
{
Expand All @@ -216,7 +216,7 @@ class ResponseHandlerTest {
},
"events_with_missing_fields": {
"event_type": [
2
0
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ class EventPipeline(

val diagnostics = amplitude.diagnostics.extractDiagnostics()
val response = httpClient.upload(eventsString, diagnostics)
responseHandler.handle(response, eventFile, eventsString)
val shouldRetryUploadOnFailure = responseHandler.handle(response, eventFile, eventsString)

// if we encounter a retryable error, we retry with delay and
// restart the loop to get the newest event files
if (response.status.shouldRetryUploadOnFailure == true) {
if (shouldRetryUploadOnFailure == true) {
retryUploadHandler.attemptRetry { canRetry ->
val retrySignal = if (canRetry) UPLOAD_SIG else MAX_RETRY_ATTEMPT_SIG
uploadChannel.trySend(retrySignal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,7 @@ class FileResponseHandler(
) {
val eventFilePath = events as String
logger?.debug("Handle response, status: ${successResponse.status}")
val eventsList: List<BaseEvent>
try {
eventsList = JSONArray(eventsString).toEvents()
} catch (e: JSONException) {
scope.launch(storageDispatcher) {
storage.removeFile(eventFilePath)
}
removeCallbackByInsertId(eventsString)
throw e
}
val eventsList = parseEvents(eventsString, eventFilePath).toEvents()
triggerEventsCallback(eventsList, HttpStatus.SUCCESS.code, "Event sent success.")
scope.launch(storageDispatcher) {
storage.removeFile(eventFilePath)
Expand All @@ -54,27 +45,18 @@ class FileResponseHandler(
badRequestResponse: BadRequestResponse,
events: Any,
eventsString: String,
) {
): Boolean {
logger?.debug(
"Handle response, status: ${badRequestResponse.status}, error: ${badRequestResponse.error}"
)
val eventFilePath = events as String
val eventsList: List<BaseEvent>
try {
eventsList = JSONArray(eventsString).toEvents()
} catch (e: JSONException) {
scope.launch(storageDispatcher) {
storage.removeFile(eventFilePath)
}
removeCallbackByInsertId(eventsString)
throw e
}
if (eventsList.size == 1 || badRequestResponse.isInvalidApiKeyResponse()) {
val eventsList = parseEvents(eventsString, eventFilePath).toEvents()
if (badRequestResponse.isInvalidApiKeyResponse()) {
triggerEventsCallback(eventsList, HttpStatus.BAD_REQUEST.code, badRequestResponse.error)
scope.launch(storageDispatcher) {
storage.removeFile(eventFilePath)
}
return
return false
}
val droppedIndices = badRequestResponse.getEventIndicesToDrop()
val eventsToDrop = mutableListOf<BaseEvent>()
Expand All @@ -93,6 +75,10 @@ class FileResponseHandler(
scope.launch(storageDispatcher) {
storage.removeFile(eventFilePath)
}

// shouldRetryUploadOnFailure is true if there are NO events to drop, this happens
// when connected to a proxy and it returns 400 with w/o the eventsToDrop fields
return eventsToDrop.isEmpty()
}

override fun handlePayloadTooLargeResponse(
Expand All @@ -104,16 +90,7 @@ class FileResponseHandler(
"Handle response, status: ${payloadTooLargeResponse.status}, error: ${payloadTooLargeResponse.error}"
)
val eventFilePath = events as String
val rawEvents: JSONArray
try {
rawEvents = JSONArray(eventsString)
} catch (e: JSONException) {
scope.launch(storageDispatcher) {
storage.removeFile(eventFilePath)
}
removeCallbackByInsertId(eventsString)
throw e
}
val rawEvents = parseEvents(eventsString, eventFilePath)
if (rawEvents.length() == 1) {
val eventsList = rawEvents.toEvents()
triggerEventsCallback(
Expand Down Expand Up @@ -168,6 +145,28 @@ class FileResponseHandler(
}
}

/**
* Parse events from the [eventsString] at the given [eventFilePath].
* If parsing fails, this removes the file at [eventFilePath], and
* remove the callback by insert ID, and throws a [JSONException].
*/
private fun parseEvents(
eventsString: String,
eventFilePath: String,
): JSONArray {
val rawEvents: JSONArray
try {
rawEvents = JSONArray(eventsString)
} catch (e: JSONException) {
scope.launch(storageDispatcher) {
storage.removeFile(eventFilePath)
}
removeCallbackByInsertId(eventsString)
throw e
}
return rawEvents
}

private fun triggerEventsCallback(
events: List<BaseEvent>,
status: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,35 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

internal class InMemoryResponseHandler(
val eventPipeline: EventPipeline,
val configuration: Configuration,
val scope: CoroutineScope,
val dispatcher: CoroutineDispatcher
private val eventPipeline: EventPipeline,
private val configuration: Configuration,
private val scope: CoroutineScope,
private val storageDispatcher: CoroutineDispatcher,
) : ResponseHandler {

companion object {
const val BACK_OFF: Long = 30000
}

override fun handleSuccessResponse(successResponse: SuccessResponse, events: Any, eventsString: String) {
triggerEventsCallback(events as List<BaseEvent>, HttpStatus.SUCCESS.code, "Event sent success.")
override fun handleSuccessResponse(
successResponse: SuccessResponse,
events: Any,
eventsString: String,
) {
triggerEventsCallback(
events as List<BaseEvent>, HttpStatus.SUCCESS.code, "Event sent success."
)
}

override fun handleBadRequestResponse(badRequestResponse: BadRequestResponse, events: Any, eventsString: String) {
override fun handleBadRequestResponse(
badRequestResponse: BadRequestResponse,
events: Any,
eventsString: String,
): Boolean {
val eventsList = events as List<BaseEvent>
if (eventsList.size == 1 || badRequestResponse.isInvalidApiKeyResponse()) {
triggerEventsCallback(eventsList, HttpStatus.BAD_REQUEST.code, badRequestResponse.error)
return
return false
}
val droppedIndices = badRequestResponse.getEventIndicesToDrop()
val eventsToDrop = mutableListOf<BaseEvent>()
Expand All @@ -51,12 +61,19 @@ internal class InMemoryResponseHandler(
eventsToRetry.forEach {
eventPipeline.put(it)
}
return eventsToDrop.isEmpty()
}

override fun handlePayloadTooLargeResponse(payloadTooLargeResponse: PayloadTooLargeResponse, events: Any, eventsString: String) {
override fun handlePayloadTooLargeResponse(
payloadTooLargeResponse: PayloadTooLargeResponse,
events: Any,
eventsString: String,
) {
val eventsList = events as List<BaseEvent>
if (eventsList.size == 1) {
triggerEventsCallback(eventsList, HttpStatus.PAYLOAD_TOO_LARGE.code, payloadTooLargeResponse.error)
triggerEventsCallback(
eventsList, HttpStatus.PAYLOAD_TOO_LARGE.code, payloadTooLargeResponse.error
)
return
}
eventPipeline.flushSizeDivider.incrementAndGet()
Expand All @@ -65,7 +82,11 @@ internal class InMemoryResponseHandler(
}
}

override fun handleTooManyRequestsResponse(tooManyRequestsResponse: TooManyRequestsResponse, events: Any, eventsString: String) {
override fun handleTooManyRequestsResponse(
tooManyRequestsResponse: TooManyRequestsResponse,
events: Any,
eventsString: String,
) {
val eventsList = events as List<BaseEvent>
val eventsToDrop = mutableListOf<BaseEvent>()
val eventsToRetryNow = mutableListOf<BaseEvent>()
Expand All @@ -79,29 +100,39 @@ internal class InMemoryResponseHandler(
eventsToRetryNow.add(event)
}
}
triggerEventsCallback(eventsToDrop, HttpStatus.TOO_MANY_REQUESTS.code, tooManyRequestsResponse.error)
triggerEventsCallback(
eventsToDrop, HttpStatus.TOO_MANY_REQUESTS.code, tooManyRequestsResponse.error
)
eventsToRetryNow.forEach {
eventPipeline.put(it)
}
scope.launch(dispatcher) {
scope.launch(storageDispatcher) {
delay(BACK_OFF)
eventsToRetryLater.forEach {
eventPipeline.put(it)
}
}
}

override fun handleTimeoutResponse(timeoutResponse: TimeoutResponse, events: Any, eventsString: String) {
override fun handleTimeoutResponse(
timeoutResponse: TimeoutResponse,
events: Any,
eventsString: String,
) {
val eventsList = events as List<BaseEvent>
scope.launch(dispatcher) {
scope.launch(storageDispatcher) {
delay(BACK_OFF)
eventsList.forEach {
eventPipeline.put(it)
}
}
}

override fun handleFailedResponse(failedResponse: FailedResponse, events: Any, eventsString: String) {
override fun handleFailedResponse(
failedResponse: FailedResponse,
events: Any,
eventsString: String,
) {
val eventsList = events as List<BaseEvent>
val eventsToDrop = mutableListOf<BaseEvent>()
val eventsToRetry = mutableListOf<BaseEvent>()
Expand All @@ -118,7 +149,11 @@ internal class InMemoryResponseHandler(
}
}

private fun triggerEventsCallback(events: List<BaseEvent>, status: Int, message: String) {
private fun triggerEventsCallback(
events: List<BaseEvent>,
status: Int,
message: String,
) {
events.forEach { event ->
configuration.callback?.let {
it(event, status, message)
Expand Down
Loading
Loading