Skip to content

Commit 819a27b

Browse files
committed
Initial crack at implementing StepInputExpressionRequirements.
scatter-stepfrom-wf test case passes - need to get integer inputs to workflows fixed up in order to test scatter-stepfrom2-wf.
1 parent 3696f71 commit 819a27b

File tree

8 files changed

+95
-6
lines changed

8 files changed

+95
-6
lines changed

lib/galaxy/managers/workflows.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,7 @@ def __module_from_dict( self, trans, steps, steps_by_external_id, step_dict, **k
881881
step_input.name = input_dict["name"]
882882
step_input.merge_type = input_dict.get("merge_type", step_input.default_merge_type)
883883
step_input.scatter_type = input_dict.get("scatter_type", step_input.default_scatter_type)
884+
step_input.value_from = input_dict.get("value_from", None)
884885
step.inputs.append(step_input)
885886

886887
# Create the model class for the step

lib/galaxy/model/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3231,6 +3231,10 @@ def __init__(
32313231
if not populated:
32323232
self.populated_state = DatasetCollection.populated_states.NEW
32333233

3234+
@property
3235+
def allow_implicit_mapping(self):
3236+
return self.collection_type != "record"
3237+
32343238
@property
32353239
def populated( self ):
32363240
top_level_populated = self.populated_state == DatasetCollection.populated_states.OK

lib/galaxy/model/mapping.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,8 @@
849849
Column( "name", Unicode( 255 ) ),
850850
Column( "merge_type", TEXT ),
851851
Column( "scatter_type", TEXT ),
852+
Column( "value_from", JSONType ),
853+
Column( "value_from_type", TEXT ),
852854
Column( "default_value", JSONType ) )
853855

854856

lib/galaxy/model/migrate/versions/0137_worklfow_step_input_definitions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
Column( "name", Unicode( 255 ) ),
2121
Column( "merge_type", TEXT ),
2222
Column( "scatter_type", TEXT ),
23+
Column( "value_from", JSONType ),
24+
Column( "value_from_type", TEXT ),
2325
Column( "default_value", JSONType ),
2426
)
2527

lib/galaxy/tools/cwl/parser.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
"ShellCommandRequirement",
4444
"ScatterFeatureRequirement",
4545
"SubworkflowFeatureRequirement",
46+
"StepInputExpressionRequirement",
4647
"MultipleInputFeatureRequirement",
4748
]
4849

@@ -599,7 +600,7 @@ def cwl_input_to_galaxy_step(self, input, i):
599600
input_as_dict["type"] = "data_collection_input"
600601
input_as_dict["collection_type"] = "record"
601602
else:
602-
raise NotImplementedError()
603+
input_as_dict["type"] = "parameter_input"
603604

604605
return input_as_dict
605606

@@ -750,6 +751,9 @@ def to_dict(self):
750751
as_dict["merge_type"] = self._cwl_input["linkMerge"]
751752
if "scatterMethod" in self.step_proxy._step.tool:
752753
as_dict["scatter_method"] = self.step_proxy._step.tool["scatterMethod"]
754+
if "valueFrom" in self._cwl_input:
755+
# TODO: Add a table for expressions - mark the type as CWL 1.0 JavaScript.
756+
as_dict["value_from"] = self._cwl_input["valueFrom"]
753757
return as_dict
754758

755759

lib/galaxy/workflow/modules.py

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,64 @@ def decode_runtime_state( self, runtime_state ):
863863
else:
864864
raise ToolMissingException( "Tool %s missing. Cannot recover runtime state." % self.tool_id )
865865

866+
def evaluate_value_from_expressions(self, step, execution_state):
867+
value_from_expressions = {}
868+
869+
for key, value in execution_state.inputs.items():
870+
step_input = step.inputs_by_name.get(key, None)
871+
if step_input and step_input.value_from is not None:
872+
value_from_expressions[key] = step_input.value_from
873+
874+
if not value_from_expressions:
875+
return
876+
877+
hda_references = []
878+
879+
def to_cwl(value):
880+
if isinstance(value, model.HistoryDatasetAssociation):
881+
hda_references.append(value)
882+
return {
883+
"class": "File",
884+
"location": "step_input://%d" % len(hda_references),
885+
}
886+
elif hasattr(value, "collection"):
887+
collection = value.collection
888+
if collection.collection_type == "list":
889+
return map(to_cwl, collection.dataset_instances)
890+
elif collection.collection_type == "record":
891+
rval = {}
892+
for element in collection.elements:
893+
rval[element.element_identifier] = to_cwl(element.element_object)
894+
return rval
895+
else:
896+
return value
897+
898+
def from_cwl(value):
899+
if isinstance(value, dict) and "class" in value and "location" in value:
900+
assert value["location"].startswith("step_input://")
901+
return hda_references[int(value["location"][len("step_input://"):])-1]
902+
elif isinstance(value, dict):
903+
raise NotImplementedError()
904+
else:
905+
return value
906+
907+
step_state = {}
908+
for key, value in execution_state.inputs.items():
909+
step_state[key] = to_cwl(value)
910+
911+
for key, value_from in value_from_expressions.items():
912+
from cwltool.expression import do_eval
913+
as_cwl_value = do_eval(
914+
value_from,
915+
step_state,
916+
[],
917+
None,
918+
None,
919+
{},
920+
context=step_state[key],
921+
)
922+
execution_state.inputs[key] = from_cwl(as_cwl_value)
923+
866924
def execute( self, trans, progress, invocation, step ):
867925
tool = trans.app.toolbox.get_tool( step.tool_id, tool_version=step.tool_version, tool_hash=step.tool_hash )
868926
tool_state = step.state
@@ -925,6 +983,10 @@ def callback( input, prefixed_name, **kwargs ):
925983
message = message_template % (tool.name, k.message)
926984
raise exceptions.MessageException( message )
927985

