10
10
import com .google .common .collect .ImmutableMap ;
11
11
import io .airbyte .cdk .integrations .standardtest .source .SourceAcceptanceTest ;
12
12
import io .airbyte .cdk .integrations .standardtest .source .TestDestinationEnv ;
13
+ import io .airbyte .cdk .integrations .util .HostPortResolver ;
13
14
import io .airbyte .commons .jackson .MoreMappers ;
14
15
import io .airbyte .commons .json .Jsons ;
15
16
import io .airbyte .commons .resources .MoreResources ;
17
+ import io .airbyte .commons .string .Strings ;
16
18
import io .airbyte .protocol .models .Field ;
17
19
import io .airbyte .protocol .models .JsonSchemaType ;
18
20
import io .airbyte .protocol .models .v0 .CatalogHelpers ;
22
24
import io .airbyte .protocol .models .v0 .SyncMode ;
23
25
import java .util .Collections ;
24
26
import java .util .HashMap ;
27
+ import java .util .List ;
25
28
import java .util .Map ;
26
29
import java .util .concurrent .ExecutionException ;
27
30
import org .apache .kafka .clients .admin .AdminClient ;
32
35
import org .apache .kafka .clients .producer .ProducerRecord ;
33
36
import org .apache .kafka .common .serialization .StringSerializer ;
34
37
import org .apache .kafka .connect .json .JsonSerializer ;
38
+ import org .junit .jupiter .api .BeforeAll ;
35
39
import org .testcontainers .containers .KafkaContainer ;
36
40
import org .testcontainers .utility .DockerImageName ;
37
41
38
42
public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest {
39
43
40
44
private static final ObjectMapper mapper = MoreMappers .initMapper ();
41
- private static final String TOPIC_NAME = "test.topic" ;
42
45
43
46
private static KafkaContainer KAFKA ;
44
47
48
+ private String topicName ;
49
+
45
50
@ Override
46
51
protected String getImageName () {
47
52
return "airbyte/source-kafka:dev" ;
@@ -53,10 +58,11 @@ protected JsonNode getConfig() {
53
58
final ObjectNode subscriptionConfig = mapper .createObjectNode ();
54
59
protocolConfig .put ("security_protocol" , KafkaProtocol .PLAINTEXT .toString ());
55
60
subscriptionConfig .put ("subscription_type" , "subscribe" );
56
- subscriptionConfig .put ("topic_pattern" , TOPIC_NAME );
61
+ subscriptionConfig .put ("topic_pattern" , topicName );
57
62
63
+ var bootstrapServers = String .format ("PLAINTEXT://%s:%s" , HostPortResolver .resolveHost (KAFKA ), HostPortResolver .resolvePort (KAFKA ));
58
64
return Jsons .jsonNode (ImmutableMap .builder ()
59
- .put ("bootstrap_servers" , KAFKA . getBootstrapServers () )
65
+ .put ("bootstrap_servers" , bootstrapServers )
60
66
.put ("subscription" , subscriptionConfig )
61
67
.put ("client_dns_lookup" , "use_all_dns_ips" )
62
68
.put ("enable_auto_commit" , false )
@@ -67,11 +73,15 @@ protected JsonNode getConfig() {
67
73
.build ());
68
74
}
69
75
70
- @ Override
71
- protected void setupEnvironment ( final TestDestinationEnv environment ) throws Exception {
76
+ @ BeforeAll
77
+ static public void setupContainer () {
72
78
KAFKA = new KafkaContainer (DockerImageName .parse ("confluentinc/cp-kafka:6.2.0" ));
73
79
KAFKA .start ();
80
+ }
74
81
82
+ @ Override
83
+ protected void setupEnvironment (final TestDestinationEnv environment ) throws Exception {
84
+ topicName = Strings .addRandomSuffix ("topic.test" , "_" , 10 );
75
85
createTopic ();
76
86
sendEvent ();
77
87
}
@@ -87,7 +97,7 @@ private void sendEvent() throws ExecutionException, InterruptedException {
87
97
final ObjectNode event = mapper .createObjectNode ();
88
98
event .put ("test" , "value" );
89
99
90
- producer .send (new ProducerRecord <>(TOPIC_NAME , event ), (recordMetadata , exception ) -> {
100
+ producer .send (new ProducerRecord <>(topicName , event ), (recordMetadata , exception ) -> {
91
101
if (exception != null ) {
92
102
throw new RuntimeException ("Cannot send message to Kafka. Error: " + exception .getMessage (), exception );
93
103
}
@@ -96,14 +106,18 @@ private void sendEvent() throws ExecutionException, InterruptedException {
96
106
97
107
private void createTopic () throws Exception {
98
108
try (final var admin = AdminClient .create (Map .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA .getBootstrapServers ()))) {
99
- final NewTopic topic = new NewTopic (TOPIC_NAME , 1 , (short ) 1 );
109
+ final NewTopic topic = new NewTopic (topicName , 1 , (short ) 1 );
100
110
admin .createTopics (Collections .singletonList (topic )).all ().get ();
101
111
}
102
112
}
103
113
104
114
@ Override
105
115
protected void tearDown (final TestDestinationEnv testEnv ) {
106
- KAFKA .close ();
116
+ try (final var admin = AdminClient .create (Map .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA .getBootstrapServers ()))) {
117
+ admin .deleteTopics (List .of (topicName )).all ().get ();
118
+ } catch (Exception e ) {
119
+ throw new RuntimeException (e );
120
+ }
107
121
}
108
122
109
123
@ Override
@@ -114,7 +128,7 @@ protected ConnectorSpecification getSpec() throws Exception {
114
128
@ Override
115
129
protected ConfiguredAirbyteCatalog getConfiguredCatalog () throws Exception {
116
130
final ConfiguredAirbyteStream streams =
117
- CatalogHelpers .createConfiguredAirbyteStream (TOPIC_NAME , null , Field .of ("value" , JsonSchemaType .STRING ));
131
+ CatalogHelpers .createConfiguredAirbyteStream (topicName , null , Field .of ("value" , JsonSchemaType .STRING ));
118
132
streams .setSyncMode (SyncMode .FULL_REFRESH );
119
133
return new ConfiguredAirbyteCatalog ().withStreams (Collections .singletonList (streams ));
120
134
}
0 commit comments