Skip to content

Bmoric/refacto secret processor #11362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ public BootloaderApp() {
final Database configDatabase =
new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl())
.getAndInitialize();
final JsonSecretsProcessor jsonSecretsProcessor = new JsonSecretsProcessor();
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
.copySecrets(true)
.build();
final ConfigPersistence configPersistence =
DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor, featureFlags);
DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
configPersistence.loadData(YamlSeedConfigPersistence.getDefault());
LOGGER.info("Loaded seed data..");
} catch (final IOException e) {
Expand All @@ -115,8 +118,11 @@ public void load() throws Exception {
runFlywayMigration(configs, configDatabase, jobDatabase);
LOGGER.info("Ran Flyway migrations...");

final JsonSecretsProcessor jsonSecretsProcessor = new JsonSecretsProcessor();
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor, featureFlags);
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
.copySecrets(false)
.build();
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
final ConfigRepository configRepository =
new ConfigRepository(configPersistence, configDatabase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.version.AirbyteVersion;
Expand Down Expand Up @@ -81,7 +80,6 @@ public class DatabaseConfigPersistence implements ConfigPersistence {

private final ExceptionWrappingDatabase database;
private final JsonSecretsProcessor jsonSecretsProcessor;
private final FeatureFlags featureFlags;
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseConfigPersistence.class);

/**
Expand All @@ -90,20 +88,16 @@ public class DatabaseConfigPersistence implements ConfigPersistence {
*
* @param database - database where configs are stored
* @param jsonSecretsProcessor - for filtering secrets in export
* @param featureFlags - feature flags that govern secret export behavior
* @return database config persistence wrapped in validation decorators
*/
public static ConfigPersistence createWithValidation(final Database database,
final JsonSecretsProcessor jsonSecretsProcessor,
final FeatureFlags featureFlags) {
return new ClassEnforcingConfigPersistence(
new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor, featureFlags)));
final JsonSecretsProcessor jsonSecretsProcessor) {
return new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor));
}

