Skip to content

Commit e95dc48

Browse files
Merge pull request #7 from astronomer/ssis
SSIS Demo
2 parents d64feec + 2ce5c50 commit e95dc48

File tree

4 files changed

+1526
-0
lines changed

4 files changed

+1526
-0
lines changed

orbiter_translations/ssis/__init__.py

Whitespace-only changes.

orbiter_translations/ssis/xml_demo.py

Lines changed: 396 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,396 @@
1+
"""
2+
## Demo `translation_ruleset` Example
3+
4+
```pycon
5+
>>> translation_ruleset.test(input_value='''<?xml version="1.0"?>
6+
... <DTS:Executable xmlns:DTS="www.microsoft.com/SqlServer/Dts" DTS:ExecutableType="SSIS.Package.2">
7+
... <DTS:Executable DTS:ExecutableType="SSIS.Pipeline.2">
8+
... <DTS:Property DTS:Name="ObjectName">Extract Sample Currency Data</DTS:Property>
9+
... <DTS:Property DTS:Name="Description">Data Flow Task</DTS:Property>
10+
... <DTS:ObjectData>
11+
... <pipeline>
12+
... <components>
13+
... <component id="1" name="Extract Sample Currency Data">
14+
... <connections><connection id="6" name="FlatFileConnection" connectionManagerID="{EA76C836-FF8B-4E34-B273-81D4F67FCB3D}" /></connections>
15+
... <outputs><output id="2" name="Flat File Source Output" /><output id="3" name="Flat File Source Error Output" /></outputs>
16+
... </component>
17+
... <component id="30" name="Lookup CurrencyKey" componentClassID="{27648839-180F-45E6-838D-AFF53DF682D2}">
18+
... <properties><property name="SqlCommand">select * from (select * from [dbo].[DimCurrency]) as refTable where [refTable].[CurrencyAlternateKey] = 'ARS' OR [refTable].[CurrencyAlternateKey] = 'VEB'</property></properties>
19+
... <inputs><input id="31" /></inputs>
20+
... <outputs><output id="32" /><output id="266" name="Lookup No Match Output" /><output id="42" name="Lookup Error Output" /></outputs>
21+
... </component>
22+
... <component id="66" name="Lookup DateKey" componentClassID="{27648839-180F-45E6-838D-AFF53DF682D2}">
23+
... <properties><property id="69" name="SqlCommand">select * from [dbo].[DimTime]</property></properties>
24+
... <inputs><input id="67" name="Lookup Input" /></inputs>
25+
... <outputs><output id="68" name="Lookup Match Output" /><output id="270" name="Lookup No Match Output" /><output id="78" name="Lookup Error Output"/></outputs>
26+
... </component>
27+
... <component id="100" name="Sample OLE DB Destination" componentClassID="{5A0B62E8-D91D-49F5-94A5-7BE58DE508F0}">
28+
... <inputs><input id="113" name="OLE DB Destination Input"/></inputs>
29+
... <outputs />
30+
... </component>
31+
... </components>
32+
... <paths><path startId="2" endId="31" /><path startId="32" endId="67" /><path startId="68" endId="113" /></paths>
33+
... </pipeline>
34+
... </DTS:ObjectData>
35+
... </DTS:Executable>
36+
... <DTS:Property DTS:Name="ObjectName">Demo</DTS:Property>
37+
... </DTS:Executable>
38+
... ''').dags['demo.extract_sample_currency_data']
39+
... # doctest: +ELLIPSIS
40+
from airflow import DAG
41+
from airflow.operators.empty import EmptyOperator
42+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
43+
from pendulum import DateTime, Timezone
44+
with DAG(dag_id='demo.extract_sample_currency_data', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md=...):
45+
extract_sample_currency_data_task = EmptyOperator(task_id='extract_sample_currency_data', doc_md='Input did not translate: `{"@id": "1", "@name": "Extract Sample Currency Data", "connections": [{"connection": [{"@id": "6", "@name": "FlatFileConnection", "@connectionManagerID": "{EA76C836-FF8B-4E34-B273-81D4F67FCB3D}"}]}], "outputs": [{"output": [{"@id": "2", "@name": "Flat File Source Output"}, {"@id": "3", "@name": "Flat File Source Error Output"}]}]}`')
46+
lookup_currency_key_task = SQLExecuteQueryOperator(task_id='lookup_currency_key', conn_id='mssql_default', sql="select * from (select * from [dbo].[DimCurrency]) as refTable where [refTable].[CurrencyAlternateKey] = 'ARS' OR [refTable].[CurrencyAlternateKey] = 'VEB'")
47+
lookup_date_key_task = SQLExecuteQueryOperator(task_id='lookup_date_key', conn_id='mssql_default', sql='select * from [dbo].[DimTime]')
48+
sample_ole__db_destination_task = EmptyOperator(task_id='sample_ole__db_destination', doc_md='Input did not translate: `{"@id": "100", "@name": "Sample OLE DB Destination", "@componentClassID": "{5A0B62E8-D91D-49F5-94A5-7BE58DE508F0}", "inputs": [{"input": [{"@id": "113", "@name": "OLE DB Destination Input"}]}], "outputs": null}`')
49+
extract_sample_currency_data_task >> lookup_currency_key_task
50+
lookup_currency_key_task >> lookup_date_key_task
51+
lookup_date_key_task >> sample_ole__db_destination_task
52+
53+
```
54+
""" # noqa: E501
55+
56+
from __future__ import annotations
57+
from copy import deepcopy
58+
from pathlib import Path
59+
60+
import inflection
61+
import jq
62+
63+
from orbiter import FileType, clean_value
64+
from orbiter.objects import conn_id
65+
from orbiter.objects.dag import OrbiterDAG
66+
from orbiter.objects.operators.sql import OrbiterSQLExecuteQueryOperator
67+
from orbiter.objects.task import OrbiterTaskDependency
68+
from orbiter.rules import (
69+
task_dependency_rule,
70+
dag_filter_rule,
71+
dag_rule,
72+
task_filter_rule,
73+
task_rule,
74+
cannot_map_rule,
75+
)
76+
from orbiter.rules.rulesets import (
77+
TranslationRuleset,
78+
TaskDependencyRuleset,
79+
DAGFilterRuleset,
80+
DAGRuleset,
81+
TaskFilterRuleset,
82+
TaskRuleset,
83+
PostProcessingRuleset,
84+
)
85+
86+
87+
# noinspection t
88+
@dag_filter_rule
89+
def dag_filter_rule(val) -> list[dict] | None:
90+
"""
91+
Filter to top-level <DTS:Executable DTS:ExecutableType="SSIS.Package*"> elements,
92+
then filter to <DTS:Executable DTS:ExecutableType="SSIS.Pipeline*">
93+
94+
!!! note
95+
96+
Input is modified. 'Package' entry (without pipelines) is attached to 'Pipeline' entry with new "@package" key
97+
98+
```pycon
99+
>>> dag_filter_rule(val={'DTS:Executable': [{
100+
... '@xmlns:DTS': 'www.microsoft.com/SqlServer/Dts',
101+
... '@DTS:ExecutableType': 'SSIS.Package.2',
102+
... 'DTS:Property': [{
103+
... '#text': 'Demo', '@DTS:Name': 'ObjectName'
104+
... }],
105+
... 'DTS:ConnectionManager': [{"__": "Omitted for brevity"}],
106+
... 'DTS:Executable': [{
107+
... '@DTS:ExecutableType': 'SSIS.Pipeline.2',
108+
... 'DTS:ObjectData': [{'pipeline': [{
109+
... 'components': [{'component': [{"__": "Omitted for brevity"}]}],
110+
... 'paths': [{'path': [{'@endId': '31', '@id': '45', '@name': 'XYZ', '@startId': '2'}]}]
111+
... }]}],
112+
... 'DTS:Property': [
113+
... {'#text': 'Extract Sample Currency Data', '@DTS:Name': 'ObjectName'},
114+
... {'#text': 'Data Flow Task', '@DTS:Name': 'Description'},
115+
... ]
116+
... }]
117+
... }]})
118+
... # doctest: +ELLIPSIS
119+
[{'@DTS:ExecutableType': 'SSIS.Pipeline.2', 'DTS:ObjectData': [{'pipeline':...'@package': {'@xmlns:DTS': 'www.microsoft.com/SqlServer/Dts'...}}]
120+
121+
```
122+
""" # noqa: E501
123+
dags = []
124+
for package in val.get("DTS:Executable", []): # package is top-level
125+
if "SSIS.Package" in package.get(
126+
"@DTS:ExecutableType", ""
127+
): # check for 'package' as type
128+
pipelines = deepcopy(package.get("DTS:Executable", []))
129+
if pipelines:
130+
del package["DTS:Executable"]
131+
for pipeline in pipelines: # pipeline is inside of that
132+
if "SSIS.Pipeline" in pipeline.get("@DTS:ExecutableType", ""):
133+
pipeline["@package"] = package # Attach package as '@package'
134+
dags.append(pipeline)
135+
return dags or None
136+
137+
138+
@dag_rule
139+
def basic_dag_rule(val: dict) -> OrbiterDAG | None:
140+
"""
141+
Translate input into an `OrbiterDAG`
142+
143+
Pipeline name is <DTS:Property><DTS:Name="ObjectName">...</DTS:Name>
144+
Package name is in (modified) @package property, and is <DTS:Property><DTS:Name="ObjectName">...</DTS:Name>
145+
146+
```pycon
147+
>>> basic_dag_rule(val={
148+
... '@DTS:ExecutableType': 'SSIS.Pipeline.2',
149+
... 'DTS:ObjectData': [{'pipeline': [{
150+
... 'components': [{'component': [{"__": "Omitted for brevity, tasks would be here"}]}],
151+
... 'paths': [{'path': [{"__": "Omitted for brevity, task dependencies would be here"}]}]
152+
... }]}],
153+
... 'DTS:Property': [
154+
... {'#text': 'Extract Sample Currency Data', '@DTS:Name': 'ObjectName'},
155+
... {'#text': 'Data Flow Task', '@DTS:Name': 'Description'},
156+
... ],
157+
... '@package': {
158+
... '@xmlns:DTS': 'www.microsoft.com/SqlServer/Dts',
159+
... '@DTS:ExecutableType': 'SSIS.Package.2',
160+
... 'DTS:Property': [
161+
... {'#text': 'Demo', '@DTS:Name': 'ObjectName'},
162+
... {'#text': '0', '@DTS:Name': 'SuppressConfigurationWarnings'},
163+
... {'#text': 'ComputerName', '@DTS:Name': 'CreatorComputerName'},
164+
... {'#text': '8/29/2005 1:15:48 PM', '@DTS:DataType': '7', '@DTS:Name': 'CreationDate'}
165+
... ],
166+
... 'DTS:ConnectionManager': [{"__": "Omitted for brevity, connections would be here"}],
167+
... }
168+
... })
169+
... # doctest: +ELLIPSIS
170+
from airflow import DAG
171+
from pendulum import DateTime, Timezone
172+
with DAG(dag_id='demo.extract_sample_currency_data', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md=...):
173+
174+
```
175+
""" # noqa: E501
176+
if isinstance(val, dict):
177+
try:
178+
# Pipeline name is <DTS:Property><DTS:Name="ObjectName">...</DTS:Name>
179+
pipeline_name = (
180+
jq.compile(
181+
"""."DTS:Property"[]? | select(."@DTS:Name" == "ObjectName") | ."#text" """
182+
)
183+
.input_value(val)
184+
.first()
185+
)
186+
187+
# Package is in modified @package property, and is <DTS:Property><DTS:Name="ObjectName">...</DTS:Name>
188+
package_name = (
189+
jq.compile(
190+
"""."@package"?."DTS:Property"[]? | select(."@DTS:Name" == "ObjectName") | ."#text" """
191+
)
192+
.input_value(val)
193+
.first()
194+
)
195+
dag_id = f"{package_name}.{pipeline_name}"
196+
return OrbiterDAG(
197+
dag_id=dag_id,
198+
file_path=Path(f"{inflection.underscore(dag_id)}.py"),
199+
doc_md="**Created via [Orbiter](https://astronomer.github.io/orbiter) w/ Demo Translation Ruleset**.\n"
200+
"Contact Astronomer @ [[email protected]](mailto:[email protected]) "
201+
"or at [astronomer.io/contact](https://www.astronomer.io/contact/) for more!",
202+
)
203+
except StopIteration:
204+
pass
205+
return None
206+
207+
208+
def task_common_args(val: dict) -> dict:
209+
"""
210+
Common mappings for all tasks
211+
"""
212+
params = {"task_id": val.get("@name", "UNKNOWN").replace(" ", "_")}
213+
return params
214+
215+
216+
# noinspection t
217+
@task_filter_rule
218+
def task_filter_rule(val: dict) -> list[dict] | None:
219+
"""
220+
The key is 'components' and it has details of pipeline components.
221+
222+
```pycon
223+
>>> task_filter_rule(val={
224+
... '@DTS:ExecutableType': 'SSIS.Pipeline.2',
225+
... 'DTS:ObjectData': [{'pipeline': [{
226+
... 'components': [{'component': [{
227+
... '@name': 'Lookup CurrencyKey',
228+
... 'properties': [{'property': [
229+
... {'@id': '33', '@name': 'SqlCommand', '#text': "select * from foo"}
230+
... ]}],
231+
... }]}],
232+
... 'paths': [{'path': [{"__": "Omitted for brevity, task dependencies would be here"}]}]
233+
... }]}],
234+
... 'DTS:Property': [],
235+
... '@package': {
236+
... '@DTS:ExecutableType': 'SSIS.Package.2',
237+
... 'DTS:Property': [
238+
... {'#text': 'Demo', '@DTS:Name': 'ObjectName'},
239+
... ],
240+
... 'DTS:ConnectionManager': [{"__": "Omitted for brevity, connections would be here"}],
241+
... }
242+
... })
243+
... # doctest: +ELLIPSIS
244+
[{'@name': 'Lookup CurrencyKey', 'properties': [{'property': [{'@id': '33', '@name': 'SqlCommand', '#text': 'select * from foo'}]}]}]
245+
246+
```
247+
""" # noqa: E501
248+
if isinstance(val, dict):
249+
try:
250+
return (
251+
jq.compile(
252+
"""."DTS:ObjectData"[]?.pipeline[]?.components[]?.component[]"""
253+
)
254+
.input_value(val)
255+
.all()
256+
)
257+
except StopIteration:
258+
pass
259+
return None
260+
261+
262+
@task_rule(priority=2)
263+
def sql_command_rule(val) -> OrbiterSQLExecuteQueryOperator | None:
264+
"""
265+
For SQLQueryOperator.
266+
267+
```pycon
268+
>>> sql_command_rule(val={
269+
... '@name': 'Lookup CurrencyKey',
270+
... 'properties': [{'property': [
271+
... {'@id': '33', '@name': 'SqlCommand', '#text': "select * from (select * from [dbo].[DimCurrency]) as refTable where [refTable].[CurrencyAlternateKey] = 'ARS' OR [refTable].[CurrencyAlternateKey] = 'AUD'"}
272+
... ]}],
273+
... })
274+
... # doctest: +ELLIPSIS
275+
lookup_currency_key_task = SQLExecuteQueryOperator(task_id='lookup_currency_key', conn_id='mssql_default', sql="select * from (select * from [dbo].[DimCurrency]) as refTable where [refTable].[CurrencyAlternateKey] = 'ARS'...")
276+
277+
```
278+
''
279+
""" # noqa: E501
280+
try:
281+
sql: str = (
282+
jq.compile(
283+
""".properties[]?.property[]? | select(."@name" == "SqlCommand") | ."#text" """
284+
)
285+
.input_value(val)
286+
.first()
287+
)
288+
if sql:
289+
return OrbiterSQLExecuteQueryOperator(
290+
sql=sql,
291+
**conn_id(conn_id="mssql_default", conn_type="mssql"),
292+
**task_common_args(val),
293+
)
294+
except StopIteration:
295+
pass
296+
return None
297+
298+
299+
@task_rule(priority=1)
300+
def _cannot_map_rule(val):
301+
"""Add task_ids on top of common 'cannot_map_rule'"""
302+
task = cannot_map_rule(val)
303+
task.task_id = clean_value(task_common_args(val)["task_id"])
304+
return task
305+
306+
307+
@task_dependency_rule
308+
def simple_task_dependencies(
309+
val: OrbiterDAG,
310+
) -> list[OrbiterTaskDependency] | None:
311+
"""
312+
Map all task input elements to task_id
313+
<inputs><input id='...'> -> task_id
314+
315+
and map all task output elements to task_id
316+
<outputs><output id='...'> -> task_id
317+
318+
then map all path elements to task dependencies
319+
<paths><path startId='...' endId='...'> -> OrbiterTaskDependency(task_id=start.task_id, downstream=end.task_id)
320+
321+
```pycon
322+
>>> from orbiter.objects.operators.empty import OrbiterEmptyOperator
323+
>>> simple_task_dependencies(val=OrbiterDAG(
324+
... dag_id=".", file_path=".",
325+
... tasks={
326+
... "foo": OrbiterEmptyOperator(task_id="foo", orbiter_kwargs={"val": {"outputs": [{"output": [{"@id": "1"}]}]}}),
327+
... "bar": OrbiterEmptyOperator(task_id="bar", orbiter_kwargs={"val": {"inputs": [{"input": [{"@id": "2"}]}]}}),
328+
... },
329+
... orbiter_kwargs={"val": {
330+
... 'DTS:ObjectData': [{'pipeline': [{
331+
... 'paths': [{'path': [{'@startId': '1', '@endId': '2'}]}]
332+
... }]}],
333+
... }}
334+
... ))
335+
[foo >> bar]
336+
337+
```
338+
""" # noqa: E501
339+
# Descend through all the tasks, and get their `<outputs><output id='...'>` elements
340+
341+
# Descend through all the tasks, and get their `<inputs><input id='...'>` elements
342+
output_to_task_id = {
343+
# @id -> task_id
344+
output_id: task.task_id
345+
for task in (val.tasks.values() or [])
346+
# e.g. {"val": {"outputs": [{"output": {"@id": "1"}}]}}
347+
for output in (task.orbiter_kwargs.get("val", {}).get("outputs", []) or [])
348+
for o in (output.get("output", []) or [])
349+
if (output_id := o.get("@id"))
350+
}
351+
input_to_task_id = {
352+
# @id -> task_id
353+
input_id: task.task_id
354+
for task in (val.tasks.values() or [])
355+
# e.g. {"val": {"inputs": [{"input": [{"@id": "1"}]}]}}
356+
for _input in (task.orbiter_kwargs.get("val", {}).get("inputs", []) or [])
357+
for i in (_input.get("input", []) or [])
358+
if (input_id := i.get("@id"))
359+
}
360+
361+
dag_original_input = val.orbiter_kwargs.get("val", {})
362+
# noinspection PyUnboundLocalVariable
363+
return [
364+
OrbiterTaskDependency(
365+
task_id=output_to_task_id[start],
366+
downstream=input_to_task_id[end],
367+
)
368+
# <DTS:ObjectData><pipeline><paths><path startId='...' endId='...'>
369+
for path in jq.compile("""."DTS:ObjectData"[]?.pipeline[]?.paths[]?.path[]""")
370+
.input_value(dag_original_input)
371+
.all()
372+
if (start := path.get("@startId"))
373+
and (end := path.get("@endId"))
374+
and start in output_to_task_id
375+
and end in input_to_task_id
376+
] or None
377+
378+
379+
translation_ruleset: TranslationRuleset = TranslationRuleset(
380+
file_type=FileType.XML,
381+
dag_filter_ruleset=DAGFilterRuleset(ruleset=[dag_filter_rule]),
382+
dag_ruleset=DAGRuleset(ruleset=[basic_dag_rule]),
383+
task_filter_ruleset=TaskFilterRuleset(ruleset=[task_filter_rule]),
384+
task_ruleset=TaskRuleset(ruleset=[sql_command_rule, _cannot_map_rule]),
385+
task_dependency_ruleset=TaskDependencyRuleset(ruleset=[simple_task_dependencies]),
386+
post_processing_ruleset=PostProcessingRuleset(ruleset=[]),
387+
)
388+
389+
if __name__ == "__main__":
390+
import doctest
391+
392+
doctest.testmod(
393+
optionflags=doctest.ELLIPSIS
394+
| doctest.NORMALIZE_WHITESPACE
395+
| doctest.IGNORE_EXCEPTION_DETAIL
396+
)

0 commit comments

Comments
 (0)