Skip to content

Commit 1e31474

Browse files
authored
scaffold for catalog diff, needs fixing on type handling and tests (#13786)
1 parent f483396 commit 1e31474

14 files changed

+556
-21
lines changed

airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java

+147-17
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,25 @@
88
import com.google.common.annotations.VisibleForTesting;
99
import com.google.common.collect.ImmutableMap;
1010
import com.google.common.collect.Lists;
11+
import com.google.common.collect.Sets;
12+
import io.airbyte.commons.json.JsonSchemas;
13+
import io.airbyte.commons.json.JsonSchemas.FieldNameOrList;
1114
import io.airbyte.commons.json.Jsons;
15+
import io.airbyte.commons.util.MoreIterators;
16+
import io.airbyte.commons.util.MoreLists;
17+
import io.airbyte.protocol.models.transform_models.FieldTransform;
18+
import io.airbyte.protocol.models.transform_models.StreamTransform;
19+
import io.airbyte.protocol.models.transform_models.UpdateFieldTransform;
20+
import io.airbyte.protocol.models.transform_models.UpdateStreamTransform;
1221
import java.util.ArrayList;
1322
import java.util.Arrays;
1423
import java.util.HashSet;
15-
import java.util.Iterator;
1624
import java.util.List;
1725
import java.util.Map;
26+
import java.util.Optional;
1827
import java.util.Set;
1928
import java.util.stream.Collectors;
29+
import org.apache.commons.lang3.tuple.Pair;
2030

2131
/**
2232
* Helper class for Catalog and Stream related operations. Generally only used in tests.
@@ -124,31 +134,151 @@ public static Set<String> getTopLevelFieldNames(final ConfiguredAirbyteStream st
124134
}
125135

126136
/**
127-
* @param node any json node
137+
* @param jsonSchema - a JSONSchema node
128138
* @return a set of all keys for all objects within the node
129139
*/
130140
@VisibleForTesting
131-
protected static Set<String> getAllFieldNames(final JsonNode node) {
132-
final Set<String> allFieldNames = new HashSet<>();
133-
134-
if (node.has("properties")) {
135-
final JsonNode properties = node.get("properties");
136-
final Iterator<String> fieldNames = properties.fieldNames();
137-
while (fieldNames.hasNext()) {
138-
final String fieldName = fieldNames.next();
139-
allFieldNames.add(fieldName);
140-
final JsonNode fieldValue = properties.get(fieldName);
141-
if (fieldValue.isObject()) {
142-
allFieldNames.addAll(getAllFieldNames(fieldValue));
143-
}
141+
protected static Set<String> getAllFieldNames(final JsonNode jsonSchema) {
142+
return getFullyQualifiedFieldNamesWithTypes(jsonSchema)
143+
.stream()
144+
.map(Pair::getLeft)
145+
// only need field name, not fully qualified name
146+
.map(MoreLists::last)
147+
.flatMap(Optional::stream)
148+
.collect(Collectors.toSet());
149+
}
150+
151+
/**
152+
* Extracts all fields and their schemas from a JSONSchema. This method returns values in
153+
* depth-first search preorder. It short circuits at oneOfs--in other words, child fields of a oneOf
154+
* are not returned.
155+
*
156+
* @param jsonSchema - a JSONSchema node
157+
* @return a list of all keys for all objects within the node. ordered in depth-first search
158+
* preorder.
159+
*/
160+
@VisibleForTesting
161+
protected static List<Pair<List<String>, JsonNode>> getFullyQualifiedFieldNamesWithTypes(final JsonNode jsonSchema) {
162+
// if this were ever a performance issue, it could be replaced with a trie. this seems unlikely
163+
// however.
164+
final Set<List<String>> fieldNamesThatAreOneOfs = new HashSet<>();
165+
166+
return JsonSchemas.traverseJsonSchemaWithCollector(jsonSchema, (node, basicPath) -> {
167+
final List<String> fieldName = basicPath.stream().filter(fieldOrList -> !fieldOrList.isList()).map(FieldNameOrList::getFieldName).toList();
168+
return Pair.of(fieldName, node);
169+
})
170+
.stream()
171+
// first node is the original object.
172+
.skip(1)
173+
.filter(fieldWithSchema -> filterChildrenOfFoneOneOf(fieldWithSchema.getLeft(), fieldWithSchema.getRight(), fieldNamesThatAreOneOfs))
174+
.toList();
175+
}
176+
177+
/**
178+
* Predicate that checks if a field is a CHILD of a oneOf field. If child of a oneOf, returns false.
179+
* Otherwise, true. This method as side effects. It assumes that it will be run in order on field
180+
* names returned in depth-first search preoorder. As it encounters oneOfs it adds them to a
181+
* collection. It then checks if subsequent field names are prefix matches to the field that are
182+
* oneOfs.
183+
*
184+
* @param fieldName - field to investigate
185+
* @param schema - schema of field
186+
* @param oneOfFieldNameAccumulator - collection of fields that are oneOfs
187+
* @return If child of a oneOf, returns false. Otherwise, true.
188+
*/
189+
private static boolean filterChildrenOfFoneOneOf(final List<String> fieldName,
190+
final JsonNode schema,
191+
final Set<List<String>> oneOfFieldNameAccumulator) {
192+
if (isOneOfField(schema)) {
193+
oneOfFieldNameAccumulator.add(fieldName);
194+
// return early because we know it is a oneOf and therefore cannot be a child of a oneOf.
195+
return true;
196+
}
197+
198+
// leverage that nodes are returned in depth-first search preorder. this means the parent field for
199+
// the oneOf will be present in the list BEFORE any of its children.
200+
for (final List<String> oneOfFieldName : oneOfFieldNameAccumulator) {
201+
final String oneOfFieldNameString = String.join(".", oneOfFieldName);
202+
final String fieldNameString = String.join(".", fieldName);
203+
204+
if (fieldNameString.startsWith(oneOfFieldNameString)) {
205+
return false;
144206
}
145207
}
208+
return true;
209+
}
146210

147-
return allFieldNames;
211+
private static boolean isOneOfField(final JsonNode schema) {
212+
return !MoreIterators.toSet(schema.fieldNames()).contains("type");
148213
}
149214

150-
private static boolean isObjectWithSubFields(Field field) {
215+
private static boolean isObjectWithSubFields(final Field field) {
151216
return field.getType() == JsonSchemaType.OBJECT && field.getSubFields() != null && !field.getSubFields().isEmpty();
152217
}
153218

219+
public static StreamDescriptor extractStreamDescriptor(final AirbyteStream airbyteStream) {
220+
return new StreamDescriptor().withName(airbyteStream.getName()).withNamespace(airbyteStream.getNamespace());
221+
}
222+
223+
private static Map<StreamDescriptor, AirbyteStream> streamDescriptorToMap(final AirbyteCatalog catalog) {
224+
return catalog.getStreams()
225+
.stream()
226+
.collect(Collectors.toMap(CatalogHelpers::extractStreamDescriptor, s -> s));
227+
}
228+
229+
/**
230+
* Returns difference between two provided catalogs.
231+
*
232+
* @param oldCatalog - old catalog
233+
* @param newCatalog - new catalog
234+
* @return difference between old and new catalogs
235+
*/
236+
public static Set<StreamTransform> getCatalogDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
237+
final Set<StreamTransform> streamTransforms = new HashSet<>();
238+
239+
final Map<StreamDescriptor, AirbyteStream> descriptorToStreamOld = streamDescriptorToMap(oldCatalog);
240+
final Map<StreamDescriptor, AirbyteStream> descriptorToStreamNew = streamDescriptorToMap(newCatalog);
241+
242+
Sets.difference(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet())
243+
.forEach(descriptor -> streamTransforms.add(StreamTransform.createRemoveStreamTransform(descriptor)));
244+
Sets.difference(descriptorToStreamNew.keySet(), descriptorToStreamOld.keySet())
245+
.forEach(descriptor -> streamTransforms.add(StreamTransform.createAddStreamTransform(descriptor)));
246+
Sets.intersection(descriptorToStreamOld.keySet(), descriptorToStreamNew.keySet())
247+
.forEach(descriptor -> {
248+
final AirbyteStream streamOld = descriptorToStreamOld.get(descriptor);
249+
final AirbyteStream streamNew = descriptorToStreamNew.get(descriptor);
250+
if (!streamOld.equals(streamNew)) {
251+
streamTransforms.add(StreamTransform.createUpdateStreamTransform(getStreamDiff(descriptor, streamOld, streamNew)));
252+
}
253+
});
254+
255+
return streamTransforms;
256+
}
257+
258+
private static UpdateStreamTransform getStreamDiff(final StreamDescriptor descriptor,
259+
final AirbyteStream streamOld,
260+
final AirbyteStream streamNew) {
261+
final Set<FieldTransform> fieldTransforms = new HashSet<>();
262+
final Map<List<String>, JsonNode> fieldNameToTypeOld = getFullyQualifiedFieldNamesWithTypes(streamOld.getJsonSchema())
263+
.stream()
264+
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
265+
final Map<List<String>, JsonNode> fieldNameToTypeNew = getFullyQualifiedFieldNamesWithTypes(streamNew.getJsonSchema())
266+
.stream()
267+
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
268+
269+
Sets.difference(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet())
270+
.forEach(fieldName -> fieldTransforms.add(FieldTransform.createRemoveFieldTransform(fieldName, fieldNameToTypeOld.get(fieldName))));
271+
Sets.difference(fieldNameToTypeNew.keySet(), fieldNameToTypeOld.keySet())
272+
.forEach(fieldName -> fieldTransforms.add(FieldTransform.createAddFieldTransform(fieldName, fieldNameToTypeNew.get(fieldName))));
273+
Sets.intersection(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet()).forEach(fieldName -> {
274+
final JsonNode oldType = fieldNameToTypeOld.get(fieldName);
275+
final JsonNode newType = fieldNameToTypeNew.get(fieldName);
276+
277+
if (!oldType.equals(newType)) {
278+
fieldTransforms.add(FieldTransform.createUpdateFieldTransform(new UpdateFieldTransform(fieldName, oldType, newType)));
279+
}
280+
});
281+
return new UpdateStreamTransform(descriptor, fieldTransforms);
282+
}
283+
154284
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.protocol.models.transform_models;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import lombok.AllArgsConstructor;
11+
import lombok.EqualsAndHashCode;
12+
import lombok.ToString;
13+
14+
/**
15+
* Represents the addition of a field to an {@link io.airbyte.protocol.models.AirbyteStream}.
16+
*/
17+
@AllArgsConstructor
18+
@EqualsAndHashCode
19+
@ToString
20+
public class AddFieldTransform {
21+
22+
private final List<String> fieldName;
23+
private final JsonNode schema;
24+
25+
public List<String> getFieldName() {
26+
return new ArrayList<>(fieldName);
27+
}
28+
29+
public JsonNode getSchema() {
30+
return schema;
31+
}
32+
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.protocol.models.transform_models;
6+
7+
import io.airbyte.protocol.models.StreamDescriptor;
8+
import lombok.AllArgsConstructor;
9+
import lombok.EqualsAndHashCode;
10+
import lombok.ToString;
11+
12+
/**
13+
* Represents the addition of an {@link io.airbyte.protocol.models.AirbyteStream} to a
14+
* {@link io.airbyte.protocol.models.AirbyteCatalog}.
15+
*/
16+
@AllArgsConstructor
17+
@EqualsAndHashCode
18+
@ToString
19+
public class AddStreamTransform {
20+
21+
private final StreamDescriptor streamDescriptor;
22+
23+
public StreamDescriptor getStreamDescriptor() {
24+
return streamDescriptor;
25+
}
26+
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.protocol.models.transform_models;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import java.util.List;
9+
import lombok.AllArgsConstructor;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.ToString;
12+
13+
/**
14+
* Represents the diff between two fields.
15+
*/
16+
@AllArgsConstructor
17+
@EqualsAndHashCode
18+
@ToString
19+
public final class FieldTransform {
20+
21+
private final FieldTransformType transformType;
22+
private final AddFieldTransform addFieldTransform;
23+
private final RemoveFieldTransform removeFieldTransform;
24+
private final UpdateFieldTransform updateFieldTransform;
25+
26+
public static FieldTransform createAddFieldTransform(final List<String> fieldName, final JsonNode schema) {
27+
return createAddFieldTransform(new AddFieldTransform(fieldName, schema));
28+
}
29+
30+
public static FieldTransform createAddFieldTransform(final AddFieldTransform addFieldTransform) {
31+
return new FieldTransform(FieldTransformType.ADD_FIELD, addFieldTransform, null, null);
32+
}
33+
34+
public static FieldTransform createRemoveFieldTransform(final List<String> fieldName, final JsonNode schema) {
35+
return createRemoveFieldTransform(new RemoveFieldTransform(fieldName, schema));
36+
}
37+
38+
public static FieldTransform createRemoveFieldTransform(final RemoveFieldTransform removeFieldTransform) {
39+
return new FieldTransform(FieldTransformType.REMOVE_FIELD, null, removeFieldTransform, null);
40+
}
41+
42+
public static FieldTransform createUpdateFieldTransform(final UpdateFieldTransform updateFieldTransform) {
43+
return new FieldTransform(FieldTransformType.UPDATE_FIELD, null, null, updateFieldTransform);
44+
}
45+
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.protocol.models.transform_models;
6+
7+
/**
8+
* Types of transformations possible for a field.
9+
*/
10+
public enum FieldTransformType {
11+
ADD_FIELD,
12+
REMOVE_FIELD,
13+
UPDATE_FIELD
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.protocol.models.transform_models;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import lombok.AllArgsConstructor;
11+
import lombok.EqualsAndHashCode;
12+
import lombok.ToString;
13+
14+
/**
15+
* Represents the removal of a field to an {@link io.airbyte.protocol.models.AirbyteStream}.
16+
*/
17+
@AllArgsConstructor
18+
@EqualsAndHashCode
19+
@ToString
20+
public class RemoveFieldTransform {
21+
22+
private final List<String> fieldName;
23+
private final JsonNode schema;
24+
25+
public List<String> getFieldName() {
26+
return new ArrayList<>(fieldName);
27+
}
28+
29+
public JsonNode getSchema() {
30+
return schema;
31+
}
32+
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.protocol.models.transform_models;
6+
7+
import io.airbyte.protocol.models.StreamDescriptor;
8+
import lombok.AllArgsConstructor;
9+
import lombok.EqualsAndHashCode;
10+
import lombok.ToString;
11+
12+
/**
13+
* Represents the removal of an {@link io.airbyte.protocol.models.AirbyteStream} to a
14+
* {@link io.airbyte.protocol.models.AirbyteCatalog}.
15+
*/
16+
@AllArgsConstructor
17+
@EqualsAndHashCode
18+
@ToString
19+
public class RemoveStreamTransform {
20+
21+
private final StreamDescriptor streamDescriptor;
22+
23+
}

0 commit comments

Comments
 (0)