|
14 | 14 |
|
15 | 15 | 'use strict';
|
16 | 16 |
|
17 |
| -// [START dataproc_quickstart] |
18 |
| -const dataproc = require('@google-cloud/dataproc'); |
19 |
| -const client = new dataproc.v1.ClusterControllerClient(); |
20 |
| - |
21 |
| -async function quickstart() { |
22 |
| - const projectId = await client.getProjectId(); |
23 |
| - const request = { |
24 |
| - region: 'global', |
25 |
| - projectId, |
26 |
| - }; |
27 |
| - const [resources] = await client.listClusters(request); |
28 |
| - console.log('Total resources:', resources.length); |
29 |
| - for (const resource of resources) { |
30 |
| - console.log(resource); |
31 |
| - } |
| 17 | +function main(projectId, region, clusterName, jobFilePath) { |
| 18 | + // [START dataproc_quickstart] |
| 19 | + const dataproc = require('@google-cloud/dataproc').v1; |
| 20 | + const {Storage} = require('@google-cloud/storage'); |
32 | 21 |
|
33 |
| - let nextRequest = request; |
34 |
| - // Or obtain the paged response. |
35 |
| - const options = {autoPaginate: false}; |
36 |
| - do { |
37 |
| - const responses = await client.listClusters(nextRequest, options); |
38 |
| - // The actual resources in a response. |
39 |
| - const resources = responses[0]; |
40 |
| - // The next request if the response shows that there are more responses. |
41 |
| - nextRequest = responses[1]; |
42 |
| - // The actual response object, if necessary. |
43 |
| - // const rawResponse = responses[2]; |
44 |
| - for (const resource of resources) { |
45 |
| - console.log(resource); |
46 |
| - } |
47 |
| - } while (nextRequest); |
| 22 | + const sleep = require('sleep'); |
| 23 | + |
| 24 | + // Create a cluster client with the endpoint set to the desired cluster region |
| 25 | + const clusterClient = new dataproc.ClusterControllerClient({ |
| 26 | + apiEndpoint: `${region}-dataproc.googleapis.com`, |
| 27 | + }); |
48 | 28 |
|
49 |
| - client.listClustersStream(request).on('data', element => { |
50 |
| - console.log(element); |
| 29 | + // Create a job client with the endpoint set to the desired cluster region |
| 30 | + const jobClient = new dataproc.v1.JobControllerClient({ |
| 31 | + apiEndpoint: `${region}-dataproc.googleapis.com`, |
51 | 32 | });
|
| 33 | + |
| 34 | + async function quickstart() { |
| 35 | + // TODO(developer): Uncomment and set the following variables |
| 36 | + // projectId = 'YOUR_PROJECT_ID' |
| 37 | + // region = 'YOUR_CLUSTER_REGION' |
| 38 | + // clusterName = 'YOUR_CLUSTER_NAME' |
| 39 | + // jobFilePath = 'YOUR_JOB_FILE_PATH' |
| 40 | + |
| 41 | + // Create the cluster config |
| 42 | + const cluster = { |
| 43 | + projectId: projectId, |
| 44 | + region: region, |
| 45 | + cluster: { |
| 46 | + clusterName: clusterName, |
| 47 | + config: { |
| 48 | + masterConfig: { |
| 49 | + numInstances: 1, |
| 50 | + machineTypeUri: 'n1-standard-1', |
| 51 | + }, |
| 52 | + workerConfig: { |
| 53 | + numInstances: 2, |
| 54 | + machineTypeUri: 'n1-standard-1', |
| 55 | + }, |
| 56 | + }, |
| 57 | + }, |
| 58 | + }; |
| 59 | + |
| 60 | + // Create the cluster |
| 61 | + const [operation] = await clusterClient.createCluster(cluster); |
| 62 | + const [response] = await operation.promise(); |
| 63 | + |
| 64 | + // Output a success message |
| 65 | + console.log(`Cluster created successfully: ${response.clusterName}`); |
| 66 | + |
| 67 | + const job = { |
| 68 | + projectId: projectId, |
| 69 | + region: region, |
| 70 | + job: { |
| 71 | + placement: { |
| 72 | + clusterName: clusterName, |
| 73 | + }, |
| 74 | + pysparkJob: { |
| 75 | + mainPythonFileUri: jobFilePath, |
| 76 | + }, |
| 77 | + }, |
| 78 | + }; |
| 79 | + |
| 80 | + let [jobResp] = await jobClient.submitJob(job); |
| 81 | + const jobId = jobResp.reference.jobId; |
| 82 | + |
| 83 | + console.log(`Submitted job "${jobId}".`); |
| 84 | + |
| 85 | + // Terminal states for a job |
| 86 | + const terminalStates = new Set(['DONE', 'ERROR', 'CANCELLED']); |
| 87 | + |
| 88 | + // Create a timeout such that the job gets cancelled if not |
| 89 | + // in a termimal state after a fixed period of time. |
| 90 | + const timeout = 600000; |
| 91 | + const start = new Date(); |
| 92 | + |
| 93 | + // Wait for the job to finish. |
| 94 | + const jobReq = { |
| 95 | + projectId: projectId, |
| 96 | + region: region, |
| 97 | + jobId: jobId, |
| 98 | + }; |
| 99 | + |
| 100 | + while (!terminalStates.has(jobResp.status.state)) { |
| 101 | + if (new Date() - timeout > start) { |
| 102 | + await jobClient.cancelJob(jobReq); |
| 103 | + console.log( |
| 104 | + `Job ${jobId} timed out after threshold of ` + |
| 105 | + `${timeout / 60000} minutes.` |
| 106 | + ); |
| 107 | + break; |
| 108 | + } |
| 109 | + await sleep.sleep(1); |
| 110 | + [jobResp] = await jobClient.getJob(jobReq); |
| 111 | + } |
| 112 | + |
| 113 | + const clusterReq = { |
| 114 | + projectId: projectId, |
| 115 | + region: region, |
| 116 | + clusterName: clusterName, |
| 117 | + }; |
| 118 | + |
| 119 | + const [clusterResp] = await clusterClient.getCluster(clusterReq); |
| 120 | + |
| 121 | + const storage = new Storage(); |
| 122 | + |
| 123 | + const output = await storage |
| 124 | + .bucket(clusterResp.config.configBucket) |
| 125 | + .file( |
| 126 | + `google-cloud-dataproc-metainfo/${clusterResp.clusterUuid}/` + |
| 127 | + `jobs/${jobId}/driveroutput.000000000` |
| 128 | + ) |
| 129 | + .download(); |
| 130 | + |
| 131 | + // Output a success message. |
| 132 | + console.log( |
| 133 | + `Job ${jobId} finished with state ${jobResp.status.state}:\n${output}` |
| 134 | + ); |
| 135 | + |
| 136 | + // Delete the cluster once the job has terminated. |
| 137 | + const [deleteOperation] = await clusterClient.deleteCluster(clusterReq); |
| 138 | + await deleteOperation.promise(); |
| 139 | + |
| 140 | + // Output a success message |
| 141 | + console.log(`Cluster ${clusterName} successfully deleted.`); |
| 142 | + } |
| 143 | + |
| 144 | + quickstart(); |
| 145 | + // [END dataproc_quickstart] |
52 | 146 | }
|
53 | 147 |
|
54 |
| -quickstart(); |
55 |
| -// [END dataproc_quickstart] |
| 148 | +main(...process.argv.slice(2)); |
0 commit comments