Skip to content

feat(workflows): add sample 'execute_with_arguments' and update 'conftest.py' #13290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
15 changes: 9 additions & 6 deletions workflows/cloud-client/README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
<img src="https://avatars2.githubusercontent.com/u/2810941?v=3&s=96" alt="Google Cloud Platform logo" title="Google Cloud Platform" align="right" height="96" width="96"/>
<img src="https://avatars2.githubusercontent.com/u/2810941?v=3&s=96" alt="Google Cloud logo" title="Google Cloud" align="right" height="96" width="96"/>

# Cloud Workflows Quickstart – Python

This sample shows how to execute a Cloud Workflow and wait for the workflow execution results using the Python client libraries.
This sample shows how to execute Cloud Workflows and wait for the result
of a workflow execution using the Python client libraries.

## Setup

1. Deploy the workflow, `myFirstWorkflow`:

1. Copy the YAML from this file: https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/myFirstWorkflow.workflows.yaml
1. Copy the YAML from this file: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/workflows/cloud-client/myFirstWorkflow.workflows.yaml
1. Paste the YAML into a file called `myFirstWorkflow.workflows.yaml`.
1. Run the command: `gcloud workflows deploy myFirstWorkflow --source myFirstWorkflow.workflows.yaml`

## Run the Quickstart

Install [`pip`][pip] and [`virtualenv`][virtualenv] if you do not already have them.

You may want to refer to the [`Python Development Environment Setup Guide`][setup] for Google Cloud Platform for instructions.
For more information, refer to the
[Python Development Environment Setup Guide][setup] for Google Cloud.

1. Create a virtualenv. Samples are compatible with Python 2.7 and 3.4+.
1. Create a virtualenv. Samples are compatible with Python 3.9+.

```sh
virtualenv env
Expand All @@ -40,7 +42,8 @@ You may want to refer to the [`Python Development Environment Setup Guide`][setu

1. Observe the results:

In stdout, you should see a JSON response from your workflow like the following:
In `stdout`, you should see a JSON response from your workflow like the following,
depending on the current weekday in Amsterdam.

```json
["Wednesday","Wednesday Night Wars","Wednesday 13","Wednesday Addams","Wednesday Campanella","Wednesdayite","Wednesday Martin","Wednesday Campanella discography","Wednesday Night Hockey (American TV program)","Wednesday Morning, 3 A.M."]
Expand Down
20 changes: 11 additions & 9 deletions workflows/cloud-client/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
LOCATION = "us-central1"
WORKFLOW_ID = "myFirstWorkflow_" + str(uuid.uuid4())
WORKFLOW_ID_BASE = "myFirstWorkflow"


def workflow_exists(client: workflows_v1.WorkflowsClient) -> bool:
def workflow_exists(client: workflows_v1.WorkflowsClient, workflow_id: str) -> bool:
"""Returns True if the workflow exists in this project."""
try:
workflow_name = client.workflow_path(
PROJECT_ID, LOCATION, WORKFLOW_ID
PROJECT_ID, LOCATION, workflow_id
)
client.get_workflow(request={"name": workflow_name})
return True
Expand All @@ -56,13 +56,15 @@ def location() -> str:
return LOCATION


@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def workflow_id(client: workflows_v1.WorkflowsClient) -> str:
workflow_id_str = f"{WORKFLOW_ID_BASE}_{uuid.uuid4()}"

creating_workflow = False
backoff_delay = 1 # Start wait with delay of 1 second.

