Skip to content

Commit b97ee31

Browse files
authored
[yaml]: Phase2 High Usage Yaml examples (#35279)
* add infrastructure to run Phase 2 test cases * yapf * add input data * add new examples * change dataflow link
1 parent 6d0e00e commit b97ee31

File tree

6 files changed

+400
-14
lines changed

6 files changed

+400
-14
lines changed

sdks/python/apache_beam/yaml/examples/testing/examples_test.py

Lines changed: 109 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#
1818
# pytype: skip-file
1919
import glob
20+
import json
2021
import logging
2122
import os
2223
import random
@@ -124,11 +125,35 @@ def test_kafka_read(
124125
| beam.Map(lambda element: beam.Row(payload=element.encode('utf-8'))))
125126

126127

128+
@beam.ptransform.ptransform_fn
129+
def test_pubsub_read(
130+
pbegin,
131+
topic: Optional[str] = None,
132+
subscription: Optional[str] = None,
133+
format: Optional[str] = None,
134+
schema: Optional[Any] = None,
135+
attributes: Optional[List[str]] = None,
136+
attributes_map: Optional[str] = None,
137+
id_attribute: Optional[str] = None,
138+
timestamp_attribute: Optional[str] = None):
139+
140+
pubsub_messages = input_data.pubsub_messages_data()
141+
142+
return (
143+
pbegin
144+
| beam.Create([json.loads(msg.data) for msg in pubsub_messages])
145+
| beam.Map(lambda element: beam.Row(**element)))
146+
147+
127148
TEST_PROVIDERS = {
128-
'TestEnrichment': test_enrichment, 'TestReadFromKafka': test_kafka_read
149+
'TestEnrichment': test_enrichment,
150+
'TestReadFromKafka': test_kafka_read,
151+
'TestReadFromPubSub': test_pubsub_read
129152
}
130-
131-
INPUT_TRANSFORM_TEST_PROVIDERS = ['TestReadFromKafka']
153+
"""
154+
Transforms not requiring inputs.
155+
"""
156+
INPUT_TRANSFORM_TEST_PROVIDERS = ['TestReadFromKafka', 'TestReadFromPubSub']
132157

133158

134159
def check_output(expected: List[str]):
@@ -186,6 +211,7 @@ def test_yaml_example(self):
186211
f"Missing '# Expected:' tag in example file '{pipeline_spec_file}'")
187212
for i, line in enumerate(expected):
188213
expected[i] = line.replace('# ', '').replace('\n', '')
214+
expected = [line for line in expected if line]
189215
pipeline_spec = yaml.load(
190216
''.join(lines), Loader=yaml_transform.SafeLineLoader)
191217

