Skip to content

Commit 39a93de

Browse files
etsybaevandriikorotkov
authored andcommitted
Source MySQL\MsSql\Postgres: added RDS base performance tests (airbytehq#8215)
* Added RDS base performance tests for source-postgres, source-mssql and source-mysql * updated perfomance test with cpu and memory limit Co-authored-by: andriikorotkov <[email protected]>
1 parent 926d7e9 commit 39a93de

File tree

16 files changed

+738
-260
lines changed

16 files changed

+738
-260
lines changed

airbyte-integrations/bases/standard-source-test/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies {
2222
runtimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.4.2'
2323
implementation 'org.junit.platform:junit-platform-launcher:1.7.0'
2424
implementation 'org.junit.jupiter:junit-jupiter-api:5.4.2'
25+
implementation 'org.junit.jupiter:junit-jupiter-params:5.8.1'
2526
}
2627

2728
def getFullPath(String className) {

airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java

+46-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import com.fasterxml.jackson.databind.JsonNode;
1111
import io.airbyte.config.JobGetSpecConfig;
12+
import io.airbyte.config.ResourceRequirements;
1213
import io.airbyte.config.StandardCheckConnectionInput;
1314
import io.airbyte.config.StandardCheckConnectionOutput;
1415
import io.airbyte.config.StandardDiscoverCatalogInput;
@@ -24,6 +25,7 @@
2425
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
2526
import io.airbyte.workers.DefaultGetSpecWorker;
2627
import io.airbyte.workers.WorkerException;
28+
import io.airbyte.workers.WorkerUtils;
2729
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
2830
import io.airbyte.workers.process.DockerProcessFactory;
2931
import io.airbyte.workers.process.ProcessFactory;
@@ -33,18 +35,22 @@
3335
import java.nio.file.Files;
3436
import java.nio.file.Path;
3537
import java.util.ArrayList;
38+
import java.util.HashMap;
3639
import java.util.List;
3740
import java.util.Map;
3841
import java.util.Optional;
3942
import org.junit.jupiter.api.AfterEach;
4043
import org.junit.jupiter.api.BeforeEach;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
4146

4247
/**
4348
* This abstract class contains helpful functionality and boilerplate for testing a source
4449
* connector.
4550
*/
4651
public abstract class AbstractSourceConnectorTest {
4752

53+
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceConnectorTest.class);
4854
private TestDestinationEnv environment;
4955
private Path jobRoot;
5056
protected Path localRoot;
@@ -53,6 +59,11 @@ public abstract class AbstractSourceConnectorTest {
5359
private static final String JOB_ID = String.valueOf(0L);
5460
private static final int JOB_ATTEMPT = 0;
5561

62+
private static final String CPU_REQUEST_FIELD_NAME = "cpuRequest";
63+
private static final String CPU_LIMIT_FIELD_NAME = "cpuLimit";
64+
private static final String MEMORY_REQUEST_FIELD_NAME = "memoryRequest";
65+
private static final String MEMORY_LIMIT_FIELD_NAME = "memoryLimit";
66+
5667
/**
5768
* Name of the docker image that the tests will run against.
5869
*
@@ -169,7 +180,9 @@ protected Map<String, Integer> runReadVerifyNumberOfReceivedMsgs(final Configure
169180
.withState(state == null ? null : new State().withState(state))
170181
.withCatalog(catalog);
171182

172-
final AirbyteSource source = new DefaultAirbyteSource(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory));
183+
final Map<String, String> mapOfResourceRequirementsParams = prepareResourceRequestMapBySystemProperties();
184+
final AirbyteSource source =
185+
prepareAirbyteSource(!mapOfResourceRequirementsParams.isEmpty() ? prepareResourceRequirements(mapOfResourceRequirementsParams) : null);
173186
source.start(sourceConfig, jobRoot);
174187

175188
while (!source.isFinished()) {
@@ -186,4 +199,36 @@ protected Map<String, Integer> runReadVerifyNumberOfReceivedMsgs(final Configure
186199
return mapOfExpectedRecordsCount;
187200
}
188201

202+
protected ResourceRequirements prepareResourceRequirements(Map<String, String> mapOfResourceRequirementsParams) {
203+
return new ResourceRequirements().withCpuRequest(mapOfResourceRequirementsParams.get(CPU_REQUEST_FIELD_NAME))
204+
.withCpuLimit(mapOfResourceRequirementsParams.get(CPU_LIMIT_FIELD_NAME))
205+
.withMemoryRequest(mapOfResourceRequirementsParams.get(MEMORY_REQUEST_FIELD_NAME))
206+
.withMemoryLimit(mapOfResourceRequirementsParams.get(MEMORY_LIMIT_FIELD_NAME));
207+
}
208+
209+
private AirbyteSource prepareAirbyteSource(ResourceRequirements resourceRequirements) {
210+
var integrationLauncher = resourceRequirements == null ? new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory)
211+
: new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, resourceRequirements);
212+
return new DefaultAirbyteSource(integrationLauncher);
213+
}
214+
215+
private static Map<String, String> prepareResourceRequestMapBySystemProperties() {
216+
var cpuLimit = System.getProperty(CPU_LIMIT_FIELD_NAME);
217+
var memoryLimit = System.getProperty(MEMORY_LIMIT_FIELD_NAME);
218+
if (cpuLimit.isBlank() || cpuLimit.isEmpty()) {
219+
cpuLimit = WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS.getCpuLimit();
220+
}
221+
if (memoryLimit.isBlank() || memoryLimit.isEmpty()) {
222+
memoryLimit = WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS.getMemoryLimit();
223+
}
224+
LOGGER.error("cpu limit -->> {}", cpuLimit);
225+
LOGGER.error("memory limit -->> {}", memoryLimit);
226+
Map<String, String> result = new HashMap<>();
227+
result.put(CPU_REQUEST_FIELD_NAME, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS.getCpuRequest());
228+
result.put(CPU_LIMIT_FIELD_NAME, cpuLimit);
229+
result.put(MEMORY_REQUEST_FIELD_NAME, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS.getMemoryRequest());
230+
result.put(MEMORY_LIMIT_FIELD_NAME, memoryLimit);
231+
return result;
232+
}
233+
189234
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.standardtest.source.performancetest;
6+
7+
import io.airbyte.integrations.standardtest.source.AbstractSourceConnectorTest;
8+
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
9+
10+
/**
11+
* This abstract class contains common methods for both steams - Fill Db scripts and Performance
12+
* tests.
13+
*/
14+
public abstract class AbstractSourceBasePerformanceTest extends AbstractSourceConnectorTest {
15+
16+
private static final String TEST_COLUMN_NAME = "test_column";
17+
private static final String TEST_STREAM_NAME_TEMPLATE = "test_%S";
18+
19+
/**
20+
* The column name will be used for a test column in the test tables. Override it if default name is
21+
* not valid for your source.
22+
*
23+
* @return Test column name
24+
*/
25+
protected String getTestColumnName() {
26+
return TEST_COLUMN_NAME;
27+
}
28+
29+
/**
30+
* The stream name template will be used for a test tables. Override it if default name is not valid
31+
* for your source.
32+
*
33+
* @return Test steam name template
34+
*/
35+
protected String getTestStreamNameTemplate() {
36+
return TEST_STREAM_NAME_TEMPLATE;
37+
}
38+
39+
@Override
40+
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
41+
// DO NOTHING. Mandatory to override. DB will be setup as part of each test
42+
}
43+
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.standardtest.source.performancetest;
6+
7+
import io.airbyte.db.Database;
8+
import java.util.StringJoiner;
9+
import java.util.stream.Stream;
10+
import org.junit.jupiter.api.Disabled;
11+
import org.junit.jupiter.params.ParameterizedTest;
12+
import org.junit.jupiter.params.provider.Arguments;
13+
import org.junit.jupiter.params.provider.MethodSource;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
/**
18+
* This abstract class contains common methods for Fill Db scripts.
19+
*/
20+
public abstract class AbstractSourceFillDbWithTestData extends AbstractSourceBasePerformanceTest {
21+
22+
private static final String CREATE_DB_TABLE_TEMPLATE = "CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, %s)";
23+
private static final String INSERT_INTO_DB_TABLE_QUERY_TEMPLATE = "INSERT INTO %s.%s (%s) VALUES %s";
24+
private static final String TEST_DB_FIELD_TYPE = "varchar(10)";
25+
26+
protected static final Logger c = LoggerFactory.getLogger(AbstractSourceFillDbWithTestData.class);
27+
private static final String TEST_VALUE_TEMPLATE_POSTGRES = "\'Value id_placeholder\'";
28+
protected static Stream testArgs;
29+
30+
/**
31+
* Setup the test database. All tables and data described in the registered tests will be put there.
32+
*
33+
* @return configured test database
34+
* @throws Exception - might throw any exception during initialization.
35+
*/
36+
protected abstract Database setupDatabase(String dbName) throws Exception;
37+
38+
/**
39+
* The test added test data to a new DB. 1. Set DB creds in static variables above 2. Set desired
40+
* number for streams, coolumns and records 3. Run the test
41+
*/
42+
@Disabled
43+
@ParameterizedTest
44+
@MethodSource("provideParameters")
45+
public void addTestData(String dbName,
46+
String schemaName,
47+
int numberOfDummyRecords,
48+
int numberOfBatches,
49+
int numberOfColumns,
50+
int numberOfStreams)
51+
throws Exception {
52+
53+
final Database database = setupDatabase(dbName);
54+
55+
database.query(ctx -> {
56+
for (int currentSteamNumber = 0; currentSteamNumber < numberOfStreams; currentSteamNumber++) {
57+
58+
String currentTableName = String.format(getTestStreamNameTemplate(), currentSteamNumber);
59+
60+
ctx.fetch(prepareCreateTableQuery(schemaName, numberOfColumns, currentTableName));
61+
for (int i = 0; i < numberOfBatches; i++) {
62+
String insertQueryTemplate = prepareInsertQueryTemplate(schemaName, i,
63+
numberOfColumns,
64+
numberOfDummyRecords);
65+
ctx.fetch(String.format(insertQueryTemplate, currentTableName));
66+
}
67+
68+
c.info("Finished processing for stream " + currentSteamNumber);
69+
}
70+
return null;
71+
});
72+
73+
database.close();
74+
75+
}
76+
77+
/**
78+
* This is a data provider for fill DB script,, Each argument's group would be ran as a separate
79+
* test. Set the "testArgs" in test class of your DB in @BeforeTest method.
80+
*
81+
* 1st arg - a name of DB that will be used in jdbc connection string. 2nd arg - a schemaName that
82+
* will be ised as a NameSpace in Configured Airbyte Catalog. 3rd arg - a number of expected records
83+
* retrieved in each stream. 4th arg - a number of columns in each stream\table that will be use for
84+
* Airbyte Cataloq configuration 5th arg - a number of streams to read in configured airbyte
85+
* Catalog. Each stream\table in DB should be names like "test_0", "test_1",..., test_n.
86+
*
87+
* Stream.of( Arguments.of("your_db_name", "your_schema_name", 100, 2, 240, 1000) );
88+
*/
89+
private static Stream<Arguments> provideParameters() {
90+
return testArgs;
91+
}
92+
93+
protected String prepareCreateTableQuery(final String dbSchemaName,
94+
final int numberOfColumns,
95+
final String currentTableName) {
96+
97+
StringJoiner sj = new StringJoiner(",");
98+
for (int i = 0; i < numberOfColumns; i++) {
99+
sj.add(String.format(" %s%s %s", getTestColumnName(), i, TEST_DB_FIELD_TYPE));
100+
}
101+
102+
return String.format(CREATE_DB_TABLE_TEMPLATE, dbSchemaName, currentTableName, sj.toString());
103+
}
104+
105+
protected String prepareInsertQueryTemplate(final String dbSchemaName,
106+
final int batchNumber,
107+
final int numberOfColumns,
108+
final int recordsNumber) {
109+
110+
StringJoiner fieldsNames = new StringJoiner(",");
111+
fieldsNames.add("id");
112+
113+
StringJoiner baseInsertQuery = new StringJoiner(",");
114+
baseInsertQuery.add("id_placeholder");
115+
116+
for (int i = 0; i < numberOfColumns; i++) {
117+
fieldsNames.add(getTestColumnName() + i);
118+
baseInsertQuery.add(TEST_VALUE_TEMPLATE_POSTGRES);
119+
}
120+
121+
StringJoiner insertGroupValuesJoiner = new StringJoiner(",");
122+
123+
int batchMessages = batchNumber * 100;
124+
125+
for (int currentRecordNumber = batchMessages;
126+
currentRecordNumber < recordsNumber + batchMessages;
127+
currentRecordNumber++) {
128+
insertGroupValuesJoiner
129+
.add("(" + baseInsertQuery.toString()
130+
.replaceAll("id_placeholder", String.valueOf(currentRecordNumber)) + ")");
131+
}
132+
133+
return String
134+
.format(INSERT_INTO_DB_TABLE_QUERY_TEMPLATE, dbSchemaName, "%s", fieldsNames.toString(),
135+
insertGroupValuesJoiner.toString());
136+
}
137+
138+
}

0 commit comments

Comments
 (0)