Skip to content

Commit e065437

Browse files
feat(workflows): add sample 'execute_with_arguments' and update 'conftest.py' (#13290)
* feat(workflows): update the Workflow definition to accept optional arguments * feat(workflows): add pass_data_in_execution_request sample * feat(workflows): add test for pass_data sample, and fix remaining tests so they work well together * feat(workflows): update README.md * feat(workflows): delete conflicting region tag and cleanup for linting * feat(workflows): use a f-string for uuid interpolation * feat(workflows): migrate previous region tag and rename new region tag * feat(workflows): remove unnecessary context in comment. * feat(workflows): apply feedback from PR review PR review #13290 (comment) - Replace texts in README.md * feat(workflows): remove reference to 'Platform' in 'Google Cloud' product name - As per https://cloud.google.com/guides/style/branding-list#cloud-name-list
1 parent a5a7807 commit e065437

7 files changed

+185
-32
lines changed

workflows/cloud-client/README.md

+9-6
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
1-
<img src="https://avatars2.githubusercontent.com/u/2810941?v=3&s=96" alt="Google Cloud Platform logo" title="Google Cloud Platform" align="right" height="96" width="96"/>
1+
<img src="https://avatars2.githubusercontent.com/u/2810941?v=3&s=96" alt="Google Cloud logo" title="Google Cloud" align="right" height="96" width="96"/>
22

33
# Cloud Workflows Quickstart – Python
44

5-
This sample shows how to execute a Cloud Workflow and wait for the workflow execution results using the Python client libraries.
5+
This sample shows how to execute Cloud Workflows and wait for the result
6+
of a workflow execution using the Python client libraries.
67

78
## Setup
89

910
1. Deploy the workflow, `myFirstWorkflow`:
1011

11-
1. Copy the YAML from this file: https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/myFirstWorkflow.workflows.yaml
12+
1. Copy the YAML from this file: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/workflows/cloud-client/myFirstWorkflow.workflows.yaml
1213
1. Paste the YAML into a file called `myFirstWorkflow.workflows.yaml`.
1314
1. Run the command: `gcloud workflows deploy myFirstWorkflow --source myFirstWorkflow.workflows.yaml`
1415

1516
## Run the Quickstart
1617

1718
Install [`pip`][pip] and [`virtualenv`][virtualenv] if you do not already have them.
1819

19-
You may want to refer to the [`Python Development Environment Setup Guide`][setup] for Google Cloud Platform for instructions.
20+
For more information, refer to the
21+
[Python Development Environment Setup Guide][setup] for Google Cloud.
2022

21-
1. Create a virtualenv. Samples are compatible with Python 2.7 and 3.4+.
23+
1. Create a virtualenv. Samples are compatible with Python 3.9+.
2224

2325
```sh
2426
virtualenv env
@@ -40,7 +42,8 @@ You may want to refer to the [`Python Development Environment Setup Guide`][setu
4042

4143
1. Observe the results:
4244

43-
In stdout, you should see a JSON response from your workflow like the following:
45+
In `stdout`, you should see a JSON response from your workflow like the following,
46+
depending on the current weekday in Amsterdam.
4447

4548
```json
4649
["Wednesday","Wednesday Night Wars","Wednesday 13","Wednesday Addams","Wednesday Campanella","Wednesdayite","Wednesday Martin","Wednesday Campanella discography","Wednesday Night Hockey (American TV program)","Wednesday Morning, 3 A.M."]

workflows/cloud-client/conftest.py

+11-9
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323

2424
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
2525
LOCATION = "us-central1"
26-
WORKFLOW_ID = "myFirstWorkflow_" + str(uuid.uuid4())
26+
WORKFLOW_ID_BASE = "myFirstWorkflow"
2727

2828

29-
def workflow_exists(client: workflows_v1.WorkflowsClient) -> bool:
29+
def workflow_exists(client: workflows_v1.WorkflowsClient, workflow_id: str) -> bool:
3030
"""Returns True if the workflow exists in this project."""
3131
try:
3232
workflow_name = client.workflow_path(
33-
PROJECT_ID, LOCATION, WORKFLOW_ID
33+
PROJECT_ID, LOCATION, workflow_id
3434
)
3535
client.get_workflow(request={"name": workflow_name})
3636
return True
@@ -56,13 +56,15 @@ def location() -> str:
5656
return LOCATION
5757

5858

59-
@pytest.fixture(scope="module")
59+
@pytest.fixture(scope="function")
6060
def workflow_id(client: workflows_v1.WorkflowsClient) -> str:
61+
workflow_id_str = f"{WORKFLOW_ID_BASE}_{uuid.uuid4()}"
62+
6163
creating_workflow = False
6264
backoff_delay = 1 # Start wait with delay of 1 second.
6365

6466
# Create the workflow if it doesn't exist.
65-
while not workflow_exists(client):
67+
while not workflow_exists(client, workflow_id_str):
6668
if not creating_workflow:
6769
# Create the workflow.
6870
workflow_file = open("myFirstWorkflow.workflows.yaml").read()
@@ -72,9 +74,9 @@ def workflow_id(client: workflows_v1.WorkflowsClient) -> str:
7274
client.create_workflow(
7375
request={
7476
"parent": parent,
75-
"workflow_id": WORKFLOW_ID,
77+
"workflow_id": workflow_id_str,
7678
"workflow": {
77-
"name": WORKFLOW_ID,
79+
"name": workflow_id_str,
7880
"source_contents": workflow_file
7981
},
8082
}
@@ -88,11 +90,11 @@ def workflow_id(client: workflows_v1.WorkflowsClient) -> str:
8890
# Double the delay to provide exponential backoff.
8991
backoff_delay *= 2
9092

91-
yield WORKFLOW_ID
93+
yield workflow_id_str
9294

9395
# Delete the workflow.
9496
workflow_full_name = client.workflow_path(
95-
PROJECT_ID, LOCATION, WORKFLOW_ID
97+
PROJECT_ID, LOCATION, workflow_id_str
9698
)
9799

98100
client.delete_workflow(

workflows/cloud-client/main.py

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def execute_workflow(
3737
The execution response.
3838
"""
3939

40+
# [START workflows_execute_without_arguments]
4041
# [START workflows_api_quickstart]
4142
import time
4243

@@ -86,6 +87,7 @@ def execute_workflow(
8687
print(f"Execution results: {execution.result}")
8788
# [END workflows_api_quickstart_execution]
8889
# [END workflows_api_quickstart]
90+
# [END workflows_execute_without_arguments]
8991
return execution
9092

9193

workflows/cloud-client/main_test.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import main
2020

2121

22-
@backoff.on_exception(backoff.expo, AssertionError, max_time=60)
22+
@backoff.on_exception(backoff.expo, AssertionError, max_tries=5)
2323
def test_workflow_execution(project_id: str, location: str, workflow_id: str) -> None:
2424
result = main.execute_workflow(project_id, location, workflow_id)
2525
assert result.state == executions.Execution.State.SUCCEEDED
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2021 Google LLC
1+
# Copyright 2025 Google LLC
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -12,18 +12,36 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
- getCurrentTime:
16-
call: http.get
17-
args:
18-
url: https://timeapi.io/api/Time/current/zone?timeZone=Europe/Amsterdam
19-
result: currentTime
20-
- readWikipedia:
21-
call: http.get
22-
args:
23-
url: https://en.wikipedia.org/w/api.php
24-
query:
25-
action: opensearch
26-
search: ${currentTime.body.dayOfWeek}
27-
result: wikiResult
28-
- returnResult:
29-
return: ${wikiResult.body[1]}
15+
# [START workflows_myfirstworkflow_yaml_python]
16+
# This workflow accepts an optional "searchTerm" argument for the Wikipedia API.
17+
# If no input arguments are provided or "searchTerm" is absent,
18+
# it will fetch the day of the week in Amsterdam and use it as the search term.
19+
20+
main:
21+
params: [input]
22+
steps:
23+
- validateSearchTermAndRedirectToReadWikipedia:
24+
switch:
25+
- condition: '${map.get(input, "searchTerm") != null}'
26+
assign:
27+
- searchTerm: '${input.searchTerm}'
28+
next: readWikipedia
29+
- getCurrentTime:
30+
call: http.get
31+
args:
32+
url: https://timeapi.io/api/Time/current/zone?timeZone=Europe/Amsterdam
33+
result: currentTime
34+
- setFromCallResult:
35+
assign:
36+
- searchTerm: '${currentTime.body.dayOfWeek}'
37+
- readWikipedia:
38+
call: http.get
39+
args:
40+
url: 'https://en.wikipedia.org/w/api.php'
41+
query:
42+
action: opensearch
43+
search: '${searchTerm}'
44+
result: wikiResult
45+
- returnOutput:
46+
return: '${wikiResult.body[1]}'
47+
# [END workflows_myfirstworkflow_yaml_python]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
17+
from google.cloud.workflows.executions_v1 import Execution
18+
19+
20+
def execute_workflow_with_argument(
21+
project_id: str,
22+
location: str,
23+
workflow_id: str
24+
) -> Execution:
25+
"""Execute a workflow and print the execution results.
26+
27+
Args:
28+
project: The ID of the Google Cloud project
29+
which contains the workflow to execute.
30+
location: The location for the workflow.
31+
workflow: The ID of the workflow to execute.
32+
33+
Returns:
34+
The execution response.
35+
"""
36+
37+
# [START workflows_execute_with_arguments]
38+
import time
39+
40+
from google.cloud import workflows_v1
41+
from google.cloud.workflows import executions_v1
42+
43+
from google.cloud.workflows.executions_v1.types import executions
44+
45+
# TODO(developer): Update and uncomment the following lines.
46+
# project_id = "YOUR_PROJECT_ID"
47+
# location = "YOUR_LOCATION" # For example: us-central1
48+
# workflow_id = "YOUR_WORKFLOW_ID" # For example: myFirstWorkflow
49+
50+
# Initialize API clients.
51+
execution_client = executions_v1.ExecutionsClient()
52+
workflows_client = workflows_v1.WorkflowsClient()
53+
54+
# Construct the fully qualified location path.
55+
parent = workflows_client.workflow_path(project_id, location, workflow_id)
56+
57+
# Execute the workflow.
58+
# Find more information about the Execution object here:
59+
# https://cloud.google.com/python/docs/reference/workflows/latest/google.cloud.workflows.executions_v1.types.Execution
60+
execution = executions_v1.Execution(
61+
name=parent,
62+
argument='{"searchTerm": "Cloud"}',
63+
)
64+
65+
response = execution_client.create_execution(
66+
parent=parent,
67+
execution=execution,
68+
)
69+
print(f"Created execution: {response.name}")
70+
71+
# Wait for execution to finish, then print results.
72+
execution_finished = False
73+
backoff_delay = 1 # Start wait with delay of 1 second.
74+
print("Poll for result...")
75+
76+
while not execution_finished:
77+
execution = execution_client.get_execution(
78+
request={"name": response.name}
79+
)
80+
execution_finished = execution.state != executions.Execution.State.ACTIVE
81+
82+
# If we haven't seen the result yet, keep waiting.
83+
if not execution_finished:
84+
print("- Waiting for results...")
85+
time.sleep(backoff_delay)
86+
# Double the delay to provide exponential backoff.
87+
backoff_delay *= 2
88+
else:
89+
print(f"Execution finished with state: {execution.state.name}")
90+
print(f"Execution results: {execution.result}")
91+
# [END workflows_execute_with_arguments]
92+
return execution
93+
94+
95+
if __name__ == "__main__":
96+
PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
97+
assert PROJECT, "'GOOGLE_CLOUD_PROJECT' environment variable not set."
98+
99+
execute_workflow_with_argument(PROJECT, "us-central1", "myFirstWorkflow")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import backoff
16+
17+
from google.cloud.workflows.executions_v1.types import executions
18+
19+
import pass_data_in_execution_request
20+
21+
22+
@backoff.on_exception(backoff.expo, AssertionError, max_tries=5)
23+
def test_workflow_execution_with_arguments(project_id: str, location: str, workflow_id: str) -> None:
24+
execution_result = pass_data_in_execution_request.execute_workflow_with_argument(
25+
project_id, location, workflow_id
26+
)
27+
assert execution_result.state == executions.Execution.State.SUCCEEDED
28+
assert "searchTerm" in execution_result.argument
29+
assert "Cloud" in execution_result.result

0 commit comments

Comments
 (0)