@@ -418,7 +444,11 @@ def _kafka_test_preprocessor(
418444
'test_kafka_yaml',
419445
'test_spanner_read_yaml',
420446
'test_spanner_write_yaml',
421-
'test_enrich_spanner_with_bigquery_yaml'
447+
'test_enrich_spanner_with_bigquery_yaml',
448+
'test_pubsub_topic_to_bigquery_yaml',
449+
'test_pubsub_subscription_to_bigquery_yaml',
450+
'test_jdbc_to_bigquery_yaml',
451+
'test_spanner_to_avro_yaml'
422452
])
423453
def _io_write_test_preprocessor(
424454
test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -527,8 +557,11 @@ def _iceberg_io_read_test_preprocessor(
527557
return test_spec
528558

529559

530-
@YamlExamplesTestSuite.register_test_preprocessor(
531-
['test_spanner_read_yaml', 'test_enrich_spanner_with_bigquery_yaml'])
560+
@YamlExamplesTestSuite.register_test_preprocessor([
561+
'test_spanner_read_yaml',
562+
'test_enrich_spanner_with_bigquery_yaml',
563+
"test_spanner_to_avro_yaml"
564+
])
532565
def _spanner_io_read_test_preprocessor(
533566
test_spec: dict, expected: List[str], env: TestEnvironment):
534567
"""
@@ -607,6 +640,71 @@ def _enrichment_test_preprocessor(
607640
return test_spec
608641

609642

643+
@YamlExamplesTestSuite.register_test_preprocessor([
644+
'test_pubsub_topic_to_bigquery_yaml',
645+
'test_pubsub_subscription_to_bigquery_yaml'
646+
])
647+
def _pubsub_io_read_test_preprocessor(
648+
test_spec: dict, expected: List[str], env: TestEnvironment):
649+
"""
650+
Preprocessor for tests that involve reading from Pub/Sub.
651+
652+
This preprocessor replaces any ReadFromPubSub transform with a Create
653+
transform that reads from a predefined in-memory list of messages.
654+
This allows the test to verify the pipeline's correctness without relying
655+
on an active Pub/Sub subscription or topic.
656+
"""
657+
if pipeline := test_spec.get('pipeline', None):
658+
for transform in pipeline.get('transforms', []):
659+
if transform.get('type', '') == 'ReadFromPubSub':
660+
transform['type'] = 'TestReadFromPubSub'
661+
662+
return test_spec
663+
664+
665+
@YamlExamplesTestSuite.register_test_preprocessor([
666+
'test_jdbc_to_bigquery_yaml',
667+
])
668+
def _jdbc_io_read_test_preprocessor(
669+
test_spec: dict, expected: List[str], env: TestEnvironment):
670+
"""
671+
Preprocessor for tests that involve reading from JDBC.
672+
673+
This preprocessor replaces any ReadFromJdbc transform with a Create
674+
transform that reads from a predefined in-memory list of records.
675+
This allows the test to verify the pipeline's correctness without
676+
relying on an active JDBC connection.
677+
"""
678+
if pipeline := test_spec.get('pipeline', None):
679+
for transform in pipeline.get('transforms', []):
680+
if transform.get('type', '').startswith('ReadFromJdbc'):
681+
config = transform['config']
682+
url = config['url']
683+
database = url.split('/')[-1]
684+
if (table := config.get('table', None)) is None:
685+
table = config.get('query', '').split('FROM')[-1].strip()
686+
transform['type'] = 'Create'
687+
transform['config'] = {
688+
k: v
689+
for k, v in config.items() if k.startswith('__')
690+
}
691+
elements = INPUT_TABLES[("Jdbc", database, table)]
692+
if config.get('query', None):
693+
config['query'].replace('select ',
694+
'SELECT ').replace(' from ', ' FROM ')
695+
columns = set(
696+
''.join(config['query'].split('SELECT ')[1:]).split(
697+
' FROM', maxsplit=1)[0].split(', '))
698+
if columns != {'*'}:
699+
elements = [{
700+
column: element[column]
701+
for column in element if column in columns
702+
} for element in elements]
703+
transform['config']['elements'] = elements
704+
705+
return test_spec
706+
707+
610708
INPUT_FILES = {'products.csv': input_data.products_csv()}
611709
INPUT_TABLES = {
612710
('shipment-test', 'shipment', 'shipments'): input_data.
@@ -616,16 +714,17 @@ def _enrichment_test_preprocessor(
616714
('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(),
617715
('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data.
618716
bigtable_data(),
619-
('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data()
717+
('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data(),
718+
('Jdbc', 'shipment', 'shipments'): input_data.jdbc_shipments_data()
620719
}
621720
YAML_DOCS_DIR = os.path.join(os.path.dirname(__file__))
622721

623722
AggregationTest = YamlExamplesTestSuite(
624723
'AggregationExamplesTest',
625724
os.path.join(YAML_DOCS_DIR, '../transforms/aggregation/*.yaml')).run()
626-
BlueprintsTest = YamlExamplesTestSuite(
627-
'BlueprintsExamplesTest',
628-
os.path.join(YAML_DOCS_DIR, '../transforms/blueprints/*.yaml')).run()
725+
BlueprintTest = YamlExamplesTestSuite(
726+
'BlueprintExamplesTest',
727+
os.path.join(YAML_DOCS_DIR, '../transforms/blueprint/*.yaml')).run()
629728
ElementWiseTest = YamlExamplesTestSuite(
630729
'ElementwiseExamplesTest',
631730
os.path.join(YAML_DOCS_DIR, '../transforms/elementwise/*.yaml')).run()

sdks/python/apache_beam/yaml/examples/testing/input_data.py

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
# limitations under the License.
1717
#
1818

19-
"""
20-
This file contains the input data to be requested by the example tests, if
21-
needed.
22-
"""
19+
from apache_beam.io.gcp.pubsub import PubsubMessage
20+
21+
# This file contains the input data to be requested by the example tests, if
22+
# needed.
2323

2424

2525
def text_data():
@@ -129,6 +129,57 @@ def spanner_shipments_data():
129129
}]
130130

131131

132+
def jdbc_shipments_data():
133+
return [{
134+
'shipment_id': 'S1',
135+
'customer_id': 'C1',
136+
'shipment_date': '2023-05-01',
137+
'shipment_cost': 150.0,
138+
'customer_name': 'Alice',
139+
'customer_email': '[email protected]'
140+
},
141+
{
142+
'shipment_id': 'S2',
143+
'customer_id': 'C2',
144+
'shipment_date': '2023-06-12',
145+
'shipment_cost': 300.0,
146+
'customer_name': 'Bob',
147+
'customer_email': '[email protected]'
148+
},
149+
{
150+
'shipment_id': 'S3',
151+
'customer_id': 'C1',
152+
'shipment_date': '2023-05-10',
153+
'shipment_cost': 20.0,
154+
'customer_name': 'Alice',
155+
'customer_email': '[email protected]'
156+
},
157+
{
158+
'shipment_id': 'S4',
159+
'customer_id': 'C4',
160+
'shipment_date': '2024-07-01',
161+
'shipment_cost': 150.0,
162+
'customer_name': 'Derek',
163+
'customer_email': '[email protected]'
164+
},
165+
{
166+
'shipment_id': 'S5',
167+
'customer_id': 'C5',
168+
'shipment_date': '2023-05-09',
169+
'shipment_cost': 300.0,
170+
'customer_name': 'Erin',
171+
'customer_email': '[email protected]'
172+
},
173+
{
174+
'shipment_id': 'S6',
175+
'customer_id': 'C4',
176+
'shipment_date': '2024-07-02',
177+
'shipment_cost': 150.0,
178+
'customer_name': 'Derek',
179+
'customer_email': '[email protected]'
180+
}]
181+
182+
132183
def bigtable_data():
133184
return [{
134185
'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'
@@ -165,3 +216,15 @@ def bigquery_data():
165216
'customer_name': 'Claire',
166217
'customer_email': '[email protected]'
167218
}]
219+
220+
221+
def pubsub_messages_data():
222+
"""
223+
Provides a list of PubsubMessage objects for testing.
224+
"""
225+
return [
226+
PubsubMessage(data=b"{\"label\": \"37a\", \"rank\": 1}", attributes={}),
227+
PubsubMessage(data=b"{\"label\": \"37b\", \"rank\": 4}", attributes={}),
228+
PubsubMessage(data=b"{\"label\": \"37c\", \"rank\": 3}", attributes={}),
229+
PubsubMessage(data=b"{\"label\": \"37d\", \"rank\": 2}", attributes={}),
230+
]
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# coding=utf-8
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
# This is an example of a Beam YAML pipeline that reads from jdbc database
19+
# and writes them to BigQuery. This matches the Dataflow Template located
20+
# here - https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery
21+
22+
pipeline:
23+
type: composite
24+
transforms:
25+
# Step 1: Reading shipment data from jdbc DB
26+
- type: ReadFromJdbc
27+
name: ReadShipments
28+
config:
29+
url: "jdbc:mysql://my-host:3306/shipment"
30+
driver_class_name: "org.sqlite.JDBC"
31+
query: "SELECT * FROM shipments"
32+
# Step 2: Write successful records out to BigQuery
33+
- type: WriteToBigQuery
34+
name: WriteShipments
35+
input: ReadShipments
36+
config:
37+
table: "apache-beam-testing.yaml_test.shipments"
38+
create_disposition: "CREATE_NEVER"
39+
write_disposition: "WRITE_APPEND"
40+
error_handling:
41+
output: "deadLetterQueue"
42+
num_streams: 1
43+
# Step 3: Write the failed messages to BQ to a dead letter queue JSON file
44+
- type: WriteToJson
45+
input: WriteShipments.deadLetterQueue
46+
config:
47+
path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"
48+
49+
50+
51+
# Expected:
52+
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]')
53+
# Row(shipment_id='S2', customer_id='C2', shipment_date='2023-06-12', shipment_cost=300.0, customer_name='Bob', customer_email='[email protected]')
54+
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]')
55+
# Row(shipment_id='S4', customer_id='C4', shipment_date='2024-07-01', shipment_cost=150.0, customer_name='Derek', customer_email='[email protected]')
56+
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='[email protected]')
57+
# Row(shipment_id='S6', customer_id='C4', shipment_date='2024-07-02', shipment_cost=150.0, customer_name='Derek', customer_email='[email protected]')
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# coding=utf-8
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
# This is an example of a Beam YAML pipeline that reads messages from Pub/Sub
19+
# and writes them to BigQuery. This matches the Dataflow Template located
20+
# here - https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-subscription-to-bigquery
21+
22+
pipeline:
23+
type: composite
24+
transforms:
25+
# Step 1: Reading messages from Pub/Sub subscription
26+
- type: ReadFromPubSub
27+
name: ReadMessages
28+
config:
29+
subscription: "projects/apache-beam-testing/subscription/my-subscription"
30+
format: JSON
31+
schema:
32+
type: object
33+
properties:
34+
data: {type: BYTES}
35+
attributes: {type: object}
36+
# Step 2: Write successful records out to BigQuery
37+
- type: WriteToBigQuery
38+
name: WriteMessages
39+
input: ReadMessages
40+
config:
41+
table: "apache-beam-testing.yaml_test.order_data"
42+
create_disposition: "CREATE_NEVER"
43+
write_disposition: "WRITE_APPEND"
44+
error_handling:
45+
output: "deadLetterQueue"
46+
num_streams: 1
47+
# Step 3: Write the failed messages to BQ to a JSON file
48+
- type: WriteToJson
49+
input: WriteMessages.deadLetterQueue
50+
config:
51+
path: "gs://my-bucket/yaml-123/writingToBigQueryErrors.json"
52+
53+
options:
54+
streaming: true
55+
56+
57+
# Expected:
58+
# Row(label='37a', rank=1)
59+
# Row(label='37b', rank=4)
60+
# Row(label='37c', rank=3)
61+
# Row(label='37d', rank=2)

0 commit comments

Comments
 (0)