17
17
import io .airbyte .cdk .db .jdbc .DefaultJdbcDatabase ;
18
18
import io .airbyte .cdk .db .jdbc .JdbcDatabase ;
19
19
import io .airbyte .cdk .db .jdbc .JdbcUtils ;
20
+ import io .airbyte .cdk .integrations .base .JavaBaseConstants ;
20
21
import io .airbyte .cdk .integrations .destination .StandardNameTransformer ;
21
22
import io .airbyte .cdk .integrations .standardtest .destination .DestinationAcceptanceTest ;
22
23
import io .airbyte .commons .json .Jsons ;
23
24
import io .airbyte .commons .string .Strings ;
24
- import io .airbyte .integrations .destination .oracle .OracleDestination ;
25
25
import io .airbyte .integrations .destination .oracle .OracleNameTransformer ;
26
26
import java .sql .SQLException ;
27
+ import java .time .Duration ;
27
28
import java .util .ArrayList ;
28
29
import java .util .HashSet ;
29
30
import java .util .List ;
@@ -73,7 +74,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
73
74
return retrieveRecordsFromTable (namingResolver .getRawTableName (streamName ), namespace )
74
75
.stream ()
75
76
.map (r -> Jsons .deserialize (
76
- r .get (OracleDestination .COLUMN_NAME_DATA . replace ( " \" " , "" ) ).asText ()))
77
+ r .get (JavaBaseConstants .COLUMN_NAME_DATA ).asText ()))
77
78
.collect (Collectors .toList ());
78
79
}
79
80
@@ -113,16 +114,15 @@ protected List<String> resolveIdentifier(final String identifier) {
113
114
114
115
private List <JsonNode > retrieveRecordsFromTable (final String tableName , final String schemaName )
115
116
throws SQLException {
116
- final String query = String .format ("SELECT * FROM %s.%s ORDER BY %s ASC" , schemaName , tableName , OracleDestination .COLUMN_NAME_EMITTED_AT );
117
-
118
- try (final DSLContext dslContext = getDslContext (config )) {
119
- final List <org .jooq .Record > result = getDatabase (dslContext ).query (ctx -> ctx .fetch (query ).stream ().toList ());
120
- return result
121
- .stream ()
122
- .map (r -> r .formatJSON (JSON_FORMAT ))
123
- .map (Jsons ::deserialize )
124
- .collect (Collectors .toList ());
125
- }
117
+ final String query =
118
+ String .format ("SELECT * FROM %s.%s ORDER BY %s ASC" , schemaName , tableName , JavaBaseConstants .COLUMN_NAME_AB_EXTRACTED_AT .toUpperCase ());
119
+ final DSLContext dslContext = getDslContext (config );
120
+ final List <org .jooq .Record > result = getDatabase (dslContext ).query (ctx -> ctx .fetch (query ).stream ().toList ());
121
+ return result
122
+ .stream ()
123
+ .map (r -> r .formatJSON (JSON_FORMAT ))
124
+ .map (Jsons ::deserialize )
125
+ .collect (Collectors .toList ());
126
126
}
127
127
128
128
private static Database getDatabase (final DSLContext dslContext ) {
@@ -151,15 +151,13 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TES
151
151
db .start ();
152
152
153
153
config = getConfig (db );
154
+ final DSLContext dslContext = getDslContext (config );
155
+ final Database database = getDatabase (dslContext );
156
+ database .query (
157
+ ctx -> ctx .fetch (String .format ("CREATE USER %s IDENTIFIED BY %s" , schemaName , schemaName )));
158
+ database .query (ctx -> ctx .fetch (String .format ("GRANT ALL PRIVILEGES TO %s" , schemaName )));
154
159
155
- try (final DSLContext dslContext = getDslContext (config )) {
156
- final Database database = getDatabase (dslContext );
157
- database .query (
158
- ctx -> ctx .fetch (String .format ("CREATE USER %s IDENTIFIED BY %s" , schemaName , schemaName )));
159
- database .query (ctx -> ctx .fetch (String .format ("GRANT ALL PRIVILEGES TO %s" , schemaName )));
160
-
161
- ((ObjectNode ) config ).put (JdbcUtils .SCHEMA_KEY , dbName );
162
- }
160
+ ((ObjectNode ) config ).put (JdbcUtils .SCHEMA_KEY , dbName );
163
161
}
164
162
165
163
@ Override
@@ -182,7 +180,8 @@ public void testEncryption() throws SQLException {
182
180
config .get (JdbcUtils .PORT_KEY ).asInt (),
183
181
config .get ("sid" ).asText ()),
184
182
JdbcUtils .parseJdbcParameters ("oracle.net.encryption_client=REQUIRED;" +
185
- "oracle.net.encryption_types_client=( " + algorithm + " )" , ";" ));
183
+ "oracle.net.encryption_types_client=( " + algorithm + " )" , ";" ),
184
+ Duration .ofMinutes (5 ));
186
185
final JdbcDatabase database = new DefaultJdbcDatabase (dataSource );
187
186
188
187
final String networkServiceBanner =
@@ -208,7 +207,8 @@ public void testCheckProtocol() throws SQLException {
208
207
config .get (JdbcUtils .PORT_KEY ).asInt (),
209
208
config .get ("sid" ).asText ()),
210
209
JdbcUtils .parseJdbcParameters ("oracle.net.encryption_client=REQUIRED;" +
211
- "oracle.net.encryption_types_client=( " + algorithm + " )" , ";" ));
210
+ "oracle.net.encryption_types_client=( " + algorithm + " )" , ";" ),
211
+ Duration .ofMinutes (5 ));
212
212
final JdbcDatabase database = new DefaultJdbcDatabase (dataSource );
213
213
214
214
final String networkServiceBanner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual" ;
0 commit comments