Skip to content

Commit 4174406

Browse files
committed
chore: hashing mapper populate meta.changes on errors (#13919)
1 parent 06af5db commit 4174406

File tree

6 files changed

+103
-8
lines changed

6 files changed

+103
-8
lines changed

airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/AirbyteRecord.kt

+15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@ import io.airbyte.config.StreamDescriptor
44
import io.airbyte.protocol.models.AirbyteMessage
55

66
interface AirbyteRecord {
7+
enum class Change {
8+
NULLED,
9+
TRUNCATED,
10+
}
11+
12+
enum class Reason {
13+
PLATFORM_SERIALIZATION_ERROR,
14+
}
15+
716
val streamDescriptor: StreamDescriptor
817
val asProtocol: AirbyteMessage
918

@@ -17,6 +26,12 @@ interface AirbyteRecord {
1726
fieldName: String,
1827
value: T,
1928
)
29+
30+
fun trackFieldError(
31+
fieldName: String,
32+
change: Change,
33+
reason: Reason,
34+
)
2035
}
2136

2237
interface Value {

airbyte-config/config-models/src/main/kotlin/io/airbyte/config/adapters/JsonAdapters.kt

+37
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode
88
import com.fasterxml.jackson.databind.node.TextNode
99
import io.airbyte.config.StreamDescriptor
1010
import io.airbyte.protocol.models.AirbyteMessage
11+
import io.airbyte.protocol.models.AirbyteRecordMessageMeta
12+
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange
1113

1214
class JsonValueAdapter(private val node: JsonNode) : Value {
1315
override fun asBoolean(): Boolean = node.asBoolean()
@@ -38,6 +40,30 @@ data class AirbyteJsonRecordAdapter(private val message: AirbyteMessage) : Airby
3840
data.set<JsonNode>(fieldName, createNode(value))
3941
}
4042

43+
override fun trackFieldError(
44+
fieldName: String,
45+
change: AirbyteRecord.Change,
46+
reason: AirbyteRecord.Reason,
47+
) {
48+
val metaChange =
49+
AirbyteRecordMessageMetaChange()
50+
.withChange(change.toProtocol())
51+
.withField(fieldName)
52+
.withReason(reason.toProtocol())
53+
54+
// handling all the cascading layers of potential null objects
55+
// very thread-unsafe
56+
if (message.record != null) {
57+
if (message.record.meta == null) {
58+
message.record.withMeta(AirbyteRecordMessageMeta().withChanges(mutableListOf()))
59+
}
60+
if (message.record.meta.changes == null) {
61+
message.record.meta.withChanges(mutableListOf())
62+
}
63+
message.record.meta.changes.add(metaChange)
64+
}
65+
}
66+
4167
private fun <T : Any> createNode(value: T): JsonNode =
4268
when (value) {
4369
is Boolean -> BooleanNode.valueOf(value)
@@ -46,4 +72,15 @@ data class AirbyteJsonRecordAdapter(private val message: AirbyteMessage) : Airby
4672
is String -> TextNode.valueOf(value)
4773
else -> TODO("Unsupported type ${value::class.java.name}")
4874
}
75+
76+
private fun AirbyteRecord.Change.toProtocol() =
77+
when (this) {
78+
AirbyteRecord.Change.NULLED -> AirbyteRecordMessageMetaChange.Change.NULLED
79+
AirbyteRecord.Change.TRUNCATED -> AirbyteRecordMessageMetaChange.Change.TRUNCATED
80+
}
81+
82+
private fun AirbyteRecord.Reason.toProtocol() =
83+
when (this) {
84+
AirbyteRecord.Reason.PLATFORM_SERIALIZATION_ERROR -> AirbyteRecordMessageMetaChange.Reason.PLATFORM_SERIALIZATION_ERROR
85+
}
4986
}

airbyte-config/config-models/src/test/kotlin/io/airbyte/config/adapters/JsonRecordAdapterTest.kt

+15
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.airbyte.config.adapters
33
import io.airbyte.commons.json.Jsons
44
import io.airbyte.config.StreamDescriptor
55
import io.airbyte.protocol.models.AirbyteMessage
6+
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange
67
import org.junit.jupiter.api.Assertions.assertEquals
78
import org.junit.jupiter.api.Test
89

@@ -49,6 +50,20 @@ class JsonRecordAdapterTest {
4950
assertEquals("4.2", adapter.get(NUMBER_FIELD).asString())
5051
}
5152

53+
@Test
54+
fun `tracking meta changes`() {
55+
val adapter = getAdapterFromRecord(jsonRecordString)
56+
57+
adapter.trackFieldError("test", AirbyteRecord.Change.TRUNCATED, AirbyteRecord.Reason.PLATFORM_SERIALIZATION_ERROR)
58+
59+
val expectedMetaChange =
60+
AirbyteRecordMessageMetaChange()
61+
.withField("test")
62+
.withChange(AirbyteRecordMessageMetaChange.Change.TRUNCATED)
63+
.withReason(AirbyteRecordMessageMetaChange.Reason.PLATFORM_SERIALIZATION_ERROR)
64+
assertEquals(listOf(expectedMetaChange), adapter.asProtocol.record.meta.changes)
65+
}
66+
5267
@Test
5368
fun `verify modify then serialize`() {
5469
val adapter = getAdapterFromRecord(jsonRecordString)

airbyte-mappers/src/main/kotlin/io/airbyte/mappers/transformations/HashingMapper.kt

+12-5
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,20 @@ class HashingMapper : Mapper {
102102
record: AirbyteRecord,
103103
) {
104104
val (targetField, method, fieldNameSuffix) = getConfigValues(config.config)
105+
val outputFieldName = "$targetField$fieldNameSuffix"
105106

106107
if (record.has(targetField)) {
107-
val data = record.get(targetField).asString().toByteArray()
108-
109-
val hashedAndEncodeValue: String = hashAndEncodeData(method, data)
110-
record.set(targetField + fieldNameSuffix, hashedAndEncodeValue)
111-
record.remove(targetField)
108+
try {
109+
val data = record.get(targetField).asString().toByteArray()
110+
111+
val hashedAndEncodeValue: String = hashAndEncodeData(method, data)
112+
record.set(outputFieldName, hashedAndEncodeValue)
113+
} catch (e: Exception) {
114+
// TODO We should use a more precise Reason once available in the protocol
115+
record.trackFieldError(outputFieldName, AirbyteRecord.Change.NULLED, AirbyteRecord.Reason.PLATFORM_SERIALIZATION_ERROR)
116+
} finally {
117+
record.remove(targetField)
118+
}
112119
}
113120
}
114121

airbyte-mappers/src/test/kotlin/io/airbyte/config/adapters/TestRecordAdapter.kt

+14
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,13 @@ class TestValueAdapter(private val value: Any) : Value {
1212
}
1313

1414
class TestRecordAdapter(override val streamDescriptor: StreamDescriptor, data: Map<String, Any>) : AirbyteRecord {
15+
data class Change(val fieldName: String, val change: AirbyteRecord.Change, val reason: AirbyteRecord.Reason)
16+
1517
private val data: MutableMap<String, Any> = data.toMutableMap()
18+
private val _changes: MutableList<Change> = mutableListOf()
19+
20+
val changes: List<Change>
21+
get(): List<Change> = _changes.toList()
1622

1723
override val asProtocol: AirbyteMessage
1824
get() = TODO("Not yet implemented")
@@ -31,4 +37,12 @@ class TestRecordAdapter(override val streamDescriptor: StreamDescriptor, data: M
3137
) {
3238
data[fieldName] = value
3339
}
40+
41+
override fun trackFieldError(
42+
fieldName: String,
43+
change: AirbyteRecord.Change,
44+
reason: AirbyteRecord.Reason,
45+
) {
46+
_changes.add(Change(fieldName, change, reason))
47+
}
3448
}

airbyte-mappers/src/test/kotlin/io/airbyte/mappers/transformations/HashingMapperTest.kt

+10-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.airbyte.config.ConfiguredMapper
44
import io.airbyte.config.Field
55
import io.airbyte.config.FieldType
66
import io.airbyte.config.StreamDescriptor
7+
import io.airbyte.config.adapters.AirbyteRecord
78
import io.airbyte.config.adapters.TestRecordAdapter
89
import io.airbyte.mappers.transformations.HashingMapper.Companion.supportedMethods
910
import io.mockk.spyk
@@ -113,9 +114,15 @@ class HashingMapperTest {
113114
),
114115
)
115116

116-
assertThrows(IllegalArgumentException::class.java) {
117-
hashingMapper.map(config, TestRecordAdapter(StreamDescriptor().withName("any"), mapOf("field1" to "anything")))
118-
}
117+
val record = TestRecordAdapter(StreamDescriptor().withName("any"), mapOf("field1" to "anything"))
118+
hashingMapper.map(config, record)
119+
120+
assertFalse(record.has("field1"))
121+
assertFalse(record.has("field1_hashed"))
122+
assertEquals(
123+
listOf(TestRecordAdapter.Change("field1_hashed", AirbyteRecord.Change.NULLED, AirbyteRecord.Reason.PLATFORM_SERIALIZATION_ERROR)),
124+
record.changes,
125+
)
119126
}
120127

121128
@Test

0 commit comments

Comments
 (0)