1
1
from __future__ import annotations
2
- from copy import deepcopy
3
2
from pathlib import Path
4
3
5
4
import inflection
6
- import jq
7
5
8
6
from orbiter .file_types import FileTypeJSON
9
7
from orbiter .objects .include import OrbiterInclude
36
34
def dag_filter_rule (val ) -> list [dict ] | None :
37
35
38
36
if isinstance (val , dict ) and "userFlow" in val :
39
- return [val ["userFlow" ]]
37
+ return [val ["userFlow" ]]
40
38
return None
41
39
42
40
@@ -61,7 +59,7 @@ def task_common_args(val: dict) -> dict:
61
59
.get ("properties" , {})
62
60
.get ("$componentMetadata" , {})
63
61
.get ("name" , "UNKNOWN" ))
64
-
62
+
65
63
params = {
66
64
"task_id" : task_id .replace (" " , "_" ),
67
65
}
@@ -71,7 +69,7 @@ def task_common_args(val: dict) -> dict:
71
69
# noinspection t
72
70
@task_filter_rule
73
71
def task_filter_rule (val : dict ) -> list [dict ] | None :
74
-
72
+
75
73
if isinstance (val , dict ) and "pipelines" in val :
76
74
components = []
77
75
for pipeline in val ["pipelines" ]:
@@ -82,7 +80,7 @@ def task_filter_rule(val: dict) -> list[dict] | None:
82
80
83
81
@task_rule (priority = 2 )
84
82
def python_task_rule (val : dict ) -> OrbiterPythonOperator | None :
85
-
83
+
86
84
if "python3" in val .get ("data" , {}).get ("properties" , {}).get ("$componentMetadata" , {}).get ("technicalType" , "" ).lower ():
87
85
code = val .get ("data" , {}).get ("properties" , {}).get ("configuration" , {}).get ("pythonCode" , "" )
88
86
name = val .get ("data" , {}).get ("properties" , {}).get ("$componentMetadata" , {}).get ("name" , "" ).lower ().replace (" " , "_" )
@@ -123,7 +121,7 @@ def simple_task_dependencies(
123
121
"""Extract task dependencies from Talend pipeline steps."""
124
122
dependencies = []
125
123
pipeline = val .orbiter_kwargs .get ("val" , {}).get ("pipelines" , [{}])[0 ]
126
-
124
+
127
125
component_to_task_name = {}
128
126
for component in pipeline .get ("components" , []):
129
127
name = (component .get ("data" , {})
@@ -139,27 +137,27 @@ def simple_task_dependencies(
139
137
.get ("properties" , {})
140
138
.get ("type" ))
141
139
port_to_component [port ["id" ]] = (port ["nodeId" ], port_type )
142
-
140
+
143
141
for step in pipeline .get ("steps" , []):
144
142
source_port = step ["sourceId" ]
145
143
target_port = step ["targetId" ]
146
-
144
+
147
145
if source_port in port_to_component and target_port in port_to_component :
148
146
source_component_id , source_type = port_to_component [source_port ]
149
147
target_component_id , target_type = port_to_component [target_port ]
150
-
148
+
151
149
if source_type != "OUTGOING" or target_type != "INCOMING" :
152
150
continue
153
-
151
+
154
152
source_task_name = component_to_task_name .get (source_component_id )
155
153
target_task_name = component_to_task_name .get (target_component_id )
156
-
154
+
157
155
if source_task_name and target_task_name :
158
156
dependencies .append (OrbiterTaskDependency (
159
157
task_id = source_task_name ,
160
158
downstream = target_task_name
161
159
))
162
-
160
+
163
161
return dependencies if dependencies else None
164
162
165
163
0 commit comments