Skip to content

Commit 418801a

Browse files
committed
Create credentials upload plugins for GCP and AWS (#438)
1 parent 6eaf2db commit 418801a

File tree

2 files changed

+93
-0
lines changed

2 files changed

+93
-0
lines changed

dask_cloudprovider/aws/plugins.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import logging
2+
import os
3+
from pathlib import Path
4+
5+
from distributed import WorkerPlugin
6+
7+
logger = logging.getLogger(__name__)
8+
9+
class UploadAWSCredentials(WorkerPlugin):
10+
# """Automatically upload a GCP key to the worker."""
11+
12+
name = "upload_aws_credentials"
13+
14+
def __init__(self):
15+
"""
16+
Initialize the plugin by reading in the data from the given file.
17+
"""
18+
config_path = os.getenv("AWS_CONFIG_FILE", Path.home() / Path(".aws/config"))
19+
credentials_path = os.getenv(
20+
"AWS_SHARED_CREDENTIALS_FILE", Path.home() / Path(".aws/credentials")
21+
)
22+
config_path, credentials_path = Path(config_path), Path(credentials_path)
23+
24+
if not config_path.exists():
25+
raise ValueError(
26+
f"Config file {config_path} does not exist. If you store AWS config "
27+
"in a different location, please set AWS_CONFIG_FILE environment variable."
28+
)
29+
30+
if not credentials_path.exists():
31+
raise ValueError(
32+
f"Credentials file {credentials_path} does not exist. If you store AWS credentials "
33+
"in a different location, please set AWS_SHARED_CREDENTIALS_FILE environment variable."
34+
)
35+
36+
self.config_filename = config_path.name
37+
self.credentials_filename = credentials_path.name
38+
39+
with open(config_path, "rb") as f:
40+
self.config = f.read()
41+
with open(credentials_path, "rb") as f:
42+
self.credentials = f.read()
43+
44+
async def setup(self, worker):
45+
await worker.upload_file(
46+
filename=self.config_filename, data=self.config, load=False
47+
)
48+
worker_config_path = Path(worker.local_directory) / self.config_filename
49+
os.environ["AWS_CONFIG_FILE"] = str(worker_config_path)
50+
51+
await worker.upload_file(
52+
filename=self.credentials_filename, data=self.credentials, load=False
53+
)
54+
worker_credentials_path = (
55+
Path(worker.local_directory) / self.credentials_filename
56+
)
57+
os.environ["AWS_SHARED_CREDENTIALS_FILE"] = str(worker_credentials_path)

dask_cloudprovider/gcp/plugins.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import logging
2+
import os
3+
from pathlib import Path
4+
5+
from distributed import WorkerPlugin
6+
from google.auth._cloud_sdk import get_application_default_credentials_path
7+
8+
logger = logging.getLogger(__name__)
9+
10+
class UploadGCPKey(WorkerPlugin):
11+
"""Automatically upload a GCP key to the worker."""
12+
13+
name = "upload_gcp_key"
14+
15+
def __init__(self):
16+
"""
17+
Initialize the plugin by reading in the data from the given file.
18+
"""
19+
key_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
20+
if key_path is None:
21+
key_path = Path(get_application_default_credentials_path())
22+
if not key_path.exists():
23+
raise ValueError("GOOGLE_APPLICATION_CREDENTIALS is not set or `gcloud auth application-default login` wasn't run.")
24+
25+
key_path = Path(key_path)
26+
self.filename = key_path.name
27+
28+
logger.info("Uploading GCP key from %s.", str(key_path))
29+
30+
with open(key_path, "rb") as f:
31+
self.data = f.read()
32+
33+
async def setup(self, worker):
34+
await worker.upload_file(filename=self.filename, data=self.data, load=False)
35+
worker_key_path = Path(worker.local_directory) / self.filename
36+
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = str(worker_key_path)

0 commit comments

Comments
 (0)