986+
self.evaluate_value_from_expressions(
987+
step, execution_state
988+
)
989+
928990
unmatched_input_connections = expected_replacement_keys - found_replacement_keys
929991
if unmatched_input_connections:
930992
log.warn("Failed to use input connections for inputs [%s]" % unmatched_input_connections)
@@ -997,22 +1059,22 @@ def callback( input, prefixed_name, **kwargs ):
9971059
is_data_param = isinstance( input, DataToolParameter )
9981060
if is_data_param and not input.multiple:
9991061
data = progress.replacement_for_tool_input( step, input, prefixed_name )
1000-
if hasattr( data, "collection" ):
1062+
if hasattr( data, "collection" ) and data.collection.allow_implicit_mapping:
10011063
collections_to_match.add( prefixed_name, data )
10021064
return
10031065
is_data_collection_param = isinstance( input, DataCollectionToolParameter )
10041066
if is_data_collection_param and not input.multiple:
10051067
data = progress.replacement_for_tool_input( step, input, prefixed_name )
10061068
history_query = input._history_query( self.trans )
1007-
if hasattr( data, "collection" ):
1069+
if hasattr( data, "collection" ) and data.collection.allow_implicit_mapping:
10081070
subcollection_type_description = history_query.can_map_over( data )
10091071
if subcollection_type_description:
10101072
collections_to_match.add( prefixed_name, data, subcollection_type=subcollection_type_description.collection_type )
10111073
return
10121074

10131075
data = progress.replacement_for_tool_input( step, input, prefixed_name )
10141076
if data is not NO_REPLACEMENT:
1015-
if hasattr( data, "collection" ):
1077+
if hasattr( data, "collection" ) and data.collection.allow_implicit_mapping:
10161078
collections_to_match.add( prefixed_name, data )
10171079

10181080
visit_input_values( tool.inputs, step.state.inputs, callback )

lib/galaxy/workflow/run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ def replacement_for_tool_input( self, step, input, prefixed_name ):
295295
replacement = replacement[ 0 ]
296296
else:
297297
step_input = step.inputs_by_name.get(prefixed_name, None)
298+
298299
merge_type = model.WorkflowStepInput.default_merge_type
299300
if step_input:
300301
merge_type = step_input.merge_type
@@ -328,7 +329,6 @@ def replacement_for_tool_input( self, step, input, prefixed_name ):
328329

329330
inputs.append(input_from_connection)
330331

331-
332332
if input.type == "data_collection":
333333
# TODO: Implement more nested types here...
334334
assert input.collection_types == ["list"], input.collection_types
@@ -384,7 +384,7 @@ def replacement_for_tool_input( self, step, input, prefixed_name ):
384384
collection=collection,
385385
history=self.workflow_invocation.history,
386386
)
387-
return ephemeral_collection
387+
replacement = ephemeral_collection
388388

389389
return replacement
390390

test/unit/tools/test_cwl.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,20 @@ def test_workflow_multiple_input_merge_flattened():
200200
assert input["merge_type"] == "merge_flattened"
201201

202202

203+
def test_workflow_step_value_from():
204+
version = "v1.0"
205+
proxy = workflow_proxy(_cwl_tool_path("%s/step-valuefrom-wf.cwl" % version))
206+
207+
galaxy_workflow_dict = proxy.to_dict()
208+
assert len(galaxy_workflow_dict["steps"]) == 3
209+
210+
tool_step = galaxy_workflow_dict["steps"][2]
211+
assert "inputs" in tool_step
212+
inputs = tool_step["inputs"]
213+
assert len(inputs) == 1
214+
assert "value_from" in inputs[0]
215+
216+
203217
def test_load_proxy_simple():
204218
cat3 = _cwl_tool_path("draft3/cat3-tool.cwl")
205219
tool_source = get_tool_source(cat3)

0 commit comments

Comments
 (0)