Skip to content

Commit 36698ce

Browse files
xiaohansongmidavadimetsybaevalafanecheregrishick
authored
Discover worker starts to use API to write schema result (#21875)
* api changes for writing discover catalog * api changes * format * worker change 1 * change return type of the API to return catalogId * worker to call api * typo * 🎉 Source GoogleSheets - migrated SAT to strictness level (#21399) * migrated SAT to strictness level * fixed expected records * revert file from another source * changed extension to txt * changed extension to txt * 🐛Destination-Bigquery: Added an explicit error message if sync fails due to a config issue (#21144) * [19998] Destination-Bigquery: Added an explicit error message in sync fails due to a config issue * ci-connector-ops: split workflows(#21474) * CI: nightly build alpha sources and destinations (#21562) * Revert "Change main class in strict-encrypt destination and bump versions on both destinations to keep them in sync (#21509)" (#21567) This reverts commit 1d202d1. * Fixes webhook updating logic (#21519) * ci_credentials: disable tooling test run by tox (#21580) * disable tox * rename steps * revert changes on experimental workflow * do not install tox * Revert "CI: nightly build alpha sources and destinations (#21562)" (#21589) This reverts commit 61f88f3. * Security update of default docker images (#21407) Because there is a lot of CVEs in those releases. Co-authored-by: Topher Lubaway <[email protected]> * 📝 add docs for how to add normalization (#21563) * add docs * add schema link * update based on feedback * 🪟 🚦 E2E tests: clean up matchers (#20887) * improve serviceTypeDropdownOption selector * add test ids to PathPopout component(s) * add unique id's to table dropdowns * extend submitButtonClick to support optional click options * update dropdown(pathPopout) matchers * add test-id to Overlay component * remove redundant function brackets * revert changes onSubmit button click * fix dropDown overlay issue * move all duplicated intercepters to beforeEach * add test id's to Connections, Sources and Destinations tables * add table helper functions * update source page actions * intercepter fixes * update createTestConnection function with optional replication settings * remove extra Connection name check * replace "cypress-postgres" with "pg-promise" npm package * update cypress config * Revert "update createTestConnection function with optional replication settings" This reverts commit 8e47c78. * Revert "remove extra Connection name check" This reverts commit dfb19c7. * replace openSourceDestinationFromGrid with specific selector * replace openSourceDestinationFromGrid with specific selector * turn on test * add test-id's * fix selectors * update test * update test snapshots * fix lost data-testid after resolve merge conflicts * remove extra check * move clickOnCellInTable helper to common.ts file * remove empty line and comments * fix dropdownType * replace partial string check with exact * extract interceptors and waiters to separate file * fix selector for predefined PK * fix selector * add comment regarding dropdown * 🪟 🎨 [Free connectors] Update modal copy (#21600) * move start/end time options out of optional block (#21541) * lingering fix * reflecting api changes * test fix * worker to call api to do discover work * recovered deleted html * self review * more converters refactor * fix connector test * fix test * fix * fix integration test * add unit test for converter * static fix * api client needs to have a timeout in case request does not get responded --------- Co-authored-by: midavadim <[email protected]> Co-authored-by: Eugene <[email protected]> Co-authored-by: Augustin <[email protected]> Co-authored-by: Greg Solovyev <[email protected]> Co-authored-by: Yatsuk Bogdan <[email protected]> Co-authored-by: Hervé Commowick <[email protected]> Co-authored-by: Topher Lubaway <[email protected]> Co-authored-by: Pedro S. Lopez <[email protected]> Co-authored-by: Vladimir <[email protected]> Co-authored-by: Joey Marshment-Howell <[email protected]> Co-authored-by: Lake Mossman <[email protected]>
1 parent 0cef7b0 commit 36698ce

File tree

9 files changed

+225
-44
lines changed

9 files changed

+225
-44
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.net.http.HttpClient;
2828
import java.net.http.HttpClient.Version;
2929
import java.security.interfaces.RSAPrivateKey;
30+
import java.time.Duration;
3031
import java.util.Date;
3132
import java.util.concurrent.TimeUnit;
3233
import lombok.extern.slf4j.Slf4j;
@@ -53,6 +54,8 @@ public ApiClient apiClient(
5354
.setPort(parsePort(airbyteApiHost))
5455
.setBasePath("/api")
5556
.setHttpClientBuilder(HttpClient.newBuilder().version(Version.HTTP_1_1))
57+
.setConnectTimeout(Duration.ofSeconds(30))
58+
.setReadTimeout(Duration.ofSeconds(30))
5659
.setRequestInterceptor(builder -> {
5760
builder.setHeader("User-Agent", "WorkerApp");
5861
// internalApiAuthToken is in BeanProvider because we want to create a new token each

airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111

1212
import com.fasterxml.jackson.databind.JsonNode;
1313
import datadog.trace.api.Trace;
14+
import io.airbyte.api.client.AirbyteApiClient;
15+
import io.airbyte.api.client.model.generated.DiscoverCatalogResult;
16+
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaWriteRequestBody;
1417
import io.airbyte.commons.io.LineGobbler;
1518
import io.airbyte.commons.json.Jsons;
1619
import io.airbyte.config.ConnectorJobOutput;
1720
import io.airbyte.config.ConnectorJobOutput.OutputType;
1821
import io.airbyte.config.FailureReason;
1922
import io.airbyte.config.StandardDiscoverCatalogInput;
20-
import io.airbyte.config.persistence.ConfigRepository;
2123
import io.airbyte.metrics.lib.ApmTraceUtils;
2224
import io.airbyte.protocol.models.AirbyteCatalog;
2325
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
@@ -26,6 +28,7 @@
2628
import io.airbyte.workers.WorkerConstants;
2729
import io.airbyte.workers.WorkerUtils;
2830
import io.airbyte.workers.exception.WorkerException;
31+
import io.airbyte.workers.helper.CatalogClientConverters;
2932
import io.airbyte.workers.helper.ConnectorConfigUpdater;
3033
import io.airbyte.workers.internal.AirbyteStreamFactory;
3134
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
@@ -43,29 +46,28 @@
4346
public class DefaultDiscoverCatalogWorker implements DiscoverCatalogWorker {
4447

4548
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDiscoverCatalogWorker.class);
46-
47-
private final ConfigRepository configRepository;
49+
private static final String WRITE_DISCOVER_CATALOG_LOGS_TAG = "call to write discover schema result";
4850

4951
private final IntegrationLauncher integrationLauncher;
5052
private final AirbyteStreamFactory streamFactory;
5153
private final ConnectorConfigUpdater connectorConfigUpdater;
52-
54+
private final AirbyteApiClient airbyteApiClient;
5355
private volatile Process process;
5456

55-
public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
57+
public DefaultDiscoverCatalogWorker(final AirbyteApiClient airbyteApiClient,
5658
final IntegrationLauncher integrationLauncher,
5759
final ConnectorConfigUpdater connectorConfigUpdater,
5860
final AirbyteStreamFactory streamFactory) {
59-
this.configRepository = configRepository;
61+
this.airbyteApiClient = airbyteApiClient;
6062
this.integrationLauncher = integrationLauncher;
6163
this.streamFactory = streamFactory;
6264
this.connectorConfigUpdater = connectorConfigUpdater;
6365
}
6466

65-
public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
67+
public DefaultDiscoverCatalogWorker(final AirbyteApiClient airbyteApiClient,
6668
final IntegrationLauncher integrationLauncher,
6769
final ConnectorConfigUpdater connectorConfigUpdater) {
68-
this(configRepository, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
70+
this(airbyteApiClient, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
6971
}
7072

7173
@Trace(operationName = WORKER_OPERATION_NAME)
@@ -108,14 +110,11 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
108110
}
109111

110112
if (catalog.isPresent()) {
111-
final UUID catalogId =
112-
configRepository.writeActorCatalogFetchEvent(catalog.get(),
113-
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
114-
// it, so we check again here.
115-
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()),
116-
discoverSchemaInput.getConnectorVersion(),
117-
discoverSchemaInput.getConfigHash());
118-
jobOutput.setDiscoverCatalogId(catalogId);
113+
final DiscoverCatalogResult result =
114+
AirbyteApiClient.retryWithJitter(() -> airbyteApiClient.getSourceApi()
115+
.writeDiscoverCatalogResult(buildSourceDiscoverSchemaWriteRequestBody(discoverSchemaInput, catalog.get())),
116+
WRITE_DISCOVER_CATALOG_LOGS_TAG);
117+
jobOutput.setDiscoverCatalogId(result.getCatalogId());
119118
} else if (failureReason.isEmpty()) {
120119
WorkerUtils.throwWorkerException("Integration failed to output a catalog struct and did not output a failure reason", process);
121120
}
@@ -129,6 +128,19 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
129128
}
130129
}
131130

131+
private SourceDiscoverSchemaWriteRequestBody buildSourceDiscoverSchemaWriteRequestBody(final StandardDiscoverCatalogInput discoverSchemaInput,
132+
final AirbyteCatalog catalog) {
133+
return new SourceDiscoverSchemaWriteRequestBody().catalog(
134+
CatalogClientConverters.toAirbyteCatalogClientApi(catalog)).sourceId(
135+
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
136+
// it, so we check again here.
137+
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()))
138+
.connectorVersion(
139+
discoverSchemaInput.getConnectorVersion())
140+
.configurationHash(
141+
discoverSchemaInput.getConfigHash());
142+
}
143+
132144
private Map<String, Object> generateTraceTags(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) {
133145
final Map<String, Object> tags = new HashMap<>();
134146

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.helper;
6+
7+
import io.airbyte.commons.enums.Enums;
8+
import io.airbyte.commons.text.Names;
9+
import io.airbyte.protocol.models.AirbyteStream;
10+
import java.util.stream.Collectors;
11+
12+
/**
13+
* Utilities to convert Catalog protocol to Catalog API client. This class was similar to existing
14+
* logic in CatalogConverter.java; But code can't be shared because the protocol model is
15+
* essentially converted to two different api models. Thus, if we need to change logic on either
16+
* place we have to take care of the other one too.
17+
*/
18+
public class CatalogClientConverters {
19+
20+
/**
21+
* Converts a protocol AirbyteCatalog to an OpenAPI client versioned AirbyteCatalog.
22+
*/
23+
public static io.airbyte.api.client.model.generated.AirbyteCatalog toAirbyteCatalogClientApi(
24+
final io.airbyte.protocol.models.AirbyteCatalog catalog) {
25+
return new io.airbyte.api.client.model.generated.AirbyteCatalog()
26+
.streams(catalog.getStreams()
27+
.stream()
28+
.map(stream -> toAirbyteStreamClientApi(stream))
29+
.map(s -> new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration()
30+
.stream(s)
31+
.config(generateDefaultConfiguration(s)))
32+
.collect(Collectors.toList()));
33+
}
34+
35+
private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration generateDefaultConfiguration(
36+
final io.airbyte.api.client.model.generated.AirbyteStream stream) {
37+
final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration result =
38+
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration()
39+
.aliasName(Names.toAlphanumericAndUnderscore(stream.getName()))
40+
.cursorField(stream.getDefaultCursorField())
41+
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND)
42+
.primaryKey(stream.getSourceDefinedPrimaryKey())
43+
.selected(true);
44+
if (stream.getSupportedSyncModes().size() > 0) {
45+
result.setSyncMode(Enums.convertTo(stream.getSupportedSyncModes().get(0),
46+
io.airbyte.api.client.model.generated.SyncMode.class));
47+
} else {
48+
result.setSyncMode(io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL);
49+
}
50+
return result;
51+
}
52+
53+
private static io.airbyte.api.client.model.generated.AirbyteStream toAirbyteStreamClientApi(
54+
final AirbyteStream stream) {
55+
return new io.airbyte.api.client.model.generated.AirbyteStream()
56+
.name(stream.getName())
57+
.jsonSchema(stream.getJsonSchema())
58+
.supportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(),
59+
io.airbyte.api.client.model.generated.SyncMode.class))
60+
.sourceDefinedCursor(stream.getSourceDefinedCursor())
61+
.defaultCursorField(stream.getDefaultCursorField())
62+
.sourceDefinedPrimaryKey(stream.getSourceDefinedPrimaryKey())
63+
.namespace(stream.getNamespace());
64+
}
65+
66+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.helper;
6+
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
9+
import com.google.common.collect.Lists;
10+
import io.airbyte.commons.text.Names;
11+
import io.airbyte.protocol.models.AirbyteCatalog;
12+
import io.airbyte.protocol.models.AirbyteStream;
13+
import io.airbyte.protocol.models.CatalogHelpers;
14+
import io.airbyte.protocol.models.Field;
15+
import io.airbyte.protocol.models.JsonSchemaType;
16+
import io.airbyte.protocol.models.SyncMode;
17+
import java.util.Collections;
18+
import java.util.List;
19+
import org.junit.jupiter.api.Test;
20+
21+
class CatalogClientConvertersTest {
22+
23+
public static final String ID_FIELD_NAME = "id";
24+
private static final String STREAM_NAME = "users-data";
25+
private static final AirbyteStream STREAM = new AirbyteStream()
26+
.withName(STREAM_NAME)
27+
.withJsonSchema(
28+
CatalogHelpers.fieldsToJsonSchema(Field.of(ID_FIELD_NAME, JsonSchemaType.STRING)))
29+
.withDefaultCursorField(Lists.newArrayList(ID_FIELD_NAME))
30+
.withSourceDefinedCursor(false)
31+
.withSourceDefinedPrimaryKey(Collections.emptyList())
32+
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL));
33+
34+
private static final io.airbyte.api.client.model.generated.AirbyteStream CLIENT_STREAM =
35+
new io.airbyte.api.client.model.generated.AirbyteStream()
36+
.name(STREAM_NAME)
37+
.jsonSchema(CatalogHelpers.fieldsToJsonSchema(Field.of(ID_FIELD_NAME, JsonSchemaType.STRING)))
38+
.defaultCursorField(Lists.newArrayList(ID_FIELD_NAME))
39+
.sourceDefinedCursor(false)
40+
.sourceDefinedPrimaryKey(Collections.emptyList())
41+
.supportedSyncModes(List.of(io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH,
42+
io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL));
43+
private static final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration CLIENT_DEFAULT_STREAM_CONFIGURATION =
44+
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration()
45+
.syncMode(io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH)
46+
.cursorField(Lists.newArrayList(ID_FIELD_NAME))
47+
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND)
48+
.primaryKey(Collections.emptyList())
49+
.aliasName(Names.toAlphanumericAndUnderscore(STREAM_NAME))
50+
.selected(true);
51+
52+
private static final AirbyteCatalog BASIC_MODEL_CATALOG = new AirbyteCatalog().withStreams(
53+
Lists.newArrayList(STREAM));
54+
55+
private static final io.airbyte.api.client.model.generated.AirbyteCatalog EXPECTED_CLIENT_CATALOG =
56+
new io.airbyte.api.client.model.generated.AirbyteCatalog()
57+
.streams(Lists.newArrayList(
58+
new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration()
59+
.stream(CLIENT_STREAM)
60+
.config(CLIENT_DEFAULT_STREAM_CONFIGURATION)));
61+
62+
@Test
63+
void testConvertToClientAPI() {
64+
assertEquals(EXPECTED_CLIENT_CATALOG,
65+
CatalogClientConverters.toAirbyteCatalogClientApi(BASIC_MODEL_CATALOG));
66+
}
67+
68+
}

