@@ -17,25 +17,35 @@ import kotlinx.coroutines.delay
17
17
import kotlinx.coroutines.launch
18
18
19
19
internal class InMemoryResponseHandler (
20
- val eventPipeline : EventPipeline ,
21
- val configuration : Configuration ,
22
- val scope : CoroutineScope ,
23
- val dispatcher : CoroutineDispatcher
20
+ private val eventPipeline : EventPipeline ,
21
+ private val configuration : Configuration ,
22
+ private val scope : CoroutineScope ,
23
+ private val storageDispatcher : CoroutineDispatcher ,
24
24
) : ResponseHandler {
25
25
26
26
companion object {
27
27
const val BACK_OFF : Long = 30000
28
28
}
29
29
30
- override fun handleSuccessResponse (successResponse : SuccessResponse , events : Any , eventsString : String ) {
31
- triggerEventsCallback(events as List <BaseEvent >, HttpStatus .SUCCESS .code, " Event sent success." )
30
+ override fun handleSuccessResponse (
31
+ successResponse : SuccessResponse ,
32
+ events : Any ,
33
+ eventsString : String ,
34
+ ) {
35
+ triggerEventsCallback(
36
+ events as List <BaseEvent >, HttpStatus .SUCCESS .code, " Event sent success."
37
+ )
32
38
}
33
39
34
- override fun handleBadRequestResponse (badRequestResponse : BadRequestResponse , events : Any , eventsString : String ) {
40
+ override fun handleBadRequestResponse (
41
+ badRequestResponse : BadRequestResponse ,
42
+ events : Any ,
43
+ eventsString : String ,
44
+ ): Boolean {
35
45
val eventsList = events as List <BaseEvent >
36
46
if (eventsList.size == 1 || badRequestResponse.isInvalidApiKeyResponse()) {
37
47
triggerEventsCallback(eventsList, HttpStatus .BAD_REQUEST .code, badRequestResponse.error)
38
- return
48
+ return false
39
49
}
40
50
val droppedIndices = badRequestResponse.getEventIndicesToDrop()
41
51
val eventsToDrop = mutableListOf<BaseEvent >()
@@ -51,12 +61,19 @@ internal class InMemoryResponseHandler(
51
61
eventsToRetry.forEach {
52
62
eventPipeline.put(it)
53
63
}
64
+ return eventsToDrop.isEmpty()
54
65
}
55
66
56
- override fun handlePayloadTooLargeResponse (payloadTooLargeResponse : PayloadTooLargeResponse , events : Any , eventsString : String ) {
67
+ override fun handlePayloadTooLargeResponse (
68
+ payloadTooLargeResponse : PayloadTooLargeResponse ,
69
+ events : Any ,
70
+ eventsString : String ,
71
+ ) {
57
72
val eventsList = events as List <BaseEvent >
58
73
if (eventsList.size == 1 ) {
59
- triggerEventsCallback(eventsList, HttpStatus .PAYLOAD_TOO_LARGE .code, payloadTooLargeResponse.error)
74
+ triggerEventsCallback(
75
+ eventsList, HttpStatus .PAYLOAD_TOO_LARGE .code, payloadTooLargeResponse.error
76
+ )
60
77
return
61
78
}
62
79
eventPipeline.flushSizeDivider.incrementAndGet()
@@ -65,7 +82,11 @@ internal class InMemoryResponseHandler(
65
82
}
66
83
}
67
84
68
- override fun handleTooManyRequestsResponse (tooManyRequestsResponse : TooManyRequestsResponse , events : Any , eventsString : String ) {
85
+ override fun handleTooManyRequestsResponse (
86
+ tooManyRequestsResponse : TooManyRequestsResponse ,
87
+ events : Any ,
88
+ eventsString : String ,
89
+ ) {
69
90
val eventsList = events as List <BaseEvent >
70
91
val eventsToDrop = mutableListOf<BaseEvent >()
71
92
val eventsToRetryNow = mutableListOf<BaseEvent >()
@@ -79,29 +100,39 @@ internal class InMemoryResponseHandler(
79
100
eventsToRetryNow.add(event)
80
101
}
81
102
}
82
- triggerEventsCallback(eventsToDrop, HttpStatus .TOO_MANY_REQUESTS .code, tooManyRequestsResponse.error)
103
+ triggerEventsCallback(
104
+ eventsToDrop, HttpStatus .TOO_MANY_REQUESTS .code, tooManyRequestsResponse.error
105
+ )
83
106
eventsToRetryNow.forEach {
84
107
eventPipeline.put(it)
85
108
}
86
- scope.launch(dispatcher ) {
109
+ scope.launch(storageDispatcher ) {
87
110
delay(BACK_OFF )
88
111
eventsToRetryLater.forEach {
89
112
eventPipeline.put(it)
90
113
}
91
114
}
92
115
}
93
116
94
- override fun handleTimeoutResponse (timeoutResponse : TimeoutResponse , events : Any , eventsString : String ) {
117
+ override fun handleTimeoutResponse (
118
+ timeoutResponse : TimeoutResponse ,
119
+ events : Any ,
120
+ eventsString : String ,
121
+ ) {
95
122
val eventsList = events as List <BaseEvent >
96
- scope.launch(dispatcher ) {
123
+ scope.launch(storageDispatcher ) {
97
124
delay(BACK_OFF )
98
125
eventsList.forEach {
99
126
eventPipeline.put(it)
100
127
}
101
128
}
102
129
}
103
130
104
- override fun handleFailedResponse (failedResponse : FailedResponse , events : Any , eventsString : String ) {
131
+ override fun handleFailedResponse (
132
+ failedResponse : FailedResponse ,
133
+ events : Any ,
134
+ eventsString : String ,
135
+ ) {
105
136
val eventsList = events as List <BaseEvent >
106
137
val eventsToDrop = mutableListOf<BaseEvent >()
107
138
val eventsToRetry = mutableListOf<BaseEvent >()
@@ -118,7 +149,11 @@ internal class InMemoryResponseHandler(
118
149
}
119
150
}
120
151
121
- private fun triggerEventsCallback (events : List <BaseEvent >, status : Int , message : String ) {
152
+ private fun triggerEventsCallback (
153
+ events : List <BaseEvent >,
154
+ status : Int ,
155
+ message : String ,
156
+ ) {
122
157
events.forEach { event ->
123
158
configuration.callback?.let {
124
159
it(event, status, message)
0 commit comments