Skip to content

Commit 3d9f9ec

Browse files
authored
Cache schema during discoverSchema (#10820)
* Make SchedulerHandler store schema after fetching it * Add `disable_cache` parameter to discover_schema API * Return cached catalog if it already exists * Address code review comments * Add tests for caching of catalog in SchedulerHandler * Format fixes * Fix Acceptance tests * New code review fixes - Use upper case for global variable - Inline definition and assignment of variable
1 parent a050688 commit 3d9f9ec

File tree

8 files changed

+158
-15
lines changed

8 files changed

+158
-15
lines changed

airbyte-api/src/main/openapi/config.yaml

+10-1
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ paths:
592592
content:
593593
application/json:
594594
schema:
595-
$ref: "#/components/schemas/SourceIdRequestBody"
595+
$ref: "#/components/schemas/SourceDiscoverSchemaRequestBody"
596596
required: true
597597
responses:
598598
"200":
@@ -2298,6 +2298,15 @@ components:
22982298
$ref: "#/components/schemas/WorkspaceId"
22992299
name:
23002300
type: string
2301+
SourceDiscoverSchemaRequestBody:
2302+
type: object
2303+
required:
2304+
- sourceId
2305+
properties:
2306+
sourceId:
2307+
$ref: "#/components/schemas/SourceId"
2308+
disable_cache:
2309+
type: boolean
23012310
SourceUpdate:
23022311
type: object
23032312
required:

airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import io.airbyte.api.model.SourceDefinitionSpecificationRead;
6464
import io.airbyte.api.model.SourceDefinitionUpdate;
6565
import io.airbyte.api.model.SourceDiscoverSchemaRead;
66+
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
6667
import io.airbyte.api.model.SourceIdRequestBody;
6768
import io.airbyte.api.model.SourceOauthConsentRequest;
6869
import io.airbyte.api.model.SourceRead;
@@ -432,8 +433,8 @@ public CheckConnectionRead checkConnectionToSourceForUpdate(final SourceUpdate s
432433
}
433434

434435
@Override
435-
public SourceDiscoverSchemaRead discoverSchemaForSource(final SourceIdRequestBody sourceIdRequestBody) {
436-
return execute(() -> schedulerHandler.discoverSchemaForSourceFromSourceId(sourceIdRequestBody));
436+
public SourceDiscoverSchemaRead discoverSchemaForSource(final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) {
437+
return execute(() -> schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaRequestBody));
437438
}
438439

439440
// DB MIGRATION

airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java

+37-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.google.common.annotations.VisibleForTesting;
9+
import com.google.common.base.Charsets;
910
import com.google.common.collect.Lists;
11+
import com.google.common.hash.HashFunction;
12+
import com.google.common.hash.Hashing;
1013
import io.airbyte.api.model.AdvancedAuth;
1114
import io.airbyte.api.model.AuthSpecification;
1215
import io.airbyte.api.model.CheckConnectionRead;
@@ -19,17 +22,23 @@
1922
import io.airbyte.api.model.DestinationIdRequestBody;
2023
import io.airbyte.api.model.DestinationSyncMode;
2124
import io.airbyte.api.model.DestinationUpdate;
25+
import io.airbyte.api.model.JobConfigType;
2226
import io.airbyte.api.model.JobIdRequestBody;
2327
import io.airbyte.api.model.JobInfoRead;
28+
import io.airbyte.api.model.LogRead;
2429
import io.airbyte.api.model.SourceCoreConfig;
2530
import io.airbyte.api.model.SourceDefinitionIdRequestBody;
2631
import io.airbyte.api.model.SourceDefinitionSpecificationRead;
2732
import io.airbyte.api.model.SourceDiscoverSchemaRead;
33+
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
2834
import io.airbyte.api.model.SourceIdRequestBody;
2935
import io.airbyte.api.model.SourceUpdate;
36+
import io.airbyte.api.model.SynchronousJobRead;
3037
import io.airbyte.commons.docker.DockerUtils;
3138
import io.airbyte.commons.enums.Enums;
3239
import io.airbyte.commons.features.FeatureFlags;
40+
import io.airbyte.commons.json.Jsons;
41+
import io.airbyte.config.ActorCatalog;
3342
import io.airbyte.config.Configs.WorkerEnvironment;
3443
import io.airbyte.config.DestinationConnection;
3544
import io.airbyte.config.JobConfig.ConfigType;
@@ -68,6 +77,7 @@
6877
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest;
6978
import io.temporal.serviceclient.WorkflowServiceStubs;
7079
import java.io.IOException;
80+
import java.util.ArrayList;
7181
import java.util.List;
7282
import java.util.Optional;
7383
import java.util.UUID;
@@ -77,6 +87,7 @@
7787
public class SchedulerHandler {
7888

7989
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class);
90+
private static final HashFunction HASH_FUNCTION = Hashing.md5();
8091

8192
private final ConfigRepository configRepository;
8293
private final SecretsRepositoryWriter secretsRepositoryWriter;
@@ -241,13 +252,35 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationIdForUpdate(
241252
return checkDestinationConnectionFromDestinationCreate(destinationCoreConfig);
242253
}
243254

244-
public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
255+
public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
245256
throws ConfigNotFoundException, IOException, JsonValidationException {
246-
final SourceConnection source = configRepository.getSourceConnection(sourceIdRequestBody.getSourceId());
257+
final SourceConnection source = configRepository.getSourceConnection(discoverSchemaRequestBody.getSourceId());
247258
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(source.getSourceDefinitionId());
248259
final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
249-
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName);
250-
return discoverJobToOutput(response);
260+
261+
final String configHash = HASH_FUNCTION.hashBytes(Jsons.serialize(source.getConfiguration()).getBytes(
262+
Charsets.UTF_8)).toString();
263+
final String connectorVersion = sourceDef.getDockerImageTag();
264+
final Optional<ActorCatalog> currentCatalog =
265+
configRepository.getSourceCatalog(discoverSchemaRequestBody.getSourceId(), configHash, connectorVersion);
266+
final boolean bustActorCatalogCache = discoverSchemaRequestBody.getDisableCache() != null && discoverSchemaRequestBody.getDisableCache();
267+
if (currentCatalog.isEmpty() || bustActorCatalogCache) {
268+
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName);
269+
configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), configHash, connectorVersion);
270+
return discoverJobToOutput(response);
271+
}
272+
final AirbyteCatalog airbyteCatalog = Jsons.object(currentCatalog.get().getCatalog(), AirbyteCatalog.class);
273+
final SynchronousJobRead emptyJob = new SynchronousJobRead()
274+
.configId("NoConfiguration")
275+
.configType(JobConfigType.DISCOVER_SCHEMA)
276+
.id(UUID.randomUUID())
277+
.createdAt(0L)
278+
.endedAt(0L)
279+
.logs(new LogRead().logLines(new ArrayList<>()))
280+
.succeeded(true);
281+
return new SourceDiscoverSchemaRead()
282+
.catalog(CatalogConverter.toApi(airbyteCatalog))
283+
.jobInfo(emptyJob);
251284
}
252285

