Skip to content

Commit 6897a60

Browse files
authored
Choose optimal sync modes by default on UI (#12411)
* fix CatalogSectionInner component padding * fix React key error in StreamFieldTable component * Update choosing the optimal sync mode logic. Sync Mode priority: 1. Incremental(cursor defined) => Append Dedup 2. Full Refresh => Overwrite 3. Incremental => Append 4. Full Refresh => Append * Fixes after comments to PR: - replace func "isEmpty" with "[].length" - replace named export with direct export - update "config" type for updateStreamConfig func - rename "getOptimalSyncMode" to "setOptimalSyncMode" * remove fp-ts package * fix wrong condition * cover formConfigHelpers with tests * add npm script: run tests with coverage * tiny fix: test assert
1 parent e8406a2 commit 6897a60

File tree

6 files changed

+217
-51
lines changed

6 files changed

+217
-51
lines changed

airbyte-webapp/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"start": "react-scripts start",
1010
"build": "react-scripts build",
1111
"test": "react-scripts test",
12+
"test:coverage": "npm test -- --coverage --watchAll=false",
1213
"format": "prettier --write 'src/**/*.{ts,tsx}'",
1314
"storybook": "start-storybook -p 9009 -s public --quiet",
1415
"lint": "eslint --ext js,ts,tsx src",

airbyte-webapp/src/views/Connection/CatalogTree/CatalogSection.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { flatten, getPathType } from "./utils";
2828
const Section = styled.div<{ error?: boolean; isSelected: boolean }>`
2929
border: 1px solid ${(props) => (props.error ? props.theme.dangerColor : "none")};
3030
background: ${({ theme, isSelected }) => (isSelected ? "rgba(97, 94, 255, 0.1);" : theme.greyColor0)};
31+
padding: 2px;
3132
3233
&:first-child {
3334
border-radius: 8px 8px 0 0;

airbyte-webapp/src/views/Connection/CatalogTree/StreamFieldTable.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ export const StreamFieldTable: React.FC<StreamFieldTableProps> = (props) => {
3838
</TreeRowWrapper>
3939
<RowsContainer>
4040
{props.syncSchemaFields.map((field) => (
41-
<TreeRowWrapper depth={1} key={field.key}>
41+
<TreeRowWrapper depth={1} key={pathDisplayName(field.path)}>
4242
<FieldRow
4343
path={field.path}
4444
name={pathDisplayName(field.path)}

airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx

Lines changed: 8 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
1-
import { setIn } from "formik";
21
import { useMemo } from "react";
32
import { useIntl } from "react-intl";
43
import * as yup from "yup";
54

65
import { DropDownRow } from "components";
76

87
import FrequencyConfig from "config/FrequencyConfig.json";
9-
import {
10-
AirbyteStreamConfiguration,
11-
DestinationSyncMode,
12-
SyncMode,
13-
SyncSchema,
14-
SyncSchemaStream,
15-
} from "core/domain/catalog";
8+
import { DestinationSyncMode, SyncMode, SyncSchema, SyncSchemaStream } from "core/domain/catalog";
169
import { Connection, ScheduleProperties } from "core/domain/connection";
1710
import { ConnectionNamespaceDefinition, ConnectionSchedule } from "core/domain/connection";
1811
import {
@@ -28,6 +21,8 @@ import { SOURCE_NAMESPACE_TAG } from "core/domain/connector/source";
2821
import { ValuesProps } from "hooks/services/useConnectionHook";
2922
import { useCurrentWorkspace } from "services/workspaces/WorkspacesService";
3023

24+
import { getOptimalSyncMode, verifyConfigCursorField, verifySupportedSyncModes } from "./formConfigHelpers";
25+
3126
type FormikConnectionFormValues = {
3227
schedule?: ScheduleProperties | null;
3328
prefix: string;
@@ -197,54 +192,17 @@ function mapFormPropsToOperation(
197192
return newOperations;
198193
}
199194

200-
function getDefaultCursorField(streamNode: SyncSchemaStream): string[] {
201-
if (streamNode.stream.defaultCursorField.length) {
202-
return streamNode.stream.defaultCursorField;
203-
}
204-
return streamNode.config.cursorField;
205-
}
206-
207-
const useInitialSchema = (schema: SyncSchema): SyncSchema =>
195+
const useInitialSchema = (schema: SyncSchema, supportedDestinationSyncModes: DestinationSyncMode[]): SyncSchema =>
208196
useMemo<SyncSchema>(
209197
() => ({
210198
streams: schema.streams.map<SyncSchemaStream>((apiNode, id) => {
211199
const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() };
200+
const nodeStream = verifyConfigCursorField(verifySupportedSyncModes(nodeWithId));
212201

213-
// If the value in supportedSyncModes is empty assume the only supported sync mode is FULL_REFRESH.
214-
// Otherwise, it supports whatever sync modes are present.
215-
const streamNode = nodeWithId.stream.supportedSyncModes?.length
216-
? nodeWithId
217-
: setIn(nodeWithId, "stream.supportedSyncModes", [SyncMode.FullRefresh]);
218-
219-
// If syncMode isn't null - don't change item
220-
if (streamNode.config.syncMode) {
221-
return streamNode;
222-
}
223-
224-
const updateStreamConfig = (config: Partial<AirbyteStreamConfiguration>): SyncSchemaStream => ({
225-
...streamNode,
226-
config: { ...streamNode.config, ...config },
227-
});
228-
229-
const supportedSyncModes = streamNode.stream.supportedSyncModes;
230-
231-
// Prefer INCREMENTAL sync mode over other sync modes
232-
if (supportedSyncModes.includes(SyncMode.Incremental)) {
233-
return updateStreamConfig({
234-
cursorField: streamNode.config.cursorField.length
235-
? streamNode.config.cursorField
236-
: getDefaultCursorField(streamNode),
237-
syncMode: SyncMode.Incremental,
238-
});
239-
}
240-
241-
// If source don't support INCREMENTAL and FULL_REFRESH - set first value from supportedSyncModes list
242-
return updateStreamConfig({
243-
syncMode: streamNode.stream.supportedSyncModes[0],
244-
});
202+
return getOptimalSyncMode(nodeStream, supportedDestinationSyncModes);
245203
}),
246204
}),
247-
[schema.streams]
205+
[schema.streams, supportedDestinationSyncModes]
248206
);
249207

250208
const getInitialTransformations = (operations: Operation[]): Transformation[] => operations.filter(isDbtTransformation);
@@ -266,7 +224,7 @@ const useInitialValues = (
266224
destDefinition: DestinationDefinitionSpecification,
267225
isEditMode?: boolean
268226
): FormikConnectionFormValues => {
269-
const initialSchema = useInitialSchema(connection.syncCatalog);
227+
const initialSchema = useInitialSchema(connection.syncCatalog, destDefinition.supportedDestinationSyncModes);
270228

271229
return useMemo(() => {
272230
const initialValues: FormikConnectionFormValues = {
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import { DestinationSyncMode, SyncMode, SyncSchemaStream } from "../../../core/domain/catalog";
2+
import { getOptimalSyncMode, verifyConfigCursorField, verifySupportedSyncModes } from "./formConfigHelpers";
3+
4+
const mockedStreamNode: SyncSchemaStream = {
5+
stream: {
6+
name: "test",
7+
supportedSyncModes: [],
8+
jsonSchema: {},
9+
sourceDefinedCursor: null,
10+
sourceDefinedPrimaryKey: [],
11+
defaultCursorField: [],
12+
},
13+
config: {
14+
cursorField: [],
15+
primaryKey: [],
16+
selected: true,
17+
syncMode: SyncMode.FullRefresh,
18+
destinationSyncMode: DestinationSyncMode.Append,
19+
aliasName: "",
20+
},
21+
id: "1",
22+
};
23+
24+
describe("formConfigHelpers", () => {
25+
describe("verifySupportedSyncModes", () => {
26+
const streamNodeWithDefinedSyncMode: SyncSchemaStream = {
27+
...mockedStreamNode,
28+
stream: { ...mockedStreamNode.stream, supportedSyncModes: [SyncMode.Incremental] },
29+
};
30+
31+
test("should not change supportedSyncModes if it's not empty", () => {
32+
const streamNode = verifySupportedSyncModes(streamNodeWithDefinedSyncMode);
33+
34+
expect(streamNode.stream.supportedSyncModes).toStrictEqual([SyncMode.Incremental]);
35+
});
36+
37+
test("should set default supportedSyncModes if it's empty", () => {
38+
const streamNodeWithEmptySyncMode = {
39+
...streamNodeWithDefinedSyncMode,
40+
stream: { ...mockedStreamNode.stream, supportedSyncModes: [] },
41+
};
42+
const streamNode = verifySupportedSyncModes(streamNodeWithEmptySyncMode);
43+
44+
expect(streamNode.stream.supportedSyncModes).toStrictEqual([SyncMode.FullRefresh]);
45+
});
46+
});
47+
48+
describe("verifyConfigCursorField", () => {
49+
const streamWithDefinedCursorField: SyncSchemaStream = {
50+
...mockedStreamNode,
51+
config: { ...mockedStreamNode.config, cursorField: ["id"] },
52+
stream: { ...mockedStreamNode.stream, defaultCursorField: ["name"] },
53+
};
54+
55+
test("should leave cursorField value as is if it's defined", () => {
56+
const streamNode = verifyConfigCursorField(streamWithDefinedCursorField);
57+
58+
expect(streamNode.config.cursorField).toStrictEqual(["id"]);
59+
});
60+
61+
test("should set defaultCursorField if cursorField is not defined", () => {
62+
const streamNodeWithoutDefinedCursor = {
63+
...streamWithDefinedCursorField,
64+
config: { ...mockedStreamNode.config, cursorField: [] },
65+
};
66+
const streamNode = verifyConfigCursorField(streamNodeWithoutDefinedCursor);
67+
68+
expect(streamNode.config.cursorField).toStrictEqual(["name"]);
69+
});
70+
71+
test("should leave cursorField empty if defaultCursorField not defined", () => {
72+
const streamNode = verifyConfigCursorField(mockedStreamNode);
73+
74+
expect(streamNode.config.cursorField).toStrictEqual([]);
75+
});
76+
});
77+
78+
describe("getOptimalSyncMode", () => {
79+
test("should get 'Incremental(cursor defined) => Append dedup' mode", () => {
80+
const streamNodeWithIncrDedupMode = {
81+
...mockedStreamNode,
82+
stream: { ...mockedStreamNode.stream, supportedSyncModes: [SyncMode.Incremental], sourceDefinedCursor: true },
83+
};
84+
const nodeStream = getOptimalSyncMode(streamNodeWithIncrDedupMode, [DestinationSyncMode.Dedupted]);
85+
86+
expect(nodeStream.config.syncMode).toBe(SyncMode.Incremental);
87+
expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Dedupted);
88+
});
89+
90+
test("should get 'FullRefresh => Overwrite' mode", () => {
91+
const nodeStream = getOptimalSyncMode(mockedStreamNode, [DestinationSyncMode.Overwrite]);
92+
93+
expect(nodeStream.config.syncMode).toBe(SyncMode.FullRefresh);
94+
expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Overwrite);
95+
});
96+
97+
test("should get 'Incremental => Append' mode", () => {
98+
const streamNodeWithIncrAppendMode = {
99+
...mockedStreamNode,
100+
stream: { ...mockedStreamNode.stream, supportedSyncModes: [SyncMode.Incremental] },
101+
};
102+
const nodeStream = getOptimalSyncMode(streamNodeWithIncrAppendMode, [DestinationSyncMode.Append]);
103+
104+
expect(nodeStream.config.syncMode).toBe(SyncMode.Incremental);
105+
expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Append);
106+
});
107+
108+
test("should get 'FullRefresh => Append' mode", () => {
109+
const nodeStream = getOptimalSyncMode(mockedStreamNode, [DestinationSyncMode.Append]);
110+
111+
expect(nodeStream.config.syncMode).toBe(SyncMode.FullRefresh);
112+
expect(nodeStream.config.destinationSyncMode).toBe(DestinationSyncMode.Append);
113+
});
114+
test("should return untouched nodeStream", () => {
115+
const nodeStream = getOptimalSyncMode(mockedStreamNode, []);
116+
117+
expect(nodeStream).toBe(mockedStreamNode);
118+
});
119+
});
120+
});
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import {
2+
AirbyteStreamConfiguration,
3+
DestinationSyncMode,
4+
SyncMode,
5+
SyncSchemaStream,
6+
} from "../../../core/domain/catalog";
7+
8+
const getDefaultCursorField = (streamNode: SyncSchemaStream): string[] => {
9+
if (streamNode.stream.defaultCursorField.length) {
10+
return streamNode.stream.defaultCursorField;
11+
}
12+
return streamNode.config.cursorField;
13+
};
14+
15+
export const verifySupportedSyncModes = (streamNode: SyncSchemaStream): SyncSchemaStream => {
16+
const {
17+
stream: { supportedSyncModes },
18+
} = streamNode;
19+
20+
if (supportedSyncModes?.length) return streamNode;
21+
return { ...streamNode, stream: { ...streamNode.stream, supportedSyncModes: [SyncMode.FullRefresh] } };
22+
};
23+
24+
export const verifyConfigCursorField = (streamNode: SyncSchemaStream): SyncSchemaStream => {
25+
const { config } = streamNode;
26+
27+
return {
28+
...streamNode,
29+
config: {
30+
...config,
31+
cursorField: config.cursorField?.length ? config.cursorField : getDefaultCursorField(streamNode),
32+
},
33+
};
34+
};
35+
36+
export const getOptimalSyncMode = (
37+
streamNode: SyncSchemaStream,
38+
supportedDestinationSyncModes: DestinationSyncMode[]
39+
): SyncSchemaStream => {
40+
const updateStreamConfig = (
41+
config: Pick<AirbyteStreamConfiguration, "syncMode" | "destinationSyncMode">
42+
): SyncSchemaStream => ({
43+
...streamNode,
44+
config: { ...streamNode.config, ...config },
45+
});
46+
47+
const {
48+
stream: { supportedSyncModes, sourceDefinedCursor },
49+
} = streamNode;
50+
51+
if (
52+
supportedSyncModes.includes(SyncMode.Incremental) &&
53+
supportedDestinationSyncModes.includes(DestinationSyncMode.Dedupted) &&
54+
sourceDefinedCursor
55+
) {
56+
return updateStreamConfig({
57+
syncMode: SyncMode.Incremental,
58+
destinationSyncMode: DestinationSyncMode.Dedupted,
59+
});
60+
}
61+
62+
if (supportedDestinationSyncModes.includes(DestinationSyncMode.Overwrite)) {
63+
return updateStreamConfig({
64+
syncMode: SyncMode.FullRefresh,
65+
destinationSyncMode: DestinationSyncMode.Overwrite,
66+
});
67+
}
68+
69+
if (
70+
supportedSyncModes.includes(SyncMode.Incremental) &&
71+
supportedDestinationSyncModes.includes(DestinationSyncMode.Append)
72+
) {
73+
return updateStreamConfig({
74+
syncMode: SyncMode.Incremental,
75+
destinationSyncMode: DestinationSyncMode.Append,
76+
});
77+
}
78+
79+
if (supportedDestinationSyncModes.includes(DestinationSyncMode.Append)) {
80+
return updateStreamConfig({
81+
syncMode: SyncMode.FullRefresh,
82+
destinationSyncMode: DestinationSyncMode.Append,
83+
});
84+
}
85+
return streamNode;
86+
};

0 commit comments

Comments
 (0)