public DatabaseConfigPersistence(final Database database, final JsonSecretsProcessor jsonSecretsProcessor, final FeatureFlags featureFlags) {
public DatabaseConfigPersistence(final Database database, final JsonSecretsProcessor jsonSecretsProcessor) {
this.database = new ExceptionWrappingDatabase(database);
this.jsonSecretsProcessor = jsonSecretsProcessor;
this.featureFlags = featureFlags;
}

@Override
Expand Down Expand Up @@ -1431,18 +1425,15 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
sourceConnectionWithMetadata
.stream()
.map(configWithMetadata -> {
if (featureFlags.exposeSecretsInExport()) {
return Jsons.jsonNode(configWithMetadata.getConfig());
}

try {
final UUID sourceDefinitionId = configWithMetadata.getConfig().getSourceDefinitionId();
final StandardSourceDefinition standardSourceDefinition = getConfig(
ConfigSchema.STANDARD_SOURCE_DEFINITION,
sourceDefinitionId.toString(),
StandardSourceDefinition.class);
final JsonNode connectionSpecs = standardSourceDefinition.getSpec().getConnectionSpecification();
final JsonNode sanitizedConfig = jsonSecretsProcessor.maskSecrets(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpecs);
final JsonNode sanitizedConfig =
jsonSecretsProcessor.prepareSecretsForOutput(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpecs);
return sanitizedConfig;
} catch (final ConfigNotFoundException | JsonValidationException | IOException e) {
throw new RuntimeException(e);
Expand All @@ -1454,18 +1445,15 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
final Stream<JsonNode> jsonNodeStream = destinationConnectionWithMetadata
.stream()
.map(configWithMetadata -> {
if (featureFlags.exposeSecretsInExport()) {
return Jsons.jsonNode(configWithMetadata.getConfig());
}

try {
final UUID destinationDefinition = configWithMetadata.getConfig().getDestinationDefinitionId();
final StandardDestinationDefinition standardDestinationDefinition = getConfig(
ConfigSchema.STANDARD_DESTINATION_DEFINITION,
destinationDefinition.toString(),
StandardDestinationDefinition.class);
final JsonNode connectionSpec = standardDestinationDefinition.getSpec().getConnectionSpecification();
final JsonNode sanitizedConfig = jsonSecretsProcessor.maskSecrets(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpec);
final JsonNode sanitizedConfig =
jsonSecretsProcessor.prepareSecretsForOutput(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpec);
return sanitizedConfig;
} catch (final ConfigNotFoundException | JsonValidationException | IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,29 @@
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import lombok.Builder;
import lombok.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Builder
public class JsonSecretsProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(JsonSecretsProcessor.class);
@Builder.Default
private boolean maskSecrets = true;

public static final String AIRBYTE_SECRET_FIELD = "airbyte_secret";
public static final String PROPERTIES_FIELD = "properties";
public static final String TYPE_FIELD = "type";
public static final String ARRAY_TYPE_FIELD = "array";
public static final String ITEMS_FIELD = "items";
@Builder.Default
private boolean copySecrets = true;

private static final JsonSchemaValidator VALIDATOR = new JsonSchemaValidator();
protected static final JsonSchemaValidator VALIDATOR = new JsonSchemaValidator();

@VisibleForTesting
static String SECRETS_MASK = "**********";

static final String AIRBYTE_SECRET_FIELD = "airbyte_secret";
static final String PROPERTIES_FIELD = "properties";
static final String TYPE_FIELD = "type";
static final String ARRAY_TYPE_FIELD = "array";
static final String ITEMS_FIELD = "items";

/**
* Returns a copy of the input object wherein any fields annotated with "airbyte_secret" in the
* input schema are masked.
Expand All @@ -47,20 +51,97 @@ public class JsonSecretsProcessor {
* @param schema Schema containing secret annotations
* @param obj Object containing potentially secret fields
*/
// todo: fix bug where this doesn't handle non-oneof nesting or just arrays
// see: https://github.com/airbytehq/airbyte/issues/6393
public JsonNode maskSecrets(final JsonNode obj, final JsonNode schema) {
// if schema is an object and has a properties field
if (!canBeProcessed(schema)) {
return obj;
public JsonNode prepareSecretsForOutput(final JsonNode obj, final JsonNode schema) {
if (maskSecrets) {
// if schema is an object and has a properties field
if (!canBeProcessed(schema)) {
return obj;
}

final SecretKeys secretKeys = getAllSecretKeys(schema);
return maskAllSecrets(obj, secretKeys);
}

return obj;
}

/**
* Returns a copy of the destination object in which any secret fields (as denoted by the input
* schema) found in the source object are added.
* <p>
* This method absorbs secrets both at the top level of the configuration object and in nested
* properties in a oneOf.
*
* @param src The object potentially containing secrets
* @param dst The object to absorb secrets into
* @param schema
* @return
*/
public JsonNode copySecrets(final JsonNode src, final JsonNode dst, final JsonNode schema) {
if (copySecrets) {
if (!canBeProcessed(schema)) {
return dst;
}
Preconditions.checkArgument(dst.isObject());
Preconditions.checkArgument(src.isObject());

final ObjectNode dstCopy = dst.deepCopy();

final ObjectNode properties = (ObjectNode) schema.get(PROPERTIES_FIELD);
for (final String key : Jsons.keys(properties)) {
// If the source object doesn't have this key then we have nothing to copy, so we should skip to the
// next key.
if (!src.has(key)) {
continue;
}

final JsonNode fieldSchema = properties.get(key);
// We only copy the original secret if the destination object isn't attempting to overwrite it
// I.e. if the destination object's value is set to the mask, then we can copy the original secret
if (JsonSecretsProcessor.isSecret(fieldSchema) && dst.has(key) && dst.get(key).asText().equals(SECRETS_MASK)) {
dstCopy.set(key, src.get(key));
} else if (dstCopy.has(key)) {
// If the destination has this key, then we should consider copying it

// Check if this schema is a combination node; if it is, find a matching sub-schema and copy based
// on that sub-schema
final var combinationKey = findJsonCombinationNode(fieldSchema);
if (combinationKey.isPresent()) {
var combinationCopy = dstCopy.get(key);
final var arrayNode = (ArrayNode) fieldSchema.get(combinationKey.get());
for (int i = 0; i < arrayNode.size(); i++) {
final JsonNode childSchema = arrayNode.get(i);
/*
* when traversing a oneOf or anyOf if multiple schema in the oneOf or anyOf have the SAME key, but
* a different type, then, without this test, we can try to apply the wrong schema to the object
* resulting in errors because of type mismatches.
*/
if (VALIDATOR.test(childSchema, combinationCopy)) {
// Absorb field values if any of the combination option is declaring it as secrets
combinationCopy = copySecrets(src.get(key), combinationCopy, childSchema);
}
}
dstCopy.set(key, combinationCopy);
} else {
// Otherwise, this is just a plain old json node; recurse into it. If it's not actually an object,
// the recursive call will exit immediately.
final JsonNode copiedField = copySecrets(src.get(key), dstCopy.get(key), fieldSchema);
dstCopy.set(key, copiedField);
}
}
}

return dstCopy;
}
Preconditions.checkArgument(schema.isObject());

final SecretKeys secretKeys = getAllSecretKeys(schema);
return maskAllSecrets(obj, secretKeys);
return src;
}

private JsonNode maskAllSecrets(final JsonNode obj, final SecretKeys secretKeys) {
static boolean isSecret(final JsonNode obj) {
return obj.isObject() && obj.has(AIRBYTE_SECRET_FIELD) && obj.get(AIRBYTE_SECRET_FIELD).asBoolean();
}

protected JsonNode maskAllSecrets(final JsonNode obj, final SecretKeys secretKeys) {
final JsonNode copiedObj = obj.deepCopy();
final Queue<JsonNode> toProcess = new LinkedList<>();
toProcess.add(copiedObj);
Expand Down Expand Up @@ -91,14 +172,14 @@ private JsonNode maskAllSecrets(final JsonNode obj, final SecretKeys secretKeys)
}

@Value
private class SecretKeys {
protected class SecretKeys {

private final Set<String> fieldSecretKey;
private final Set<String> arraySecretKey;

}

private SecretKeys getAllSecretKeys(final JsonNode schema) {
protected SecretKeys getAllSecretKeys(final JsonNode schema) {
final Set<String> fieldSecretKeys = new HashSet<>();
final Set<String> arraySecretKeys = new HashSet<>();

Expand All @@ -115,7 +196,7 @@ private SecretKeys getAllSecretKeys(final JsonNode schema) {
} else {
toProcess.add(arrayItems);
}
} else if (isSecret(currentNode.get(key))) {
} else if (JsonSecretsProcessor.isSecret(currentNode.get(key))) {
fieldSecretKeys.add(key);
} else if (currentNode.get(key).isObject()) {
toProcess.add(currentNode.get(key));
Expand All @@ -140,77 +221,6 @@ public static Optional<String> findJsonCombinationNode(final JsonNode node) {
return Optional.empty();
}

/**
* Returns a copy of the destination object in which any secret fields (as denoted by the input
* schema) found in the source object are added.
* <p>
* This method absorbs secrets both at the top level of the configuration object and in nested
* properties in a oneOf.
*
* @param src The object potentially containing secrets
* @param dst The object to absorb secrets into
* @param schema
* @return
*/
public JsonNode copySecrets(final JsonNode src, final JsonNode dst, final JsonNode schema) {
if (!canBeProcessed(schema)) {
return dst;
}
Preconditions.checkArgument(dst.isObject());
Preconditions.checkArgument(src.isObject());

final ObjectNode dstCopy = dst.deepCopy();

final ObjectNode properties = (ObjectNode) schema.get(PROPERTIES_FIELD);
for (final String key : Jsons.keys(properties)) {
// If the source object doesn't have this key then we have nothing to copy, so we should skip to the
// next key.
if (!src.has(key)) {
continue;
}

final JsonNode fieldSchema = properties.get(key);
// We only copy the original secret if the destination object isn't attempting to overwrite it
// i.e: if the value of the secret isn't set to the mask
if (isSecret(fieldSchema) && dst.has(key) && dst.get(key).asText().equals(SECRETS_MASK)) {
dstCopy.set(key, src.get(key));
} else if (dstCopy.has(key)) {
// If the destination has this key, then we should consider copying it

// Check if this schema is a combination node; if it is, find a matching sub-schema and copy based
// on that sub-schema
final var combinationKey = findJsonCombinationNode(fieldSchema);
if (combinationKey.isPresent()) {
var combinationCopy = dstCopy.get(key);
final var arrayNode = (ArrayNode) fieldSchema.get(combinationKey.get());
for (int i = 0; i < arrayNode.size(); i++) {
final JsonNode childSchema = arrayNode.get(i);
/*
* when traversing a oneOf or anyOf if multiple schema in the oneOf or anyOf have the SAME key, but
* a different type, then, without this test, we can try to apply the wrong schema to the object
* resulting in errors because of type mismatches.
*/
if (VALIDATOR.test(childSchema, combinationCopy)) {
// Absorb field values if any of the combination option is declaring it as secrets
combinationCopy = copySecrets(src.get(key), combinationCopy, childSchema);
}
}
dstCopy.set(key, combinationCopy);
} else {
// Otherwise, this is just a plain old json object; recurse into it.
final JsonNode copiedField = copySecrets(src.get(key), dstCopy.get(key), fieldSchema);
dstCopy.set(key, copiedField);
}
}
}

return dstCopy;
}

public static boolean isSecret(final JsonNode obj) {
return obj.isObject() && obj.has(AIRBYTE_SECRET_FIELD) && obj.get(AIRBYTE_SECRET_FIELD).asBoolean();
}

public static boolean canBeProcessed(final JsonNode schema) {
return schema.isObject() && schema.has(PROPERTIES_FIELD) && schema.get(PROPERTIES_FIELD).isObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static org.mockito.Mockito.mock;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
Expand Down Expand Up @@ -45,7 +44,6 @@ public abstract class BaseDatabaseConfigPersistenceTest {
protected static Database database;
protected static DatabaseConfigPersistence configPersistence;
protected static JsonSecretsProcessor jsonSecretsProcessor;
protected static FeatureFlags featureFlags;

@BeforeAll
public static void dbSetup() {
Expand All @@ -55,7 +53,6 @@ public static void dbSetup() {
.withPassword("docker");
container.start();
jsonSecretsProcessor = mock(JsonSecretsProcessor.class);
featureFlags = mock(FeatureFlags.class);
}

@AfterAll
Expand Down
Loading