Skip to content

Commit 7ecb1d3

Browse files
authored
Destination Postgres: improve handling for column name truncation (#36805)
1 parent ed8cebe commit 7ecb1d3

File tree

12 files changed

+308
-63
lines changed

12 files changed

+308
-63
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ Maven and Gradle will automatically reference the correct (pinned) version of th
144144

145145
| Version | Date | Pull Request | Subject |
146146
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
147+
| 0.29.10 | 2024-04-10 | [\#36805](https://github.com/airbytehq/airbyte/pull/36805) | Destinations: Enhance CatalogParser name collision handling; add DV2 tests for long identifiers |
148+
| 0.29.9 | 2024-04-09 | [\#36047](https://github.com/airbytehq/airbyte/pull/36047) | Destinations: CDK updates for raw-only destinations |
147149
| 0.29.8 | 2024-04-08 | [\#36868](https://github.com/airbytehq/airbyte/pull/36868) | Destinations: s3-destinations Compilation fixes for connector |
148150
| 0.29.7 | 2024-04-08 | [\#36768](https://github.com/airbytehq/airbyte/pull/36768) | Destinations: Make destination state fetch/commit logic more resilient to errors |
149151
| 0.29.6 | 2024-04-05 | [\#36577](https://github.com/airbytehq/airbyte/pull/36577) | Do not send system_error trace message for config exceptions. |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.29.9
1+
version=0.29.10

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt

+113-43
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStr
88
import io.airbyte.cdk.integrations.base.JavaBaseConstants
99
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
1010
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
11-
import java.util.*
11+
import java.util.Optional
1212
import java.util.function.Consumer
1313
import org.apache.commons.codec.digest.DigestUtils
1414
import org.slf4j.Logger
@@ -50,31 +50,31 @@ constructor(
5050
// We're taking a hash of the quoted namespace and the unquoted stream name
5151
val hash =
5252
DigestUtils.sha1Hex(
53-
originalStreamConfig.id!!.finalNamespace + "&airbyte&" + originalName
53+
"${originalStreamConfig.id.finalNamespace}&airbyte&$originalName"
5454
)
5555
.substring(0, 3)
56-
val newName = originalName + "_" + hash
56+
val newName = "${originalName}_$hash"
5757
actualStreamConfig =
5858
StreamConfig(
5959
sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace),
6060
originalStreamConfig.syncMode,
6161
originalStreamConfig.destinationSyncMode,
6262
originalStreamConfig.primaryKey,
6363
originalStreamConfig.cursor,
64-
originalStreamConfig.columns
64+
originalStreamConfig.columns,
6565
)
6666
} else {
6767
actualStreamConfig = originalStreamConfig
6868
}
6969
streamConfigs.add(actualStreamConfig)
7070

7171
// Populate some interesting strings into the exception handler string deinterpolator
72-
addStringForDeinterpolation(actualStreamConfig.id!!.rawNamespace)
73-
addStringForDeinterpolation(actualStreamConfig.id!!.rawName)
74-
addStringForDeinterpolation(actualStreamConfig.id!!.finalNamespace)
75-
addStringForDeinterpolation(actualStreamConfig.id!!.finalName)
76-
addStringForDeinterpolation(actualStreamConfig.id!!.originalNamespace)
77-
addStringForDeinterpolation(actualStreamConfig.id!!.originalName)
72+
addStringForDeinterpolation(actualStreamConfig.id.rawNamespace)
73+
addStringForDeinterpolation(actualStreamConfig.id.rawName)
74+
addStringForDeinterpolation(actualStreamConfig.id.finalNamespace)
75+
addStringForDeinterpolation(actualStreamConfig.id.finalName)
76+
addStringForDeinterpolation(actualStreamConfig.id.originalNamespace)
77+
addStringForDeinterpolation(actualStreamConfig.id.originalName)
7878
actualStreamConfig.columns!!
7979
.keys
8080
.forEach(
@@ -101,19 +101,14 @@ constructor(
101101
return ParsedCatalog(streamConfigs)
102102
}
103103

104-
// TODO maybe we should extract the column collision stuff to a separate method, since that's
105-
// the
106-
// interesting bit
107104
@VisibleForTesting
108105
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
109106
val schema: AirbyteType = AirbyteType.Companion.fromJsonSchema(stream.stream.jsonSchema)
110107
val airbyteColumns =
111-
if (schema is Struct) {
112-
schema.properties
113-
} else if (schema is Union) {
114-
schema.asColumns()
115-
} else {
116-
throw IllegalArgumentException("Top-level schema must be an object")
108+
when (schema) {
109+
is Struct -> schema.properties
110+
is Union -> schema.asColumns()
111+
else -> throw IllegalArgumentException("Top-level schema must be an object")
117112
}
118113

119114
require(!stream.primaryKey.stream().anyMatch { key: List<String?> -> key.size > 1 }) {
@@ -126,21 +121,38 @@ constructor(
126121
.toList()
127122

128123
require(stream.cursorField.size <= 1) { "Only top-level cursors are supported" }
129-
val cursor: Optional<ColumnId>
130-
if (stream.cursorField.size > 0) {
131-
cursor = Optional.of(sqlGenerator.buildColumnId(stream.cursorField[0])!!)
132-
} else {
133-
cursor = Optional.empty()
134-
}
124+
val cursor: Optional<ColumnId> =
125+
if (stream.cursorField.isNotEmpty()) {
126+
Optional.of(sqlGenerator.buildColumnId(stream.cursorField[0]))
127+
} else {
128+
Optional.empty()
129+
}
130+
131+
val columns = resolveColumnCollisions(airbyteColumns, stream)
132+
133+
return StreamConfig(
134+
sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace),
135+
stream.syncMode,
136+
stream.destinationSyncMode,
137+
primaryKey,
138+
cursor,
139+
columns
140+
)
141+
}
135142

136-
// this code is really bad and I'm not convinced we need to preserve this behavior.
137-
// as with the tablename collisions thing above - we're trying to preserve legacy
138-
// normalization's
139-
// naming conventions here.
143+
/**
144+
* This code is really bad and I'm not convinced we need to preserve this behavior. As with the
145+
* tablename collisions thing above - we're trying to preserve legacy normalization's naming
146+
* conventions here.
147+
*/
148+
private fun resolveColumnCollisions(
149+
airbyteColumns: LinkedHashMap<String, AirbyteType>,
150+
stream: ConfiguredAirbyteStream
151+
): LinkedHashMap<ColumnId, AirbyteType> {
140152
val columns = LinkedHashMap<ColumnId, AirbyteType>()
141153
for ((key, value) in airbyteColumns) {
142154
val originalColumnId = sqlGenerator.buildColumnId(key)
143-
var columnId: ColumnId?
155+
var columnId: ColumnId
144156
if (
145157
columns.keys.stream().noneMatch { c: ColumnId ->
146158
c.canonicalName == originalColumnId.canonicalName
@@ -154,14 +166,31 @@ constructor(
154166
"Detected column name collision for {}.{}.{}",
155167
stream.stream.namespace,
156168
stream.stream.name,
157-
key
169+
key,
158170
)
159171
// One of the existing columns has the same name. We need to handle this collision.
160172
// Append _1, _2, _3, ... to the column name until we find one that doesn't collide.
161173
var i = 1
162174
while (true) {
163175
columnId = sqlGenerator.buildColumnId(key, "_$i")
164-
val canonicalName = columnId!!.canonicalName
176+
177+
// Verify that we're making progress, e.g. we haven't immediately truncated away
178+
// the suffix.
179+
if (columnId.canonicalName == originalColumnId.canonicalName) {
180+
// If we're not making progress, do a more powerful mutation instead of
181+
// appending numbers.
182+
// Assume that we're being truncated, and that the column ID's name is the
183+
// maximum length.
184+
columnId =
185+
superResolveColumnCollisions(
186+
originalColumnId,
187+
columns,
188+
originalColumnId.name.length
189+
)
190+
break
191+
}
192+
193+
val canonicalName = columnId.canonicalName
165194
if (
166195
columns.keys.stream().noneMatch { c: ColumnId ->
167196
c.canonicalName == canonicalName
@@ -176,23 +205,64 @@ constructor(
176205
// JSON records.
177206
columnId =
178207
ColumnId(
179-
columnId!!.name,
180-
originalColumnId!!.originalName,
181-
columnId.canonicalName
208+
columnId.name,
209+
originalColumnId.originalName,
210+
columnId.canonicalName,
182211
)
183212
}
184213

185214
columns[columnId] = value
186215
}
216+
return columns
217+
}
187218

188-
return StreamConfig(
189-
sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace),
190-
stream.syncMode,
191-
stream.destinationSyncMode,
192-
primaryKey,
193-
cursor,
194-
columns
195-
)
219+
/**
220+
* Generate a name of the format `<prefix><length><suffix>`. E.g. for affixLength=3:
221+
* "veryLongName" -> "ver6ame" This is based on the "i18n"-ish naming convention.
222+
*
223+
* @param columnId The column that we're trying to add
224+
* @param columns The columns that we've already added
225+
*/
226+
private fun superResolveColumnCollisions(
227+
columnId: ColumnId,
228+
columns: LinkedHashMap<ColumnId, AirbyteType>,
229+
maximumColumnNameLength: Int
230+
): ColumnId {
231+
val originalColumnName = columnId.originalName
232+
233+
var newColumnId = columnId
234+
// Assume that the <length> portion can be expressed in at most 5 characters.
235+
// If someone is giving us a column name that's longer than 99999 characters,
236+
// that's just being silly.
237+
val affixLength = (maximumColumnNameLength - 5) / 2
238+
// If, after reserving 5 characters for the length, we can't fit the affixes,
239+
// just give up. That means the destination is trying to restrict us to a
240+
// 6-character column name, which is just silly.
241+
if (affixLength <= 0) {
242+
throw IllegalArgumentException(
243+
"Cannot solve column name collision: ${newColumnId.originalName}. We recommend removing this column to continue syncing."
244+
)
245+
}
246+
val prefix = originalColumnName.substring(0, affixLength)
247+
val suffix =
248+
originalColumnName.substring(
249+
originalColumnName.length - affixLength,
250+
originalColumnName.length
251+
)
252+
val length = originalColumnName.length - 2 * affixLength
253+
newColumnId = sqlGenerator.buildColumnId("$prefix$length$suffix")
254+
// if there's _still_ a collision after this, just give up.
255+
// we could try to be more clever, but this is already a pretty rare case.
256+
if (
257+
columns.keys.stream().anyMatch { c: ColumnId ->
258+
c.canonicalName == newColumnId.canonicalName
259+
}
260+
) {
261+
throw IllegalArgumentException(
262+
"Cannot solve column name collision: ${newColumnId.originalName}. We recommend removing this column to continue syncing."
263+
)
264+
}
265+
return newColumnId
196266
}
197267

198268
companion object {

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt

+69-12
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,31 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
1010
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
1111
import java.util.List
1212
import org.junit.jupiter.api.Assertions
13+
import org.junit.jupiter.api.Assertions.assertAll
1314
import org.junit.jupiter.api.BeforeEach
1415
import org.junit.jupiter.api.Test
1516
import org.mockito.Mockito
1617
import org.mockito.invocation.InvocationOnMock
1718
import org.mockito.kotlin.any
19+
import org.mockito.kotlin.whenever
1820

1921
internal class CatalogParserTest {
2022
private lateinit var sqlGenerator: SqlGenerator
21-
private var parser: CatalogParser? = null
23+
private lateinit var parser: CatalogParser
2224

2325
@BeforeEach
2426
fun setup() {
2527
sqlGenerator = Mockito.mock(SqlGenerator::class.java)
2628
// noop quoting logic
29+
Mockito.`when`(sqlGenerator.buildColumnId(any(), any())).thenAnswer {
30+
invocation: InvocationOnMock ->
31+
val fieldName = invocation.getArgument<String>(0)
32+
val suffix = invocation.getArgument<String>(1)
33+
ColumnId(fieldName + suffix, fieldName + suffix, fieldName + suffix)
34+
}
2735
Mockito.`when`(sqlGenerator.buildColumnId(any())).thenAnswer { invocation: InvocationOnMock
2836
->
29-
val fieldName = invocation.getArgument<String>(0)
30-
ColumnId(fieldName, fieldName, fieldName)
37+
sqlGenerator.buildColumnId(invocation.getArgument<String>(0), "")
3138
}
3239
Mockito.`when`(sqlGenerator.buildStreamId(any(), any(), any())).thenAnswer {
3340
invocation: InvocationOnMock ->
@@ -46,7 +53,7 @@ internal class CatalogParserTest {
4653
*/
4754
@Test
4855
fun finalNameCollision() {
49-
Mockito.`when`(sqlGenerator!!.buildStreamId(any(), any(), any())).thenAnswer {
56+
Mockito.`when`(sqlGenerator.buildStreamId(any(), any(), any())).thenAnswer {
5057
invocation: InvocationOnMock ->
5158
val originalNamespace = invocation.getArgument<String>(0)
5259
val originalName = (invocation.getArgument<String>(1))
@@ -67,11 +74,18 @@ internal class CatalogParserTest {
6774
ConfiguredAirbyteCatalog()
6875
.withStreams(List.of(stream("a", "foobarfoo"), stream("a", "foofoo")))
6976

70-
val parsedCatalog = parser!!.parseCatalog(catalog)
77+
val parsedCatalog = parser.parseCatalog(catalog)
7178

72-
Assertions.assertNotEquals(
73-
parsedCatalog.streams.get(0).id.finalName,
74-
parsedCatalog.streams.get(1).id.finalName
79+
assertAll(
80+
{ Assertions.assertEquals("a_abab_foofoo", parsedCatalog.streams.get(0).id.rawName) },
81+
{ Assertions.assertEquals("foofoo", parsedCatalog.streams.get(0).id.finalName) },
82+
{
83+
Assertions.assertEquals(
84+
"a_abab_foofoo_3fd",
85+
parsedCatalog.streams.get(1).id.rawName
86+
)
87+
},
88+
{ Assertions.assertEquals("foofoo_3fd", parsedCatalog.streams.get(1).id.finalName) }
7589
)
7690
}
7791

@@ -81,9 +95,9 @@ internal class CatalogParserTest {
8195
*/
8296
@Test
8397
fun columnNameCollision() {
84-
Mockito.`when`(sqlGenerator!!.buildColumnId(any(), any())).thenAnswer {
98+
Mockito.`when`(sqlGenerator.buildColumnId(any(), any())).thenAnswer {
8599
invocation: InvocationOnMock ->
86-
val originalName = invocation.getArgument<String>(0)
100+
val originalName = invocation.getArgument<String>(0) + invocation.getArgument<String>(1)
87101
// emulate quoting logic that causes a name collision
88102
val quotedName = originalName.replace("bar".toRegex(), "")
89103
ColumnId(quotedName, originalName, quotedName)
@@ -103,9 +117,52 @@ internal class CatalogParserTest {
103117
)
104118
val catalog = ConfiguredAirbyteCatalog().withStreams(List.of(stream("a", "a", schema)))
105119

106-
val parsedCatalog = parser!!.parseCatalog(catalog)
120+
val parsedCatalog = parser.parseCatalog(catalog)
121+
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
122+
123+
assertAll(
124+
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
125+
{ Assertions.assertEquals("foofoo", columnsList[0].name) },
126+
{ Assertions.assertEquals("foofoo_1", columnsList[1].name) }
127+
)
128+
}
129+
130+
/**
131+
* Test behavior when the sqlgenerator truncates column names. We should end generate new names
132+
* that still avoid collision.
133+
*/
134+
@Test
135+
fun truncatingColumnNameCollision() {
136+
whenever(sqlGenerator.buildColumnId(any(), any())).thenAnswer { invocation: InvocationOnMock
137+
->
138+
val originalName = invocation.getArgument<String>(0) + invocation.getArgument<String>(1)
139+
// truncate to 10 characters
140+
val truncatedName = originalName.substring(0, 10.coerceAtMost(originalName.length))
141+
ColumnId(truncatedName, originalName, truncatedName)
142+
}
143+
val schema =
144+
Jsons.deserialize(
145+
"""
146+
{
147+
"type": "object",
148+
"properties": {
149+
"aVeryLongColumnName": {"type": "string"},
150+
"aVeryLongColumnNameWithMoreTextAfterward": {"type": "string"}
151+
}
152+
}
153+
154+
""".trimIndent()
155+
)
156+
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))
157+
158+
val parsedCatalog = parser.parseCatalog(catalog)
159+
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
107160

108-
Assertions.assertEquals(2, parsedCatalog.streams.get(0).columns!!.size)
161+
assertAll(
162+
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
163+
{ Assertions.assertEquals("aVeryLongC", columnsList[0].name) },
164+
{ Assertions.assertEquals("aV36rd", columnsList[1].name) }
165+
)
109166
}
110167

111168
companion object {

0 commit comments

Comments
 (0)