253286
public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final SourceCoreConfig sourceCreate)

airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.airbyte.api.model.OperationReadList;
3131
import io.airbyte.api.model.OperationUpdate;
3232
import io.airbyte.api.model.SourceDiscoverSchemaRead;
33+
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
3334
import io.airbyte.api.model.SourceIdRequestBody;
3435
import io.airbyte.api.model.SourceRead;
3536
import io.airbyte.api.model.WebBackendConnectionCreate;
@@ -181,8 +182,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
181182
final ConnectionRead connection = connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId());
182183

183184
if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) {
184-
final SourceIdRequestBody sourceId = new SourceIdRequestBody().sourceId(connection.getSourceId());
185-
final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(sourceId);
185+
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId());
186+
final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq);
186187

187188
final AirbyteCatalog original = connection.getSyncCatalog();
188189
final AirbyteCatalog discovered = discoverSchema.getCatalog();

airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java

+89-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
1515
import static org.mockito.Mockito.doReturn;
1616
import static org.mockito.Mockito.mock;
17+
import static org.mockito.Mockito.never;
1718
import static org.mockito.Mockito.spy;
1819
import static org.mockito.Mockito.verify;
1920
import static org.mockito.Mockito.when;
@@ -32,13 +33,15 @@
3233
import io.airbyte.api.model.SourceDefinitionIdRequestBody;
3334
import io.airbyte.api.model.SourceDefinitionSpecificationRead;
3435
import io.airbyte.api.model.SourceDiscoverSchemaRead;
36+
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
3537
import io.airbyte.api.model.SourceIdRequestBody;
3638
import io.airbyte.api.model.SourceUpdate;
3739
import io.airbyte.commons.docker.DockerUtils;
3840
import io.airbyte.commons.enums.Enums;
3941
import io.airbyte.commons.features.FeatureFlags;
4042
import io.airbyte.commons.json.Jsons;
4143
import io.airbyte.commons.lang.Exceptions;
44+
import io.airbyte.config.ActorCatalog;
4245
import io.airbyte.config.ActorDefinitionResourceRequirements;
4346
import io.airbyte.config.Configs.WorkerEnvironment;
4447
import io.airbyte.config.DestinationConnection;
@@ -381,12 +384,14 @@ void testCheckDestinationConnectionFromUpdate() throws IOException, JsonValidati
381384
@Test
382385
void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidationException, ConfigNotFoundException {
383386
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
384-
final SourceIdRequestBody request = new SourceIdRequestBody().sourceId(source.getSourceId());
387+
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId());
385388

