4
4
5
5
package io .airbyte .scheduler .persistence .job_tracker ;
6
6
7
+ import static java .util .Collections .emptyMap ;
8
+ import static java .util .Collections .singletonMap ;
9
+
7
10
import com .fasterxml .jackson .databind .JsonNode ;
8
- import com .fasterxml .jackson .databind .node . ObjectNode ;
11
+ import com .fasterxml .jackson .databind .ObjectMapper ;
9
12
import com .google .common .annotations .VisibleForTesting ;
10
13
import com .google .common .base .Preconditions ;
11
14
import com .google .common .collect .ImmutableMap ;
12
15
import com .google .common .collect .ImmutableMap .Builder ;
13
16
import io .airbyte .analytics .TrackingClient ;
17
+ import io .airbyte .commons .json .Jsons ;
14
18
import io .airbyte .commons .lang .Exceptions ;
15
19
import io .airbyte .commons .map .MoreMaps ;
16
20
import io .airbyte .config .JobConfig ;
28
32
import io .airbyte .scheduler .models .Job ;
29
33
import io .airbyte .scheduler .persistence .JobPersistence ;
30
34
import io .airbyte .scheduler .persistence .WorkspaceHelper ;
35
+ import io .airbyte .validation .json .JsonSchemaValidator ;
31
36
import io .airbyte .validation .json .JsonValidationException ;
32
37
import java .io .IOException ;
33
- import java .util .Collections ;
34
38
import java .util .HashMap ;
35
39
import java .util .Iterator ;
36
40
import java .util .Map ;
41
+ import java .util .Map .Entry ;
37
42
import java .util .UUID ;
38
43
39
44
public class JobTracker {
@@ -50,6 +55,8 @@ public enum JobState {
50
55
public static final String OPERATION = "operation." ;
51
56
public static final String SET = "set" ;
52
57
58
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
59
+
53
60
private final ConfigRepository configRepository ;
54
61
private final JobPersistence jobPersistence ;
55
62
private final WorkspaceHelper workspaceHelper ;
@@ -118,16 +125,21 @@ public void trackSync(final Job job, final JobState jobState) {
118
125
Preconditions .checkArgument (allowedJob , "Job type " + configType + " is not allowed!" );
119
126
final long jobId = job .getId ();
120
127
final UUID connectionId = UUID .fromString (job .getScope ());
121
- final UUID sourceDefinitionId = configRepository .getSourceDefinitionFromConnection (connectionId ).getSourceDefinitionId ();
122
- final UUID destinationDefinitionId = configRepository .getDestinationDefinitionFromConnection (connectionId ).getDestinationDefinitionId ();
128
+ final StandardSourceDefinition sourceDefinition = configRepository .getSourceDefinitionFromConnection (connectionId );
129
+ final UUID sourceDefinitionId = sourceDefinition .getSourceDefinitionId ();
130
+ final StandardDestinationDefinition destinationDefinition = configRepository .getDestinationDefinitionFromConnection (connectionId );
131
+ final UUID destinationDefinitionId = destinationDefinition .getDestinationDefinitionId ();
123
132
124
133
final Map <String , Object > jobMetadata = generateJobMetadata (String .valueOf (jobId ), configType , job .getAttemptsCount ());
125
134
final Map <String , Object > jobAttemptMetadata = generateJobAttemptMetadata (job .getId (), jobState );
126
135
final Map <String , Object > sourceDefMetadata = generateSourceDefinitionMetadata (sourceDefinitionId );
127
136
final Map <String , Object > destinationDefMetadata = generateDestinationDefinitionMetadata (destinationDefinitionId );
128
137
final Map <String , Object > syncMetadata = generateSyncMetadata (connectionId );
129
138
final Map <String , Object > stateMetadata = generateStateMetadata (jobState );
130
- final Map <String , Object > syncConfigMetadata = generateSyncConfigMetadata (job .getConfig ());
139
+ final Map <String , Object > syncConfigMetadata = generateSyncConfigMetadata (
140
+ job .getConfig (),
141
+ sourceDefinition .getSpec ().getConnectionSpecification (),
142
+ destinationDefinition .getSpec ().getConnectionSpecification ());
131
143
132
144
final UUID workspaceId = workspaceHelper .getWorkspaceForJobIdIgnoreExceptions (jobId );
133
145
track (workspaceId ,
@@ -142,18 +154,20 @@ public void trackSync(final Job job, final JobState jobState) {
142
154
});
143
155
}
144
156
145
- private Map <String , Object > generateSyncConfigMetadata (final JobConfig config ) {
157
+ private Map <String , Object > generateSyncConfigMetadata (final JobConfig config ,
158
+ final JsonNode sourceConfigSchema ,
159
+ final JsonNode destinationConfigSchema ) {
146
160
if (config .getConfigType () == ConfigType .SYNC ) {
147
161
final JsonNode sourceConfiguration = config .getSync ().getSourceConfiguration ();
148
162
final JsonNode destinationConfiguration = config .getSync ().getDestinationConfiguration ();
149
163
150
- final Map <String , Object > sourceMetadata = configToMetadata (CONFIG + ".source" , sourceConfiguration );
151
- final Map <String , Object > destinationMetadata = configToMetadata (CONFIG + ".destination" , destinationConfiguration );
164
+ final Map <String , Object > sourceMetadata = configToMetadata (CONFIG + ".source" , sourceConfiguration , sourceConfigSchema );
165
+ final Map <String , Object > destinationMetadata = configToMetadata (CONFIG + ".destination" , destinationConfiguration , destinationConfigSchema );
152
166
final Map <String , Object > catalogMetadata = getCatalogMetadata (config .getSync ().getConfiguredAirbyteCatalog ());
153
167
154
168
return MoreMaps .merge (sourceMetadata , destinationMetadata , catalogMetadata );
155
169
} else {
156
- return Collections . emptyMap ();
170
+ return emptyMap ();
157
171
}
158
172
}
159
173
@@ -168,30 +182,94 @@ private Map<String, Object> getCatalogMetadata(final ConfiguredAirbyteCatalog ca
168
182
return output ;
169
183
}
170
184
171
- protected static Map <String , Object > configToMetadata (final String jsonPath , final JsonNode config ) {
185
+ /**
186
+ * Flattens a config into a map. Uses the schema to determine which fields are const (i.e.
187
+ * non-sensitive). Non-const, non-boolean values are replaced with {@link #SET} to avoid leaking
188
+ * potentially-sensitive information.
189
+ * <p>
190
+ * anyOf/allOf schemas are treated as non-const values. These aren't (currently) used in config
191
+ * schemas anyway.
192
+ *
193
+ * @param jsonPath A prefix to add to all the keys in the returned map, with a period (`.`)
194
+ * separator
195
+ * @param schema The JSON schema that {@code config} conforms to
196
+ */
197
+ protected static Map <String , Object > configToMetadata (final String jsonPath , final JsonNode config , final JsonNode schema ) {
198
+ final Map <String , Object > metadata = configToMetadata (config , schema );
199
+ // Prepend all the keys with the root jsonPath
200
+ // But leave the values unchanged
172
201
final Map <String , Object > output = new HashMap <>();
202
+ Jsons .mergeMaps (output , jsonPath , metadata );
203
+ return output ;
204
+ }
205
+
206
+ /**
207
+ * Does the actually interesting bits of configToMetadata. If config is an object, returns a
208
+ * flattened map. If config is _not_ an object (i.e. it's a primitive string/number/etc, or it's an
209
+ * array) then returns a map of {null: toMetadataValue(config)}.
210
+ */
211
+ private static Map <String , Object > configToMetadata (final JsonNode config , final JsonNode schema ) {
212
+ if (schema .hasNonNull ("const" )) {
213
+ // If this schema is a const, then just dump it into a map:
214
+ // * If it's an object, flatten it
215
+ // * Otherwise, do some basic conversions to value-ish data.
216
+ // It would be a weird thing to declare const: null, but in that case we don't want to report null
217
+ // anyway, so explicitly use hasNonNull.
218
+ return Jsons .flatten (config );
219
+ } else if (schema .has ("oneOf" )) {
220
+ // If this schema is a oneOf, then find the first sub-schema which the config matches
221
+ // and use that sub-schema to convert the config to a map
222
+ final JsonSchemaValidator validator = new JsonSchemaValidator ();
223
+ for (final Iterator <JsonNode > it = schema .get ("oneOf" ).elements (); it .hasNext ();) {
224
+ final JsonNode subSchema = it .next ();
225
+ if (validator .test (subSchema , config )) {
226
+ return configToMetadata (config , subSchema );
227
+ }
228
+ }
229
+ // If we didn't match any of the subschemas, then something is wrong. Bail out silently.
230
+ return emptyMap ();
231
+ } else if (config .isObject ()) {
232
+ // If the schema is not a oneOf, but the config is an object (i.e. the schema has "type": "object")
233
+ // then we need to recursively convert each field of the object to a map.
234
+ final Map <String , Object > output = new HashMap <>();
235
+ final JsonNode maybeProperties = schema .get ("properties" );
236
+
237
+ // If additionalProperties is not set, or it's a boolean, then there's no schema for additional properties. Use the accept-all schema.
238
+ // Otherwise, it's an actual schema.
239
+ final JsonNode maybeAdditionalProperties = schema .get ("additionalProperties" );
240
+ final JsonNode additionalPropertiesSchema ;
241
+ if (maybeAdditionalProperties == null || maybeAdditionalProperties .isBoolean ()) {
242
+ additionalPropertiesSchema = OBJECT_MAPPER .createObjectNode ();
243
+ } else {
244
+ additionalPropertiesSchema = maybeAdditionalProperties ;
245
+ }
173
246
174
- if (config .isObject ()) {
175
- final ObjectNode node = (ObjectNode ) config ;
176
- for (final Iterator <Map .Entry <String , JsonNode >> it = node .fields (); it .hasNext ();) {
177
- final var entry = it .next ();
178
- final var field = entry .getKey ();
179
- final var fieldJsonPath = jsonPath + "." + field ;
180
- final var child = entry .getValue ();
181
-
182
- if (child .isBoolean ()) {
183
- output .put (fieldJsonPath , child .asBoolean ());
184
- } else if (!child .isNull ()) {
185
- if (child .isObject ()) {
186
- output .putAll (configToMetadata (fieldJsonPath , child ));
187
- } else if (!child .isTextual () || (child .isTextual () && !child .asText ().isEmpty ())) {
188
- output .put (fieldJsonPath , SET );
189
- }
247
+ for (final Iterator <Entry <String , JsonNode >> it = config .fields (); it .hasNext (); ) {
248
+ final Entry <String , JsonNode > entry = it .next ();
249
+ final String field = entry .getKey ();
250
+ final JsonNode value = entry .getValue ();
251
+
252
+ final JsonNode propertySchema ;
253
+ if (maybeProperties != null && maybeProperties .hasNonNull (field )) {
254
+ // If this property is explicitly declared, then use its schema
255
+ propertySchema = maybeProperties .get (field );
256
+ } else {
257
+ // otherwise, use the additionalProperties schema
258
+ propertySchema = additionalPropertiesSchema ;
190
259
}
260
+
261
+ Jsons .mergeMaps (output , field , configToMetadata (value , propertySchema ));
191
262
}
263
+ return output ;
264
+ } else if (config .isBoolean ()) {
265
+ return singletonMap (null , config .asBoolean ());
266
+ } else if ((!config .isTextual () && !config .isNull ()) || (config .isTextual () && !config .asText ().isEmpty ())) {
267
+ // This is either non-textual (e.g. integer, array, etc) or non-empty text
268
+ return singletonMap (null , SET );
269
+ } else {
270
+ // Otherwise, this is an empty string, so just ignore it
271
+ return emptyMap ();
192
272
}
193
-
194
- return output ;
195
273
}
196
274
197
275
private Map <String , Object > generateSyncMetadata (final UUID connectionId ) throws ConfigNotFoundException , IOException , JsonValidationException {
0 commit comments