|
9 | 9 | import com.fasterxml.jackson.databind.JsonNode;
|
10 | 10 | import com.google.common.base.Preconditions;
|
11 | 11 | import com.google.common.collect.Lists;
|
| 12 | +import io.airbyte.commons.exceptions.ConfigErrorException; |
12 | 13 | import io.airbyte.commons.exceptions.ConnectionErrorException;
|
13 | 14 | import io.airbyte.commons.features.EnvVariableFeatureFlags;
|
14 | 15 | import io.airbyte.commons.features.FeatureFlags;
|
|
26 | 27 | import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
|
27 | 28 | import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
|
28 | 29 | import io.airbyte.integrations.base.Source;
|
29 |
| -import io.airbyte.integrations.source.relationaldb.InvalidCursorException.InvalidCursorInfo; |
| 30 | +import io.airbyte.integrations.source.relationaldb.InvalidCursorInfoUtil.InvalidCursorInfo; |
30 | 31 | import io.airbyte.integrations.source.relationaldb.models.DbState;
|
31 | 32 | import io.airbyte.integrations.source.relationaldb.state.StateManager;
|
32 | 33 | import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory;
|
@@ -142,56 +143,42 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
|
142 | 143 | */
|
143 | 144 | @Override
|
144 | 145 | public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
|
145 |
| - final ConfiguredAirbyteCatalog catalog, |
146 |
| - final JsonNode state) |
147 |
| - throws Exception { |
148 |
| - try { |
149 |
| - final StateManager stateManager = |
150 |
| - StateManagerFactory.createStateManager(getSupportedStateType(config), |
151 |
| - deserializeInitialState(state, config), catalog); |
152 |
| - final Instant emittedAt = Instant.now(); |
153 |
| - |
154 |
| - final Database database = createDatabaseInternal(config); |
155 |
| - |
156 |
| - final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo = |
157 |
| - discoverWithoutSystemTables(database) |
158 |
| - .stream() |
159 |
| - .collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), |
160 |
| - Function |
161 |
| - .identity())); |
162 |
| - |
163 |
| - validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog); |
164 |
| - |
165 |
| - final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators = |
166 |
| - getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, |
167 |
| - emittedAt); |
168 |
| - final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators = |
169 |
| - getFullRefreshIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, |
170 |
| - emittedAt); |
171 |
| - final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = Stream |
172 |
| - .of(incrementalIterators, fullRefreshIterators) |
173 |
| - .flatMap(Collection::stream) |
174 |
| - .collect(Collectors.toList()); |
175 |
| - |
176 |
| - return AutoCloseableIterators |
177 |
| - .appendOnClose(AutoCloseableIterators.concatWithEagerClose(iteratorList), () -> { |
178 |
| - LOGGER.info("Closing database connection pool."); |
179 |
| - Exceptions.toRuntime(this::close); |
180 |
| - LOGGER.info("Closed database connection pool."); |
181 |
| - }); |
182 |
| - } catch (final Exception exception) { |
183 |
| - if (isConfigError(exception)) { |
184 |
| - AirbyteTraceMessageUtility.emitConfigErrorTrace(exception, exception.getMessage()); |
185 |
| - } |
186 |
| - throw exception; |
187 |
| - } |
188 |
| - } |
| 146 | + final ConfiguredAirbyteCatalog catalog, |
| 147 | + final JsonNode state) |
| 148 | + throws Exception { |
| 149 | + final StateManager stateManager = |
| 150 | + StateManagerFactory.createStateManager(getSupportedStateType(config), |
| 151 | + deserializeInitialState(state, config), catalog); |
| 152 | + final Instant emittedAt = Instant.now(); |
| 153 | + |
| 154 | + final Database database = createDatabaseInternal(config); |
| 155 | + |
| 156 | + final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo = |
| 157 | + discoverWithoutSystemTables(database) |
| 158 | + .stream() |
| 159 | + .collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), |
| 160 | + Function |
| 161 | + .identity())); |
| 162 | + |
| 163 | + validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog); |
| 164 | + |
| 165 | + final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators = |
| 166 | + getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, |
| 167 | + emittedAt); |
| 168 | + final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators = |
| 169 | + getFullRefreshIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, |
| 170 | + emittedAt); |
| 171 | + final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = Stream |
| 172 | + .of(incrementalIterators, fullRefreshIterators) |
| 173 | + .flatMap(Collection::stream) |
| 174 | + .collect(Collectors.toList()); |
189 | 175 |
|
190 |
| - private boolean isConfigError(final Exception exception) { |
191 |
| - // For now, enhanced error details should only be shown for InvalidCursorException. In the future, |
192 |
| - // enhanced error messages will exist for |
193 |
| - // additional error types. |
194 |
| - return exception instanceof InvalidCursorException; |
| 176 | + return AutoCloseableIterators |
| 177 | + .appendOnClose(AutoCloseableIterators.concatWithEagerClose(iteratorList), () -> { |
| 178 | + LOGGER.info("Closing database connection pool."); |
| 179 | + Exceptions.toRuntime(this::close); |
| 180 | + LOGGER.info("Closed database connection pool."); |
| 181 | + }); |
195 | 182 | }
|
196 | 183 |
|
197 | 184 | private void validateCursorFieldForIncrementalTables(
|
@@ -230,7 +217,8 @@ private void validateCursorFieldForIncrementalTables(
|
230 | 217 | }
|
231 | 218 |
|
232 | 219 | if (!tablesWithInvalidCursor.isEmpty()) {
|
233 |
| - throw new InvalidCursorException(tablesWithInvalidCursor); |
| 220 | + throw new ConfigErrorException( |
| 221 | + InvalidCursorInfoUtil.getInvalidCursorConfigMessage(tablesWithInvalidCursor)) ; |
234 | 222 | }
|
235 | 223 | }
|
236 | 224 |
|
|
0 commit comments