Skip to content

Commit ede5923

Browse files
committed
fix: remove message grouping for Ably Batch calls
The kafka connector put every message sent to a single channel in one `BatchSpec`, meaning we treated them atomically and send them as a single `ProtocolMessage`. This causes a problem when the total `ProtocolMessage` size gets way too big. That's why now we put every message in its own `BatchSpec`
1 parent fcdcd2d commit ede5923

File tree

7 files changed

+184
-233
lines changed

7 files changed

+184
-233
lines changed

src/main/java/com/ably/kafka/connect/batch/MessageGroup.java

Lines changed: 0 additions & 87 deletions
This file was deleted.

src/main/java/com/ably/kafka/connect/batch/MessageGrouper.java renamed to src/main/java/com/ably/kafka/connect/batch/MessageTransformer.java

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,41 @@
11
package com.ably.kafka.connect.batch;
22

3+
import com.ably.kafka.connect.client.BatchSpec;
34
import com.ably.kafka.connect.config.ChannelSinkConnectorConfig;
45
import com.ably.kafka.connect.mapping.MessageConverter;
56
import com.ably.kafka.connect.mapping.RecordMapping;
67
import com.ably.kafka.connect.mapping.RecordMappingException;
7-
import com.google.common.collect.Lists;
88
import io.ably.lib.types.Message;
99
import org.apache.kafka.connect.sink.ErrantRecordReporter;
1010
import org.apache.kafka.connect.sink.SinkRecord;
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

1414
import javax.annotation.Nullable;
15-
import java.util.HashMap;
15+
import java.util.Collections;
1616
import java.util.List;
17-
import java.util.Map;
17+
import java.util.Objects;
18+
import java.util.stream.Collectors;
1819

19-
public class MessageGrouper {
20-
private static final Logger logger = LoggerFactory.getLogger(MessageGrouper.class);
20+
public class MessageTransformer {
21+
private static final Logger logger = LoggerFactory.getLogger(MessageTransformer.class);
2122

2223
private final RecordMapping channelMapping;
2324
private final RecordMapping messageNameMapping;
2425
private final ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure;
25-
@Nullable private final ErrantRecordReporter dlqReporter;
26+
@Nullable
27+
private final ErrantRecordReporter dlqReporter;
2628

2729
/**
28-
* Construct a new message grouper, for generating Ably BatchSpecs and converting
30+
* Construct a new message transformer, for generating Ably BatchSpecs and converting
2931
* records to messages as needed.
3032
*
31-
* @param channelMapping The RecordMapping to use to generate Ably channel names
33+
* @param channelMapping The RecordMapping to use to generate Ably channel names
3234
* @param messageNameMapping The RecordMapping to use to generate Ably Message names
33-
* @param actionOnFailure Action to perform when a message mapping attempt fails
34-
* @param dlqReporter dead letter queue for reporting bad records, or null if not in use
35+
* @param actionOnFailure Action to perform when a message mapping attempt fails
36+
* @param dlqReporter dead letter queue for reporting bad records, or null if not in use
3537
*/
36-
public MessageGrouper(
38+
public MessageTransformer(
3739
RecordMapping channelMapping,
3840
RecordMapping messageNameMapping,
3941
ChannelSinkConnectorConfig.FailedRecordMappingAction actionOnFailure,
@@ -45,44 +47,32 @@ public MessageGrouper(
4547
}
4648

4749
/**
48-
* Construct a message group for an incoming batch of Kafka records
50+
* Construct Ably messages for an incoming batch of Kafka records
4951
*
50-
* @param records Kafka sink records to group by channel and transform to Ably messages
51-
* @return MessageGroup for outgoing message batch
52+
* @param records Kafka sink records to transform to Ably messages
53+
* @return List of Kafka sink records with transformed Ably messages
5254
* @throws FatalBatchProcessingException if a fatal error occurred processing records
5355
*/
54-
public MessageGroup group(List<SinkRecord> records) throws FatalBatchProcessingException {
55-
final Map<String, List<MessageGroup.RecordMessagePair>> groupedRecords = new HashMap<>();
56-
for (SinkRecord record : records) {
56+
public List<RecordMessagePair> transform(List<SinkRecord> records) throws FatalBatchProcessingException {
57+
return records.stream().map(record -> {
5758
try {
58-
final String channel = channelMapping.map(record);
59-
final String messageName = messageNameMapping.map(record);
60-
final Message message = MessageConverter.toAblyMessage(messageName, record);
61-
62-
groupedRecords.compute(channel, (ch, recs) -> {
63-
final MessageGroup.RecordMessagePair pair = new MessageGroup.RecordMessagePair(record, message);
64-
if (recs != null) {
65-
recs.add(pair);
66-
return recs;
67-
} else {
68-
return Lists.newArrayList(pair);
69-
}
70-
});
71-
59+
String channel = channelMapping.map(record);
60+
String messageName = messageNameMapping.map(record);
61+
Message message = MessageConverter.toAblyMessage(messageName, record);
62+
return new RecordMessagePair(record, new BatchSpec(Collections.singleton(channel), Collections.singletonList(message)));
7263
} catch (RecordMappingException mappingError) {
7364
handleMappingFailure(record, mappingError);
65+
return null;
7466
}
75-
}
76-
77-
return new MessageGroup(groupedRecords);
67+
}).filter(Objects::nonNull).collect(Collectors.toList());
7868
}
7969

8070

8171
/**
8272
* Process a record that we're unable to forward to Ably due to a failed channel or
8373
* message name mapping according to the configured handling behaviour.
8474
*
85-
* @param record The SinkRecord we weren't able to map
75+
* @param record The SinkRecord we weren't able to map
8676
* @param mappingError The error raised by the RecordMapping
8777
*/
8878
private void handleMappingFailure(
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.ably.kafka.connect.batch;
2+
3+
import com.ably.kafka.connect.client.BatchSpec;
4+
import io.ably.lib.types.Message;
5+
import org.apache.kafka.connect.sink.SinkRecord;
6+
7+
/**
8+
* Kafka Records with outgoing Ably Batch Spec.
9+
*/
10+
public class RecordMessagePair {
11+
private final SinkRecord kafkaRecord;
12+
private final BatchSpec batchSpec;
13+
/**
14+
* Construct a new record-message pairing.
15+
*/
16+
RecordMessagePair(SinkRecord kafkaRecord, BatchSpec batchSpec) {
17+
this.kafkaRecord = kafkaRecord;
18+
this.batchSpec = batchSpec;
19+
}
20+
21+
/**
22+
* Returns the incoming Kafka SinkRecord
23+
*/
24+
public SinkRecord getKafkaRecord() {
25+
return kafkaRecord;
26+
}
27+
28+
/**
29+
* Returns the outgoing Ably Message
30+
*/
31+
public BatchSpec getBatchSpec() {
32+
return batchSpec;
33+
}
34+
35+
}

src/main/java/com/ably/kafka/connect/client/BatchSpec.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.ably.lib.types.Message;
44

5+
import java.util.Collections;
56
import java.util.List;
67
import java.util.Objects;
78
import java.util.Set;
@@ -18,6 +19,9 @@ public BatchSpec(Set<String> channels, List<Message> messages) {
1819
this.channels = channels;
1920
this.messages = messages;
2021
}
22+
public BatchSpec(String channel, Message message) {
23+
this(Collections.singleton(channel), Collections.singletonList(message));
24+
}
2125
public Set<String> getChannels() {
2226
return channels;
2327
}

0 commit comments

Comments
 (0)