Skip to content

Commit eef7413

Browse files
committed
wip
1 parent 1be53b3 commit eef7413

File tree

6 files changed

+14
-24
lines changed

6 files changed

+14
-24
lines changed

airbyte-integrations/connectors/destination-bigquery/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ airbyteJavaConnector {
1111
'gcs-destinations',
1212
'core',
1313
]
14-
useLocalCdk = false
14+
useLocalCdk = true
1515
}
1616

1717
java {

airbyte-integrations/connectors/destination-bigquery/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
8-
dockerImageTag: 2.6.1
8+
dockerImageTag: 2.6.2
99
dockerRepository: airbyte/destination-bigquery
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
1111
githubIssueLabel: destination-bigquery

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryConsumerFactory.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ object BigQueryConsumerFactory {
3333
catalog = catalog,
3434
bufferManager =
3535
BufferManager(
36+
defaultNamespace,
3637
(Runtime.getRuntime().maxMemory() * 0.4).toLong(),
3738
),
38-
defaultNamespace = Optional.of(defaultNamespace),
3939
)
4040
}
4141

@@ -59,9 +59,9 @@ object BigQueryConsumerFactory {
5959
catalog = catalog,
6060
bufferManager =
6161
BufferManager(
62+
defaultNamespace,
6263
(Runtime.getRuntime().maxMemory() * 0.5).toLong(),
6364
),
64-
defaultNamespace = Optional.of(defaultNamespace),
6565
)
6666
}
6767
}

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ import java.io.ByteArrayInputStream
5656
import java.io.IOException
5757
import java.util.*
5858
import java.util.function.Consumer
59-
import org.apache.commons.lang3.StringUtils
6059

6160
private val log = KotlinLogging.logger {}
6261

@@ -190,7 +189,6 @@ class BigQueryDestination : BaseConnector(), Destination {
190189
): SerializedAirbyteMessageConsumer {
191190
val uploadingMethod = getLoadingMethod(config)
192191
val defaultNamespace = getDatasetId(config)
193-
setDefaultStreamNamespace(catalog, defaultNamespace)
194192
val disableTypeDedupe = getDisableTypeDedupFlag(config)
195193
val datasetLocation = getDatasetLocation(config)
196194
val projectId = config[bqConstants.CONFIG_PROJECT_ID].asText()
@@ -222,6 +220,7 @@ class BigQueryDestination : BaseConnector(), Destination {
222220
val parsedCatalog =
223221
parseCatalog(
224222
sqlGenerator,
223+
defaultNamespace,
225224
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
226225
catalog,
227226
)
@@ -309,28 +308,18 @@ class BigQueryDestination : BaseConnector(), Destination {
309308
)
310309
}
311310

312-
private fun setDefaultStreamNamespace(catalog: ConfiguredAirbyteCatalog, namespace: String) {
313-
// Set the default originalNamespace on streams with null originalNamespace. This means we
314-
// don't
315-
// need to repeat this
316-
// logic in the rest of the connector.
317-
// (record messages still need to handle null namespaces though, which currently happens in
318-
// e.g.
319-
// AsyncStreamConsumer#accept)
320-
// This probably should be shared logic amongst destinations eventually.
321-
for (stream in catalog.streams) {
322-
if (StringUtils.isEmpty(stream.stream.namespace)) {
323-
stream.stream.withNamespace(namespace)
324-
}
325-
}
326-
}
327-
328311
private fun parseCatalog(
329312
sqlGenerator: BigQuerySqlGenerator,
313+
defaultNamespace: String,
330314
rawNamespaceOverride: String,
331315
catalog: ConfiguredAirbyteCatalog
332316
): ParsedCatalog {
333-
val catalogParser = CatalogParser(sqlGenerator, rawNamespaceOverride)
317+
val catalogParser =
318+
CatalogParser(
319+
sqlGenerator,
320+
defaultNamespace = defaultNamespace,
321+
rawNamespace = rawNamespaceOverride,
322+
)
334323

335324
return catalogParser.parseCatalog(catalog)
336325
}

airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void testBuildColumnId() {
3535

3636
@Test
3737
void columnCollision() {
38-
final CatalogParser parser = new CatalogParser(generator);
38+
final CatalogParser parser = new CatalogParser(generator, "default_ns");
3939
assertEquals(
4040
new StreamConfig(
4141
new StreamId("bar", "foo", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"),

docs/integrations/destinations/bigquery.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ tutorials:
220220

221221
| Version | Date | Pull Request | Subject |
222222
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
223+
| 2.6.2 | 2024-05-30 | [38331](https://github.com/airbytehq/airbyte/pull/38331) | Internal code changes in preparation for future feature release |
223224
| 2.6.1 | 2024-05-29 | [38770](https://github.com/airbytehq/airbyte/pull/38770) | Internal code change (switch to CDK artifact) |
224225
| 2.6.0 | 2024-05-28 | [38359](https://github.com/airbytehq/airbyte/pull/38359) | Propagate airbyte_meta from sources; add generation_id column |
225226
| 2.5.1 | 2024-05-22 | [38591](https://github.com/airbytehq/airbyte/pull/38591) | Bugfix to include forward-slash when cleaning up stage |

0 commit comments

Comments
 (0)