Skip to content

Commit 83b9b4f

Browse files
Source Kafka: added option for giving polling time when consuming records using consumer (#12903)
* added option for giving polling time using spec * add doc and seed file Co-authored-by: marcosmarxm <[email protected]>
1 parent 16de42f commit 83b9b4f

File tree

6 files changed

+12
-4
lines changed

6 files changed

+12
-4
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@
446446
- name: Kafka
447447
sourceDefinitionId: d917a47b-8537-4d0d-8c10-36a9928d4265
448448
dockerRepository: airbyte/source-kafka
449-
dockerImageTag: 0.1.5
449+
dockerImageTag: 0.1.6
450450
documentationUrl: https://docs.airbyte.io/integrations/sources/kafka
451451
icon: kafka.svg
452452
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3969,7 +3969,7 @@
39693969
supportsNormalization: false
39703970
supportsDBT: false
39713971
supported_destination_sync_modes: []
3972-
- dockerImage: "airbyte/source-kafka:0.1.5"
3972+
- dockerImage: "airbyte/source-kafka:0.1.6"
39733973
spec:
39743974
documentationUrl: "https://docs.airbyte.io/integrations/sources/kafka"
39753975
connectionSpecification:

airbyte-integrations/connectors/source-kafka/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-kafka
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.1.5
19+
LABEL io.airbyte.version=0.1.6
2020
LABEL io.airbyte.name=airbyte/source-kafka

airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,9 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final C
9090

9191
final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0;
9292
int pollCount = 0;
93+
final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100;
9394
while (true) {
94-
final ConsumerRecords<String, JsonNode> consumerRecords = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
95+
final ConsumerRecords<String, JsonNode> consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS));
9596
if (consumerRecords.count() == 0) {
9697
pollCount++;
9798
if (pollCount > retry) {

airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@
7878
"type": "integer",
7979
"default": 500
8080
},
81+
"polling_time": {
82+
"title": "Polling Time",
83+
"description": "Amount of time Kafka connector should try to poll for messages.",
84+
"type": "integer",
85+
"default": 100
86+
},
8187
"protocol": {
8288
"title": "Protocol",
8389
"type": "object",

docs/integrations/sources/kafka.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ The Kafka source connector supports the following[sync modes](https://docs.airby
4444

4545
| Version | Date | Pull Request | Subject |
4646
| :------ | :-------- | :------------------------------------------------------| :---------------------------------------- |
47+
| 0.1.6 | 2022-05-29 | [12903](https://github.com/airbytehq/airbyte/pull/12903) | Add Polling Time to Specification (default 100 ms) |
4748
| 0.1.5 | 2022-04-19 | [12134](https://github.com/airbytehq/airbyte/pull/12134) | Add PLAIN Auth |
4849
| 0.1.4 | 2022-02-15 | [10186](https://github.com/airbytehq/airbyte/pull/10186) | Add SCRAM-SHA-512 Auth |
4950
| 0.1.3 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |

0 commit comments

Comments
 (0)