diff --git a/core/src/main/java/com/amplitude/core/platform/Mediator.kt b/core/src/main/java/com/amplitude/core/platform/Mediator.kt index 57a6bcf4..5fae5d8a 100644 --- a/core/src/main/java/com/amplitude/core/platform/Mediator.kt +++ b/core/src/main/java/com/amplitude/core/platform/Mediator.kt @@ -4,17 +4,18 @@ import com.amplitude.core.events.BaseEvent import com.amplitude.core.events.GroupIdentifyEvent import com.amplitude.core.events.IdentifyEvent import com.amplitude.core.events.RevenueEvent +import java.util.concurrent.CopyOnWriteArrayList -internal class Mediator(internal val plugins: MutableList) { - fun add(plugin: Plugin) = synchronized(plugins) { +internal class Mediator( + private val plugins: CopyOnWriteArrayList = CopyOnWriteArrayList() +) { + fun add(plugin: Plugin) { plugins.add(plugin) } - fun remove(plugin: Plugin) = synchronized(plugins) { - plugins.removeAll { it === plugin } - } + fun remove(plugin: Plugin) = plugins.removeAll { it === plugin } - fun execute(event: BaseEvent): BaseEvent? = synchronized(plugins) { + fun execute(event: BaseEvent): BaseEvent? { var result: BaseEvent? = event plugins.forEach { plugin -> @@ -53,9 +54,14 @@ internal class Mediator(internal val plugins: MutableList) { return result } - fun applyClosure(closure: (Plugin) -> Unit) = synchronized(plugins) { + fun applyClosure(closure: (Plugin) -> Unit) { plugins.forEach { closure(it) } } + + /** + * Only visible for testing + */ + internal fun size() = plugins.size } diff --git a/core/src/main/java/com/amplitude/core/platform/Timeline.kt b/core/src/main/java/com/amplitude/core/platform/Timeline.kt index 2afcce78..b47b19d5 100644 --- a/core/src/main/java/com/amplitude/core/platform/Timeline.kt +++ b/core/src/main/java/com/amplitude/core/platform/Timeline.kt @@ -5,10 +5,10 @@ import com.amplitude.core.events.BaseEvent open class Timeline { internal val plugins: Map = mapOf( - Plugin.Type.Before to Mediator(mutableListOf()), - Plugin.Type.Enrichment to Mediator(mutableListOf()), - Plugin.Type.Destination to Mediator(mutableListOf()), - Plugin.Type.Utility to Mediator(mutableListOf()) + Plugin.Type.Before to Mediator(), + Plugin.Type.Enrichment to Mediator(), + Plugin.Type.Destination to Mediator(), + Plugin.Type.Utility to Mediator() ) lateinit var amplitude: Amplitude diff --git a/core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt b/core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt index 03f795a5..7f93799d 100644 --- a/core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt +++ b/core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt @@ -316,10 +316,10 @@ internal class AmplitudeTest { override lateinit var amplitude: Amplitude } amplitude.add(middleware) - amplitude.timeline.plugins[Plugin.Type.Enrichment]?.plugins?.let { + amplitude.timeline.plugins[Plugin.Type.Enrichment]?.size()?.let { assertEquals( 2, - it.size + it ) } ?: fail() } @@ -332,10 +332,10 @@ internal class AmplitudeTest { } amplitude.add(middleware) amplitude.remove(middleware) - amplitude.timeline.plugins[Plugin.Type.Enrichment]?.plugins?.let { + amplitude.timeline.plugins[Plugin.Type.Enrichment]?.size()?.let { assertEquals( 1, // SegmentLog is the other added at startup - it.size + it ) } ?: fail() } diff --git a/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt b/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt new file mode 100644 index 00000000..0993939f --- /dev/null +++ b/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt @@ -0,0 +1,94 @@ +package com.amplitude.core.platform + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Timeout +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread + +class MediatorTest { + private val mediator = Mediator(CopyOnWriteArrayList()) + + /** + * Fake [DestinationPlugin] that does work on [flush] for 1 second + */ + private class FakeDestinationPlugin : DestinationPlugin() { + var amountOfWorkDone = AtomicInteger() + + override fun flush() { + super.flush() + Thread.sleep(1_000) + amountOfWorkDone.incrementAndGet() + } + } + + @Test + @Timeout(3, unit = TimeUnit.SECONDS) + fun `call flush twice on two destination plugins`() { + val fakeDestinationPlugins = List(2) { FakeDestinationPlugin() } + fakeDestinationPlugins.forEach { + mediator.add(it) + } + + // simulate 2 threads executing flush on 2 different DestinationPlugins + val work = { + mediator.applyClosure { + (it as EventPlugin).flush() + } + } + val t1 = thread { + work() + } + val t2 = thread { + work() + } + t1.join() + t2.join() + + fakeDestinationPlugins.forEach { + assertEquals(2, it.amountOfWorkDone.get()) + } + } + + @Test + @Timeout(5, unit = TimeUnit.SECONDS) + fun `flush, add a new plugin, and flush again on two destination plugins`() { + val fakeDestinationPlugin1 = FakeDestinationPlugin() + val fakeDestinationPlugin2 = FakeDestinationPlugin() + + mediator.add(fakeDestinationPlugin1) + + val work = { + mediator.applyClosure { + (it as EventPlugin).flush() + } + } + + // flush and add + val latch = CountDownLatch(2) + val t1 = thread { + work() + latch.countDown() + } + val t2 = thread { + // add plugin 2, work() should catch up with the newly added plugin + mediator.add(fakeDestinationPlugin2) + latch.countDown() + } + latch.await() + + // flush again + val t3 = thread { + work() + } + t1.join() + t2.join() + t3.join() + + assertEquals(2, fakeDestinationPlugin1.amountOfWorkDone.get()) + assertEquals(2, fakeDestinationPlugin2.amountOfWorkDone.get()) + } +}