From a39e15ac9eef53cf1c32452fbf8c2cf4fb258381 Mon Sep 17 00:00:00 2001 From: "paul.intal" Date: Thu, 13 Mar 2025 11:01:32 -0700 Subject: [PATCH 1/5] change visibility of plugins to private --- .../src/main/java/com/amplitude/core/platform/Mediator.kt | 7 ++++++- core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt | 8 ++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/amplitude/core/platform/Mediator.kt b/core/src/main/java/com/amplitude/core/platform/Mediator.kt index 57a6bcf4..9c79eb3d 100644 --- a/core/src/main/java/com/amplitude/core/platform/Mediator.kt +++ b/core/src/main/java/com/amplitude/core/platform/Mediator.kt @@ -5,7 +5,7 @@ import com.amplitude.core.events.GroupIdentifyEvent import com.amplitude.core.events.IdentifyEvent import com.amplitude.core.events.RevenueEvent -internal class Mediator(internal val plugins: MutableList) { +internal class Mediator(private val plugins: MutableList) { fun add(plugin: Plugin) = synchronized(plugins) { plugins.add(plugin) } @@ -58,4 +58,9 @@ internal class Mediator(internal val plugins: MutableList) { closure(it) } } + + /** + * Only visible for testing + */ + internal fun size() = plugins.size } diff --git a/core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt b/core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt index 03f795a5..7f93799d 100644 --- a/core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt +++ b/core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt @@ -316,10 +316,10 @@ internal class AmplitudeTest { override lateinit var amplitude: Amplitude } amplitude.add(middleware) - amplitude.timeline.plugins[Plugin.Type.Enrichment]?.plugins?.let { + amplitude.timeline.plugins[Plugin.Type.Enrichment]?.size()?.let { assertEquals( 2, - it.size + it ) } ?: fail() } @@ -332,10 +332,10 @@ internal class AmplitudeTest { } amplitude.add(middleware) amplitude.remove(middleware) - amplitude.timeline.plugins[Plugin.Type.Enrichment]?.plugins?.let { + amplitude.timeline.plugins[Plugin.Type.Enrichment]?.size()?.let { assertEquals( 1, // SegmentLog is the other added at startup - it.size + it ) } ?: fail() } From a0294ebdb52f6a3ed963514ff283d81a7d00058c Mon Sep 17 00:00:00 2001 From: "paul.intal" Date: Thu, 13 Mar 2025 11:05:09 -0700 Subject: [PATCH 2/5] use CopyOnWriteArrayList to remove the synchronized blocks --- .../com/amplitude/core/platform/Mediator.kt | 13 ++-- .../com/amplitude/core/platform/Timeline.kt | 9 ++- .../amplitude/core/platform/MediatorTest.kt | 74 +++++++++++++++++++ 3 files changed, 85 insertions(+), 11 deletions(-) create mode 100644 core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt diff --git a/core/src/main/java/com/amplitude/core/platform/Mediator.kt b/core/src/main/java/com/amplitude/core/platform/Mediator.kt index 9c79eb3d..129f7e4f 100644 --- a/core/src/main/java/com/amplitude/core/platform/Mediator.kt +++ b/core/src/main/java/com/amplitude/core/platform/Mediator.kt @@ -4,17 +4,16 @@ import com.amplitude.core.events.BaseEvent import com.amplitude.core.events.GroupIdentifyEvent import com.amplitude.core.events.IdentifyEvent import com.amplitude.core.events.RevenueEvent +import java.util.concurrent.CopyOnWriteArrayList -internal class Mediator(private val plugins: MutableList) { - fun add(plugin: Plugin) = synchronized(plugins) { +internal class Mediator(private val plugins: CopyOnWriteArrayList) { + fun add(plugin: Plugin) { plugins.add(plugin) } - fun remove(plugin: Plugin) = synchronized(plugins) { - plugins.removeAll { it === plugin } - } + fun remove(plugin: Plugin) = plugins.removeAll { it === plugin } - fun execute(event: BaseEvent): BaseEvent? = synchronized(plugins) { + fun execute(event: BaseEvent): BaseEvent? { var result: BaseEvent? = event plugins.forEach { plugin -> @@ -53,7 +52,7 @@ internal class Mediator(private val plugins: MutableList) { return result } - fun applyClosure(closure: (Plugin) -> Unit) = synchronized(plugins) { + fun applyClosure(closure: (Plugin) -> Unit) { plugins.forEach { closure(it) } diff --git a/core/src/main/java/com/amplitude/core/platform/Timeline.kt b/core/src/main/java/com/amplitude/core/platform/Timeline.kt index 2afcce78..e2940132 100644 --- a/core/src/main/java/com/amplitude/core/platform/Timeline.kt +++ b/core/src/main/java/com/amplitude/core/platform/Timeline.kt @@ -2,13 +2,14 @@ package com.amplitude.core.platform import com.amplitude.core.Amplitude import com.amplitude.core.events.BaseEvent +import java.util.concurrent.CopyOnWriteArrayList open class Timeline { internal val plugins: Map = mapOf( - Plugin.Type.Before to Mediator(mutableListOf()), - Plugin.Type.Enrichment to Mediator(mutableListOf()), - Plugin.Type.Destination to Mediator(mutableListOf()), - Plugin.Type.Utility to Mediator(mutableListOf()) + Plugin.Type.Before to Mediator(CopyOnWriteArrayList()), + Plugin.Type.Enrichment to Mediator(CopyOnWriteArrayList()), + Plugin.Type.Destination to Mediator(CopyOnWriteArrayList()), + Plugin.Type.Utility to Mediator(CopyOnWriteArrayList()) ) lateinit var amplitude: Amplitude diff --git a/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt b/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt new file mode 100644 index 00000000..5d9dbcd7 --- /dev/null +++ b/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt @@ -0,0 +1,74 @@ +package com.amplitude.core.platform + +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Timeout +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import kotlin.concurrent.thread + +class MediatorTest { + private val mediator = Mediator(CopyOnWriteArrayList()) + + /** + * Fake [DestinationPlugin] that does work on [flush] for 1 second + */ + private class FakeDestinationPlugin : DestinationPlugin() { + var workDone = false + + override fun flush() { + super.flush() + println("$this start work ${System.currentTimeMillis()} ${Thread.currentThread().name}") + Thread.sleep(1_000) + println("$this end work ${System.currentTimeMillis()} ${Thread.currentThread().name}") + workDone = true + } + } + + @Test + @Timeout(2, unit = TimeUnit.SECONDS) + fun `multiple threads that call flush on destination plugin`() { + val fakeDestinationPlugins = List(10) { FakeDestinationPlugin() } + fakeDestinationPlugins.forEach { + mediator.add(it) + } + + // simulate 10 threads executing flush on 10 different [DestinationPlugin]s + val executor = Executors.newFixedThreadPool(10) + fakeDestinationPlugins.forEach { + executor.submit { + it.flush() + } + } + executor.shutdown() + executor.awaitTermination(2, TimeUnit.SECONDS) + + assertTrue { + fakeDestinationPlugins.all { it.workDone } + } + } + + @Test + @Timeout(2, unit = TimeUnit.SECONDS) + fun `two threads that call flush on destination plugin`() { + val fakeDestinationPlugins = List(2) { FakeDestinationPlugin() } + fakeDestinationPlugins.forEach { + mediator.add(it) + } + + // simulate 2 threads executing flush on 2 different DestinationPlugins + val t1 = thread { + fakeDestinationPlugins[0].flush() + } + val t2 = thread { + fakeDestinationPlugins[1].flush() + } + t1.join() + t2.join() + + assertTrue { + fakeDestinationPlugins.all { it.workDone } + } + } +} From 7abecac88b067dab4ac9de604c6af47e975e4726 Mon Sep 17 00:00:00 2001 From: "paul.intal" Date: Thu, 13 Mar 2025 11:48:47 -0700 Subject: [PATCH 3/5] use a default list constructor on mediator --- .../main/java/com/amplitude/core/platform/Mediator.kt | 4 +++- .../main/java/com/amplitude/core/platform/Timeline.kt | 9 ++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/com/amplitude/core/platform/Mediator.kt b/core/src/main/java/com/amplitude/core/platform/Mediator.kt index 129f7e4f..5fae5d8a 100644 --- a/core/src/main/java/com/amplitude/core/platform/Mediator.kt +++ b/core/src/main/java/com/amplitude/core/platform/Mediator.kt @@ -6,7 +6,9 @@ import com.amplitude.core.events.IdentifyEvent import com.amplitude.core.events.RevenueEvent import java.util.concurrent.CopyOnWriteArrayList -internal class Mediator(private val plugins: CopyOnWriteArrayList) { +internal class Mediator( + private val plugins: CopyOnWriteArrayList = CopyOnWriteArrayList() +) { fun add(plugin: Plugin) { plugins.add(plugin) } diff --git a/core/src/main/java/com/amplitude/core/platform/Timeline.kt b/core/src/main/java/com/amplitude/core/platform/Timeline.kt index e2940132..b47b19d5 100644 --- a/core/src/main/java/com/amplitude/core/platform/Timeline.kt +++ b/core/src/main/java/com/amplitude/core/platform/Timeline.kt @@ -2,14 +2,13 @@ package com.amplitude.core.platform import com.amplitude.core.Amplitude import com.amplitude.core.events.BaseEvent -import java.util.concurrent.CopyOnWriteArrayList open class Timeline { internal val plugins: Map = mapOf( - Plugin.Type.Before to Mediator(CopyOnWriteArrayList()), - Plugin.Type.Enrichment to Mediator(CopyOnWriteArrayList()), - Plugin.Type.Destination to Mediator(CopyOnWriteArrayList()), - Plugin.Type.Utility to Mediator(CopyOnWriteArrayList()) + Plugin.Type.Before to Mediator(), + Plugin.Type.Enrichment to Mediator(), + Plugin.Type.Destination to Mediator(), + Plugin.Type.Utility to Mediator() ) lateinit var amplitude: Amplitude From 44b4ff7734e2a39a2c7e7eeec3c5ce41e8ae31e6 Mon Sep 17 00:00:00 2001 From: "paul.intal" Date: Thu, 13 Mar 2025 12:36:18 -0700 Subject: [PATCH 4/5] update to test to actually use mediator --- .../amplitude/core/platform/MediatorTest.kt | 50 ++++++------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt b/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt index 5d9dbcd7..5fe14eda 100644 --- a/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt +++ b/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt @@ -1,11 +1,11 @@ package com.amplitude.core.platform -import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger import kotlin.concurrent.thread class MediatorTest { @@ -15,60 +15,40 @@ class MediatorTest { * Fake [DestinationPlugin] that does work on [flush] for 1 second */ private class FakeDestinationPlugin : DestinationPlugin() { - var workDone = false + var amountOfWorkDone = AtomicInteger() override fun flush() { super.flush() - println("$this start work ${System.currentTimeMillis()} ${Thread.currentThread().name}") Thread.sleep(1_000) - println("$this end work ${System.currentTimeMillis()} ${Thread.currentThread().name}") - workDone = true + amountOfWorkDone.incrementAndGet() } } @Test - @Timeout(2, unit = TimeUnit.SECONDS) - fun `multiple threads that call flush on destination plugin`() { - val fakeDestinationPlugins = List(10) { FakeDestinationPlugin() } - fakeDestinationPlugins.forEach { - mediator.add(it) - } - - // simulate 10 threads executing flush on 10 different [DestinationPlugin]s - val executor = Executors.newFixedThreadPool(10) - fakeDestinationPlugins.forEach { - executor.submit { - it.flush() - } - } - executor.shutdown() - executor.awaitTermination(2, TimeUnit.SECONDS) - - assertTrue { - fakeDestinationPlugins.all { it.workDone } - } - } - - @Test - @Timeout(2, unit = TimeUnit.SECONDS) - fun `two threads that call flush on destination plugin`() { + @Timeout(3, unit = TimeUnit.SECONDS) + fun `two threads that call flush twice on two destination plugins`() { val fakeDestinationPlugins = List(2) { FakeDestinationPlugin() } fakeDestinationPlugins.forEach { mediator.add(it) } // simulate 2 threads executing flush on 2 different DestinationPlugins + val work = { + mediator.applyClosure { + (it as EventPlugin).flush() + } + } val t1 = thread { - fakeDestinationPlugins[0].flush() + work() } val t2 = thread { - fakeDestinationPlugins[1].flush() + work() } t1.join() t2.join() - assertTrue { - fakeDestinationPlugins.all { it.workDone } + fakeDestinationPlugins.forEach { + assertEquals(2, it.amountOfWorkDone.get()) } } } From 04aeba8f6c826f8ca730cf2e6379c5bf9025dcd4 Mon Sep 17 00:00:00 2001 From: "paul.intal" Date: Fri, 14 Mar 2025 13:13:42 -0700 Subject: [PATCH 5/5] add a new test with add plugin in between --- .../amplitude/core/platform/MediatorTest.kt | 42 ++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt b/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt index 5fe14eda..0993939f 100644 --- a/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt +++ b/core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt @@ -4,6 +4,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import kotlin.concurrent.thread @@ -26,7 +27,7 @@ class MediatorTest { @Test @Timeout(3, unit = TimeUnit.SECONDS) - fun `two threads that call flush twice on two destination plugins`() { + fun `call flush twice on two destination plugins`() { val fakeDestinationPlugins = List(2) { FakeDestinationPlugin() } fakeDestinationPlugins.forEach { mediator.add(it) @@ -51,4 +52,43 @@ class MediatorTest { assertEquals(2, it.amountOfWorkDone.get()) } } + + @Test + @Timeout(5, unit = TimeUnit.SECONDS) + fun `flush, add a new plugin, and flush again on two destination plugins`() { + val fakeDestinationPlugin1 = FakeDestinationPlugin() + val fakeDestinationPlugin2 = FakeDestinationPlugin() + + mediator.add(fakeDestinationPlugin1) + + val work = { + mediator.applyClosure { + (it as EventPlugin).flush() + } + } + + // flush and add + val latch = CountDownLatch(2) + val t1 = thread { + work() + latch.countDown() + } + val t2 = thread { + // add plugin 2, work() should catch up with the newly added plugin + mediator.add(fakeDestinationPlugin2) + latch.countDown() + } + latch.await() + + // flush again + val t3 = thread { + work() + } + t1.join() + t2.join() + t3.join() + + assertEquals(2, fakeDestinationPlugin1.amountOfWorkDone.get()) + assertEquals(2, fakeDestinationPlugin2.amountOfWorkDone.get()) + } }