1
1
package com.amplitude.core.platform
2
2
3
+ import okhttp3.internal.wait
3
4
import org.junit.jupiter.api.Assertions.assertEquals
4
5
import org.junit.jupiter.api.Test
5
6
import org.junit.jupiter.api.Timeout
6
7
import java.util.concurrent.CopyOnWriteArrayList
8
+ import java.util.concurrent.CountDownLatch
7
9
import java.util.concurrent.TimeUnit
8
10
import java.util.concurrent.atomic.AtomicInteger
9
11
import kotlin.concurrent.thread
@@ -26,7 +28,7 @@ class MediatorTest {
26
28
27
29
@Test
28
30
@Timeout(3 , unit = TimeUnit .SECONDS )
29
- fun `two threads that call flush twice on two destination plugins` () {
31
+ fun `call flush twice on two destination plugins` () {
30
32
val fakeDestinationPlugins = List (2 ) { FakeDestinationPlugin () }
31
33
fakeDestinationPlugins.forEach {
32
34
mediator.add(it)
@@ -51,4 +53,43 @@ class MediatorTest {
51
53
assertEquals(2 , it.amountOfWorkDone.get())
52
54
}
53
55
}
56
+
57
+ @Test
58
+ @Timeout(5 , unit = TimeUnit .SECONDS )
59
+ fun `flush, add a new plugin, and flush again on two destination plugins` () {
60
+ val fakeDestinationPlugin1 = FakeDestinationPlugin ()
61
+ val fakeDestinationPlugin2 = FakeDestinationPlugin ()
62
+
63
+ mediator.add(fakeDestinationPlugin1)
64
+
65
+ val work = {
66
+ mediator.applyClosure {
67
+ (it as EventPlugin ).flush()
68
+ }
69
+ }
70
+
71
+ // flush and add
72
+ val latch = CountDownLatch (2 )
73
+ val t1 = thread {
74
+ work()
75
+ latch.countDown()
76
+ }
77
+ val t2 = thread {
78
+ // add plugin 2, work() should catch up with the newly added plugin
79
+ mediator.add(fakeDestinationPlugin2)
80
+ latch.countDown()
81
+ }
82
+ latch.await()
83
+
84
+ // flush again
85
+ val t3 = thread {
86
+ work()
87
+ }
88
+ t1.join()
89
+ t2.join()
90
+ t3.join()
91
+
92
+ assertEquals(2 , fakeDestinationPlugin1.amountOfWorkDone.get())
93
+ assertEquals(2 , fakeDestinationPlugin2.amountOfWorkDone.get())
94
+ }
54
95
}
0 commit comments