Skip to content

fix: Mediator flaky test #256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions core/src/test/kotlin/com/amplitude/core/platform/MediatorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread

private const val UNIT_OF_WORK_IN_MS = 100L

class MediatorTest {
private val mediator = Mediator(CopyOnWriteArrayList())

Expand All @@ -20,20 +22,19 @@ class MediatorTest {

override fun flush() {
super.flush()
Thread.sleep(1_000)
Thread.sleep(UNIT_OF_WORK_IN_MS)
amountOfWorkDone.incrementAndGet()
}
}

@Test
@Timeout(3, unit = TimeUnit.SECONDS)
fun `call flush twice on two destination plugins`() {
@Timeout(UNIT_OF_WORK_IN_MS * 3, unit = TimeUnit.MILLISECONDS)
fun `does work twice on two destination plugins`() {
val fakeDestinationPlugins = List(2) { FakeDestinationPlugin() }
fakeDestinationPlugins.forEach {
mediator.add(it)
}

// simulate 2 threads executing flush on 2 different DestinationPlugins
// simulate 2 threads executing work on 2 different DestinationPlugins
val work = {
mediator.applyClosure {
(it as EventPlugin).flush()
Expand All @@ -54,8 +55,8 @@ class MediatorTest {
}

@Test
@Timeout(5, unit = TimeUnit.SECONDS)
fun `flush, add a new plugin, and flush again on two destination plugins`() {
@Timeout(UNIT_OF_WORK_IN_MS * 6, unit = TimeUnit.MILLISECONDS)
fun `work, add a new plugin and work, and work again on two destination plugins`() {
val fakeDestinationPlugin1 = FakeDestinationPlugin()
val fakeDestinationPlugin2 = FakeDestinationPlugin()

Expand All @@ -67,28 +68,31 @@ class MediatorTest {
}
}

// flush and add
// work and add, work again
val latch = CountDownLatch(2)
val t1 = thread {
work()
work()
latch.countDown()
}
val t2 = thread {
// add plugin 2, work() should catch up with the newly added plugin
// give time for the first work() to start
Thread.sleep(UNIT_OF_WORK_IN_MS / 2)
// add plugin 2, 2nd work() should catch up with the newly added plugin
mediator.add(fakeDestinationPlugin2)
latch.countDown()
}
t1.join()
t2.join()
latch.await()

// flush again
// work again
val t3 = thread {
work()
}
t1.join()
t2.join()
t3.join()

assertEquals(2, fakeDestinationPlugin1.amountOfWorkDone.get())
assertEquals(3, fakeDestinationPlugin1.amountOfWorkDone.get())
assertEquals(2, fakeDestinationPlugin2.amountOfWorkDone.get())
}
}