1
1
package com.amplitude.core.platform
2
2
3
- import org.junit.jupiter.api.Assertions.assertTrue
3
+ import org.junit.jupiter.api.Assertions.assertEquals
4
4
import org.junit.jupiter.api.Test
5
5
import org.junit.jupiter.api.Timeout
6
6
import java.util.concurrent.CopyOnWriteArrayList
7
- import java.util.concurrent.Executors
8
7
import java.util.concurrent.TimeUnit
8
+ import java.util.concurrent.atomic.AtomicInteger
9
9
import kotlin.concurrent.thread
10
10
11
11
class MediatorTest {
@@ -15,60 +15,40 @@ class MediatorTest {
15
15
* Fake [DestinationPlugin] that does work on [flush] for 1 second
16
16
*/
17
17
private class FakeDestinationPlugin : DestinationPlugin () {
18
- var workDone = false
18
+ var amountOfWorkDone = AtomicInteger ()
19
19
20
20
override fun flush () {
21
21
super .flush()
22
- println (" $this start work ${System .currentTimeMillis()} ${Thread .currentThread().name} " )
23
22
Thread .sleep(1_000 )
24
- println (" $this end work ${System .currentTimeMillis()} ${Thread .currentThread().name} " )
25
- workDone = true
23
+ amountOfWorkDone.incrementAndGet()
26
24
}
27
25
}
28
26
29
27
@Test
30
- @Timeout(2 , unit = TimeUnit .SECONDS )
31
- fun `multiple threads that call flush on destination plugin` () {
32
- val fakeDestinationPlugins = List (10 ) { FakeDestinationPlugin () }
33
- fakeDestinationPlugins.forEach {
34
- mediator.add(it)
35
- }
36
-
37
- // simulate 10 threads executing flush on 10 different [DestinationPlugin]s
38
- val executor = Executors .newFixedThreadPool(10 )
39
- fakeDestinationPlugins.forEach {
40
- executor.submit {
41
- it.flush()
42
- }
43
- }
44
- executor.shutdown()
45
- executor.awaitTermination(2 , TimeUnit .SECONDS )
46
-
47
- assertTrue {
48
- fakeDestinationPlugins.all { it.workDone }
49
- }
50
- }
51
-
52
- @Test
53
- @Timeout(2 , unit = TimeUnit .SECONDS )
54
- fun `two threads that call flush on destination plugin` () {
28
+ @Timeout(3 , unit = TimeUnit .SECONDS )
29
+ fun `two threads that call flush twice on two destination plugins` () {
55
30
val fakeDestinationPlugins = List (2 ) { FakeDestinationPlugin () }
56
31
fakeDestinationPlugins.forEach {
57
32
mediator.add(it)
58
33
}
59
34
60
35
// simulate 2 threads executing flush on 2 different DestinationPlugins
36
+ val work = {
37
+ mediator.applyClosure {
38
+ (it as EventPlugin ).flush()
39
+ }
40
+ }
61
41
val t1 = thread {
62
- fakeDestinationPlugins[ 0 ].flush ()
42
+ work ()
63
43
}
64
44
val t2 = thread {
65
- fakeDestinationPlugins[ 1 ].flush ()
45
+ work ()
66
46
}
67
47
t1.join()
68
48
t2.join()
69
49
70
- assertTrue {
71
- fakeDestinationPlugins.all { it.workDone }
50
+ fakeDestinationPlugins.forEach {
51
+ assertEquals( 2 , it.amountOfWorkDone.get())
72
52
}
73
53
}
74
54
}
0 commit comments