Skip to content

Bmoric/refacto secret processor #11362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessorFactory;
import io.airbyte.db.Database;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
Expand Down Expand Up @@ -86,9 +87,13 @@ public BootloaderApp() {
final Database configDatabase =
new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl())
.getAndInitialize();
final JsonSecretsProcessor jsonSecretsProcessor = new JsonSecretsProcessor();
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessorFactory.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
.copySecrets(true)
.build()
.createJsonSecretsProcessor();
final ConfigPersistence configPersistence =
DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor, featureFlags);
DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
configPersistence.loadData(YamlSeedConfigPersistence.getDefault());
LOGGER.info("Loaded seed data..");
} catch (final IOException e) {
Expand All @@ -115,8 +120,12 @@ public void load() throws Exception {
runFlywayMigration(configs, configDatabase, jobDatabase);
LOGGER.info("Ran Flyway migrations...");

final JsonSecretsProcessor jsonSecretsProcessor = new JsonSecretsProcessor();
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor, featureFlags);
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessorFactory.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
.copySecrets(true)
.build()
.createJsonSecretsProcessor();
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
final ConfigRepository configRepository =
new ConfigRepository(configPersistence, configDatabase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.version.AirbyteVersion;
Expand Down Expand Up @@ -82,7 +81,6 @@ public class DatabaseConfigPersistence implements ConfigPersistence {

private final ExceptionWrappingDatabase database;
private final JsonSecretsProcessor jsonSecretsProcessor;
private final FeatureFlags featureFlags;
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseConfigPersistence.class);

/**
Expand All @@ -91,20 +89,16 @@ public class DatabaseConfigPersistence implements ConfigPersistence {
*
* @param database - database where configs are stored
* @param jsonSecretsProcessor - for filtering secrets in export
* @param featureFlags - feature flags that govern secret export behavior
* @return database config persistence wrapped in validation decorators
*/
public static ConfigPersistence createWithValidation(final Database database,
final JsonSecretsProcessor jsonSecretsProcessor,
final FeatureFlags featureFlags) {
return new ClassEnforcingConfigPersistence(
new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor, featureFlags)));
final JsonSecretsProcessor jsonSecretsProcessor) {
return new ValidatingConfigPersistence(new DatabaseConfigPersistence(database, jsonSecretsProcessor));
}

