Skip to content

Commit 4070642

Browse files
committed
chore: mappers also rename source default cursor and source defined pk. (#14002)
1 parent 559dc25 commit 4070642

File tree

4 files changed

+75
-6
lines changed

4 files changed

+75
-6
lines changed

airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/DestinationCatalogGenerator.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class DestinationCatalogGenerator(
5454
stream.stream.jsonSchema = Jsons.deserialize(jsonSchema)
5555
stream.cursorField = updateFields.cursor
5656
stream.primaryKey = updateFields.primaryKey
57+
stream.stream.sourceDefinedPrimaryKey = updateFields.sourceDefinedPrimaryKey
58+
stream.stream.defaultCursorField = updateFields.sourceDefaultCursor
5759

5860
return errors
5961
}
@@ -79,7 +81,13 @@ class DestinationCatalogGenerator(
7981
}
8082
.fold(
8183
MapperToFieldAccumulator(
82-
SlimStream(stream.fields ?: listOf(), stream.cursorField, stream.primaryKey),
84+
SlimStream(
85+
fields = stream.fields ?: listOf(),
86+
cursor = stream.cursorField,
87+
primaryKey = stream.primaryKey,
88+
sourceDefinedPrimaryKey = stream.stream.sourceDefinedPrimaryKey,
89+
sourceDefaultCursor = stream.stream.defaultCursorField,
90+
),
8391
listOf(),
8492
mapOf(),
8593
),

airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/SlimStream.kt

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,18 @@ import io.airbyte.config.FieldType
66
/**
77
* Subset of Stream configuration that is relevant to mappers.
88
*/
9-
class SlimStream(fields: List<Field>, cursor: List<String>? = null, primaryKey: List<List<String>>? = null) {
9+
class SlimStream(
10+
fields: List<Field>,
11+
cursor: List<String>? = null,
12+
primaryKey: List<List<String>>? = null,
13+
sourceDefaultCursor: List<String>? = null,
14+
sourceDefinedPrimaryKey: List<List<String>>? = null,
15+
) {
1016
private val _fields: MutableList<Field> = fields.toMutableList()
1117
private val _cursor: MutableList<String>? = cursor?.toMutableList()
1218
private val _primaryKey: MutableList<MutableList<String>>? = primaryKey?.map { it.toMutableList() }?.toMutableList()
19+
private val _sourceDefaultCursor: MutableList<String>? = sourceDefaultCursor?.toMutableList()
20+
private val _sourceDefinedPrimaryKey: MutableList<MutableList<String>>? = sourceDefinedPrimaryKey?.map { it.toMutableList() }?.toMutableList()
1321

1422
val fields: List<Field>
1523
get() = _fields.toList()
@@ -20,16 +28,26 @@ class SlimStream(fields: List<Field>, cursor: List<String>? = null, primaryKey:
2028
val primaryKey: List<List<String>>?
2129
get() = _primaryKey?.map { it.toList() }?.toList()
2230

31+
val sourceDefaultCursor: List<String>?
32+
get() = _sourceDefaultCursor?.toList()
33+
34+
val sourceDefinedPrimaryKey: List<List<String>>?
35+
get() = _sourceDefinedPrimaryKey?.map { it.toList() }?.toList()
36+
2337
fun deepCopy(
2438
fields: List<Field>? = null,
2539
cursor: List<String>? = null,
2640
primaryKey: List<List<String>>? = null,
41+
sourceDefaultCursor: List<String>? = null,
42+
sourceDefinedPrimaryKey: List<List<String>>? = null,
2743
): SlimStream =
2844
// This relies on the fact that the constructor is copying into a mutable list
2945
SlimStream(
3046
fields = fields ?: _fields.map { it.copy() },
3147
cursor = cursor ?: _cursor,
3248
primaryKey = primaryKey ?: _primaryKey,
49+
sourceDefaultCursor = sourceDefaultCursor ?: _sourceDefaultCursor,
50+
sourceDefinedPrimaryKey = sourceDefinedPrimaryKey ?: _sourceDefinedPrimaryKey,
3351
)
3452

3553
/**
@@ -63,13 +81,30 @@ class SlimStream(fields: List<Field>, cursor: List<String>? = null, primaryKey:
6381
throw IllegalStateException("Field $oldName not found in stream fields")
6482
}
6583

66-
_cursor?.apply {
84+
_cursor?.apply { renameInSimpleList(this, oldName, newName) }
85+
_primaryKey?.apply { renameInNestedList(this, oldName, newName) }
86+
_sourceDefaultCursor?.apply { renameInSimpleList(this, oldName, newName) }
87+
_sourceDefinedPrimaryKey?.apply { renameInNestedList(this, oldName, newName) }
88+
}
89+
90+
private fun renameInSimpleList(
91+
list: MutableList<String>,
92+
oldName: String,
93+
newName: String,
94+
) {
95+
list.apply {
6796
if (this == listOf(oldName)) {
6897
this[0] = newName
6998
}
7099
}
100+
}
71101

72-
_primaryKey?.apply {
102+
private fun renameInNestedList(
103+
list: MutableList<MutableList<String>>,
104+
oldName: String,
105+
newName: String,
106+
) {
107+
list.apply {
73108
this.replaceAll {
74109
if (it == listOf(oldName)) mutableListOf(newName) else it
75110
}

airbyte-mappers/src/test/kotlin/io/airbyte/mappers/transformations/DestinationCatalogGeneratorTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class DestinationCatalogGeneratorTest {
280280
"additionalProperties": true
281281
},
282282
"default_cursor_field": [
283-
"modified_at"
283+
"modified_at_test"
284284
],
285285
"supported_sync_modes": [
286286
"full_refresh",
@@ -289,7 +289,7 @@ class DestinationCatalogGeneratorTest {
289289
"source_defined_cursor": true,
290290
"source_defined_primary_key": [
291291
[
292-
"id"
292+
"id_test"
293293
]
294294
]
295295
},

airbyte-mappers/src/test/kotlin/io/airbyte/mappers/transformations/SlimStreamTest.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@ class SlimStreamTest {
2323
Field(CURSOR_NAME, FieldType.INTEGER),
2424
Field(PRIMARY_KEY_NAME, FieldType.STRING),
2525
Field(PRIMARY_KEY_OTHER_ATTR_NAME, FieldType.STRING),
26+
Field(SOURCE_CURSOR_NAME, FieldType.STRING),
27+
Field(SOURCE_PRIMARY_KEY_NAME, FieldType.STRING),
2628
),
2729
cursor = listOf(CURSOR_NAME),
2830
primaryKey = listOf(listOf(PRIMARY_KEY_NAME), listOf(PRIMARY_KEY_OTHER_ATTR_NAME)),
31+
sourceDefaultCursor = listOf(SOURCE_CURSOR_NAME),
32+
sourceDefinedPrimaryKey = listOf(listOf(SOURCE_PRIMARY_KEY_NAME), listOf(PRIMARY_KEY_OTHER_ATTR_NAME)),
2933
)
3034
}
3135

@@ -83,11 +87,33 @@ class SlimStreamTest {
8387
assertEquals(listOf(listOf(renamedPrimaryKey), listOf(PRIMARY_KEY_OTHER_ATTR_NAME)), slimStream.primaryKey)
8488
}
8589

90+
@Test
91+
fun `renaming a field also updates source default cursor if relevant`() {
92+
val renamedSourceFieldKey = "newSourceField"
93+
slimStream.redefineField(SOURCE_CURSOR_NAME, renamedSourceFieldKey)
94+
95+
assertTrue(slimStream.fields.contains(Field(renamedSourceFieldKey, FieldType.STRING)))
96+
assertFalse(slimStream.fields.any { it.name == SOURCE_CURSOR_NAME })
97+
assertEquals(listOf(renamedSourceFieldKey), slimStream.sourceDefaultCursor)
98+
}
99+
100+
@Test
101+
fun `renaming a field also updates source defined PK if relevant`() {
102+
val renamedSourceFieldKey = "newSourceField"
103+
slimStream.redefineField(SOURCE_PRIMARY_KEY_NAME, renamedSourceFieldKey)
104+
105+
assertTrue(slimStream.fields.contains(Field(renamedSourceFieldKey, FieldType.STRING)))
106+
assertFalse(slimStream.fields.any { it.name == SOURCE_PRIMARY_KEY_NAME })
107+
assertEquals(listOf(listOf(renamedSourceFieldKey), listOf(PRIMARY_KEY_OTHER_ATTR_NAME)), slimStream.sourceDefinedPrimaryKey)
108+
}
109+
86110
companion object {
87111
const val FIELD1_NAME = "field1"
88112
const val FIELD2_NAME = "field2"
89113
const val CURSOR_NAME = "cursor_field"
90114
const val PRIMARY_KEY_NAME = "primary_key"
91115
const val PRIMARY_KEY_OTHER_ATTR_NAME = "primary_key_other"
116+
const val SOURCE_CURSOR_NAME = "source_cursor_field"
117+
const val SOURCE_PRIMARY_KEY_NAME = "source_primary_key"
92118
}
93119
}

0 commit comments

Comments
 (0)