Skip to content

Commit d71d59b

Browse files
authored
fix: remove synchronized on mediator (#254)
* change visibility of plugins to private * use CopyOnWriteArrayList to remove the synchronized blocks * use a default list constructor on mediator * update to test to actually use mediator * add a new test with add plugin in between
1 parent b6a035b commit d71d59b

File tree

4 files changed

+115
-15
lines changed

4 files changed

+115
-15
lines changed

core/src/main/java/com/amplitude/core/platform/Mediator.kt

+13-7
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@ import com.amplitude.core.events.BaseEvent
44
import com.amplitude.core.events.GroupIdentifyEvent
55
import com.amplitude.core.events.IdentifyEvent
66
import com.amplitude.core.events.RevenueEvent
7+
import java.util.concurrent.CopyOnWriteArrayList
78

8-
internal class Mediator(internal val plugins: MutableList<Plugin>) {
9-
fun add(plugin: Plugin) = synchronized(plugins) {
9+
internal class Mediator(
10+
private val plugins: CopyOnWriteArrayList<Plugin> = CopyOnWriteArrayList()
11+
) {
12+
fun add(plugin: Plugin) {
1013
plugins.add(plugin)
1114
}
1215

13-
fun remove(plugin: Plugin) = synchronized(plugins) {
14-
plugins.removeAll { it === plugin }
15-
}
16+
fun remove(plugin: Plugin) = plugins.removeAll { it === plugin }
1617

17-
fun execute(event: BaseEvent): BaseEvent? = synchronized(plugins) {
18+
fun execute(event: BaseEvent): BaseEvent? {
1819
var result: BaseEvent? = event
1920

2021
plugins.forEach { plugin ->
@@ -53,9 +54,14 @@ internal class Mediator(internal val plugins: MutableList<Plugin>) {
5354
return result
5455
}
5556

56-
fun applyClosure(closure: (Plugin) -> Unit) = synchronized(plugins) {
57+
fun applyClosure(closure: (Plugin) -> Unit) {
5758
plugins.forEach {
5859
closure(it)
5960
}
6061
}
62+
63+
/**
64+
* Only visible for testing
65+
*/
66+
internal fun size() = plugins.size
6167
}

core/src/main/java/com/amplitude/core/platform/Timeline.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import com.amplitude.core.events.BaseEvent
55

66
open class Timeline {
77
internal val plugins: Map<Plugin.Type, Mediator> = mapOf(
8-
Plugin.Type.Before to Mediator(mutableListOf()),
9-
Plugin.Type.Enrichment to Mediator(mutableListOf()),
10-
Plugin.Type.Destination to Mediator(mutableListOf()),
11-
Plugin.Type.Utility to Mediator(mutableListOf())
8+
Plugin.Type.Before to Mediator(),
9+
Plugin.Type.Enrichment to Mediator(),
10+
Plugin.Type.Destination to Mediator(),
11+
Plugin.Type.Utility to Mediator()
1212
)
1313
lateinit var amplitude: Amplitude
1414

core/src/test/kotlin/com/amplitude/core/AmplitudeTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -316,10 +316,10 @@ internal class AmplitudeTest {
316316
override lateinit var amplitude: Amplitude
317317
}
318318
amplitude.add(middleware)
319-
amplitude.timeline.plugins[Plugin.Type.Enrichment]?.plugins?.let {
319+
amplitude.timeline.plugins[Plugin.Type.Enrichment]?.size()?.let {
320320
assertEquals(
321321
2,
322-
it.size
322+
it
323323
)
324324
} ?: fail()
325325
}
@@ -332,10 +332,10 @@ internal class AmplitudeTest {
332332
}
333333
amplitude.add(middleware)
334334
amplitude.remove(middleware)
335-
amplitude.timeline.plugins[Plugin.Type.Enrichment]?.plugins?.let {
335+
amplitude.timeline.plugins[Plugin.Type.Enrichment]?.size()?.let {
336336
assertEquals(
337337
1, // SegmentLog is the other added at startup
338-
it.size
338+
it
339339
)
340340
} ?: fail()
341341
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.amplitude.core.platform
2+
3+
import org.junit.jupiter.api.Assertions.assertEquals
4+
import org.junit.jupiter.api.Test
5+
import org.junit.jupiter.api.Timeout
6+
import java.util.concurrent.CopyOnWriteArrayList
7+
import java.util.concurrent.CountDownLatch
8+
import java.util.concurrent.TimeUnit
9+
import java.util.concurrent.atomic.AtomicInteger
10+
import kotlin.concurrent.thread
11+
12+
class MediatorTest {
13+
private val mediator = Mediator(CopyOnWriteArrayList())
14+
15+
/**
16+
* Fake [DestinationPlugin] that does work on [flush] for 1 second
17+
*/
18+
private class FakeDestinationPlugin : DestinationPlugin() {
19+
var amountOfWorkDone = AtomicInteger()
20+
21+
override fun flush() {
22+
super.flush()
23+
Thread.sleep(1_000)
24+
amountOfWorkDone.incrementAndGet()
25+
}
26+
}
27+
28+
@Test
29+
@Timeout(3, unit = TimeUnit.SECONDS)
30+
fun `call flush twice on two destination plugins`() {
31+
val fakeDestinationPlugins = List(2) { FakeDestinationPlugin() }
32+
fakeDestinationPlugins.forEach {
33+
mediator.add(it)
34+
}
35+
36+
// simulate 2 threads executing flush on 2 different DestinationPlugins
37+
val work = {
38+
mediator.applyClosure {
39+
(it as EventPlugin).flush()
40+
}
41+
}
42+
val t1 = thread {
43+
work()
44+
}
45+
val t2 = thread {
46+
work()
47+
}
48+
t1.join()
49+
t2.join()
50+
51+
fakeDestinationPlugins.forEach {
52+
assertEquals(2, it.amountOfWorkDone.get())
53+
}
54+
}
55+
56+
@Test
57+
@Timeout(5, unit = TimeUnit.SECONDS)
58+
fun `flush, add a new plugin, and flush again on two destination plugins`() {
59+
val fakeDestinationPlugin1 = FakeDestinationPlugin()
60+
val fakeDestinationPlugin2 = FakeDestinationPlugin()
61+
62+
mediator.add(fakeDestinationPlugin1)
63+
64+
val work = {
65+
mediator.applyClosure {
66+
(it as EventPlugin).flush()
67+
}
68+
}
69+
70+
// flush and add
71+
val latch = CountDownLatch(2)
72+
val t1 = thread {
73+
work()
74+
latch.countDown()
75+
}
76+
val t2 = thread {
77+
// add plugin 2, work() should catch up with the newly added plugin
78+
mediator.add(fakeDestinationPlugin2)
79+
latch.countDown()
80+
}
81+
latch.await()
82+
83+
// flush again
84+
val t3 = thread {
85+
work()
86+
}
87+
t1.join()
88+
t2.join()
89+
t3.join()
90+
91+
assertEquals(2, fakeDestinationPlugin1.amountOfWorkDone.get())
92+
assertEquals(2, fakeDestinationPlugin2.amountOfWorkDone.get())
93+
}
94+
}

0 commit comments

Comments
 (0)