Skip to content

Commit 9fb4bf7

Browse files
committed
Include join.left source as an upstream source connection
1 parent 4379920 commit 9fb4bf7

File tree

3 files changed

+102
-77
lines changed

3 files changed

+102
-77
lines changed

frontend/src/lib/api/utils.ts

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import {
55
type ILineageResponse,
66
type ILogicalNode,
77
type INodeKey,
8+
type ISource,
89
type NodeGraph
910
} from '../types/codegen';
1011

1112
/** Convert Join to LineageResponse by walking joinParts */
12-
export function joinToLineage(join: IJoin): ILineageResponse {
13+
export function joinToLineage(join: IJoin, excludeLeft = false): ILineageResponse {
1314
// Use `InternMap` insteaad of `Map` to support object keys (instances will be different once serialized/fetched from API) - https://d3js.org/d3-array/intern
1415
// @ts-expect-error: Bad typing
1516
const connections: NodeGraph['connections'] = new InternMap([], JSON.stringify);
@@ -26,6 +27,10 @@ export function joinToLineage(join: IJoin): ILineageResponse {
2627
const joinParents: INodeKey[] = [];
2728
connections.set(joinNodeKey, { parents: joinParents });
2829

30+
if (join.left && !excludeLeft) {
31+
processSource(join.left, infoMap, connections, joinParents);
32+
}
33+
2934
for (const jp of join.joinParts ?? []) {
3035
if (jp.groupBy) {
3136
const groupByNodeKey: INodeKey = {
@@ -41,52 +46,7 @@ export function joinToLineage(join: IJoin): ILineageResponse {
4146
connections.set(groupByNodeKey, { parents: groupByParents });
4247

4348
for (const source of jp.groupBy?.sources ?? []) {
44-
if (source.entities) {
45-
const entityNodeKey: INodeKey = {
46-
name: source.entities.snapshotTable,
47-
logicalType: LogicalType.TABULAR_DATA // TODO: Are all sources tabular data?
48-
};
49-
infoMap.set(entityNodeKey, {
50-
conf: source.entities as ILogicalNode
51-
});
52-
groupByParents.push(entityNodeKey);
53-
}
54-
55-
if (source.events) {
56-
const eventNodeKey: INodeKey = {
57-
name: source.events.table,
58-
logicalType: LogicalType.TABULAR_DATA // TODO: Are all sources tabular data?
59-
};
60-
infoMap.set(eventNodeKey, {
61-
conf: source.events as ILogicalNode
62-
});
63-
groupByParents.push(eventNodeKey);
64-
}
65-
66-
if (source.joinSource) {
67-
const joinNodeKey: INodeKey = {
68-
name: source.joinSource.join?.metaData?.name,
69-
logicalType: LogicalType.TABULAR_DATA // TODO: Are all sources tabular data?
70-
};
71-
infoMap.set(joinNodeKey, {
72-
conf: source.joinSource as ILogicalNode
73-
});
74-
groupByParents.push(joinNodeKey);
75-
76-
// Transfer connections and infoMap from joinSource join to root join graph
77-
const joinSourceLineage = joinToLineage(source.joinSource.join as IJoin);
78-
79-
for (const [key, nodeConnections] of joinSourceLineage.nodeGraph?.connections ?? []) {
80-
connections.set(
81-
key === joinSourceLineage.mainNode ? joinNodeKey : key,
82-
nodeConnections
83-
);
84-
}
85-
86-
for (const [key, info] of joinSourceLineage.nodeGraph?.infoMap ?? []) {
87-
infoMap.set(key, info);
88-
}
89-
}
49+
processSource(source, infoMap, connections, groupByParents);
9050
}
9151
}
9252
}
@@ -99,3 +59,54 @@ export function joinToLineage(join: IJoin): ILineageResponse {
9959
mainNode: joinNodeKey
10060
};
10161
}
62+
63+
function processSource(
64+
source: ISource,
65+
infoMap: NonNullable<NodeGraph['infoMap']>,
66+
connections: NonNullable<NodeGraph['connections']>,
67+
parents: INodeKey[]
68+
) {
69+
if (source.entities) {
70+
const entityNodeKey: INodeKey = {
71+
name: source.entities.snapshotTable,
72+
logicalType: LogicalType.TABULAR_DATA // TODO: Are all sources tabular data?
73+
};
74+
infoMap.set(entityNodeKey, {
75+
conf: source.entities as ILogicalNode
76+
});
77+
parents.push(entityNodeKey);
78+
}
79+
80+
if (source.events) {
81+
const eventNodeKey: INodeKey = {
82+
name: source.events.table,
83+
logicalType: LogicalType.TABULAR_DATA // TODO: Are all sources tabular data?
84+
};
85+
infoMap.set(eventNodeKey, {
86+
conf: source.events as ILogicalNode
87+
});
88+
parents.push(eventNodeKey);
89+
}
90+
91+
if (source.joinSource) {
92+
const joinNodeKey: INodeKey = {
93+
name: source.joinSource.join?.metaData?.name,
94+
logicalType: LogicalType.TABULAR_DATA // TODO: Are all sources tabular data?
95+
};
96+
infoMap.set(joinNodeKey, {
97+
conf: source.joinSource as ILogicalNode
98+
});
99+
parents.push(joinNodeKey);
100+
101+
// Transfer connections and infoMap from joinSource join to root join graph
102+
const joinSourceLineage = joinToLineage(source.joinSource.join as IJoin);
103+
104+
for (const [key, nodeConnections] of joinSourceLineage.nodeGraph?.connections ?? []) {
105+
connections.set(key === joinSourceLineage.mainNode ? joinNodeKey : key, nodeConnections);
106+
}
107+
108+
for (const [key, info] of joinSourceLineage.nodeGraph?.infoMap ?? []) {
109+
infoMap.set(key, info);
110+
}
111+
}
112+
}

frontend/src/lib/types/LogicalNode.ts

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,27 +38,34 @@ export const logicalNodeConfig = {
3838
};
3939

4040
/** Determine node type from it's properties */
41-
export function getLogicalNodeType(node: { key: INodeKey; value: INodeInfo }) {
42-
if ('joinParts' in node.value.conf!) {
41+
export function getLogicalNodeType(node: CombinedLogicalNode) {
42+
if ('joinParts' in node) {
4343
return LogicalType.JOIN;
44-
} else if ('sources' in node.value.conf!) {
44+
} else if ('sources' in node) {
4545
return LogicalType.GROUP_BY;
46-
} else if ('query' in node.value.conf! && typeof node.value.conf.query === 'string') {
46+
} else if ('query' in node && typeof node.query === 'string') {
4747
return LogicalType.STAGING_QUERY;
48-
} else if ('modelType' in node.value.conf!) {
48+
} else if ('modelType' in node) {
4949
return LogicalType.MODEL;
5050
} else {
5151
return LogicalType.TABULAR_DATA;
5252
}
5353
}
5454

55+
/**
56+
* Returns the logical source type of a node (`source.entities` or `source.events`) if node is a source, else `null`
57+
*/
58+
export function getLogicalNodeSourceType(node: CombinedLogicalNode) {
59+
return 'snapshotTable' in node ? 'entity' : 'table' in node ? 'event' : null;
60+
}
61+
5562
export function getLogicalNodeConfig(node: { key: INodeKey; value: INodeInfo }) {
56-
const nodeType = getLogicalNodeType(node);
63+
const nodeType = getLogicalNodeType(node.value.conf as CombinedLogicalNode); // TODO: Is not `ILogicalNode` with nested `join` | `groupBy` | etc properties
5764
if (nodeType === LogicalType.TABULAR_DATA) {
5865
return {
5966
...logicalNodeConfig[LogicalType.TABULAR_DATA],
6067
icon:
61-
getLogicalNodeSubtype(node.value.conf as CombinedLogicalNode) === 'entity'
68+
getLogicalNodeSourceType(node.value.conf as CombinedLogicalNode) === 'entity'
6269
? IconTableCells
6370
: IconSignal
6471
};
@@ -67,13 +74,6 @@ export function getLogicalNodeConfig(node: { key: INodeKey; value: INodeInfo })
6774
}
6875
}
6976

70-
/**
71-
* Returns the logical source type of a node (`source.entities` or `source.events`) if node is a source, else `null`
72-
*/
73-
export function getLogicalNodeSubtype(node: CombinedLogicalNode) {
74-
return 'snapshotTable' in node ? 'entity' : 'table' in node ? 'event' : null;
75-
}
76-
7777
/**
7878
* Returns true if the node is a streaming node
7979
*/
@@ -87,15 +87,13 @@ export function isStreaming(node: CombinedLogicalNode): boolean {
8787
// Check if any upstream sources are streaming
8888
return (
8989
node.sources?.some((source) => {
90-
let result = false;
9190
if ('entities' in source && source.entities) {
92-
result = isStreaming(source.entities);
91+
return isStreaming(source.entities);
9392
} else if ('events' in source && source.events) {
94-
result = isStreaming(source.events);
93+
return isStreaming(source.events);
9594
} else if ('joinSource' in source && source.joinSource) {
96-
result = isStreaming(source.joinSource as CombinedLogicalNode);
95+
return isStreaming(source.joinSource as CombinedLogicalNode);
9796
}
98-
return result;
9997
}) ?? false
10098
);
10199
} else if ('join' in node) {

frontend/src/routes/joins/[slug]/overview/+page.svelte

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
import { type Node } from '@dagrejs/dagre';
1717
1818
import TransformControls from '$lib/components/charts/TransformControls.svelte';
19-
import { type IJoin, type INodeInfo, type INodeKey } from '$src/lib/types/codegen';
19+
import { LogicalType, type IJoin, type INodeInfo, type INodeKey } from '$src/lib/types/codegen';
2020
import {
2121
getLogicalNodeConfig,
22+
getLogicalNodeType,
2223
isStreaming,
2324
type CombinedLogicalNode
2425
} from '$src/lib/types/LogicalNode.js';
@@ -139,6 +140,7 @@
139140
nodeWidth={300}
140141
nodeSeparation={100}
141142
rankSeparation={100}
143+
edgeSeparation={20}
142144
bind:graph
143145
let:nodes
144146
let:edges
@@ -149,20 +151,34 @@
149151
const source = nodesById.get(e.v) as CustomNode;
150152
const target = nodesById.get(e.w) as CustomNode;
151153

154+
const sourceType = getLogicalNodeType(source.value.conf as CombinedLogicalNode);
155+
const targetType = getLogicalNodeType(target.value.conf as CombinedLogicalNode);
156+
157+
const sourcePoint = {
158+
x: source.x + source.width / 2,
159+
y: source.y
160+
};
161+
const targetPoint = {
162+
x: target.x - target.width / 2,
163+
y: target.y
164+
};
165+
166+
let points: Array<{ x: number; y: number }> = [];
167+
if (sourceType === LogicalType.TABULAR_DATA && targetType === LogicalType.JOIN) {
168+
// Is `join.left` and use dagre-defined edge to route around other nodes (typically at the top)
169+
170+
points = e.points;
171+
points[0] = sourcePoint;
172+
points[points.length - 1] = targetPoint;
173+
} else {
174+
points = [sourcePoint, targetPoint];
175+
}
176+
152177
return {
153178
...e,
154179
source,
155180
target,
156-
points: [
157-
{
158-
x: source.x + source.width / 2,
159-
y: source.y
160-
},
161-
{
162-
x: target.x - target.width / 2,
163-
y: target.y
164-
}
165-
]
181+
points
166182
};
167183
})}
168184

0 commit comments

Comments
 (0)