diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java index 8c96856f571c9..83a977824ae7d 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java @@ -285,7 +285,12 @@ private Map generateSyncMetadata(final UUID connectionId) throws operationUsage.put(OPERATION + operation.getOperatorType(), usageCount + 1); } } - return MoreMaps.merge(TrackingMetadata.generateSyncMetadata(standardSync), operationUsage); + + final Map streamCountData = new HashMap<>(); + final Integer streamCount = standardSync.getCatalog().getStreams().size(); + streamCountData.put("number_of_streams", streamCount); + + return MoreMaps.merge(TrackingMetadata.generateSyncMetadata(standardSync), operationUsage, streamCountData); } private static ImmutableMap generateStateMetadata(final JobState jobState) { diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java index b00582ca14c07..cd680412585fb 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java @@ -39,10 +39,13 @@ import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.DestinationSyncMode; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.SyncMode; import io.airbyte.scheduler.models.Attempt; import io.airbyte.scheduler.models.Job; @@ -112,6 +115,9 @@ class JobTrackerTest { .put("table_prefix", false) .put("operation_count", 0) .build(); + private static final ConfiguredAirbyteCatalog CATALOG = CatalogHelpers + .createConfiguredAirbyteCatalog("stream_name", "stream_namespace", + Field.of("int_field", JsonSchemaType.NUMBER)); private static final ConnectorSpecification SOURCE_SPEC; private static final ConnectorSpecification DESTINATION_SPEC; @@ -269,7 +275,9 @@ void testAsynchronous(final ConfigType configType, final Map add final ImmutableMap metadata = getJobMetadata(configType, jobId); final Job job = getJobMock(configType, jobId); // test when frequency is manual. - when(configRepository.getStandardSync(CONNECTION_ID)).thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true)); + + when(configRepository.getStandardSync(CONNECTION_ID)) + .thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true).withCatalog(CATALOG)); when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)) .thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME)); final Map manualMetadata = MoreMaps.merge( @@ -280,7 +288,7 @@ void testAsynchronous(final ConfigType configType, final Map add // test when frequency is scheduled. when(configRepository.getStandardSync(CONNECTION_ID)) - .thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(false) + .thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(false).withCatalog(CATALOG) .withSchedule(new Schedule().withUnits(1L).withTimeUnit(TimeUnit.MINUTES))); final Map scheduledMetadata = MoreMaps.merge( metadata, @@ -387,7 +395,8 @@ void testAsynchronousAttempt(final ConfigType configType, final Job job, final M final ImmutableMap metadata = getJobMetadata(configType, LONG_JOB_ID); // test when frequency is manual. - when(configRepository.getStandardSync(CONNECTION_ID)).thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true)); + when(configRepository.getStandardSync(CONNECTION_ID)) + .thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true).withCatalog(CATALOG)); when(workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(LONG_JOB_ID)).thenReturn(WORKSPACE_ID); when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)) .thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME)); @@ -556,6 +565,7 @@ private ImmutableMap getJobMetadata(final ConfigType configType, .put("namespace_definition", NamespaceDefinitionType.SOURCE) .put("table_prefix", false) .put("operation_count", 0) + .put("number_of_streams", 1) .build(); }