386389
final SynchronousResponse<AirbyteCatalog> discoverResponse = (SynchronousResponse<AirbyteCatalog>) jobResponse;
387390
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
388391
when(discoverResponse.isSuccess()).thenReturn(true);
389-
when(discoverResponse.getOutput()).thenReturn(CatalogHelpers.createAirbyteCatalog("shoes", Field.of("sku", JsonSchemaType.STRING)));
392+
final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes",
393+
Field.of("sku", JsonSchemaType.STRING));
394+
when(discoverResponse.getOutput()).thenReturn(airbyteCatalog);
390395
when(discoverResponse.getMetadata()).thenReturn(metadata);
391396
when(metadata.isSucceeded()).thenReturn(true);
392397

@@ -396,6 +401,7 @@ void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidatio
396401
.withDockerImageTag(SOURCE_DOCKER_TAG)
397402
.withSourceDefinitionId(source.getSourceDefinitionId()));
398403
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
404+
when(configRepository.getSourceCatalog(any(), any(), any())).thenReturn(Optional.empty());
399405
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE))
400406
.thenReturn(discoverResponse);
401407

@@ -405,13 +411,93 @@ void testDiscoverSchemaForSourceFromSourceId() throws IOException, JsonValidatio
405411
assertNotNull(actual.getJobInfo());
406412
assertTrue(actual.getJobInfo().getSucceeded());
407413
verify(configRepository).getSourceConnection(source.getSourceId());
414+
verify(configRepository).getSourceCatalog(eq(request.getSourceId()), any(), eq(SOURCE_DOCKER_TAG));
415+
verify(configRepository).writeActorCatalogFetchEvent(eq(airbyteCatalog), eq(source.getSourceId()), any(), eq(SOURCE_DOCKER_TAG));
416+
verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE);
417+
}
418+
419+
@Test
420+
void testDiscoverSchemaForSourceFromSourceIdCachedCatalog() throws IOException, JsonValidationException, ConfigNotFoundException {
421+
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
422+
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId());
423+
424+
final SynchronousResponse<AirbyteCatalog> discoverResponse = (SynchronousResponse<AirbyteCatalog>) jobResponse;
425+
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
426+
when(discoverResponse.isSuccess()).thenReturn(true);
427+
final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes",
428+
Field.of("sku", JsonSchemaType.STRING));
429+
when(discoverResponse.getOutput()).thenReturn(airbyteCatalog);
430+
when(discoverResponse.getMetadata()).thenReturn(metadata);
431+
when(metadata.isSucceeded()).thenReturn(true);
432+
433+
when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
434+
.thenReturn(new StandardSourceDefinition()
435+
.withDockerRepository(SOURCE_DOCKER_REPO)
436+
.withDockerImageTag(SOURCE_DOCKER_TAG)
437+
.withSourceDefinitionId(source.getSourceDefinitionId()));
438+
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
439+
final ActorCatalog actorCatalog = new ActorCatalog()
440+
.withCatalog(Jsons.jsonNode(airbyteCatalog))
441+
.withCatalogHash("")
442+
.withId(UUID.randomUUID());
443+
when(configRepository.getSourceCatalog(any(), any(), any())).thenReturn(Optional.of(actorCatalog));
444+
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE))
445+
.thenReturn(discoverResponse);
446+
447+
final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request);
448+
449+
assertNotNull(actual.getCatalog());
450+
assertNotNull(actual.getJobInfo());
451+
assertTrue(actual.getJobInfo().getSucceeded());
452+
verify(configRepository).getSourceConnection(source.getSourceId());
453+
verify(configRepository).getSourceCatalog(eq(request.getSourceId()), any(), any());
454+
verify(configRepository, never()).writeActorCatalogFetchEvent(any(), any(), any(), any());
455+
verify(synchronousSchedulerClient, never()).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE);
456+
}
457+
458+
@Test
459+
void testDiscoverSchemaForSourceFromSourceIdDisableCache() throws IOException, JsonValidationException, ConfigNotFoundException {
460+
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
461+
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).disableCache(true);
462+
463+
final SynchronousResponse<AirbyteCatalog> discoverResponse = (SynchronousResponse<AirbyteCatalog>) jobResponse;
464+
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
465+
when(discoverResponse.isSuccess()).thenReturn(true);
466+
final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes",
467+
Field.of("sku", JsonSchemaType.STRING));
468+
when(discoverResponse.getOutput()).thenReturn(airbyteCatalog);
469+
when(discoverResponse.getMetadata()).thenReturn(metadata);
470+
when(metadata.isSucceeded()).thenReturn(true);
471+
472+
when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
473+
.thenReturn(new StandardSourceDefinition()
474+
.withDockerRepository(SOURCE_DOCKER_REPO)
475+
.withDockerImageTag(SOURCE_DOCKER_TAG)
476+
.withSourceDefinitionId(source.getSourceDefinitionId()));
477+
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
478+
final ActorCatalog actorCatalog = new ActorCatalog()
479+
.withCatalog(Jsons.jsonNode(airbyteCatalog))
480+
.withCatalogHash("")
481+
.withId(UUID.randomUUID());
482+
when(configRepository.getSourceCatalog(any(), any(), any())).thenReturn(Optional.of(actorCatalog));
483+
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE))
484+
.thenReturn(discoverResponse);
485+
486+
final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request);
487+
488+
assertNotNull(actual.getCatalog());
489+
assertNotNull(actual.getJobInfo());
490+
assertTrue(actual.getJobInfo().getSucceeded());
491+
verify(configRepository).getSourceConnection(source.getSourceId());
492+
verify(configRepository).getSourceCatalog(eq(request.getSourceId()), any(), any());
493+
verify(configRepository).writeActorCatalogFetchEvent(any(), any(), any(), any());
408494
verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE);
409495
}
410496

