Skip to content

Commit d7e2b76

Browse files
itaseskiiitaseskimarcosmarxm
authored
🎉 New Destination: Redpanda (#18884)
* add Redpanda destination connector * change cluster config * improve test coverage * add redpanda to destination def Co-authored-by: itaseski <itaseski@debian-BULLSEYE-live-builder-AMD64> Co-authored-by: marcosmarxm <[email protected]> Co-authored-by: Marcos Marx <[email protected]>
1 parent 8c1c920 commit d7e2b76

File tree

24 files changed

+1262
-190
lines changed

24 files changed

+1262
-190
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,12 @@
271271
memory_limit: "1Gi"
272272
memory_request: "1Gi"
273273
releaseStage: beta
274+
- name: Redpanda
275+
destinationDefinitionId: 825c5ee3-ed9a-4dd1-a2b6-79ed722f7b13
276+
dockerRepository: airbyte/destination-redpanda
277+
dockerImageTag: 0.1.0
278+
documentationUrl: https://docs.airbyte.com/integrations/destinations/redpanda
279+
releaseStage: alpha
274280
- name: Rockset
275281
destinationDefinitionId: 2c9d93a7-9a17-4789-9de9-f46f0097eb70
276282
dockerRepository: airbyte/destination-rockset

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4793,6 +4793,98 @@
47934793
- "overwrite"
47944794
- "append"
47954795
- "append_dedup"
4796+
- dockerImage: "airbyte/destination-redpanda:0.1.0"
4797+
spec:
4798+
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redpanda"
4799+
connectionSpecification:
4800+
$schema: "http://json-schema.org/draft-07/schema#"
4801+
title: "Redpanda destination connector"
4802+
type: "object"
4803+
required:
4804+
- "bootstrap_servers"
4805+
- "buffer_memory"
4806+
- "compression_type"
4807+
- "retries"
4808+
- "batch_size"
4809+
properties:
4810+
bootstrap_servers:
4811+
title: "Bootstrap Servers"
4812+
description: "A list of host/port pairs to use for establishing the initial\
4813+
\ connection to the Redpanda cluster. The client will make use of all\
4814+
\ servers irrespective of which servers are specified here for bootstrapping&mdash;this\
4815+
\ list only impacts the initial hosts used to discover the full set of\
4816+
\ servers. This list should be in the form <code>host1:port1,host2:port2,...</code>.\
4817+
\ Since these servers are just used for the initial connection to discover\
4818+
\ the full cluster membership (which may change dynamically), this list\
4819+
\ need not contain the full set of servers (you may want more than one,\
4820+
\ though, in case a server is down)."
4821+
type: "string"
4822+
examples:
4823+
- "redpanda-broker1:9092,redpanda-broker2:9092"
4824+
buffer_memory:
4825+
title: "Buffer Memory"
4826+
description: "The total bytes of memory the producer can use to buffer records\
4827+
\ waiting to be sent to the server."
4828+
type: "string"
4829+
examples: 33554432
4830+
compression_type:
4831+
title: "Compression Type"
4832+
description: "The compression type for all data generated by the producer."
4833+
type: "string"
4834+
default: "none"
4835+
enum:
4836+
- "none"
4837+
- "gzip"
4838+
- "snappy"
4839+
- "lz4"
4840+
- "zstd"
4841+
batch_size:
4842+
title: "Batch Size"
4843+
description: "The producer will attempt to batch records together into fewer\
4844+
\ requests whenever multiple records are being sent to the same partition."
4845+
type: "integer"
4846+
examples:
4847+
- 16384
4848+
retries:
4849+
title: "Retries"
4850+
description: "Setting a value greater than zero will cause the client to\
4851+
\ resend any record whose send fails with a potentially transient error."
4852+
type: "integer"
4853+
examples:
4854+
- 2147483647
4855+
topic_num_partitions:
4856+
title: "Number of topic partitions"
4857+
description: "The number of topic partitions which will be created on topic\
4858+
\ creation"
4859+
type: "integer"
4860+
examples:
4861+
- 10
4862+
topic_replication_factor:
4863+
title: "Topic replication factor"
4864+
description: "The number of topics to which messages will be replicated"
4865+
type: "integer"
4866+
examples:
4867+
- 10
4868+
socket_connection_setup_timeout_ms:
4869+
title: "Socket Connection Setup Timeout"
4870+
description: "The amount of time the client will wait for the socket connection\
4871+
\ to be established."
4872+
type: "integer"
4873+
examples:
4874+
- 10000
4875+
socket_connection_setup_timeout_max_ms:
4876+
title: "Socket Connection Setup Max Timeout"
4877+
description: "The maximum amount of time the client will wait for the socket\
4878+
\ connection to be established. The connection setup timeout will increase\
4879+
\ exponentially for each consecutive connection failure up to this maximum."
4880+
type: "integer"
4881+
examples:
4882+
- 30000
4883+
supportsIncremental: true
4884+
supportsNormalization: false
4885+
supportsDBT: false
4886+
supported_destination_sync_modes:
4887+
- "append"
47964888
- dockerImage: "airbyte/destination-rockset:0.1.4"
47974889
spec:
47984890
documentationUrl: "https://docs.airbyte.com/integrations/destinations/rockset"
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
*
2+
!Dockerfile
3+
!build
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
FROM airbyte/integration-base-java:dev AS build
2+
3+
WORKDIR /airbyte
4+
ENV APPLICATION destination-redpanda
5+
6+
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
7+
8+
RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar
9+
10+
FROM airbyte/integration-base-java:dev
11+
12+
WORKDIR /airbyte
13+
ENV APPLICATION destination-redpanda
14+
15+
COPY --from=build /airbyte /airbyte
16+
17+
LABEL io.airbyte.version=0.1.0
18+
LABEL io.airbyte.name=airbyte/destination-redpanda
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Destination Redpanda
2+
3+
This is the repository for the Redpanda destination connector in Java.
4+
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/redpanda).
5+
6+
## Local development
7+
8+
#### Building via Gradle
9+
From the Airbyte repository root, run:
10+
```
11+
./gradlew :airbyte-integrations:connectors:destination-redpanda:build
12+
```
13+
14+
#### Create credentials
15+
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
16+
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.
17+
18+
**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.
19+
20+
### Locally running the connector docker image
21+
22+
#### Build
23+
Build the connector image via Gradle:
24+
```
25+
./gradlew :airbyte-integrations:connectors:destination-redpanda:airbyteDocker
26+
```
27+
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
28+
the Dockerfile.
29+
30+
#### Run
31+
Then run any of the connector commands as follows:
32+
```
33+
docker run --rm airbyte/destination-redpanda:dev spec
34+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-redpanda:dev check --config /secrets/config.json
35+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-redpanda:dev discover --config /secrets/config.json
36+
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-redpanda:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
37+
```
38+
39+
## Testing
40+
We use `JUnit` for Java tests.
41+
42+
### Unit and Integration Tests
43+
Place unit tests under `src/test/io/airbyte/integrations/destinations/redpanda`.
44+
45+
#### Acceptance Tests
46+
Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in
47+
`src/test-integration/java/io/airbyte/integrations/destinations/redpandaDestinationAcceptanceTest.java`.
48+
49+
### Using gradle to run tests
50+
All commands should be run from airbyte project root.
51+
To run unit tests:
52+
```
53+
./gradlew :airbyte-integrations:connectors:destination-redpanda:unitTest
54+
```
55+
To run acceptance and custom integration tests:
56+
```
57+
./gradlew :airbyte-integrations:connectors:destination-redpanda:integrationTest
58+
```
59+
60+
## Dependency Management
61+
62+
### Publishing a new version of the connector
63+
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
64+
1. Make sure your changes are passing unit and integration tests.
65+
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
66+
1. Create a Pull Request.
67+
1. Pat yourself on the back for being an awesome contributor.
68+
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
plugins {
2+
id 'application'
3+
id 'airbyte-docker'
4+
id 'airbyte-integration-test-java'
5+
}
6+
7+
application {
8+
mainClass = 'io.airbyte.integrations.destination.redpanda.RedpandaDestination'
9+
}
10+
11+
dependencies {
12+
implementation project(':airbyte-config:config-models')
13+
implementation project(':airbyte-protocol:protocol-models')
14+
implementation project(':airbyte-integrations:bases:base-java')
15+
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
16+
17+
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
18+
implementation 'org.apache.kafka:kafka-clients:3.3.1'
19+
implementation 'org.apache.kafka:connect-json:3.3.1'
20+
21+
testImplementation "org.testcontainers:redpanda:1.17.5"
22+
23+
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
24+
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-redpanda')
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package io.airbyte.integrations.destination.redpanda;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import java.util.Map;
5+
import java.util.Optional;
6+
import org.apache.kafka.clients.admin.Admin;
7+
import org.apache.kafka.clients.admin.AdminClient;
8+
import org.apache.kafka.clients.admin.AdminClientConfig;
9+
import org.apache.kafka.clients.producer.KafkaProducer;
10+
import org.apache.kafka.clients.producer.ProducerConfig;
11+
12+
public class RedpandaConfig {
13+
14+
//host1:port1,host2:port2,...
15+
private final String bootstrapServers;
16+
17+
private final long bufferMemory;
18+
19+
private final String compressionType;
20+
21+
private final int retries;
22+
23+
private final int batchSize;
24+
25+
private final Optional<Integer> topicNumPartitions;
26+
27+
private final Optional<Short> topicReplicationFactor;
28+
29+
private final int socketConnectionSetupTimeoutMs;
30+
31+
private final int socketConnectionSetupTimeoutMaxMs;
32+
33+
private RedpandaConfig(String bootstrapServers, long bufferMemory, String compressionType, int retries,
34+
int batchSize, Optional<Integer> topicNumPartitions, Optional<Short> topicReplicationFactor,
35+
int socketConnectionSetupTimeoutMs, int socketConnectionSetupTimeoutMaxMs) {
36+
this.bootstrapServers = bootstrapServers;
37+
this.bufferMemory = bufferMemory;
38+
this.compressionType = compressionType;
39+
this.retries = retries;
40+
this.batchSize = batchSize;
41+
this.topicNumPartitions = topicNumPartitions;
42+
this.topicReplicationFactor = topicReplicationFactor;
43+
this.socketConnectionSetupTimeoutMs = socketConnectionSetupTimeoutMs;
44+
this.socketConnectionSetupTimeoutMaxMs = socketConnectionSetupTimeoutMaxMs;
45+
}
46+
47+
public static RedpandaConfig createConfig(JsonNode jsonConfig) {
48+
return new RedpandaConfig(
49+
jsonConfig.get("bootstrap_servers").asText(),
50+
jsonConfig.get("buffer_memory").asLong(33554432L),
51+
jsonConfig.get("compression_type").asText("none"),
52+
jsonConfig.get("retries").asInt(5),
53+
jsonConfig.get("batch_size").asInt(16384),
54+
Optional.of(jsonConfig.get("topic_num_partitions").asInt(1)),
55+
Optional.of(((Integer) jsonConfig.get("topic_replication_factor").asInt(1)).shortValue()),
56+
jsonConfig.get("socket_connection_setup_timeout_ms").asInt(10000),
57+
jsonConfig.get("socket_connection_setup_timeout_max_ms").asInt(30000)
58+
);
59+
}
60+
61+
public KafkaProducer<String, JsonNode> createKafkaProducer() {
62+
return new KafkaProducer<>(Map.of(
63+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
64+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer",
65+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonSerializer",
66+
ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory,
67+
ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType,
68+
ProducerConfig.RETRIES_CONFIG, retries,
69+
ProducerConfig.BATCH_SIZE_CONFIG, batchSize,
70+
ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, socketConnectionSetupTimeoutMs,
71+
ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, socketConnectionSetupTimeoutMaxMs
72+
));
73+
74+
}
75+
76+
public Admin createAdminClient() {
77+
return AdminClient.create(Map.of(
78+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
79+
AdminClientConfig.RETRIES_CONFIG, retries,
80+
AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, socketConnectionSetupTimeoutMs,
81+
AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, socketConnectionSetupTimeoutMaxMs
82+
));
83+
}
84+
85+
public Optional<Integer> topicNumPartitions() {
86+
return topicNumPartitions;
87+
}
88+
89+
public Optional<Short> topicReplicationFactor() {
90+
return topicReplicationFactor;
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.redpanda;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.airbyte.commons.json.Jsons;
9+
import io.airbyte.integrations.BaseConnector;
10+
import io.airbyte.integrations.base.AirbyteMessageConsumer;
11+
import io.airbyte.integrations.base.Destination;
12+
import io.airbyte.integrations.base.IntegrationRunner;
13+
import io.airbyte.protocol.models.AirbyteConnectionStatus;
14+
import io.airbyte.protocol.models.AirbyteMessage;
15+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
16+
import java.util.List;
17+
import java.util.Optional;
18+
import java.util.UUID;
19+
import java.util.function.Consumer;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
public class RedpandaDestination extends BaseConnector implements Destination {
24+
25+
private static final Logger LOGGER = LoggerFactory.getLogger(RedpandaDestination.class);
26+
27+
public static void main(String[] args) throws Exception {
28+
new IntegrationRunner(new RedpandaDestination()).run(args);
29+
}
30+
31+
@Override
32+
public AirbyteConnectionStatus check(JsonNode config) {
33+
String topicName = "namespace.stream";
34+
RedpandaOperations redpandaOperations = null;
35+
try {
36+
RedpandaConfig redpandaConfig = RedpandaConfig.createConfig(config);
37+
redpandaOperations = new RedpandaOperations(redpandaConfig);
38+
redpandaOperations.createTopic(
39+
List.of(new RedpandaOperations.TopicInfo(topicName, Optional.empty(), Optional.empty())));
40+
redpandaOperations.putRecordBlocking(topicName, UUID.randomUUID().toString(), Jsons.emptyObject());
41+
redpandaOperations.flush();
42+
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
43+
} catch (Exception e) {
44+
LOGGER.error("Error while trying to connect to Redpanda: ", e);
45+
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED);
46+
} finally {
47+
if (redpandaOperations != null) {
48+
try {
49+
redpandaOperations.deleteTopic(List.of(topicName));
50+
} catch (Exception e) {
51+
LOGGER.error("Error while deleting Redpanda topic: ", e);
52+
}
53+
redpandaOperations.close();
54+
}
55+
}
56+
}
57+
58+
@Override
59+
public AirbyteMessageConsumer getConsumer(JsonNode config,
60+
ConfiguredAirbyteCatalog configuredCatalog,
61+
Consumer<AirbyteMessage> outputRecordCollector) {
62+
RedpandaConfig redpandaConfig = RedpandaConfig.createConfig(config);
63+
return new RedpandaMessageConsumer(configuredCatalog, new RedpandaOperations(redpandaConfig), redpandaConfig,
64+
outputRecordCollector);
65+
}
66+
67+
}

0 commit comments

Comments
 (0)