15
15
*/
16
16
package dev .knative .eventing .kafka .broker .receiverloom ;
17
17
18
+ import com .google .common .util .concurrent .Uninterruptibles ;
18
19
import dev .knative .eventing .kafka .broker .core .ReactiveKafkaProducer ;
19
20
import dev .knative .eventing .kafka .broker .core .tracing .kafka .ProducerTracer ;
21
+ import io .opentelemetry .context .Context ;
20
22
import io .vertx .core .Future ;
21
23
import io .vertx .core .Promise ;
22
24
import io .vertx .core .Vertx ;
23
25
import io .vertx .core .impl .ContextInternal ;
24
26
import io .vertx .core .impl .VertxInternal ;
27
+ import io .vertx .core .impl .future .PromiseInternal ;
25
28
import io .vertx .core .tracing .TracingPolicy ;
26
29
import java .util .Objects ;
27
- import java .util .concurrent .BlockingQueue ;
28
- import java .util .concurrent .LinkedBlockingQueue ;
29
- import java .util .concurrent .TimeUnit ;
30
+ import java .util .concurrent .ExecutorService ;
31
+ import java .util .concurrent .Executors ;
30
32
import java .util .concurrent .atomic .AtomicBoolean ;
31
33
import org .apache .kafka .clients .producer .Producer ;
32
34
import org .apache .kafka .clients .producer .ProducerRecord ;
@@ -41,17 +43,15 @@ public class LoomKafkaProducer<K, V> implements ReactiveKafkaProducer<K, V> {
41
43
42
44
private final Producer <K , V > producer ;
43
45
44
- private final BlockingQueue < RecordPromise < K , V >> eventQueue ;
46
+ private final ExecutorService executorService ;
45
47
private final AtomicBoolean isClosed ;
46
48
private final ProducerTracer <?> tracer ;
47
49
private final VertxInternal vertx ;
48
- private final Thread sendFromQueueThread ;
49
50
private final Promise <Void > closePromise = Promise .promise ();
50
51
51
52
public LoomKafkaProducer (Vertx v , Producer <K , V > producer ) {
52
53
Objects .requireNonNull (v , "Vertx cannot be null" );
53
54
this .producer = producer ;
54
- this .eventQueue = new LinkedBlockingQueue <>();
55
55
this .isClosed = new AtomicBoolean (false );
56
56
this .vertx = (VertxInternal ) v ;
57
57
final var ctxInt = ((ContextInternal ) v .getOrCreateContext ()).unwrap ();
@@ -62,73 +62,54 @@ public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
62
62
this .tracer = null ;
63
63
}
64
64
65
+ ExecutorService executorService ;
65
66
if (Boolean .parseBoolean (System .getenv ("ENABLE_VIRTUAL_THREADS" ))) {
66
- this . sendFromQueueThread = Thread . ofVirtual (). start ( this :: sendFromQueue );
67
+ executorService = Executors . newVirtualThreadPerTaskExecutor ( );
67
68
} else {
68
- this .sendFromQueueThread = new Thread (this ::sendFromQueue );
69
- this .sendFromQueueThread .start ();
69
+ executorService = Executors .newSingleThreadExecutor ();
70
70
}
71
+ this .executorService = Context .taskWrapping (executorService );
71
72
}
72
73
73
74
@ Override
74
75
public Future <RecordMetadata > send (ProducerRecord <K , V > record ) {
75
- final Promise <RecordMetadata > promise = Promise .promise ();
76
76
if (isClosed .get ()) {
77
- promise .fail ("Producer is closed" );
78
- } else {
79
- eventQueue .add (new RecordPromise <>(record , this .vertx .getOrCreateContext (), promise ));
77
+ return Future .failedFuture ("Producer is closed" );
80
78
}
79
+ PromiseInternal <RecordMetadata > promise = vertx .promise ();
80
+ executorService .execute (() -> sendFromQueue (new RecordPromise <>(record , promise )));
81
81
return promise .future ();
82
82
}
83
83
84
- private void sendFromQueue () {
85
- // Process queue elements until this is closed and the tasks queue is empty
86
- while (!isClosed .get () || !eventQueue .isEmpty ()) {
87
- try {
88
- final var recordPromise = eventQueue .poll (2000 , TimeUnit .MILLISECONDS );
89
- if (recordPromise == null ) {
90
- continue ;
84
+ private void sendFromQueue (RecordPromise <K , V > recordPromise ) {
85
+ final var startedSpan = this .tracer == null
86
+ ? null
87
+ : this .tracer .prepareSendMessage (recordPromise .context (), recordPromise .record );
88
+
89
+ recordPromise
90
+ .promise
91
+ .future ()
92
+ .onComplete (v -> {
93
+ if (startedSpan != null ) {
94
+ startedSpan .finish (recordPromise .context ());
95
+ }
96
+ })
97
+ .onFailure (cause -> {
98
+ if (startedSpan != null ) {
99
+ startedSpan .fail (recordPromise .context (), cause );
100
+ }
101
+ });
102
+ try {
103
+ producer .send (recordPromise .record , (metadata , exception ) -> {
104
+ if (exception != null ) {
105
+ recordPromise .fail (exception );
106
+ return ;
91
107
}
92
-
93
- final var startedSpan = this .tracer == null
94
- ? null
95
- : this .tracer .prepareSendMessage (recordPromise .getContext (), recordPromise .getRecord ());
96
-
97
- recordPromise
98
- .getPromise ()
99
- .future ()
100
- .onComplete (v -> {
101
- if (startedSpan != null ) {
102
- startedSpan .finish (recordPromise .getContext ());
103
- }
104
- })
105
- .onFailure (cause -> {
106
- if (startedSpan != null ) {
107
- startedSpan .fail (recordPromise .getContext (), cause );
108
- }
109
- });
110
- try {
111
- producer .send (
112
- recordPromise .getRecord (),
113
- (metadata , exception ) -> recordPromise .getContext ().runOnContext (v -> {
114
- if (exception != null ) {
115
- recordPromise .getPromise ().fail (exception );
116
- return ;
117
- }
118
- recordPromise .getPromise ().complete (metadata );
119
- }));
120
- } catch (final KafkaException exception ) {
121
- recordPromise
122
- .getContext ()
123
- .runOnContext (v -> recordPromise .getPromise ().fail (exception ));
124
- }
125
- } catch (InterruptedException e ) {
126
- logger .debug ("Interrupted while waiting for event queue to be populated." );
127
- break ;
128
- }
108
+ recordPromise .complete (metadata );
109
+ });
110
+ } catch (final KafkaException exception ) {
111
+ recordPromise .fail (exception );
129
112
}
130
-
131
- logger .debug ("Background thread completed." );
132
113
}
133
114
134
115
@ Override
@@ -141,12 +122,9 @@ public Future<Void> close() {
141
122
142
123
Thread .ofVirtual ().start (() -> {
143
124
try {
144
- while (!eventQueue .isEmpty ()) {
145
- logger .debug ("Waiting for the eventQueue to become empty" );
146
- Thread .sleep (2000L );
147
- }
148
- logger .debug ("Waiting for sendFromQueueThread thread to complete" );
149
- sendFromQueueThread .join ();
125
+ executorService .shutdown ();
126
+ logger .debug ("Waiting for tasks to complete" );
127
+ Uninterruptibles .awaitTerminationUninterruptibly (executorService );
150
128
logger .debug ("Closing the producer" );
151
129
producer .close ();
152
130
closePromise .complete ();
@@ -178,35 +156,29 @@ public Producer<K, V> unwrap() {
178
156
}
179
157
180
158
private static class RecordPromise <K , V > {
181
- private final ProducerRecord <K , V > record ;
182
- private final ContextInternal context ;
183
- private final Promise <RecordMetadata > promise ;
159
+ final ProducerRecord <K , V > record ;
160
+ final PromiseInternal <RecordMetadata > promise ;
184
161
185
- private RecordPromise (ProducerRecord <K , V > record , ContextInternal context , Promise <RecordMetadata > promise ) {
162
+ RecordPromise (ProducerRecord <K , V > record , PromiseInternal <RecordMetadata > promise ) {
186
163
this .record = record ;
187
- this .context = context ;
188
164
this .promise = promise ;
189
165
}
190
166
191
- public ProducerRecord < K , V > getRecord () {
192
- return record ;
167
+ ContextInternal context () {
168
+ return promise . context () ;
193
169
}
194
170
195
- public Promise < RecordMetadata > getPromise ( ) {
196
- return promise ;
171
+ void complete ( RecordMetadata result ) {
172
+ promise . complete ( result ) ;
197
173
}
198
174
199
- public ContextInternal getContext ( ) {
200
- return context ;
175
+ void fail ( Throwable cause ) {
176
+ promise . fail ( cause ) ;
201
177
}
202
178
}
203
179
204
180
// Function needed for testing
205
181
public boolean isSendFromQueueThreadAlive () {
206
- return sendFromQueueThread .isAlive ();
207
- }
208
-
209
- public int getEventQueueSize () {
210
- return eventQueue .size ();
182
+ return !executorService .isTerminated ();
211
183
}
212
184
}
0 commit comments