@@ -323,26 +323,38 @@ class Applicable:
323
323
324
324
def __init__ (
325
325
self ,
326
- fn : Callable ,
326
+ fn : Union [ Callable , str , None ] ,
327
327
args : Tuple [Union [Any , SingleDependency ], ...],
328
328
kwargs : Dict [str , Union [Any , SingleDependency ]],
329
+ target_fn : Union [Callable , str , None ] = None ,
329
330
_resolvers : List [ConfigResolver ] = None ,
330
331
_name : Optional [str ] = None ,
331
332
_namespace : Union [str , None , EllipsisType ] = ...,
332
333
_target : base .TargetType = None ,
333
334
):
334
335
"""Instantiates an Applicable.
335
336
336
- :param fn: Function it takes in
337
+ We allow fn=None for the use-cases where we want to store the Applicable config (i.e. .when* family, namespace, target, etc.)
338
+ but do not yet the access to the actual function we are turning into the Applicable. In addition, in case the target nodes come
339
+ from a function (using extract_columns/extract_fields) we can pass target_fn to have access to its pointer that we can decorate
340
+ programmatically. See `apply_to` and `mutate` for an example.
341
+
337
342
:param args: Args (*args) to pass to the function
338
- :param kwargs: Kwargs (**kwargs) to pass to the function
343
+ :param fn: Function it takes in. Can be None to create an Applicable placeholder with delayed choice of function.
344
+ :param target_fn: Function the applicable will be applied to
339
345
:param _resolvers: Resolvers to use for the function
340
346
:param _name: Name of the node to be created
341
347
:param _namespace: Namespace of the node to be created -- currently only single-level namespaces are supported
342
348
:param _target: Selects which target nodes it will be appended onto. By default all.
343
349
:param kwargs: Kwargs (**kwargs) to pass to the function
344
350
"""
351
+
352
+ if isinstance (fn , str ) or isinstance (target_fn , str ):
353
+ raise TypeError ("Strings are not supported currently. Please provide function pointer." )
354
+
345
355
self .fn = fn
356
+ self .target_fn = target_fn
357
+
346
358
if "_name" in kwargs :
347
359
raise ValueError ("Cannot pass in _name as a kwarg" )
348
360
@@ -363,6 +375,7 @@ def _with_resolvers(self, *additional_resolvers: ConfigResolver) -> "Applicable"
363
375
_namespace = self .namespace ,
364
376
args = self .args ,
365
377
kwargs = self .kwargs ,
378
+ target_fn = self .target_fn ,
366
379
)
367
380
368
381
def when (self , ** key_value_pairs ) -> "Applicable" :
@@ -415,6 +428,7 @@ def namespaced(self, namespace: NamespaceType) -> "Applicable":
415
428
_namespace = namespace ,
416
429
args = self .args ,
417
430
kwargs = self .kwargs ,
431
+ target_fn = self .target_fn ,
418
432
)
419
433
420
434
def resolves (self , config : Dict [str , Any ]) -> bool :
@@ -452,6 +466,7 @@ def named(self, name: str, namespace: NamespaceType = ...) -> "Applicable":
452
466
),
453
467
args = self .args ,
454
468
kwargs = self .kwargs ,
469
+ target_fn = self .target_fn ,
455
470
)
456
471
457
472
def on_output (self , target : base .TargetType ) -> "Applicable" :
@@ -1174,3 +1189,204 @@ def chain_transforms(
1174
1189
)
1175
1190
first_arg = raw_node .name
1176
1191
return nodes , first_arg
1192
+
1193
+
1194
+ def apply_to (fn_ : Union [Callable , str ], ** mutating_fn_kwargs : Union [SingleDependency , Any ]):
1195
+ """Creates an applicable placeholder with potential kwargs that will be applied to a node (or a subcomponent of a node).
1196
+ See documentation for `mutate` to see how this is used. It de facto allows a postponed `step`.
1197
+
1198
+ We pass fn=None here as this will be the function we are decorating and need to delay passing it in. The target
1199
+ function is the one we wish to mutate and we store it for later access.
1200
+
1201
+ :param fn: Function the applicable will be applied to
1202
+ :param mutating_fn_kwargs: Kwargs (**kwargs) to pass to the mutator function. Must be validly called as f(**kwargs), and have a 1:1 mapping of kwargs to parameters.
1203
+ :return: an applicable placeholder with the target function
1204
+ """
1205
+ return Applicable (fn = None , args = (), kwargs = mutating_fn_kwargs , target_fn = fn_ , _resolvers = [])
1206
+
1207
+
1208
+ class NotSameModuleError (Exception ):
1209
+ def __init__ (self , fn : Callable , target_fn : Callable ):
1210
+ super ().__init__ (
1211
+ f"The functions have to be in the same module... "
1212
+ f"The target function { target_fn .__name__ } is in module { target_fn .__module__ } and "
1213
+ f"the mutator function { fn .__name__ } is in module { fn .__module__ } ./n"
1214
+ "Use power user setting to disable this restriction."
1215
+ )
1216
+
1217
+
1218
+ class mutate :
1219
+ """Running a transformation on the outputs of a series of functions.
1220
+
1221
+ This is closely related to `pipe_output` as it effectively allows you to run transformations on the output of a node without touching that node.
1222
+ We choose which target functions we wish to mutate by the transformation we are decorating. For now, the target functions, that will be mutated,
1223
+ have to be in the same module (come speak to us if you need this capability over multiple modules).
1224
+
1225
+ If we wish to apply `_transform1` to the output of A and B and `_transform2` only to the output
1226
+ of node B, we can do this like
1227
+
1228
+ .. code-block:: python
1229
+ :name: Simple @mutate example
1230
+
1231
+ def A(...):
1232
+ return ...
1233
+
1234
+ def B(...):
1235
+ return ...
1236
+
1237
+ @mutate(A,B)
1238
+ def _transform1(...):
1239
+ return ...
1240
+
1241
+ @mutate(B)
1242
+ def _transform2(...):
1243
+ return ...
1244
+
1245
+ we obtain the new pipe-like subDAGs **A_raw --> _transform1 --> A** and **B_raw --> _transform1 --> _transform2 --> B**,
1246
+ where the behavior is the same as `pipe_output`.
1247
+
1248
+ While it is generally reasonable to contain these constructs within a node's function,
1249
+ you should consider `mutate` in the following scenarios:
1250
+
1251
+ 1. Loading data and applying pre-cleaning step.
1252
+ 2. Feature engineering via joining, filtering, sorting, etc.
1253
+ 3. Experimenting with different transformations across nodes by selectively turning transformations on / off.
1254
+
1255
+ We assume the first argument of the decorated function to be the output of the function we are targeting.
1256
+ For transformations with multiple arguments you can use key word arguments coupled with `step` or `value`
1257
+ the same as with other `pipe`-family decorators
1258
+
1259
+ .. code-block:: python
1260
+ :name: Simple @mutate example with multiple arguments
1261
+
1262
+ @mutate(A,B,arg2=step('upstream_node'),arg3=value(some_literal),...)
1263
+ def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type,...):
1264
+ return ...
1265
+
1266
+ You can also select individual args that will be applied to each target node by adding `apply_to(...)`
1267
+
1268
+ .. code-block:: python
1269
+ :name: Simple @mutate example with multiple arguments allowing individual actions
1270
+
1271
+ @mutate(
1272
+ apply_to(A,arg2=step('upstream_node_1'),arg3=value(some_literal_1)),
1273
+ apply_to(B,arg2=step('upstream_node_2'),arg3=value(some_literal_2)),
1274
+ )
1275
+ def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type,...):
1276
+ return ...
1277
+
1278
+ In case of multiple output nodes, for example after extract_field / extract_columns we can also specify the output node that we wish to mutate.
1279
+ The following would mutate all columns of *A* individually while in the case of function *B* only "field_1"
1280
+
1281
+ .. code-block:: python
1282
+ :name: @mutate example targeting specific nodes local
1283
+
1284
+ @extract_columns("col_1", "col_2")
1285
+ def A(...):
1286
+ return ...
1287
+
1288
+ @extract_fields(
1289
+ {"field_1":int, "field_2":int, "field_3":int}
1290
+ )
1291
+ def B(...):
1292
+ return ...
1293
+
1294
+ @mutate(
1295
+ apply_to(A),
1296
+ apply_to(B).on_output("field_1"),
1297
+ )
1298
+
1299
+ def foo(a:int)->Dict[str,int]:
1300
+ return {"field_1":1, "field_2":2, "field_3":3}
1301
+ """
1302
+
1303
+ def __init__ (
1304
+ self ,
1305
+ * target_functions : Union [Applicable , Callable ],
1306
+ collapse : bool = False ,
1307
+ _chain : bool = False ,
1308
+ ** mutating_function_kwargs : Union [SingleDependency , Any ],
1309
+ ):
1310
+ """Instantiates a `\@mutate` decorator.
1311
+
1312
+ We assume the first argument of the decorated function to be the output of the function we are targeting.
1313
+
1314
+ :param target_functions: functions we wish to mutate the output of
1315
+ :param collapse: Whether to collapse this into a single node. This is not currently supported.
1316
+ :param _chain: Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. @flow will make use of this.
1317
+ :param \*\*mutating_function_kwargs: other kwargs that the decorated function has. Must be validly called as f(kwargs), and have a 1-to-1 mapping of kwargs to parameters. This will be applied for all `target_function`, unless `apply_to` already has the mutator function kwargs, in which case it takes those.
1318
+ """
1319
+ self .collapse = collapse
1320
+ self .chain = _chain
1321
+ # keeping it here once it gets implemented maybe nice to have options
1322
+ if self .collapse :
1323
+ raise NotImplementedError (
1324
+ "Collapsing functions as one node is not yet implemented for mutate(). Please reach out if you want this feature."
1325
+ )
1326
+ if self .chain :
1327
+ raise NotImplementedError ("@flow() is not yet supported -- this is " )
1328
+
1329
+ self .remote_applicables = tuple (
1330
+ [apply_to (fn ) if isinstance (fn , Callable ) else fn for fn in target_functions ]
1331
+ )
1332
+ self .mutating_function_kwargs = mutating_function_kwargs
1333
+
1334
+ # Cross module will require some thought so we are restricting mutate to single module for now
1335
+ self .restrict_to_single_module = True
1336
+
1337
+ def validate_same_module (self , mutating_fn : Callable ):
1338
+ """Validates target functions are in the same module as the mutator function.
1339
+
1340
+ :param mutating_fn: Function to validate against
1341
+ :return: Nothing, raises exception if not valid.
1342
+ """
1343
+ local_module = mutating_fn .__module__
1344
+ for remote_applicable in self .remote_applicables :
1345
+ if remote_applicable .target_fn .__module__ != local_module :
1346
+ raise NotSameModuleError (fn = mutating_fn , target_fn = remote_applicable .target_fn )
1347
+
1348
+ def _create_step (self , mutating_fn : Callable , remote_applicable_builder : Applicable ):
1349
+ """Adds the correct function for the applicable and resolves kwargs"""
1350
+
1351
+ if not remote_applicable_builder .kwargs :
1352
+ remote_applicable_builder .kwargs = self .mutating_function_kwargs
1353
+
1354
+ remote_applicable_builder .fn = mutating_fn
1355
+
1356
+ return remote_applicable_builder
1357
+
1358
+ def __call__ (self , mutating_fn : Callable ):
1359
+ """Adds to an existing pipe_output or creates a new pipe_output.
1360
+
1361
+ This is a new type of decorator that builds `pipe_output` for multiple nodes in the DAG. It does
1362
+ not fit in the current decorator framework since it does not decorate the node function in the DAG
1363
+ but allows us to "remotely decorate" multiple nodes at once, which needs to happen before the
1364
+ NodeTransformLifecycle gets applied / resolved.
1365
+
1366
+ :param mutating_fn: function that will be used in pipe_output to transform target function
1367
+ :return: mutating_fn, to guarantee function works even when Hamilton driver is not used
1368
+ """
1369
+
1370
+ if not mutating_fn .__name__ .startswith ("_" ):
1371
+ mutating_fn .__name__ = "" .join (("_" , mutating_fn .__name__ ))
1372
+
1373
+ if self .restrict_to_single_module :
1374
+ self .validate_same_module (mutating_fn = mutating_fn )
1375
+
1376
+ for remote_applicable in self .remote_applicables :
1377
+ new_pipe_step = self ._create_step (
1378
+ mutating_fn = mutating_fn , remote_applicable_builder = remote_applicable
1379
+ )
1380
+ found_pipe_output = False
1381
+ if hasattr (remote_applicable .target_fn , "transform" ):
1382
+ for decorator in remote_applicable .target_fn .transform :
1383
+ if isinstance (decorator , pipe_output ):
1384
+ decorator .transforms = decorator .transforms + (new_pipe_step ,)
1385
+ found_pipe_output = True
1386
+
1387
+ if not found_pipe_output :
1388
+ remote_applicable .target_fn = pipe_output (
1389
+ new_pipe_step , collapse = self .collapse , _chain = self .chain
1390
+ )(remote_applicable .target_fn )
1391
+
1392
+ return mutating_fn
0 commit comments