Skip to content

Commit c3fb42d

Browse files
add py op callable fix
1 parent 5ffe976 commit c3fb42d

File tree

1 file changed

+60
-34
lines changed

1 file changed

+60
-34
lines changed

orbiter_translations/talend/talend_demo.py

+60-34
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def basic_dag_rule(val: dict) -> OrbiterDAG | None:
4646
return OrbiterDAG(
4747
dag_id=dag_id,
4848
file_path=Path(f"{dag_id}.py"),
49-
doc_md=val.get("description", "Created from Talend pipeline")
49+
doc_md=val.get("description", "Created from Talend pipeline"),
5050
)
5151
return None
5252

@@ -55,10 +55,12 @@ def task_common_args(val: dict) -> dict:
5555
"""
5656
Common mappings for all tasks
5757
"""
58-
task_id = (val.get("data", {})
59-
.get("properties", {})
60-
.get("$componentMetadata", {})
61-
.get("name", "UNKNOWN"))
58+
task_id = (
59+
val.get("data", {})
60+
.get("properties", {})
61+
.get("$componentMetadata", {})
62+
.get("name", "UNKNOWN")
63+
)
6264

6365
params = {
6466
"task_id": task_id.replace(" ", "_"),
@@ -81,27 +83,48 @@ def task_filter_rule(val: dict) -> list[dict] | None:
8183
@task_rule(priority=2)
8284
def python_task_rule(val: dict) -> OrbiterPythonOperator | None:
8385

84-
if "python3" in val.get("data", {}).get("properties", {}).get("$componentMetadata", {}).get("technicalType", "").lower():
85-
code = val.get("data", {}).get("properties", {}).get("configuration", {}).get("pythonCode", "")
86-
name = val.get("data", {}).get("properties", {}).get("$componentMetadata", {}).get("name", "").lower().replace(" ", "_")
86+
if (
87+
"python3"
88+
in val.get("data", {})
89+
.get("properties", {})
90+
.get("$componentMetadata", {})
91+
.get("technicalType", "")
92+
.lower()
93+
):
94+
code = (
95+
val.get("data", {})
96+
.get("properties", {})
97+
.get("configuration", {})
98+
.get("pythonCode", "")
99+
)
100+
name = (
101+
val.get("data", {})
102+
.get("properties", {})
103+
.get("$componentMetadata", {})
104+
.get("name", "")
105+
.lower()
106+
.replace(" ", "_")
107+
)
87108
if code and name:
109+
indented_code = "\n ".join([""] + code.splitlines())
110+
function_content = f"""def {name}(**context):
111+
{indented_code}
112+
"""
88113
return OrbiterPythonOperator(
89114
orbiter_includes={
90-
OrbiterInclude(
91-
filepath=f"include/{name}.py",
92-
contents=code,
93-
),
94-
},
95-
imports=[
96-
OrbiterRequirement(
97-
module=f"include.{name}", names=[f"{name}"]
98-
),
99-
OrbiterRequirement(
100-
module="airflow.operators.python", names=["PythonOperator"]
101-
),
102-
],
103-
python_callable=f"{name}",
104-
**task_common_args(val)
115+
OrbiterInclude(
116+
filepath=f"include/{name}.py",
117+
contents=function_content,
118+
),
119+
},
120+
imports=[
121+
OrbiterRequirement(module=f"include.{name}", names=[f"{name}"]),
122+
OrbiterRequirement(
123+
module="airflow.operators.python", names=["PythonOperator"]
124+
),
125+
],
126+
python_callable=f"{name}",
127+
**task_common_args(val),
105128
)
106129
return None
107130

@@ -124,18 +147,20 @@ def simple_task_dependencies(
124147

125148
component_to_task_name = {}
126149
for component in pipeline.get("components", []):
127-
name = (component.get("data", {})
128-
.get("properties", {})
129-
.get("$componentMetadata", {})
130-
.get("name"))
150+
name = (
151+
component.get("data", {})
152+
.get("properties", {})
153+
.get("$componentMetadata", {})
154+
.get("name")
155+
)
131156
if name:
132157
component_to_task_name[component["id"]] = name.replace(" ", "_").lower()
133158

134159
port_to_component = {}
135160
for port in pipeline.get("ports", []):
136-
port_type = (port.get("graphicalAttributes", {})
137-
.get("properties", {})
138-
.get("type"))
161+
port_type = (
162+
port.get("graphicalAttributes", {}).get("properties", {}).get("type")
163+
)
139164
port_to_component[port["id"]] = (port["nodeId"], port_type)
140165

141166
for step in pipeline.get("steps", []):
@@ -153,10 +178,11 @@ def simple_task_dependencies(
153178
target_task_name = component_to_task_name.get(target_component_id)
154179

155180
if source_task_name and target_task_name:
156-
dependencies.append(OrbiterTaskDependency(
157-
task_id=source_task_name,
158-
downstream=target_task_name
159-
))
181+
dependencies.append(
182+
OrbiterTaskDependency(
183+
task_id=source_task_name, downstream=target_task_name
184+
)
185+
)
160186

161187
return dependencies if dependencies else None
162188

0 commit comments

Comments
 (0)