Skip to content

Commit 394b8c9

Browse files
authored
Bmoric/refacto secret processor (#11362)
Follow up on the comments on #11296 It is reactoring the JsonProcessor by making it being generated through a factory. This allow to avoid propagated the feature flag inside the JsonProcessor.
1 parent bf66d32 commit 394b8c9

File tree

19 files changed

+310
-188
lines changed

19 files changed

+310
-188
lines changed

airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,12 @@ public BootloaderApp() {
8686
final Database configDatabase =
8787
new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl())
8888
.getAndInitialize();
89-
final JsonSecretsProcessor jsonSecretsProcessor = new JsonSecretsProcessor();
89+
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
90+
.maskSecrets(!featureFlags.exposeSecretsInExport())
91+
.copySecrets(true)
92+
.build();
9093
final ConfigPersistence configPersistence =
91-
DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor, featureFlags);
94+
DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
9295
configPersistence.loadData(YamlSeedConfigPersistence.getDefault());
9396
LOGGER.info("Loaded seed data..");
9497
} catch (final IOException e) {
@@ -115,8 +118,11 @@ public void load() throws Exception {
115118
runFlywayMigration(configs, configDatabase, jobDatabase);
116119
LOGGER.info("Ran Flyway migrations...");
117120

118-
final JsonSecretsProcessor jsonSecretsProcessor = new JsonSecretsProcessor();
119-
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor, featureFlags);
121+
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
122+
.maskSecrets(!featureFlags.exposeSecretsInExport())
123+
.copySecrets(false)
124+
.build();
125+
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
120126
final ConfigRepository configRepository =
121127
new ConfigRepository(configPersistence, configDatabase);
122128

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.collect.Sets;
2424
import io.airbyte.commons.enums.Enums;
25-
import io.airbyte.commons.features.FeatureFlags;
2625
import io.airbyte.commons.json.Jsons;
2726
import io.airbyte.commons.util.MoreIterators;
2827
import io.airbyte.commons.version.AirbyteVersion;
@@ -81,7 +80,6 @@ public class DatabaseConfigPersistence implements ConfigPersistence {
8180

8281
private final ExceptionWrappingDatabase database;
8382
private final JsonSecretsProcessor jsonSecretsProcessor;
84-
private final FeatureFlags featureFlags;
8583
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseConfigPersistence.class);
8684

8785
/**
@@ -90,20 +88,16 @@ public class DatabaseConfigPersistence implements ConfigPersistence {
9088
*
9189
* @param database - database where configs are stored
9290
* @param jsonSecretsProcessor - for filtering secrets in export
93-
* @param featureFlags - feature flags that govern secret export behavior
9491
* @return database config persistence wrapped in validation decorators
9592
*/
9693
public static ConfigPersistence createWithValidation(final Database database,
97-
final JsonSecretsProcessor jsonSecretsProcessor,
98-
final FeatureFlags featureFlags) {
99-
return new ClassEnforcingConfigPersistence(
100-
new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor, featureFlags)));
94+
final JsonSecretsProcessor jsonSecretsProcessor) {
95+
return new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor));
10196
}
10297

