Skip to content

Commit 82961ad

Browse files
authored
Add support for binary _id field in source-mongodb (#38103)
1 parent 3525225 commit 82961ad

File tree

8 files changed

+183
-65
lines changed

8 files changed

+183
-65
lines changed

airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ data:
88
connectorSubtype: database
99
connectorType: source
1010
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
11-
dockerImageTag: 1.3.12
11+
dockerImageTag: 1.3.13
1212
dockerRepository: airbyte/source-mongodb-v2
1313
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
1414
githubIssueLabel: source-mongodb-v2

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbInitialLoadRecordIterator.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package io.airbyte.integrations.source.mongodb;
66

7+
import static io.airbyte.integrations.source.mongodb.state.IdType.idToStringRepresenation;
8+
import static io.airbyte.integrations.source.mongodb.state.IdType.parseBinaryIdString;
79
import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.IN_PROGRESS;
810

911
import com.google.common.collect.AbstractIterator;
@@ -16,12 +18,7 @@
1618
import io.airbyte.integrations.source.mongodb.state.IdType;
1719
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;
1820
import java.util.Optional;
19-
import org.bson.BsonDocument;
20-
import org.bson.BsonInt32;
21-
import org.bson.BsonInt64;
22-
import org.bson.BsonObjectId;
23-
import org.bson.BsonString;
24-
import org.bson.Document;
21+
import org.bson.*;
2522
import org.bson.conversions.Bson;
2623
import org.bson.types.ObjectId;
2724
import org.slf4j.Logger;
@@ -85,7 +82,8 @@ protected Document computeNext() {
8582
private Optional<MongoDbStreamState> getCurrentState(Object currentId) {
8683
final var idType = IdType.findByJavaType(currentId.getClass().getSimpleName())
8784
.orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + currentId.getClass().getSimpleName()));
88-
final var state = new MongoDbStreamState(currentId.toString(),
85+
86+
final var state = new MongoDbStreamState(idToStringRepresenation(currentId, idType),
8987
IN_PROGRESS,
9088
idType);
9189
return Optional.of(state);
@@ -130,6 +128,7 @@ private Bson buildFilter() {
130128
case OBJECT_ID -> new BsonObjectId(new ObjectId(state.id()));
131129
case INT -> new BsonInt32(Integer.parseInt(state.id()));
132130
case LONG -> new BsonInt64(Long.parseLong(state.id()));
131+
case BINARY -> parseBinaryIdString(state.id());
133132
}))
134133
// if nothing was found, return a new BsonDocument
135134
.orElseGet(BsonDocument::new);

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/IdType.java

+54-18
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,27 @@
44

55
package io.airbyte.integrations.source.mongodb.state;
66

7-
import java.util.Arrays;
8-
import java.util.HashMap;
9-
import java.util.Map;
10-
import java.util.Optional;
11-
import java.util.function.Function;
7+
import static java.util.Base64.getEncoder;
8+
9+
import java.util.*;
1210
import java.util.stream.Collectors;
13-
import org.bson.types.ObjectId;
11+
import org.bson.BsonBinary;
12+
import org.bson.BsonBinarySubType;
13+
import org.bson.UuidRepresentation;
14+
import org.bson.internal.UuidHelper;
15+
import org.bson.types.Binary;
1416

