Skip to content

Commit 06d4c19

Browse files
authored
[embedded-elt] Allow custom op names when using @sling_assets asset decorators (#20253)
## Summary & Motivation Allows the use of custom op names when using the @sling_assets decorator, allowing users to have multiple invocations of the sling asset. By default, uses the name of the calling function ## How I Tested These Changes unit tests
1 parent d8f8b5d commit 06d4c19

File tree

2 files changed

+28
-6
lines changed

2 files changed

+28
-6
lines changed

python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def sling_assets(
2727
*,
2828
replication_config: SlingReplicationParam,
2929
dagster_sling_translator: DagsterSlingTranslator = DagsterSlingTranslator(),
30+
name: Optional[str] = None,
3031
partitions_def: Optional[PartitionsDefinition] = None,
3132
backfill_policy: Optional[BackfillPolicy] = None,
3233
op_tags: Optional[Mapping[str, Any]] = None,
@@ -38,13 +39,14 @@ def sling_assets(
3839
spec and descriptions, see `Sling's Documentation <https://docs.slingdata.io/sling-cli/run/configuration>`_.
3940
4041
Args:
41-
replication_config: Union[Mapping[str, Any], str, Path]: A path to a Sling replication config, or a dictionary
42+
replication_config (Union[Mapping[str, Any], str, Path]): A path to a Sling replication config, or a dictionary
4243
of a replication config.
43-
dagster_sling_translator: DagsterSlingTranslator: Allows customization of how to map a Sling stream to a Dagster
44+
dagster_sling_translator: (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster
4445
AssetKey.
45-
partitions_def: Optional[PartitionsDefinition]: The partitions definition for this asset.
46-
backfill_policy: Optional[BackfillPolicy]: The backfill policy for this asset.
47-
op_tags: Optional[Mapping[str, Any]]: The tags for this asset.
46+
name (Optional[str]: The name of the op.
47+
partitions_def (Optional[PartitionsDefinition]): The partitions definition for this asset.
48+
backfill_policy (Optional[BackfillPolicy]): The backfill policy for this asset.
49+
op_tags (Optional[Mapping[str, Any]]): The tags for this asset.
4850
4951
Examples:
5052
Running a sync by providing a path to a Sling Replication config:
@@ -100,7 +102,7 @@ def my_assets(context, sling: SlingResource):
100102

101103
def inner(fn) -> AssetsDefinition:
102104
asset_definition = multi_asset(
103-
name="sling_asset_definition",
105+
name=name,
104106
compute_kind="sling",
105107
partitions_def=partitions_def,
106108
can_subset=False,

python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,23 @@ def my_sling_assets(sling: SlingResource):
8282
assert res.success
8383
counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0]
8484
assert counts == 3
85+
86+
87+
def test_with_custom_name(replication_config: SlingReplicationParam):
88+
@sling_assets(replication_config=replication_config)
89+
def my_sling_assets():
90+
...
91+
92+
assert my_sling_assets.op.name == "my_sling_assets"
93+
94+
@sling_assets(replication_config=replication_config)
95+
def my_other_assets():
96+
...
97+
98+
assert my_other_assets.op.name == "my_other_assets"
99+
100+
@sling_assets(replication_config=replication_config, name="custom_name")
101+
def my_third_sling_assets():
102+
...
103+
104+
assert my_third_sling_assets.op.name == "custom_name"

0 commit comments

Comments
 (0)