# Create the workflow if it doesn't exist.
while not workflow_exists(client):
while not workflow_exists(client, workflow_id_str):
if not creating_workflow:
# Create the workflow.
workflow_file = open("myFirstWorkflow.workflows.yaml").read()
Expand All @@ -72,9 +74,9 @@ def workflow_id(client: workflows_v1.WorkflowsClient) -> str:
client.create_workflow(
request={
"parent": parent,
"workflow_id": WORKFLOW_ID,
"workflow_id": workflow_id_str,
"workflow": {
"name": WORKFLOW_ID,
"name": workflow_id_str,
"source_contents": workflow_file
},
}
Expand All @@ -88,11 +90,11 @@ def workflow_id(client: workflows_v1.WorkflowsClient) -> str:
# Double the delay to provide exponential backoff.
backoff_delay *= 2

yield WORKFLOW_ID
yield workflow_id_str

# Delete the workflow.
workflow_full_name = client.workflow_path(
PROJECT_ID, LOCATION, WORKFLOW_ID
PROJECT_ID, LOCATION, workflow_id_str
)

client.delete_workflow(
Expand Down
2 changes: 2 additions & 0 deletions workflows/cloud-client/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def execute_workflow(
The execution response.
"""

# [START workflows_execute_without_arguments]
# [START workflows_api_quickstart]
import time

Expand Down Expand Up @@ -86,6 +87,7 @@ def execute_workflow(
print(f"Execution results: {execution.result}")
# [END workflows_api_quickstart_execution]
# [END workflows_api_quickstart]
# [END workflows_execute_without_arguments]
return execution


Expand Down
2 changes: 1 addition & 1 deletion workflows/cloud-client/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import main


@backoff.on_exception(backoff.expo, AssertionError, max_time=60)
@backoff.on_exception(backoff.expo, AssertionError, max_tries=5)
def test_workflow_execution(project_id: str, location: str, workflow_id: str) -> None:
result = main.execute_workflow(project_id, location, workflow_id)
assert result.state == executions.Execution.State.SUCCEEDED
Expand Down
50 changes: 34 additions & 16 deletions workflows/cloud-client/myFirstWorkflow.workflows.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,18 +12,36 @@
# See the License for the specific language governing permissions and
# limitations under the License.

- getCurrentTime:
call: http.get
args:
url: https://timeapi.io/api/Time/current/zone?timeZone=Europe/Amsterdam
result: currentTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${currentTime.body.dayOfWeek}
result: wikiResult
- returnResult:
return: ${wikiResult.body[1]}
# [START workflows_myfirstworkflow_yaml_python]
# This workflow accepts an optional "searchTerm" argument for the Wikipedia API.
# If no input arguments are provided or "searchTerm" is absent,
# it will fetch the day of the week in Amsterdam and use it as the search term.

main:
params: [input]
steps:
- validateSearchTermAndRedirectToReadWikipedia:
switch:
- condition: '${map.get(input, "searchTerm") != null}'
assign:
- searchTerm: '${input.searchTerm}'
next: readWikipedia
- getCurrentTime:
call: http.get
args:
url: https://timeapi.io/api/Time/current/zone?timeZone=Europe/Amsterdam
result: currentTime
- setFromCallResult:
assign:
- searchTerm: '${currentTime.body.dayOfWeek}'
- readWikipedia:
call: http.get
args:
url: 'https://en.wikipedia.org/w/api.php'
query:
action: opensearch
search: '${searchTerm}'
result: wikiResult
- returnOutput:
return: '${wikiResult.body[1]}'
# [END workflows_myfirstworkflow_yaml_python]
99 changes: 99 additions & 0 deletions workflows/cloud-client/pass_data_in_execution_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from google.cloud.workflows.executions_v1 import Execution


def execute_workflow_with_argument(
project_id: str,
location: str,
workflow_id: str
) -> Execution:
"""Execute a workflow and print the execution results.

Args:
project: The ID of the Google Cloud project
which contains the workflow to execute.
location: The location for the workflow.
workflow: The ID of the workflow to execute.

Returns:
The execution response.
"""

# [START workflows_execute_with_arguments]
import time

from google.cloud import workflows_v1
from google.cloud.workflows import executions_v1

from google.cloud.workflows.executions_v1.types import executions

# TODO(developer): Update and uncomment the following lines.
# project_id = "YOUR_PROJECT_ID"
# location = "YOUR_LOCATION" # For example: us-central1
# workflow_id = "YOUR_WORKFLOW_ID" # For example: myFirstWorkflow

# Initialize API clients.
execution_client = executions_v1.ExecutionsClient()
workflows_client = workflows_v1.WorkflowsClient()

# Construct the fully qualified location path.
parent = workflows_client.workflow_path(project_id, location, workflow_id)

# Execute the workflow.
# Find more information about the Execution object here:
# https://cloud.google.com/python/docs/reference/workflows/latest/google.cloud.workflows.executions_v1.types.Execution
execution = executions_v1.Execution(
name=parent,
argument='{"searchTerm": "Cloud"}',
)

response = execution_client.create_execution(
parent=parent,
execution=execution,
)
print(f"Created execution: {response.name}")

# Wait for execution to finish, then print results.
execution_finished = False
backoff_delay = 1 # Start wait with delay of 1 second.
print("Poll for result...")

while not execution_finished:
execution = execution_client.get_execution(
request={"name": response.name}
)
execution_finished = execution.state != executions.Execution.State.ACTIVE

# If we haven't seen the result yet, keep waiting.
if not execution_finished:
print("- Waiting for results...")
time.sleep(backoff_delay)
# Double the delay to provide exponential backoff.
backoff_delay *= 2
else:
print(f"Execution finished with state: {execution.state.name}")
print(f"Execution results: {execution.result}")
# [END workflows_execute_with_arguments]
return execution


if __name__ == "__main__":
PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
assert PROJECT, "'GOOGLE_CLOUD_PROJECT' environment variable not set."

execute_workflow_with_argument(PROJECT, "us-central1", "myFirstWorkflow")
29 changes: 29 additions & 0 deletions workflows/cloud-client/pass_data_in_execution_request_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import backoff

from google.cloud.workflows.executions_v1.types import executions

import pass_data_in_execution_request


@backoff.on_exception(backoff.expo, AssertionError, max_tries=5)
def test_workflow_execution_with_arguments(project_id: str, location: str, workflow_id: str) -> None:
execution_result = pass_data_in_execution_request.execute_workflow_with_argument(
project_id, location, workflow_id
)
assert execution_result.state == executions.Execution.State.SUCCEEDED
assert "searchTerm" in execution_result.argument
assert "Cloud" in execution_result.result