1517
/**
1618
* _id field types that are currently supported, potential types are defined <a href=
1719
* "https://www.mongodb.com/docs/manual/reference/operator/query/type/#std-label-document-type-available-types">here</a>
1820
*/
1921
public enum IdType {
2022

21-
OBJECT_ID("objectId", "ObjectId", ObjectId::new),
22-
STRING("string", "String", s -> s),
23-
INT("int", "Integer", Integer::valueOf),
24-
LONG("long", "Long", Long::valueOf);
23+
OBJECT_ID("objectId", "ObjectId"),
24+
STRING("string", "String"),
25+
INT("int", "Integer"),
26+
LONG("long", "Long"),
27+
BINARY("binData", "Binary");
2528

2629
private static final Map<String, IdType> byBsonType = new HashMap<>();
2730
static {
@@ -49,17 +52,10 @@ public enum IdType {
4952
private final String bsonType;
5053
/** Java class name type */
5154
private final String javaType;
52-
/** Converter for converting a string value into an appropriate MongoDb type. */
53-
private final Function<String, Object> converter;
5455

55-
IdType(final String bsonType, final String javaType, final Function<String, Object> converter) {
56+
IdType(final String bsonType, final String javaType) {
5657
this.bsonType = bsonType;
5758
this.javaType = javaType;
58-
this.converter = converter;
59-
}
60-
61-
public Object convert(final String t) {
62-
return converter.apply(t);
6359
}
6460

6561
public static Optional<IdType> findByBsonType(final String bsonType) {
@@ -76,4 +72,44 @@ public static Optional<IdType> findByJavaType(final String javaType) {
7672
return Optional.ofNullable(byJavaType.get(javaType.toLowerCase()));
7773
}
7874

75+
/**
76+
* Convers a collection id to a string representation for use in a saved state. Most types will be
77+
* converted to a string, except for Binary types which will be converted to a Base64 encoded
78+
* string. and UUIDs which will be converted to a human-readable string.
79+
*
80+
* @param id an _id field value
81+
* @param idType the type of the _id field
82+
* @return a string representation of the _id field
83+
*/
84+
public static String idToStringRepresenation(final Object id, final IdType idType) {
85+
final String strId;
86+
if (idType == IdType.BINARY) {
87+
final var binLastId = (Binary) id;
88+
if (binLastId.getType() == BsonBinarySubType.UUID_STANDARD.getValue()) {
89+
strId = UuidHelper.decodeBinaryToUuid(binLastId.getData(), binLastId.getType(), UuidRepresentation.STANDARD).toString();
90+
} else {
91+
strId = getEncoder().encodeToString(binLastId.getData());
92+
}
93+
} else {
94+
strId = id.toString();
95+
}
96+
97+
return strId;
98+
}
99+
100+
/**
101+
* Parse a string representation of a binary _id field into a BsonBinary object. The string can be a
102+
* UUID or a Base64 encoded string.
103+
*
104+
* @param id a string representation of an _id field
105+
* @return a BsonBinary object
106+
*/
107+
public static BsonBinary parseBinaryIdString(final String id) {
108+
try {
109+
return new BsonBinary(UUID.fromString(id));
110+
} catch (final IllegalArgumentException ex) {
111+
return new BsonBinary(Base64.getDecoder().decode(id));
112+
}
113+
}
114+
79115
}

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/MongoDbStateManager.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.source.mongodb.state;
66

7+
import static io.airbyte.integrations.source.mongodb.state.IdType.idToStringRepresenation;
78
import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.FULL_REFRESH;
89
import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.IN_PROGRESS;
910
import static io.airbyte.protocol.models.v0.SyncMode.INCREMENTAL;
@@ -23,11 +24,7 @@
2324
import io.airbyte.protocol.models.v0.*;
2425
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
2526
import java.time.Instant;
26-
import java.util.HashMap;
27-
import java.util.List;
28-
import java.util.Map;
29-
import java.util.Objects;
30-
import java.util.Optional;
27+
import java.util.*;
3128
import java.util.stream.Collectors;
3229
import org.bson.Document;
3330
import org.slf4j.Logger;
@@ -302,7 +299,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
302299
final var finalStateStatus = InitialSnapshotStatus.COMPLETE;
303300
final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName())
304301
.orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName()));
305-
final var state = new MongoDbStreamState(lastId.toString(), finalStateStatus, idType);
302+
final var state = new MongoDbStreamState(idToStringRepresenation(lastId, idType), finalStateStatus, idType);
306303

307304
updateStreamState(stream.getStream().getName(), stream.getStream().getNamespace(), state);
308305
}

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/state/MongoDbStreamState.java

+1-13
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,4 @@
44

55
package io.airbyte.integrations.source.mongodb.state;
66

7-
public record MongoDbStreamState(String id, InitialSnapshotStatus status, IdType idType) {
8-
9-
/**
10-
* Takes a value converting it to the appropriate MongoDb type based on the IdType of this record.
11-
*
12-
* @param value the value to convert
13-
* @return a converted value.
14-
*/
15-
public Object idTypeAsMongoDbType(final String value) {
16-
return idType.convert(value);
17-
}
18-
19-
}
7+
public record MongoDbStreamState(String id, InitialSnapshotStatus status, IdType idType) {}

0 commit comments

Comments
 (0)