Skip to content

fix: reformatting and warning fixes for upload ordering ticket #257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions core/src/main/java/com/amplitude/core/platform/EventPipeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class EventPipeline(

private val eventCount: AtomicInteger = AtomicInteger(0)

private val httpClient: HttpClientInterface = amplitude.configuration.httpClient ?: HttpClient(amplitude.configuration)
private val httpClient: HttpClientInterface = amplitude.configuration.httpClient
?: HttpClient(amplitude.configuration)

private val storage get() = amplitude.storage

Expand All @@ -34,11 +35,9 @@ class EventPipeline(
var flushInterval = amplitude.configuration.flushIntervalMillis.toLong()
var flushQueueSize = amplitude.configuration.flushQueueSize

var running: Boolean
private set
private var running: Boolean

var scheduled: Boolean
private set
private var scheduled: Boolean

var flushSizeDivider: AtomicInteger = AtomicInteger(1)

Expand All @@ -48,7 +47,7 @@ class EventPipeline(
internal const val UPLOAD_SIG = "#!upload"
}

val responseHandler: ResponseHandler
private val responseHandler: ResponseHandler

init {
running = false
Expand Down Expand Up @@ -98,7 +97,10 @@ class EventPipeline(
try {
storage.writeEvent(message.event)
} catch (e: Exception) {
e.logWithStackTrace(amplitude.logger, "Error when writing event to pipeline")
e.logWithStackTrace(
amplitude.logger,
"Error when writing event to pipeline"
)
}
}

Expand Down Expand Up @@ -155,15 +157,11 @@ class EventPipeline(
return count.takeUnless { it == 0 } ?: 1
}

private fun getFlushIntervalInMillis(): Long {
return flushInterval
}

private fun schedule() =
scope.launch(amplitude.storageIODispatcher) {
if (isActive && running && !scheduled && !exceededRetries) {
scheduled = true
delay(getFlushIntervalInMillis())
delay(flushInterval)
flush()
scheduled = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,18 @@ import java.lang.Exception

internal object HttpResponse {
fun createHttpResponse(code: Int, responseBody: String?): AnalyticsResponse {
when (code) {
HttpStatus.SUCCESS.code -> {
return SuccessResponse()
}

HttpStatus.BAD_REQUEST.code -> {
return BadRequestResponse(JSONObject(responseBody))
}

HttpStatus.PAYLOAD_TOO_LARGE.code -> {
return PayloadTooLargeResponse(JSONObject(responseBody))
}

HttpStatus.TOO_MANY_REQUESTS.code -> {
return TooManyRequestsResponse(JSONObject(responseBody))
}

HttpStatus.TIMEOUT.code -> {
return TimeoutResponse()
}

else -> {
return FailedResponse(parseResponseBodyOrGetDefault(responseBody))
}
return when (code) {
HttpStatus.SUCCESS.code -> SuccessResponse()

HttpStatus.BAD_REQUEST.code -> BadRequestResponse(JSONObject(responseBody))

HttpStatus.PAYLOAD_TOO_LARGE.code -> PayloadTooLargeResponse(JSONObject(responseBody))

HttpStatus.TOO_MANY_REQUESTS.code -> TooManyRequestsResponse(JSONObject(responseBody))

HttpStatus.TIMEOUT.code -> TimeoutResponse()

else -> FailedResponse(parseResponseBodyOrGetDefault(responseBody))
}
}

Expand Down Expand Up @@ -61,17 +49,17 @@ interface AnalyticsResponse {
}
}

class SuccessResponse() : AnalyticsResponse {
class SuccessResponse : AnalyticsResponse {
override val status: HttpStatus = HttpStatus.SUCCESS
}

class BadRequestResponse(response: JSONObject) : AnalyticsResponse {
override val status: HttpStatus = HttpStatus.BAD_REQUEST
val error: String = response.getStringWithDefault("error", "")
var eventsWithInvalidFields: Set<Int> = setOf()
var eventsWithMissingFields: Set<Int> = setOf()
var silencedEvents: Set<Int> = setOf()
var silencedDevices: Set<String> = setOf()
private var eventsWithInvalidFields: Set<Int> = setOf()
private var eventsWithMissingFields: Set<Int> = setOf()
private var silencedEvents: Set<Int> = setOf()
private var silencedDevices: Set<String> = setOf()

init {
if (response.has("events_with_invalid_fields")) {
Expand Down Expand Up @@ -99,10 +87,12 @@ class BadRequestResponse(response: JSONObject) : AnalyticsResponse {
}

fun isEventSilenced(event: BaseEvent): Boolean {
event.deviceId?.let {
return silencedDevices.contains(it)
} ?: let {
return false
val eventDeviceId = event.deviceId

return if (eventDeviceId != null) {
silencedDevices.contains(eventDeviceId)
} else {
false
}
}

Expand Down Expand Up @@ -150,7 +140,7 @@ class TooManyRequestsResponse(response: JSONObject) : AnalyticsResponse {
}
}

class TimeoutResponse() : AnalyticsResponse {
class TimeoutResponse : AnalyticsResponse {
override val status: HttpStatus = HttpStatus.TIMEOUT
}

Expand All @@ -162,33 +152,32 @@ class FailedResponse(response: JSONObject) : AnalyticsResponse {
interface ResponseHandler {
fun handle(response: AnalyticsResponse, events: Any, eventsString: String) {
when (response) {
is SuccessResponse -> {
is SuccessResponse ->
handleSuccessResponse(response, events, eventsString)
}

is BadRequestResponse -> {
is BadRequestResponse ->
handleBadRequestResponse(response, events, eventsString)
}

is PayloadTooLargeResponse -> {
is PayloadTooLargeResponse ->
handlePayloadTooLargeResponse(response, events, eventsString)
}

is TooManyRequestsResponse -> {
is TooManyRequestsResponse ->
handleTooManyRequestsResponse(response, events, eventsString)
}

is TimeoutResponse -> {
is TimeoutResponse ->
handleTimeoutResponse(response, events, eventsString)
}

else -> {
else ->
handleFailedResponse(response as FailedResponse, events, eventsString)
}
}
}

fun handleSuccessResponse(successResponse: SuccessResponse, events: Any, eventsString: String)
fun handleSuccessResponse(
successResponse: SuccessResponse,
events: Any,
eventsString: String
)

fun handleBadRequestResponse(
badRequestResponse: BadRequestResponse,
events: Any,
Expand All @@ -207,6 +196,15 @@ interface ResponseHandler {
eventsString: String
)

fun handleTimeoutResponse(timeoutResponse: TimeoutResponse, events: Any, eventsString: String)
fun handleFailedResponse(failedResponse: FailedResponse, events: Any, eventsString: String)
fun handleTimeoutResponse(
timeoutResponse: TimeoutResponse,
events: Any,
eventsString: String
)

fun handleFailedResponse(
failedResponse: FailedResponse,
events: Any,
eventsString: String
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ internal class HttpClient(
connection.outputStream.close()

val responseCode: Int = connection.responseCode
var responseBody: String?
val responseBody: String?
var inputStream: InputStream? = null
try {
inputStream = getInputStream(connection)
Expand All @@ -52,7 +52,7 @@ internal class HttpClient(
}
}

internal fun getApiKey(): String {
private fun getApiKey(): String {
return configuration.apiKey
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import okhttp3.MediaType.Companion.toMediaTypeOrNull
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody
import okhttp3.RequestBody.Companion.toRequestBody
import java.io.IOException

class CustomOkHttpClient : HttpClientInterface {
Expand All @@ -26,14 +27,17 @@ class CustomOkHttpClient : HttpClientInterface {
diagnostics = diagnostics,
minIdLength = configuration.minIdLength
)
val formBody: RequestBody = RequestBody.create(mediaType, ampRequest.getBodyStr())
val request: Request =
Request.Builder().url(configuration.getApiHost()).post(formBody).build()
val formBody: RequestBody = ampRequest.getBodyStr()
.toRequestBody(mediaType)
val request: Request = Request.Builder()
.url(configuration.getApiHost())
.post(formBody)
.build()

try {
val response = okHttpClient.newCall(request).execute()
return AnalyticsResponse.create(response.code, response.body?.string())
// Do something with the response.
return okHttpClient.newCall(request).execute().use { response ->
AnalyticsResponse.create(response.code, response.body?.string())
}
} catch (e: IOException) {
e.printStackTrace()
return AnalyticsResponse.create(500, null)
Expand Down
Loading