Skip to content

refactor: remove-previous-retry-logic #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.amplitude.android.storage

import android.content.Context
import android.content.SharedPreferences
import androidx.core.content.edit
import com.amplitude.android.utilities.AndroidKVS
import com.amplitude.common.Logger
import com.amplitude.core.Amplitude
Expand Down Expand Up @@ -62,11 +63,15 @@ class AndroidStorageV2(
key: Storage.Constants,
value: String,
) {
sharedPreferences.edit().putString(key.rawVal, value).apply()
sharedPreferences.edit {
putString(key.rawVal, value)
}
}

override suspend fun remove(key: Storage.Constants) {
sharedPreferences.edit().remove(key.rawVal).apply()
sharedPreferences.edit {
remove(key.rawVal)
}
}

override suspend fun rollover() {
Expand All @@ -85,8 +90,8 @@ class AndroidStorageV2(
eventsFile.release(filePath)
}

override suspend fun getEventsString(content: Any): String {
return eventsFile.getEventString(content as String)
override suspend fun getEventsString(filePath: Any): String {
return eventsFile.getEventString(filePath as String)
}

override fun getResponseHandler(
Expand Down Expand Up @@ -136,7 +141,8 @@ class AndroidEventsStorageProviderV2 : StorageProvider {
): Storage {
val configuration = amplitude.configuration as com.amplitude.android.Configuration
val sharedPreferencesName = "amplitude-events-${configuration.instanceName}"
val sharedPreferences = configuration.context.getSharedPreferences(sharedPreferencesName, Context.MODE_PRIVATE)
val sharedPreferences =
configuration.context.getSharedPreferences(sharedPreferencesName, Context.MODE_PRIVATE)
return AndroidStorageV2(
configuration.instanceName,
configuration.loggerProvider.getLogger(amplitude),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class AndroidStorage(
storageV2.releaseFile(filePath)
}

override suspend fun getEventsString(content: Any): String {
return storageV2.getEventsString(content)
override suspend fun getEventsString(filePath: Any): String {
return storageV2.getEventsString(filePath)
}

override fun getResponseHandler(
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/com/amplitude/core/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ open class Amplitude internal constructor(
options: EventOptions? = null,
callback: EventCallBack? = null,
): Amplitude {
options ?. let {
options?.let {
event.mergeEventOptions(it)
}
callback ?. let {
callback?.let {
event.callback = it
}
process(event)
Expand Down
36 changes: 11 additions & 25 deletions core/src/main/java/com/amplitude/core/platform/EventPipeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.amplitude.core.Amplitude
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.utilities.http.HttpClient
import com.amplitude.core.utilities.http.HttpClientInterface
import com.amplitude.core.utilities.http.ResponseHandler
import com.amplitude.core.utilities.logWithStackTrace
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
Expand All @@ -20,35 +19,30 @@ class EventPipeline(
private val amplitude: Amplitude,
) {
private val writeChannel: Channel<WriteQueueMessage>

private val uploadChannel: Channel<String>

private val eventCount: AtomicInteger = AtomicInteger(0)

private val httpClient: HttpClientInterface = amplitude.configuration.httpClient
?: HttpClient(amplitude.configuration)

private val storage get() = amplitude.storage

private val scope get() = amplitude.amplitudeScope

var flushInterval = amplitude.configuration.flushIntervalMillis.toLong()
var flushQueueSize = amplitude.configuration.flushQueueSize

private var running: Boolean

private var scheduled: Boolean

var flushSizeDivider: AtomicInteger = AtomicInteger(1)

var exceededRetries = false
private val responseHandler by lazy {
storage.getResponseHandler(
this@EventPipeline,
amplitude.configuration,
scope,
amplitude.retryDispatcher,
)
}

companion object {
internal const val UPLOAD_SIG = "#!upload"
}

private val responseHandler: ResponseHandler

init {
running = false
scheduled = false
Expand All @@ -57,14 +51,6 @@ class EventPipeline(
uploadChannel = Channel(UNLIMITED)

registerShutdownHook()

responseHandler =
storage.getResponseHandler(
this@EventPipeline,
amplitude.configuration,
scope,
amplitude.retryDispatcher,
)
}

fun put(event: BaseEvent) {
Expand Down Expand Up @@ -153,15 +139,15 @@ class EventPipeline(
}

private fun getFlushCount(): Int {
val count = flushQueueSize / flushSizeDivider.get()
val count = amplitude.configuration.flushQueueSize / flushSizeDivider.get()
return count.takeUnless { it == 0 } ?: 1
}

private fun schedule() =
scope.launch(amplitude.storageIODispatcher) {
if (isActive && running && !scheduled && !exceededRetries) {
if (isActive && running && !scheduled) {
scheduled = true
delay(flushInterval)
delay(amplitude.configuration.flushIntervalMillis.toLong())
flush()
scheduled = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import kotlinx.coroutines.sync.withLock
import org.json.JSONArray
import org.json.JSONException
import org.json.JSONObject
import java.io.BufferedReader
import java.io.File
import java.io.FileNotFoundException
import java.io.FileOutputStream
Expand All @@ -28,8 +27,8 @@ class EventsFileManager(
) {
private val fileIndexKey = "amplitude.events.file.index.$storageKey"
private val storageVersionKey = "amplitude.events.file.version.$storageKey"
val filePathSet: MutableSet<String> = Collections.newSetFromMap(ConcurrentHashMap())
val curFile: MutableMap<String, File> = ConcurrentHashMap<String, File>()
private val filePathSet: MutableSet<String> = Collections.newSetFromMap(ConcurrentHashMap())
private val curFile: MutableMap<String, File> = ConcurrentHashMap<String, File>()

companion object {
const val MAX_FILE_SIZE = 975_000 // 975KB
Expand All @@ -38,7 +37,7 @@ class EventsFileManager(
val readMutexMap = ConcurrentHashMap<String, Mutex>()
}

val writeMutex = writeMutexMap.getOrPut(storageKey) { Mutex() }
private val writeMutex = writeMutexMap.getOrPut(storageKey) { Mutex() }
private val readMutex = readMutexMap.getOrPut(storageKey) { Mutex() }

init {
Expand Down Expand Up @@ -100,20 +99,24 @@ class EventsFileManager(
*/
fun read(): List<String> {
// we need to filter out .temp file, since it's operating on the writing thread
val fileList =
directory.listFiles { _, name ->
name.contains(storageKey) && !name.endsWith(".tmp") && !name.endsWith(".properties")
} ?: emptyArray()
return fileList.sortedBy { it ->
getSortKeyForFile(it)
val fileList = directory.listFiles { _, name ->
name.contains(storageKey) && !name.endsWith(".tmp") && !name.endsWith(".properties")
} ?: emptyArray()

return fileList.sortedBy { file ->
val name = file.nameWithoutExtension.replace("$storageKey-", "")

val dashIndex = name.indexOf('-')
if (dashIndex >= 0) {
name.substring(0, dashIndex).padStart(10, '0') + name.substring(dashIndex)
} else {
name
}
}.map {
it.absolutePath
}
}

/**
* deletes the file at filePath
*/
fun remove(filePath: String): Boolean {
filePathSet.remove(filePath)
return File(filePath).delete()
Expand Down Expand Up @@ -160,8 +163,8 @@ class EventsFileManager(
return@withLock ""
}
filePathSet.add(filePath)
File(filePath).bufferedReader().use<BufferedReader, String> {
val content = it.readText()
File(filePath).bufferedReader().use { reader ->
val content = reader.readText()
val isCurrentVersion = content.endsWith(DELIMITER)
if (isCurrentVersion) {
// handle current version
Expand Down Expand Up @@ -247,15 +250,6 @@ class EventsFileManager(
return curFile[storageKey]!!
}

private fun getSortKeyForFile(file: File): String {
val name = file.nameWithoutExtension.replace("$storageKey-", "")
val dashIndex = name.indexOf('-')
if (dashIndex >= 0) {
return name.substring(0, dashIndex).padStart(10, '0') + name.substring(dashIndex)
}
return name
}

// write to underlying file
private fun writeToFile(
content: ByteArray,
Expand Down
Loading