Skip to content

Commit 7308c7f

Browse files
committed
wip
1 parent e02cc57 commit 7308c7f

File tree

3 files changed

+13
-22
lines changed

3 files changed

+13
-22
lines changed

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsumerFactory.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ object BigQueryConsumerFactory {
3333
catalog = catalog,
3434
bufferManager =
3535
BufferManager(
36+
defaultNamespace,
3637
(Runtime.getRuntime().maxMemory() * 0.4).toLong(),
3738
),
38-
defaultNamespace = Optional.of(defaultNamespace),
39+
defaultNamespace = defaultNamespace,
3940
)
4041
}
4142

@@ -59,9 +60,10 @@ object BigQueryConsumerFactory {
5960
catalog = catalog,
6061
bufferManager =
6162
BufferManager(
63+
defaultNamespace,
6264
(Runtime.getRuntime().maxMemory() * 0.5).toLong(),
6365
),
64-
defaultNamespace = Optional.of(defaultNamespace),
66+
defaultNamespace = defaultNamespace,
6567
)
6668
}
6769
}

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ import java.io.ByteArrayInputStream
5656
import java.io.IOException
5757
import java.util.*
5858
import java.util.function.Consumer
59-
import org.apache.commons.lang3.StringUtils
6059

6160
private val log = KotlinLogging.logger {}
6261

@@ -186,7 +185,6 @@ class BigQueryDestination : BaseConnector(), Destination {
186185
): SerializedAirbyteMessageConsumer {
187186
val uploadingMethod = getLoadingMethod(config)
188187
val defaultNamespace = getDatasetId(config)
189-
setDefaultStreamNamespace(catalog, defaultNamespace)
190188
val disableTypeDedupe = getDisableTypeDedupFlag(config)
191189
val datasetLocation = getDatasetLocation(config)
192190
val projectId = config[bqConstants.CONFIG_PROJECT_ID].asText()
@@ -218,6 +216,7 @@ class BigQueryDestination : BaseConnector(), Destination {
218216
val parsedCatalog =
219217
parseCatalog(
220218
sqlGenerator,
219+
defaultNamespace,
221220
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
222221
catalog,
223222
)
@@ -300,28 +299,18 @@ class BigQueryDestination : BaseConnector(), Destination {
300299
)
301300
}
302301

303-
private fun setDefaultStreamNamespace(catalog: ConfiguredAirbyteCatalog, namespace: String) {
304-
// Set the default originalNamespace on streams with null originalNamespace. This means we
305-
// don't
306-
// need to repeat this
307-
// logic in the rest of the connector.
308-
// (record messages still need to handle null namespaces though, which currently happens in
309-
// e.g.
310-
// AsyncStreamConsumer#accept)
311-
// This probably should be shared logic amongst destinations eventually.
312-
for (stream in catalog.streams) {
313-
if (StringUtils.isEmpty(stream.stream.namespace)) {
314-
stream.stream.withNamespace(namespace)
315-
}
316-
}
317-
}
318-
319302
private fun parseCatalog(
320303
sqlGenerator: BigQuerySqlGenerator,
304+
defaultNamespace: String,
321305
rawNamespaceOverride: String,
322306
catalog: ConfiguredAirbyteCatalog
323307
): ParsedCatalog {
324-
val catalogParser = CatalogParser(sqlGenerator, rawNamespaceOverride)
308+
val catalogParser =
309+
CatalogParser(
310+
sqlGenerator,
311+
defaultNamespace = defaultNamespace,
312+
rawNamespace = rawNamespaceOverride,
313+
)
325314

326315
return catalogParser.parseCatalog(catalog)
327316
}

airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void testBuildColumnId() {
3535

3636
@Test
3737
void columnCollision() {
38-
final CatalogParser parser = new CatalogParser(generator);
38+
final CatalogParser parser = new CatalogParser(generator, "default_ns");
3939
assertEquals(
4040
new StreamConfig(
4141
new StreamId("bar", "foo", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"),

0 commit comments

Comments
 (0)