|
28 | 28 | import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
|
29 | 29 | import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
|
30 | 30 | import io.airbyte.protocol.models.v0.SyncMode;
|
| 31 | +import java.util.ArrayList; |
31 | 32 | import java.util.Collections;
|
32 | 33 | import java.util.List;
|
33 | 34 | import java.util.Map;
|
34 | 35 | import java.util.Optional;
|
35 | 36 | import java.util.Set;
|
36 | 37 | import org.slf4j.Logger;
|
37 | 38 | import org.slf4j.LoggerFactory;
|
| 39 | +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; |
38 | 40 |
|
39 | 41 | public class DynamodbSource extends BaseConnector implements Source {
|
40 | 42 |
|
@@ -70,23 +72,39 @@ public AirbyteConnectionStatus check(final JsonNode config) {
|
70 | 72 | public AirbyteCatalog discover(final JsonNode config) {
|
71 | 73 |
|
72 | 74 | final var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);
|
| 75 | + List<AirbyteStream> airbyteStreams = new ArrayList<>(); |
73 | 76 |
|
74 | 77 | try (final var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {
|
75 | 78 |
|
76 |
| - final var airbyteStreams = dynamodbOperations.listTables().stream() |
77 |
| - .map(tb -> new AirbyteStream() |
78 |
| - .withName(tb) |
79 |
| - .withJsonSchema(Jsons.jsonNode(ImmutableMap.builder() |
80 |
| - .put("type", "object") |
81 |
| - .put("properties", dynamodbOperations.inferSchema(tb, 1000)) |
82 |
| - .build())) |
83 |
| - .withSourceDefinedPrimaryKey(Collections.singletonList(dynamodbOperations.primaryKey(tb))) |
84 |
| - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) |
85 |
| - .toList(); |
86 |
| - |
87 |
| - return new AirbyteCatalog().withStreams(airbyteStreams); |
| 79 | + dynamodbOperations.listTables().forEach(table -> { |
| 80 | + try { |
| 81 | + airbyteStreams.add( |
| 82 | + new AirbyteStream() |
| 83 | + .withName(table) |
| 84 | + .withJsonSchema(Jsons.jsonNode(ImmutableMap.builder() |
| 85 | + .put("type", "object") |
| 86 | + // will throw DynamoDbException if it can't scan the table from missing read permissions |
| 87 | + .put("properties", dynamodbOperations.inferSchema(table, 1000)) |
| 88 | + .build())) |
| 89 | + .withSourceDefinedPrimaryKey(Collections.singletonList(dynamodbOperations.primaryKey(table))) |
| 90 | + .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))); |
| 91 | + } catch (DynamoDbException e) { |
| 92 | + if (dynamodbConfig.ignoreMissingPermissions()) { |
| 93 | + // fragile way to check for missing read access but there is no dedicated exception for missing |
| 94 | + // permissions. |
| 95 | + if (e.getMessage().contains("not authorized")) { |
| 96 | + LOGGER.warn("Connector doesn't have READ access for the table {}", table); |
| 97 | + } else { |
| 98 | + throw e; |
| 99 | + } |
| 100 | + } else { |
| 101 | + throw e; |
| 102 | + } |
| 103 | + } |
| 104 | + }); |
88 | 105 | }
|
89 | 106 |
|
| 107 | + return new AirbyteCatalog().withStreams(airbyteStreams); |
90 | 108 | }
|
91 | 109 |
|
92 | 110 | @Override
|
|
0 commit comments