public DatabaseConfigPersistence(final Database database, final JsonSecretsProcessor jsonSecretsProcessor, final FeatureFlags featureFlags) {
public DatabaseConfigPersistence(final Database database, final JsonSecretsProcessor jsonSecretsProcessor) {
this.database = new ExceptionWrappingDatabase(database);
this.jsonSecretsProcessor = jsonSecretsProcessor;
this.featureFlags = featureFlags;
}

@Override
Expand Down Expand Up @@ -1562,10 +1556,6 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
sourceConnectionWithMetadata
.stream()
.map(configWithMetadata -> {
if (featureFlags.exposeSecretsInExport()) {
return Jsons.jsonNode(configWithMetadata.getConfig());
}

try {
final UUID sourceDefinitionId = configWithMetadata.getConfig().getSourceDefinitionId();
final StandardSourceDefinition standardSourceDefinition = getConfig(
Expand All @@ -1585,10 +1575,6 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
final Stream<JsonNode> jsonNodeStream = destinationConnectionWithMetadata
.stream()
.map(configWithMetadata -> {
if (featureFlags.exposeSecretsInExport()) {
return Jsons.jsonNode(configWithMetadata.getConfig());
}

try {
final UUID destinationDefinition = configWithMetadata.getConfig().getDestinationDefinitionId();
final StandardDestinationDefinition standardDestinationDefinition = getConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.util.HashSet;
Expand All @@ -19,24 +18,20 @@
import java.util.Queue;
import java.util.Set;
import lombok.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonSecretsProcessor {
public abstract class JsonSecretsProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(JsonSecretsProcessor.class);

public static final String AIRBYTE_SECRET_FIELD = "airbyte_secret";
public static final String PROPERTIES_FIELD = "properties";
public static final String TYPE_FIELD = "type";
public static final String ARRAY_TYPE_FIELD = "array";
public static final String ITEMS_FIELD = "items";

private static final JsonSchemaValidator VALIDATOR = new JsonSchemaValidator();
protected static final JsonSchemaValidator VALIDATOR = new JsonSchemaValidator();

@VisibleForTesting
static String SECRETS_MASK = "**********";

static final String AIRBYTE_SECRET_FIELD = "airbyte_secret";
static final String PROPERTIES_FIELD = "properties";
static final String TYPE_FIELD = "type";
static final String ARRAY_TYPE_FIELD = "array";
static final String ITEMS_FIELD = "items";

/**
* Returns a copy of the input object wherein any fields annotated with "airbyte_secret" in the
* input schema are masked.
Expand All @@ -47,20 +42,27 @@ public class JsonSecretsProcessor {
* @param schema Schema containing secret annotations
* @param obj Object containing potentially secret fields
*/
// todo: fix bug where this doesn't handle non-oneof nesting or just arrays
// see: https://github.com/airbytehq/airbyte/issues/6393
public JsonNode maskSecrets(final JsonNode obj, final JsonNode schema) {
// if schema is an object and has a properties field
if (!canBeProcessed(schema)) {
return obj;
}
Preconditions.checkArgument(schema.isObject());
public abstract JsonNode maskSecrets(final JsonNode obj, final JsonNode schema);

final SecretKeys secretKeys = getAllSecretKeys(schema);
return maskAllSecrets(obj, secretKeys);
/**
* Returns a copy of the destination object in which any secret fields (as denoted by the input
* schema) found in the source object are added.
* <p>
* This method absorbs secrets both at the top level of the configuration object and in nested
* properties in a oneOf.
*
* @param src The object potentially containing secrets
* @param dst The object to absorb secrets into
* @param schema
* @return
*/
public abstract JsonNode copySecrets(final JsonNode src, final JsonNode dst, final JsonNode schema);

static boolean isSecret(final JsonNode obj) {
return obj.isObject() && obj.has(AIRBYTE_SECRET_FIELD) && obj.get(AIRBYTE_SECRET_FIELD).asBoolean();
}

private JsonNode maskAllSecrets(final JsonNode obj, final SecretKeys secretKeys) {
protected JsonNode maskAllSecrets(final JsonNode obj, final SecretKeys secretKeys) {
final JsonNode copiedObj = obj.deepCopy();
final Queue<JsonNode> toProcess = new LinkedList<>();
toProcess.add(copiedObj);
Expand Down Expand Up @@ -91,14 +93,14 @@ private JsonNode maskAllSecrets(final JsonNode obj, final SecretKeys secretKeys)
}

@Value
private class SecretKeys {
protected class SecretKeys {

private final Set<String> fieldSecretKey;
private final Set<String> arraySecretKey;

}

private SecretKeys getAllSecretKeys(final JsonNode schema) {
protected SecretKeys getAllSecretKeys(final JsonNode schema) {
final Set<String> fieldSecretKeys = new HashSet<>();
final Set<String> arraySecretKeys = new HashSet<>();

Expand All @@ -115,7 +117,7 @@ private SecretKeys getAllSecretKeys(final JsonNode schema) {
} else {
toProcess.add(arrayItems);
}
} else if (isSecret(currentNode.get(key))) {
} else if (JsonSecretsProcessor.isSecret(currentNode.get(key))) {
fieldSecretKeys.add(key);
} else if (currentNode.get(key).isObject()) {
toProcess.add(currentNode.get(key));
Expand All @@ -140,77 +142,6 @@ public static Optional<String> findJsonCombinationNode(final JsonNode node) {
return Optional.empty();
}

/**
* Returns a copy of the destination object in which any secret fields (as denoted by the input
* schema) found in the source object are added.
* <p>
* This method absorbs secrets both at the top level of the configuration object and in nested
* properties in a oneOf.
*
* @param src The object potentially containing secrets
* @param dst The object to absorb secrets into
* @param schema
* @return
*/
public JsonNode copySecrets(final JsonNode src, final JsonNode dst, final JsonNode schema) {
if (!canBeProcessed(schema)) {
return dst;
}
Preconditions.checkArgument(dst.isObject());
Preconditions.checkArgument(src.isObject());

final ObjectNode dstCopy = dst.deepCopy();

final ObjectNode properties = (ObjectNode) schema.get(PROPERTIES_FIELD);
for (final String key : Jsons.keys(properties)) {
// If the source object doesn't have this key then we have nothing to copy, so we should skip to the
// next key.
if (!src.has(key)) {
continue;
}

final JsonNode fieldSchema = properties.get(key);
// We only copy the original secret if the destination object isn't attempting to overwrite it
// i.e: if the value of the secret isn't set to the mask
if (isSecret(fieldSchema) && dst.has(key) && dst.get(key).asText().equals(SECRETS_MASK)) {
dstCopy.set(key, src.get(key));
} else if (dstCopy.has(key)) {
// If the destination has this key, then we should consider copying it

// Check if this schema is a combination node; if it is, find a matching sub-schema and copy based
// on that sub-schema
final var combinationKey = findJsonCombinationNode(fieldSchema);
if (combinationKey.isPresent()) {
var combinationCopy = dstCopy.get(key);
final var arrayNode = (ArrayNode) fieldSchema.get(combinationKey.get());
for (int i = 0; i < arrayNode.size(); i++) {
final JsonNode childSchema = arrayNode.get(i);
/*
* when traversing a oneOf or anyOf if multiple schema in the oneOf or anyOf have the SAME key, but
* a different type, then, without this test, we can try to apply the wrong schema to the object
* resulting in errors because of type mismatches.
*/
if (VALIDATOR.test(childSchema, combinationCopy)) {
// Absorb field values if any of the combination option is declaring it as secrets
combinationCopy = copySecrets(src.get(key), combinationCopy, childSchema);
}
}
dstCopy.set(key, combinationCopy);
} else {
// Otherwise, this is just a plain old json object; recurse into it.
final JsonNode copiedField = copySecrets(src.get(key), dstCopy.get(key), fieldSchema);
dstCopy.set(key, copiedField);
}
}
}

return dstCopy;
}

public static boolean isSecret(final JsonNode obj) {
return obj.isObject() && obj.has(AIRBYTE_SECRET_FIELD) && obj.get(AIRBYTE_SECRET_FIELD).asBoolean();
}

public static boolean canBeProcessed(final JsonNode schema) {
return schema.isObject() && schema.has(PROPERTIES_FIELD) && schema.get(PROPERTIES_FIELD).isObject();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence.split_secrets;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import lombok.Builder;

@Builder
public class JsonSecretsProcessorFactory {

@Builder.Default
private boolean maskSecrets = true;

@Builder.Default
private boolean copySecrets = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this ever set to false in production? (and why is it necessary for tests?)


public JsonSecretsProcessor createJsonSecretsProcessor() {
return new JsonSecretsProcessor() {

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand why these methods need to be overridden here. I.e. why not just move maskSecrets and copySecrets into JsonSecretsProcessor and make it a concrete class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It comes from a previous PR: #11296. In order to still be able to use the export functionality, we want to sometime not mask the secret. That's why we have this behavior of having a flag to mask or not mask the secrets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my question is more about why JsonSecretsProcessor needs to be an abstract class, and why we need to create an anonymous inner class in the factory. I.e. could JsonSecretsProcessor look something like this:

// note the non-abstract class
public class JsonSecretsProcessor {
  private final boolean maskSecrets;
  private final boolean copySecrets;

  public JsonNode maskSecrets(final JsonNode obj, final JsonNode schema) {
    // do stuff...
  }

  public JsonNode copySecrets(final JsonNode src, final JsonNode dst, final JsonNode schema) {
    // do other stuff...
  }
}

and then JsonSecretsProcessorFactory#createJsonSecretsProcessor would just be return new JsonSecretsProcessor(maskSecrets, copySecrets);

public JsonNode maskSecrets(final JsonNode obj, final JsonNode schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: it seems a bit weird that maskSecrets doesn't actually mask secrets sometimes. Not sure what a better name would be though - maybe prepareSecretsForOutput or something? (I'm very bad at naming :P )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, I have updated it.

if (maskSecrets) {
// if schema is an object and has a properties field
if (!canBeProcessed(schema)) {
return obj;
}
Preconditions.checkArgument(schema.isObject());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: canBeProcessed is already checking that schema is an object, so this check is unnecessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


final SecretKeys secretKeys = getAllSecretKeys(schema);
return maskAllSecrets(obj, secretKeys);
}

return obj;
}

@Override
public JsonNode copySecrets(final JsonNode src, final JsonNode dst, final JsonNode schema) {
if (copySecrets) {
if (!canBeProcessed(schema)) {
return dst;
}
Preconditions.checkArgument(dst.isObject());
Preconditions.checkArgument(src.isObject());

final ObjectNode dstCopy = dst.deepCopy();

final ObjectNode properties = (ObjectNode) schema.get(PROPERTIES_FIELD);
for (final String key : Jsons.keys(properties)) {
// If the source object doesn't have this key then we have nothing to copy, so we should skip to the
// next key.
if (!src.has(key)) {
continue;
}

final JsonNode fieldSchema = properties.get(key);
// We only copy the original secret if the destination object isn't attempting to overwrite it
// i.e: if the value of the secret isn't set to the mask
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// i.e: if the value of the secret isn't set to the mask
// I.e. if the destination object's value is set to the mask, then we can copy the original secret

nitpick: match the comment ("copy if equal to mask") with what the branch is actually doing (if (dst.get(key) == MASK) { dst.set(src.get(key)) })

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (JsonSecretsProcessor.isSecret(fieldSchema) && dst.has(key) && dst.get(key).asText().equals(SECRETS_MASK)) {
dstCopy.set(key, src.get(key));
} else if (dstCopy.has(key)) {
// If the destination has this key, then we should consider copying it

// Check if this schema is a combination node; if it is, find a matching sub-schema and copy based
// on that sub-schema
final var combinationKey = findJsonCombinationNode(fieldSchema);
if (combinationKey.isPresent()) {
var combinationCopy = dstCopy.get(key);
final var arrayNode = (ArrayNode) fieldSchema.get(combinationKey.get());
for (int i = 0; i < arrayNode.size(); i++) {
final JsonNode childSchema = arrayNode.get(i);
/*
* when traversing a oneOf or anyOf if multiple schema in the oneOf or anyOf have the SAME key, but
* a different type, then, without this test, we can try to apply the wrong schema to the object
* resulting in errors because of type mismatches.
*/
if (VALIDATOR.test(childSchema, combinationCopy)) {
// Absorb field values if any of the combination option is declaring it as secrets
combinationCopy = copySecrets(src.get(key), combinationCopy, childSchema);
}
}
dstCopy.set(key, combinationCopy);
} else {
// Otherwise, this is just a plain old json object; recurse into it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I wrote this else-case originally so this is a bit awkward, but this would be a more helpful comment (right?):

Suggested change
// Otherwise, this is just a plain old json object; recurse into it.
// Otherwise, this is just a plain old json node; recurse into it. If it's not actually an object, the recursive call will exit immediately.

final JsonNode copiedField = copySecrets(src.get(key), dstCopy.get(key), fieldSchema);
dstCopy.set(key, copiedField);
}
}
}

return dstCopy;
}

return src;
}

};
}

}
Loading