Skip to content

Commit 31d9386

Browse files
committed
use CopyOnWriteArrayList to remove the synchronized blocks
1 parent 1232043 commit 31d9386

File tree

3 files changed

+88
-12
lines changed

3 files changed

+88
-12
lines changed

core/src/main/java/com/amplitude/core/platform/Mediator.kt

+7-8
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@ import com.amplitude.core.events.BaseEvent
44
import com.amplitude.core.events.GroupIdentifyEvent
55
import com.amplitude.core.events.IdentifyEvent
66
import com.amplitude.core.events.RevenueEvent
7+
import java.util.concurrent.CopyOnWriteArrayList
78

8-
internal class Mediator(private val plugins: MutableList<Plugin>) {
9-
fun add(plugin: Plugin) = synchronized(plugins) {
9+
internal class Mediator(private val plugins: CopyOnWriteArrayList<Plugin>) {
10+
fun add(plugin: Plugin) {
1011
plugins.add(plugin)
1112
}
1213

13-
fun remove(plugin: Plugin) = synchronized(plugins) {
14-
plugins.removeAll { it === plugin }
15-
}
14+
fun remove(plugin: Plugin) = plugins.removeAll { it === plugin }
1615

17-
fun execute(event: BaseEvent): BaseEvent? = synchronized(plugins) {
16+
fun execute(event: BaseEvent): BaseEvent? {
1817
var result: BaseEvent? = event
1918

2019
plugins.forEach { plugin ->
@@ -23,7 +22,7 @@ internal class Mediator(private val plugins: MutableList<Plugin>) {
2322
result = when (plugin) {
2423
is DestinationPlugin -> {
2524
try {
26-
plugin.process(eventToBeProcessed)
25+
plugin.process(result)
2726
} catch (e: Exception) {
2827
e.printStackTrace()
2928
null
@@ -46,7 +45,7 @@ internal class Mediator(private val plugins: MutableList<Plugin>) {
4645
return result
4746
}
4847

49-
fun applyClosure(closure: (Plugin) -> Unit) = synchronized(plugins) {
48+
fun applyClosure(closure: (Plugin) -> Unit) {
5049
plugins.forEach {
5150
closure(it)
5251
}

core/src/main/java/com/amplitude/core/platform/Timeline.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ package com.amplitude.core.platform
22

33
import com.amplitude.core.Amplitude
44
import com.amplitude.core.events.BaseEvent
5+
import java.util.concurrent.CopyOnWriteArrayList
56

67
open class Timeline {
78
internal val plugins: Map<Plugin.Type, Mediator> = mapOf(
8-
Plugin.Type.Before to Mediator(mutableListOf()),
9-
Plugin.Type.Enrichment to Mediator(mutableListOf()),
10-
Plugin.Type.Destination to Mediator(mutableListOf()),
11-
Plugin.Type.Utility to Mediator(mutableListOf())
9+
Plugin.Type.Before to Mediator(CopyOnWriteArrayList()),
10+
Plugin.Type.Enrichment to Mediator(CopyOnWriteArrayList()),
11+
Plugin.Type.Destination to Mediator(CopyOnWriteArrayList()),
12+
Plugin.Type.Utility to Mediator(CopyOnWriteArrayList())
1213
)
1314
lateinit var amplitude: Amplitude
1415

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package com.amplitude.core.platform
2+
3+
import com.amplitude.core.Amplitude
4+
import org.junit.jupiter.api.Assertions.assertTrue
5+
import org.junit.jupiter.api.Test
6+
import org.junit.jupiter.api.Timeout
7+
import java.util.concurrent.CopyOnWriteArrayList
8+
import java.util.concurrent.Executors
9+
import java.util.concurrent.TimeUnit
10+
import kotlin.concurrent.thread
11+
12+
class MediatorTest {
13+
private val mediator = Mediator(CopyOnWriteArrayList())
14+
15+
/**
16+
* Fake [DestinationPlugin] that does work on [flush] for 1 second
17+
*/
18+
private class FakeDestinationPlugin : DestinationPlugin() {
19+
var workDone = false
20+
21+
override fun flush() {
22+
super.flush()
23+
println("$this start work ${System.currentTimeMillis()} ${Thread.currentThread().name}")
24+
Thread.sleep(1_000)
25+
println("$this end work ${System.currentTimeMillis()} ${Thread.currentThread().name}")
26+
workDone = true
27+
}
28+
}
29+
30+
@Test
31+
@Timeout(2, unit = TimeUnit.SECONDS)
32+
fun `multiple threads that call flush on destination plugin`() {
33+
val fakeDestinationPlugins = List(10) { FakeDestinationPlugin() }
34+
fakeDestinationPlugins.forEach {
35+
mediator.add(it)
36+
}
37+
38+
// simulate 10 threads executing flush on 10 different [DestinationPlugin]s
39+
val executor = Executors.newFixedThreadPool(10)
40+
fakeDestinationPlugins.forEach {
41+
executor.submit {
42+
it.flush()
43+
}
44+
}
45+
executor.shutdown()
46+
executor.awaitTermination(2, TimeUnit.SECONDS)
47+
48+
assertTrue {
49+
fakeDestinationPlugins.all { it.workDone }
50+
}
51+
}
52+
53+
@Test
54+
@Timeout(2, unit = TimeUnit.SECONDS)
55+
fun `two threads that call flush on destination plugin`() {
56+
val fakeDestinationPlugins = List(2) { FakeDestinationPlugin() }
57+
fakeDestinationPlugins.forEach {
58+
mediator.add(it)
59+
}
60+
61+
// simulate 2 threads executing flush on 2 different DestinationPlugins
62+
val t1 = thread {
63+
fakeDestinationPlugins[0].flush()
64+
}
65+
val t2 = thread {
66+
fakeDestinationPlugins[1].flush()
67+
}
68+
t1.join()
69+
t2.join()
70+
71+
assertTrue {
72+
fakeDestinationPlugins.all { it.workDone }
73+
}
74+
}
75+
76+
}

0 commit comments

Comments
 (0)