411497
@Test
412498
void testDiscoverSchemaForSourceFromSourceIdFailed() throws IOException, JsonValidationException, ConfigNotFoundException {
413499
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
414-
final SourceIdRequestBody request = new SourceIdRequestBody().sourceId(source.getSourceId());
500+
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId());
415501

416502
when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
417503
.thenReturn(new StandardSourceDefinition()

airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.airbyte.api.model.OperationUpdate;
4343
import io.airbyte.api.model.ResourceRequirements;
4444
import io.airbyte.api.model.SourceDiscoverSchemaRead;
45+
import io.airbyte.api.model.SourceDiscoverSchemaRequestBody;
4546
import io.airbyte.api.model.SourceIdRequestBody;
4647
import io.airbyte.api.model.SourceRead;
4748
import io.airbyte.api.model.SyncMode;
@@ -196,7 +197,9 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
196197

197198
final AirbyteCatalog modifiedCatalog = ConnectionHelpers.generateBasicApiCatalog();
198199

199-
when(schedulerHandler.discoverSchemaForSourceFromSourceId(sourceIdRequestBody)).thenReturn(
200+
final SourceDiscoverSchemaRequestBody sourceDiscoverSchema = new SourceDiscoverSchemaRequestBody();
201+
sourceDiscoverSchema.setSourceId(connectionRead.getSourceId());
202+
when(schedulerHandler.discoverSchemaForSourceFromSourceId(sourceDiscoverSchema)).thenReturn(
200203
new SourceDiscoverSchemaRead()
201204
.jobInfo(mock(SynchronousJobRead.class))
202205
.catalog(modifiedCatalog));

airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import io.airbyte.api.client.model.SourceDefinitionIdRequestBody;
6666
import io.airbyte.api.client.model.SourceDefinitionRead;
6767
import io.airbyte.api.client.model.SourceDefinitionSpecificationRead;
68+
import io.airbyte.api.client.model.SourceDiscoverSchemaRequestBody;
6869
import io.airbyte.api.client.model.SourceIdRequestBody;
6970
import io.airbyte.api.client.model.SourceRead;
7071
import io.airbyte.api.client.model.SyncMode;
@@ -1145,7 +1146,7 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception
11451146
}
11461147

11471148
private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException {
1148-
return apiClient.getSourceApi().discoverSchemaForSource(new SourceIdRequestBody().sourceId(sourceId)).getCatalog();
1149+
return apiClient.getSourceApi().discoverSchemaForSource(new SourceDiscoverSchemaRequestBody().sourceId(sourceId)).getCatalog();
11491150
}
11501151

11511152
private void assertSourceAndDestinationDbInSync(final boolean withScdTable) throws Exception {

0 commit comments

Comments
 (0)