-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathdataflow_job.tf
259 lines (213 loc) · 9.28 KB
/
dataflow_job.tf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# Autogenerated file. DO NOT EDIT.
#
# Copyright (C) 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
variable "on_delete" {
type = string
description = "One of \"drain\" or \"cancel\". Specifies behavior of deletion during terraform destroy."
}
variable "project" {
type = string
description = "The Google Cloud Project ID within which this module provisions resources."
}
variable "region" {
type = string
description = "The region in which the created job should run."
}
variable "mongoDbUri" {
type = string
description = "MongoDB connection URI in the format `mongodb+srv://:@`."
}
variable "database" {
type = string
description = "Database in MongoDB to read the collection from. (Example: my-db)"
}
variable "collection" {
type = string
description = "Name of the collection inside MongoDB database. (Example: my-collection)"
}
variable "userOption" {
type = string
description = "User option: `FLATTEN` or `NONE`. `FLATTEN` flattens the documents to the single level. `NONE` stores the whole document as a JSON string. Defaults to: NONE."
default = "NONE"
}
variable "KMSEncryptionKey" {
type = string
description = "Cloud KMS Encryption Key to decrypt the mongodb uri connection string. If Cloud KMS key is passed in, the mongodb uri connection string must all be passed in encrypted. (Example: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key)"
default = null
}
variable "useStorageWriteApi" {
type = bool
description = "If enabled (set to true) the pipeline will use Storage Write API when writing the data to BigQuery (see https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). Defaults to: false."
default = null
}
variable "useStorageWriteApiAtLeastOnce" {
type = bool
description = <<EOT
This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.
EOT
default = null
}
variable "outputTableSpec" {
type = string
description = "BigQuery table location to write the output to. The name should be in the format `<project>:<dataset>.<table_name>`. The table's schema must match input objects."
}
variable "javascriptDocumentTransformGcsPath" {
type = string
description = "The Cloud Storage path pattern for the JavaScript code containing your user-defined functions. (Example: gs://your-bucket/your-transforms/*.js)"
default = null
}
variable "javascriptDocumentTransformFunctionName" {
type = string
description = "The function name should only contain letters, digits and underscores. Example: 'transform' or 'transform_udf1'. (Example: transform)"
default = null
}
provider "google" {
project = var.project
}
provider "google-beta" {
project = var.project
}
variable "additional_experiments" {
type = set(string)
description = "List of experiments that should be used by the job. An example value is 'enable_stackdriver_agent_metrics'."
default = null
}
variable "autoscaling_algorithm" {
type = string
description = "The algorithm to use for autoscaling"
default = null
}
variable "enable_streaming_engine" {
type = bool
description = "Indicates if the job should use the streaming engine feature."
default = null
}
variable "ip_configuration" {
type = string
description = "The configuration for VM IPs. Options are 'WORKER_IP_PUBLIC' or 'WORKER_IP_PRIVATE'."
default = null
}
variable "kms_key_name" {
type = string
description = "The name for the Cloud KMS key for the job. Key format is: projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY"
default = null
}
variable "labels" {
type = map(string)
description = "User labels to be specified for the job. Keys and values should follow the restrictions specified in the labeling restrictions page. NOTE: This field is non-authoritative, and will only manage the labels present in your configuration. Please refer to the field 'effective_labels' for all of the labels present on the resource."
default = null
}
variable "launcher_machine_type" {
type = string
description = "The machine type to use for launching the job. The default is n1-standard-1."
default = null
}
variable "machine_type" {
type = string
description = "The machine type to use for the job."
default = null
}
variable "max_workers" {
type = number
description = "The maximum number of Google Compute Engine instances to be made available to your pipeline during execution, from 1 to 1000."
default = null
}
variable "name" {
type = string
}
variable "network" {
type = string
description = "The network to which VMs will be assigned. If it is not provided, 'default' will be used."
default = null
}
variable "num_workers" {
type = number
description = "The initial number of Google Compute Engine instances for the job."
default = null
}
variable "sdk_container_image" {
type = string
description = "Docker registry location of container image to use for the 'worker harness. Default is the container for the version of the SDK. Note this field is only valid for portable pipelines."
default = null
}
variable "service_account_email" {
type = string
description = "The Service Account email used to create the job."
default = null
}
variable "skip_wait_on_job_termination" {
type = bool
description = "If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not ensure that the job names are different, e.g. by embedding a release ID or by using a random_id."
default = null
}
variable "staging_location" {
type = string
description = "The Cloud Storage path to use for staging files. Must be a valid Cloud Storage URL, beginning with gs://."
default = null
}
variable "subnetwork" {
type = string
description = "The subnetwork to which VMs will be assigned. Should be of the form 'regions/REGION/subnetworks/SUBNETWORK'."
default = null
}
variable "temp_location" {
type = string
description = "The Cloud Storage path to use for temporary files. Must be a valid Cloud Storage URL, beginning with gs://."
default = null
}
resource "google_project_service" "required" {
service = "dataflow.googleapis.com"
disable_on_destroy = false
}
resource "google_dataflow_flex_template_job" "generated" {
depends_on = [google_project_service.required]
provider = google-beta
container_spec_gcs_path = "gs://dataflow-templates-${var.region}/latest/flex/MongoDB_to_BigQuery"
parameters = {
mongoDbUri = var.mongoDbUri
database = var.database
collection = var.collection
userOption = var.userOption
KMSEncryptionKey = var.KMSEncryptionKey
useStorageWriteApi = tostring(var.useStorageWriteApi)
useStorageWriteApiAtLeastOnce = tostring(var.useStorageWriteApiAtLeastOnce)
outputTableSpec = var.outputTableSpec
javascriptDocumentTransformGcsPath = var.javascriptDocumentTransformGcsPath
javascriptDocumentTransformFunctionName = var.javascriptDocumentTransformFunctionName
}
additional_experiments = var.additional_experiments
autoscaling_algorithm = var.autoscaling_algorithm
enable_streaming_engine = var.enable_streaming_engine
ip_configuration = var.ip_configuration
kms_key_name = var.kms_key_name
labels = var.labels
launcher_machine_type = var.launcher_machine_type
machine_type = var.machine_type
max_workers = var.max_workers
name = var.name
network = var.network
num_workers = var.num_workers
sdk_container_image = var.sdk_container_image
service_account_email = var.service_account_email
skip_wait_on_job_termination = var.skip_wait_on_job_termination
staging_location = var.staging_location
subnetwork = var.subnetwork
temp_location = var.temp_location
region = var.region
}
output "dataflow_job_url" {
value = "https://console.cloud.google.com/dataflow/jobs/${var.region}/${google_dataflow_flex_template_job.generated.job_id}"
}