@@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean
15
15
import java.util.concurrent.atomic.AtomicLong
16
16
17
17
private const val DEFAULT_SESSION_ID = - 1L
18
- private const val DEFAULT = 0L
18
+ private const val DEFAULT_EVENT_ID_OR_TIME = 0L
19
19
20
20
class Timeline (
21
21
private val initialSessionId : Long? = null ,
@@ -29,9 +29,9 @@ class Timeline(
29
29
return _sessionId .get()
30
30
}
31
31
32
- internal var lastEventId: Long = DEFAULT
32
+ internal var lastEventId: Long = DEFAULT_EVENT_ID_OR_TIME
33
33
private set
34
- internal var lastEventTime: Long = DEFAULT
34
+ internal var lastEventTime: Long = DEFAULT_EVENT_ID_OR_TIME
35
35
private set
36
36
37
37
internal fun start () {
@@ -43,8 +43,8 @@ class Timeline(
43
43
if (initialSessionId == null ) {
44
44
_sessionId .set(storage.readLong(PREVIOUS_SESSION_ID , DEFAULT_SESSION_ID ))
45
45
}
46
- lastEventId = storage.readLong(LAST_EVENT_ID , DEFAULT )
47
- lastEventTime = storage.readLong(LAST_EVENT_TIME , DEFAULT )
46
+ lastEventId = storage.readLong(LAST_EVENT_ID , DEFAULT_EVENT_ID_OR_TIME )
47
+ lastEventTime = storage.readLong(LAST_EVENT_TIME , DEFAULT_EVENT_ID_OR_TIME )
48
48
49
49
for (message in eventMessageChannel) {
50
50
processEventMessage(message)
@@ -57,39 +57,58 @@ class Timeline(
57
57
this .eventMessageChannel.cancel()
58
58
}
59
59
60
+ /* *
61
+ * Enqueue an event to be processed by the timeline.
62
+ */
60
63
override fun process (incomingEvent : BaseEvent ) {
61
64
if (incomingEvent.timestamp == null ) {
62
65
incomingEvent.timestamp = System .currentTimeMillis()
63
66
}
64
67
65
- eventMessageChannel.trySend(EventQueueMessage (incomingEvent))
68
+ val result = eventMessageChannel.trySend(EventQueueMessage .Event (incomingEvent))
69
+ if (result.isFailure) {
70
+ amplitude.logger.error(" Failed to enqueue event: ${incomingEvent.eventType} . Channel is closed or full." )
71
+ }
66
72
}
67
73
68
74
internal fun onEnterForeground (timestamp : Long ) {
69
75
amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) {
70
- val localSessionEvents = startNewSessionIfNeeded(timestamp)
71
- foreground.set(true )
72
-
73
- // Process any local session events
74
- processAndPersistEvents(localSessionEvents)
76
+ eventMessageChannel.trySend(EventQueueMessage .EnterForeground (timestamp))
75
77
}
76
78
}
77
79
78
80
internal fun onExitForeground (timestamp : Long ) {
79
81
amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) {
80
- refreshSessionTime(timestamp)
81
- foreground.set(false )
82
+ eventMessageChannel.trySend(EventQueueMessage .ExitForeground (timestamp))
82
83
}
83
84
}
84
85
86
+ /* *
87
+ * Process an event message from the event queue.
88
+ */
85
89
private suspend fun processEventMessage (message : EventQueueMessage ) {
86
- val event = message.event
87
- val eventTimestamp = event.timestamp!! // Guaranteed non-null by process()
88
- val eventSessionId = event.sessionId
90
+ when (message) {
91
+ is EventQueueMessage .EnterForeground -> {
92
+ foreground.set(true )
93
+ val sessionEvents = startNewSessionIfNeeded(message.timestamp)
94
+ processAndPersistEvents(sessionEvents)
95
+ }
96
+ is EventQueueMessage .Event -> {
97
+ processEvent(message.event)
98
+ }
99
+ is EventQueueMessage .ExitForeground -> {
100
+ foreground.set(false )
101
+ refreshSessionTime(message.timestamp)
102
+ }
103
+ }
104
+ }
105
+
106
+ private suspend fun processEvent (event : BaseEvent ) {
107
+ val eventTimestamp = event.timestamp ? : System .currentTimeMillis()
89
108
90
109
when (event.eventType) {
91
110
START_SESSION_EVENT -> {
92
- setSessionId(eventSessionId ? : eventTimestamp)
111
+ setSessionId(event.sessionId ? : eventTimestamp)
93
112
refreshSessionTime(eventTimestamp)
94
113
}
95
114
@@ -99,17 +118,14 @@ class Timeline(
99
118
100
119
else -> {
101
120
if (! foreground.get()) {
102
- val localSessionEvents = startNewSessionIfNeeded(eventTimestamp)
103
- processAndPersistEvents(localSessionEvents )
121
+ val sessionEvents = startNewSessionIfNeeded(eventTimestamp)
122
+ processAndPersistEvents(sessionEvents )
104
123
} else {
105
124
refreshSessionTime(eventTimestamp)
106
125
}
107
126
}
108
127
}
109
128
110
- // Assign sessionId to the current event if it doesn't have one
111
- event.sessionId = event.sessionId ? : this .sessionId
112
-
113
129
// Process the incoming event
114
130
processAndPersistEvents(listOf (event))
115
131
}
@@ -119,6 +135,9 @@ class Timeline(
119
135
120
136
val initialLastEventId = lastEventId
121
137
for (event in events) {
138
+ // Assign sessionId to the current event if it doesn't have one
139
+ event.sessionId = event.sessionId ? : this .sessionId
140
+ // Increment and set eventId if it is not set
122
141
event.eventId = event.eventId ? : ++ lastEventId
123
142
super .process(event)
124
143
}
@@ -151,7 +170,7 @@ class Timeline(
151
170
if (trackingSessionEvents && inSession()) {
152
171
val sessionEndEvent = BaseEvent ()
153
172
sessionEndEvent.eventType = END_SESSION_EVENT
154
- sessionEndEvent.timestamp = lastEventTime.takeIf { lastEventTime > DEFAULT }
173
+ sessionEndEvent.timestamp = lastEventTime.takeIf { lastEventTime > DEFAULT_EVENT_ID_OR_TIME }
155
174
sessionEndEvent.sessionId = sessionId
156
175
sessionEvents.add(sessionEndEvent)
157
176
}
@@ -196,6 +215,10 @@ class Timeline(
196
215
}
197
216
}
198
217
199
- data class EventQueueMessage (
200
- val event : BaseEvent ,
201
- )
218
+ sealed class EventQueueMessage {
219
+ data class Event (val event : BaseEvent ) : EventQueueMessage()
220
+
221
+ data class EnterForeground (val timestamp : Long ) : EventQueueMessage()
222
+
223
+ data class ExitForeground (val timestamp : Long ) : EventQueueMessage()
224
+ }
0 commit comments