@@ -38,79 +38,87 @@ class TestEndToEndForecasting(e2e_base.TestEndToEnd):
38
38
def test_end_to_end_forecasting (self , shared_state ):
39
39
"""Builds a dataset, trains models, and gets batch predictions."""
40
40
41
- # Collection of resources generated by this test, to be deleted during
42
- # teardown.
43
- shared_state ["resources" ] = []
44
-
45
41
aiplatform .init (
46
42
project = e2e_base ._PROJECT ,
47
43
location = e2e_base ._LOCATION ,
48
44
staging_bucket = shared_state ["staging_bucket_name" ],
49
45
)
50
-
51
- # Create and import to single managed dataset for both training jobs.
52
- ds = aiplatform .TimeSeriesDataset .create (
53
- display_name = self ._make_display_name ("dataset" ),
54
- bq_source = [_TRAINING_DATASET_BQ_PATH ],
55
- sync = False ,
56
- create_request_timeout = 180.0 ,
57
- )
58
- shared_state ["resources" ].extend ([ds ])
59
-
60
- time_column = "date"
61
- time_series_identifier_column = "store_name"
62
- target_column = "sale_dollars"
63
- column_specs = {
64
- time_column : "timestamp" ,
65
- target_column : "numeric" ,
66
- "city" : "categorical" ,
67
- "zip_code" : "categorical" ,
68
- "county" : "categorical" ,
69
- }
70
-
71
- # Define both training jobs
72
- # TODO(humichael): Add seq2seq job.
73
- automl_job = aiplatform .AutoMLForecastingTrainingJob (
74
- display_name = self ._make_display_name ("train-housing-automl" ),
75
- optimization_objective = "minimize-rmse" ,
76
- column_specs = column_specs ,
77
- )
78
-
79
- # Kick off both training jobs, AutoML job will take approx one hour to
80
- # run.
81
- automl_model = automl_job .run (
82
- dataset = ds ,
83
- target_column = target_column ,
84
- time_column = time_column ,
85
- time_series_identifier_column = time_series_identifier_column ,
86
- available_at_forecast_columns = [time_column ],
87
- unavailable_at_forecast_columns = [target_column ],
88
- time_series_attribute_columns = ["city" , "zip_code" , "county" ],
89
- forecast_horizon = 30 ,
90
- context_window = 30 ,
91
- data_granularity_unit = "day" ,
92
- data_granularity_count = 1 ,
93
- budget_milli_node_hours = 1000 ,
94
- model_display_name = self ._make_display_name ("automl-liquor-model" ),
95
- sync = False ,
96
- )
97
- shared_state ["resources" ].extend ([automl_job , automl_model ])
98
-
99
- automl_batch_prediction_job = automl_model .batch_predict (
100
- job_display_name = self ._make_display_name ("automl-liquor-model" ),
101
- instances_format = "bigquery" ,
102
- machine_type = "n1-standard-4" ,
103
- bigquery_source = _PREDICTION_DATASET_BQ_PATH ,
104
- gcs_destination_prefix = (
105
- f'gs://{ shared_state ["staging_bucket_name" ]} /bp_results/'
106
- ),
107
- sync = False ,
108
- )
109
- shared_state ["resources" ].append (automl_batch_prediction_job )
110
-
111
- automl_batch_prediction_job .wait ()
112
-
113
- assert automl_job .state == pipeline_state .PipelineState .PIPELINE_STATE_SUCCEEDED
114
- assert (
115
- automl_batch_prediction_job .state == job_state .JobState .JOB_STATE_SUCCEEDED
116
- )
46
+ try :
47
+ # Create and import to single managed dataset for both training
48
+ # jobs.
49
+ ds = aiplatform .TimeSeriesDataset .create (
50
+ display_name = self ._make_display_name ("dataset" ),
51
+ bq_source = [_TRAINING_DATASET_BQ_PATH ],
52
+ sync = False ,
53
+ create_request_timeout = 180.0 ,
54
+ )
55
+
56
+ time_column = "date"
57
+ time_series_identifier_column = "store_name"
58
+ target_column = "sale_dollars"
59
+ column_specs = {
60
+ time_column : "timestamp" ,
61
+ target_column : "numeric" ,
62
+ "city" : "categorical" ,
63
+ "zip_code" : "categorical" ,
64
+ "county" : "categorical" ,
65
+ }
66
+
67
+ # Define both training jobs
68
+ # TODO(humichael): Add seq2seq job.
69
+ automl_job = aiplatform .AutoMLForecastingTrainingJob (
70
+ display_name = self ._make_display_name ("train-housing-automl" ),
71
+ optimization_objective = "minimize-rmse" ,
72
+ column_specs = column_specs ,
73
+ )
74
+
75
+ # Kick off both training jobs, AutoML job will take approx one hour
76
+ # to run.
77
+ automl_model = automl_job .run (
78
+ dataset = ds ,
79
+ target_column = target_column ,
80
+ time_column = time_column ,
81
+ time_series_identifier_column = time_series_identifier_column ,
82
+ available_at_forecast_columns = [time_column ],
83
+ unavailable_at_forecast_columns = [target_column ],
84
+ time_series_attribute_columns = ["city" , "zip_code" , "county" ],
85
+ forecast_horizon = 30 ,
86
+ context_window = 30 ,
87
+ data_granularity_unit = "day" ,
88
+ data_granularity_count = 1 ,
89
+ budget_milli_node_hours = 1000 ,
90
+ model_display_name = self ._make_display_name ("automl-liquor-model" ),
91
+ sync = False ,
92
+ )
93
+
94
+ automl_batch_prediction_job = automl_model .batch_predict (
95
+ job_display_name = self ._make_display_name ("automl-liquor-model" ),
96
+ instances_format = "bigquery" ,
97
+ machine_type = "n1-standard-4" ,
98
+ bigquery_source = _PREDICTION_DATASET_BQ_PATH ,
99
+ gcs_destination_prefix = (
100
+ f'gs://{ shared_state ["staging_bucket_name" ]} /bp_results/'
101
+ ),
102
+ sync = False ,
103
+ )
104
+
105
+ automl_batch_prediction_job .wait ()
106
+
107
+ assert (
108
+ automl_job .state
109
+ == pipeline_state .PipelineState .PIPELINE_STATE_SUCCEEDED
110
+ )
111
+ assert (
112
+ automl_batch_prediction_job .state
113
+ == job_state .JobState .JOB_STATE_SUCCEEDED
114
+ )
115
+ finally :
116
+ resources = [
117
+ "ds" ,
118
+ "automl_job" ,
119
+ "automl_model" ,
120
+ "automl_batch_prediction_job" ,
121
+ ]
122
+ for resource in resources :
123
+ if resource in locals ():
124
+ locals ()[resource ].delete ()
0 commit comments