103-
public DatabaseConfigPersistence(final Database database, final JsonSecretsProcessor jsonSecretsProcessor, final FeatureFlags featureFlags) {
98+
public DatabaseConfigPersistence(final Database database, final JsonSecretsProcessor jsonSecretsProcessor) {
10499
this.database = new ExceptionWrappingDatabase(database);
105100
this.jsonSecretsProcessor = jsonSecretsProcessor;
106-
this.featureFlags = featureFlags;
107101
}
108102

109103
@Override
@@ -1431,18 +1425,15 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
14311425
sourceConnectionWithMetadata
14321426
.stream()
14331427
.map(configWithMetadata -> {
1434-
if (featureFlags.exposeSecretsInExport()) {
1435-
return Jsons.jsonNode(configWithMetadata.getConfig());
1436-
}
1437-
14381428
try {
14391429
final UUID sourceDefinitionId = configWithMetadata.getConfig().getSourceDefinitionId();
14401430
final StandardSourceDefinition standardSourceDefinition = getConfig(
14411431
ConfigSchema.STANDARD_SOURCE_DEFINITION,
14421432
sourceDefinitionId.toString(),
14431433
StandardSourceDefinition.class);
14441434
final JsonNode connectionSpecs = standardSourceDefinition.getSpec().getConnectionSpecification();
1445-
final JsonNode sanitizedConfig = jsonSecretsProcessor.maskSecrets(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpecs);
1435+
final JsonNode sanitizedConfig =
1436+
jsonSecretsProcessor.prepareSecretsForOutput(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpecs);
14461437
return sanitizedConfig;
14471438
} catch (final ConfigNotFoundException | JsonValidationException | IOException e) {
14481439
throw new RuntimeException(e);
@@ -1454,18 +1445,15 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
14541445
final Stream<JsonNode> jsonNodeStream = destinationConnectionWithMetadata
14551446
.stream()
14561447
.map(configWithMetadata -> {
1457-
if (featureFlags.exposeSecretsInExport()) {
1458-
return Jsons.jsonNode(configWithMetadata.getConfig());
1459-
}
1460-
14611448
try {
14621449
final UUID destinationDefinition = configWithMetadata.getConfig().getDestinationDefinitionId();
14631450
final StandardDestinationDefinition standardDestinationDefinition = getConfig(
14641451
ConfigSchema.STANDARD_DESTINATION_DEFINITION,
14651452
destinationDefinition.toString(),
14661453
StandardDestinationDefinition.class);
14671454
final JsonNode connectionSpec = standardDestinationDefinition.getSpec().getConnectionSpecification();
1468-
final JsonNode sanitizedConfig = jsonSecretsProcessor.maskSecrets(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpec);
1455+
final JsonNode sanitizedConfig =
1456+
jsonSecretsProcessor.prepareSecretsForOutput(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpec);
14691457
return sanitizedConfig;
14701458
} catch (final ConfigNotFoundException | JsonValidationException | IOException e) {
14711459
throw new RuntimeException(e);

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/JsonSecretsProcessor.java

Lines changed: 103 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,29 @@
1818
import java.util.Optional;
1919
import java.util.Queue;
2020
import java.util.Set;
21+
import lombok.Builder;
2122
import lombok.Value;
22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
2423

24+
@Builder
2525
public class JsonSecretsProcessor {
2626

27-
private static final Logger LOGGER = LoggerFactory.getLogger(JsonSecretsProcessor.class);
27+
@Builder.Default
28+
private boolean maskSecrets = true;
2829

29-
public static final String AIRBYTE_SECRET_FIELD = "airbyte_secret";
30-
public static final String PROPERTIES_FIELD = "properties";
31-
public static final String TYPE_FIELD = "type";
32-
public static final String ARRAY_TYPE_FIELD = "array";
33-
public static final String ITEMS_FIELD = "items";
30+
@Builder.Default
31+
private boolean copySecrets = true;
3432

35-
private static final JsonSchemaValidator VALIDATOR = new JsonSchemaValidator();
33+
protected static final JsonSchemaValidator VALIDATOR = new JsonSchemaValidator();
3634

3735
@VisibleForTesting
3836
static String SECRETS_MASK = "**********";
3937

38+
static final String AIRBYTE_SECRET_FIELD = "airbyte_secret";
39+
static final String PROPERTIES_FIELD = "properties";
40+
static final String TYPE_FIELD = "type";
41+
static final String ARRAY_TYPE_FIELD = "array";
42+
static final String ITEMS_FIELD = "items";
43+
4044
/**
4145
* Returns a copy of the input object wherein any fields annotated with "airbyte_secret" in the
4246
* input schema are masked.
@@ -47,20 +51,97 @@ public class JsonSecretsProcessor {
4751
* @param schema Schema containing secret annotations
4852
* @param obj Object containing potentially secret fields
4953
*/
50-
// todo: fix bug where this doesn't handle non-oneof nesting or just arrays
51-
// see: https://github.com/airbytehq/airbyte/issues/6393
52-
public JsonNode maskSecrets(final JsonNode obj, final JsonNode schema) {
53-
// if schema is an object and has a properties field
54-
if (!canBeProcessed(schema)) {
55-
return obj;
54+
public JsonNode prepareSecretsForOutput(final JsonNode obj, final JsonNode schema) {
55+
if (maskSecrets) {
56+
// if schema is an object and has a properties field
57+
if (!canBeProcessed(schema)) {
58+
return obj;
59+
}
60+
61+
final SecretKeys secretKeys = getAllSecretKeys(schema);
62+
return maskAllSecrets(obj, secretKeys);
63+
}
64+
65+
return obj;
66+
}
67+
68+
/**
69+
* Returns a copy of the destination object in which any secret fields (as denoted by the input
70+
* schema) found in the source object are added.
71+
* <p>
72+
* This method absorbs secrets both at the top level of the configuration object and in nested
73+
* properties in a oneOf.
74+
*
75+
* @param src The object potentially containing secrets
76+
* @param dst The object to absorb secrets into
77+
* @param schema
78+
* @return
79+
*/
80+
public JsonNode copySecrets(final JsonNode src, final JsonNode dst, final JsonNode schema) {
81+
if (copySecrets) {
82+
if (!canBeProcessed(schema)) {
83+
return dst;
84+
}
85+
Preconditions.checkArgument(dst.isObject());
86+
Preconditions.checkArgument(src.isObject());
87+
88+
final ObjectNode dstCopy = dst.deepCopy();
89+
90+
final ObjectNode properties = (ObjectNode) schema.get(PROPERTIES_FIELD);
91+
for (final String key : Jsons.keys(properties)) {
92+
// If the source object doesn't have this key then we have nothing to copy, so we should skip to the
93+
// next key.
94+
if (!src.has(key)) {
95+
continue;
96+
}
97+
98+
final JsonNode fieldSchema = properties.get(key);
99+
// We only copy the original secret if the destination object isn't attempting to overwrite it
100+
// I.e. if the destination object's value is set to the mask, then we can copy the original secret
101+
if (JsonSecretsProcessor.isSecret(fieldSchema) && dst.has(key) && dst.get(key).asText().equals(SECRETS_MASK)) {
102+
dstCopy.set(key, src.get(key));
103+
} else if (dstCopy.has(key)) {
104+
// If the destination has this key, then we should consider copying it
105+
106+
// Check if this schema is a combination node; if it is, find a matching sub-schema and copy based
107+
// on that sub-schema
108+
final var combinationKey = findJsonCombinationNode(fieldSchema);
109+
if (combinationKey.isPresent()) {
110+
var combinationCopy = dstCopy.get(key);
111+
final var arrayNode = (ArrayNode) fieldSchema.get(combinationKey.get());
112+
for (int i = 0; i < arrayNode.size(); i++) {
113+
final JsonNode childSchema = arrayNode.get(i);
114+
/*
115+
* when traversing a oneOf or anyOf if multiple schema in the oneOf or anyOf have the SAME key, but
116+
* a different type, then, without this test, we can try to apply the wrong schema to the object
117+
* resulting in errors because of type mismatches.
118+
*/
119+
if (VALIDATOR.test(childSchema, combinationCopy)) {
120+
// Absorb field values if any of the combination option is declaring it as secrets
121+
combinationCopy = copySecrets(src.get(key), combinationCopy, childSchema);
122+
}
123+
}
124+
dstCopy.set(key, combinationCopy);
125+
} else {
126+
// Otherwise, this is just a plain old json node; recurse into it. If it's not actually an object,
127+
// the recursive call will exit immediately.
128+
final JsonNode copiedField = copySecrets(src.get(key), dstCopy.get(key), fieldSchema);
129+
dstCopy.set(key, copiedField);
130+
}
131+
}
132+
}
133+
134+
return dstCopy;
56135
}
57-
Preconditions.checkArgument(schema.isObject());
58136

59-
final SecretKeys secretKeys = getAllSecretKeys(schema);
60-
return maskAllSecrets(obj, secretKeys);
137+
return src;
61138
}
62139

63-
private JsonNode maskAllSecrets(final JsonNode obj, final SecretKeys secretKeys) {
140+
static boolean isSecret(final JsonNode obj) {
141+
return obj.isObject() && obj.has(AIRBYTE_SECRET_FIELD) && obj.get(AIRBYTE_SECRET_FIELD).asBoolean();
142+
}
143+
144+
protected JsonNode maskAllSecrets(final JsonNode obj, final SecretKeys secretKeys) {
64145
final JsonNode copiedObj = obj.deepCopy();
65146
final Queue<JsonNode> toProcess = new LinkedList<>();
66147
toProcess.add(copiedObj);
@@ -91,14 +172,14 @@ private JsonNode maskAllSecrets(final JsonNode obj, final SecretKeys secretKeys)
91172
}
92173

93174
@Value
94-
private class SecretKeys {
175+
protected class SecretKeys {
95176

96177
private final Set<String> fieldSecretKey;
97178
private final Set<String> arraySecretKey;
98179

99180
}
100181

101-
private SecretKeys getAllSecretKeys(final JsonNode schema) {
182+
protected SecretKeys getAllSecretKeys(final JsonNode schema) {
102183
final Set<String> fieldSecretKeys = new HashSet<>();
103184
final Set<String> arraySecretKeys = new HashSet<>();
104185

@@ -115,7 +196,7 @@ private SecretKeys getAllSecretKeys(final JsonNode schema) {
115196
} else {
116197
toProcess.add(arrayItems);
117198
}
118-
} else if (isSecret(currentNode.get(key))) {
199+
} else if (JsonSecretsProcessor.isSecret(currentNode.get(key))) {
119200
fieldSecretKeys.add(key);
120201
} else if (currentNode.get(key).isObject()) {
121202
toProcess.add(currentNode.get(key));
@@ -140,77 +221,6 @@ public static Optional<String> findJsonCombinationNode(final JsonNode node) {
140221
return Optional.empty();
141222
}
142223

143-
/**
144-
* Returns a copy of the destination object in which any secret fields (as denoted by the input
145-
* schema) found in the source object are added.
146-
* <p>
147-
* This method absorbs secrets both at the top level of the configuration object and in nested
148-
* properties in a oneOf.
149-
*
150-
* @param src The object potentially containing secrets
151-
* @param dst The object to absorb secrets into
152-
* @param schema
153-
* @return
154-
*/
155-
public JsonNode copySecrets(final JsonNode src, final JsonNode dst, final JsonNode schema) {
156-
if (!canBeProcessed(schema)) {
157-
return dst;
158-
}
159-
Preconditions.checkArgument(dst.isObject());
160-
Preconditions.checkArgument(src.isObject());
161-
162-
final ObjectNode dstCopy = dst.deepCopy();
163-
164-
final ObjectNode properties = (ObjectNode) schema.get(PROPERTIES_FIELD);
165-
for (final String key : Jsons.keys(properties)) {
166-
// If the source object doesn't have this key then we have nothing to copy, so we should skip to the
167-
// next key.
168-
if (!src.has(key)) {
169-
continue;
170-
}
171-
172-
final JsonNode fieldSchema = properties.get(key);
173-
// We only copy the original secret if the destination object isn't attempting to overwrite it
174-
// i.e: if the value of the secret isn't set to the mask
175-
if (isSecret(fieldSchema) && dst.has(key) && dst.get(key).asText().equals(SECRETS_MASK)) {
176-
dstCopy.set(key, src.get(key));
177-
} else if (dstCopy.has(key)) {
178-
// If the destination has this key, then we should consider copying it
179-
180-
// Check if this schema is a combination node; if it is, find a matching sub-schema and copy based
181-
// on that sub-schema
182-
final var combinationKey = findJsonCombinationNode(fieldSchema);
183-
if (combinationKey.isPresent()) {
184-
var combinationCopy = dstCopy.get(key);
185-
final var arrayNode = (ArrayNode) fieldSchema.get(combinationKey.get());
186-
for (int i = 0; i < arrayNode.size(); i++) {
187-
final JsonNode childSchema = arrayNode.get(i);
188-
/*
189-
* when traversing a oneOf or anyOf if multiple schema in the oneOf or anyOf have the SAME key, but
190-
* a different type, then, without this test, we can try to apply the wrong schema to the object
191-
* resulting in errors because of type mismatches.
192-
*/
193-
if (VALIDATOR.test(childSchema, combinationCopy)) {
194-
// Absorb field values if any of the combination option is declaring it as secrets
195-
combinationCopy = copySecrets(src.get(key), combinationCopy, childSchema);
196-
}
197-
}
198-
dstCopy.set(key, combinationCopy);
199-
} else {
200-
// Otherwise, this is just a plain old json object; recurse into it.
201-
final JsonNode copiedField = copySecrets(src.get(key), dstCopy.get(key), fieldSchema);
202-
dstCopy.set(key, copiedField);
203-
}
204-
}
205-
}
206-
207-
return dstCopy;
208-
}
209-
210-
public static boolean isSecret(final JsonNode obj) {
211-
return obj.isObject() && obj.has(AIRBYTE_SECRET_FIELD) && obj.get(AIRBYTE_SECRET_FIELD).asBoolean();
212-
}
213-
214224
public static boolean canBeProcessed(final JsonNode schema) {
215225
return schema.isObject() && schema.has(PROPERTIES_FIELD) && schema.get(PROPERTIES_FIELD).isObject();
216226
}

airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/BaseDatabaseConfigPersistenceTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import static org.mockito.Mockito.mock;
1111

1212
import com.fasterxml.jackson.databind.JsonNode;
13-
import io.airbyte.commons.features.FeatureFlags;
1413
import io.airbyte.config.ConfigSchema;
1514
import io.airbyte.config.DestinationConnection;
1615
import io.airbyte.config.SourceConnection;
@@ -45,7 +44,6 @@ public abstract class BaseDatabaseConfigPersistenceTest {
4544
protected static Database database;
4645
protected static DatabaseConfigPersistence configPersistence;
4746
protected static JsonSecretsProcessor jsonSecretsProcessor;
48-
protected static FeatureFlags featureFlags;
4947

5048
@BeforeAll
5149
public static void dbSetup() {
@@ -55,7 +53,6 @@ public static void dbSetup() {
5553
.withPassword("docker");
5654
container.start();
5755
jsonSecretsProcessor = mock(JsonSecretsProcessor.class);
58-
featureFlags = mock(FeatureFlags.class);
5956
}
6057

6158
@AfterAll

0 commit comments

Comments
 (0)