airbyte-integrations/bases/standard-source-test/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.jsoup.Jsoup;
1313

1414
dependencies {
1515
implementation project(':airbyte-db:db-lib')
16+
implementation project(':airbyte-api')
1617
implementation project(':airbyte-commons-worker')
1718
implementation project(':airbyte-config:config-models')
1819
implementation project(':airbyte-config:config-persistence')

airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66

77
import static org.junit.jupiter.api.Assertions.assertFalse;
88
import static org.junit.jupiter.api.Assertions.assertNotNull;
9-
import static org.mockito.ArgumentMatchers.any;
109
import static org.mockito.Mockito.mock;
1110
import static org.mockito.Mockito.verify;
11+
import static org.mockito.Mockito.when;
1212

1313
import com.fasterxml.jackson.databind.JsonNode;
14+
import io.airbyte.api.client.AirbyteApiClient;
15+
import io.airbyte.api.client.generated.SourceApi;
16+
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaWriteRequestBody;
1417
import io.airbyte.commons.features.EnvVariableFeatureFlags;
1518
import io.airbyte.commons.json.Jsons;
1619
import io.airbyte.config.EnvConfigs;
@@ -21,7 +24,6 @@
2124
import io.airbyte.config.StandardDiscoverCatalogInput;
2225
import io.airbyte.config.State;
2326
import io.airbyte.config.WorkerSourceConfig;
24-
import io.airbyte.config.persistence.ConfigRepository;
2527
import io.airbyte.protocol.models.v0.AirbyteCatalog;
2628
import io.airbyte.protocol.models.v0.AirbyteMessage;
2729
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
@@ -112,7 +114,10 @@ public abstract class AbstractSourceConnectorTest {
112114

113115
private WorkerConfigs workerConfigs;
114116

115-
private ConfigRepository mConfigRepository;
117+
private AirbyteApiClient mAirbyteApiClient;
118+
119+
private SourceApi mSourceApi;
120+
116121
private ConnectorConfigUpdater mConnectorConfigUpdater;
117122

118123
// This has to be using the protocol version of the platform in order to capture the arg
@@ -123,6 +128,9 @@ protected AirbyteCatalog getLastPersistedCatalog() {
123128
return convertProtocolObject(lastPersistedCatalog.getValue(), AirbyteCatalog.class);
124129
}
125130

131+
private final ArgumentCaptor<SourceDiscoverSchemaWriteRequestBody> discoverWriteRequest =
132+
ArgumentCaptor.forClass(SourceDiscoverSchemaWriteRequestBody.class);
133+
126134
@BeforeEach
127135
public void setUpInternal() throws Exception {
128136
final Path testDir = Path.of("/tmp/airbyte_tests/");
@@ -133,7 +141,9 @@ public void setUpInternal() throws Exception {
133141
environment = new TestDestinationEnv(localRoot);
134142
setupEnvironment(environment);
135143
workerConfigs = new WorkerConfigs(new EnvConfigs());
136-
mConfigRepository = mock(ConfigRepository.class);
144+
mAirbyteApiClient = mock(AirbyteApiClient.class);
145+
mSourceApi = mock(SourceApi.class);
146+
when(mAirbyteApiClient.getSourceApi()).thenReturn(mSourceApi);
137147
mConnectorConfigUpdater = mock(ConnectorConfigUpdater.class);
138148
processFactory = new DockerProcessFactory(
139149
workerConfigs,
@@ -182,13 +192,13 @@ protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exce
182192

183193
protected UUID runDiscover() throws Exception {
184194
final UUID toReturn = new DefaultDiscoverCatalogWorker(
185-
mConfigRepository,
195+
mAirbyteApiClient,
186196
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), null, false,
187197
new EnvVariableFeatureFlags()),
188198
mConnectorConfigUpdater)
189199
.run(new StandardDiscoverCatalogInput().withSourceId(SOURCE_ID.toString()).withConnectionConfiguration(getConfig()), jobRoot)
190200
.getDiscoverCatalogId();
191-
verify(mConfigRepository).writeActorCatalogFetchEvent(lastPersistedCatalog.capture(), any(), any(), any());
201+
verify(mSourceApi).writeDiscoverCatalogResult(discoverWriteRequest.capture());
192202
return toReturn;
193203
}
194204

airbyte-workers/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ dependencies {
9595

9696
testImplementation project(':airbyte-commons-docker')
9797
testImplementation project(':airbyte-test-utils')
98+
testImplementation project(':airbyte-api')
9899

99100
integrationTestJavaImplementation project(':airbyte-workers')
100101
integrationTestJavaImplementation libs.bundles.micronaut.test

airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private CheckedSupplier<Worker<StandardDiscoverCatalogInput, ConnectorJobOutput>
137137
Optional.empty());
138138
final ConnectorConfigUpdater connectorConfigUpdater =
139139
new ConnectorConfigUpdater(airbyteApiClient.getSourceApi(), airbyteApiClient.getDestinationApi());
140-
return new DefaultDiscoverCatalogWorker(configRepository, integrationLauncher, connectorConfigUpdater, streamFactory);
140+
return new DefaultDiscoverCatalogWorker(airbyteApiClient, integrationLauncher, connectorConfigUpdater, streamFactory);
141141
};
142142
}
143143

0 commit comments

Comments
 (0)